From 2c36775ff514d07cf4e94c874772dba49d5da158 Mon Sep 17 00:00:00 2001 From: Jille Timmermans Date: Fri, 9 Feb 2024 15:50:06 +0100 Subject: [PATCH 1/2] s2: Add AsyncFlush method: Complete the block without flushing My use case is to transfer a large compressed S2 stream with a few changes very often. To get a small diff I want to end blocks at application decided points rather than at byte offsets. This allows me to remove the first byte without every single block changing. Flush() works for this, but it limits concurrency because it waits for the last block to be compressed rather than allowing that asynchronously. So I'd like to propose AsyncFlush, which flushes the buffer to a block, but doesn't flush the block to the io.Writer. There were actually a few places in the s2 code that also wanted to end the block, but didn't necessary want to flush to the writer. --- s2/writer.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/s2/writer.go b/s2/writer.go index bba66a8766..891f18decf 100644 --- a/s2/writer.go +++ b/s2/writer.go @@ -215,7 +215,7 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { return 0, err } if len(w.ibuf) > 0 { - err := w.Flush() + err := w.AsyncFlush() if err != nil { return 0, err } @@ -225,7 +225,7 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { if err := w.EncodeBuffer(buf); err != nil { return 0, err } - return int64(len(buf)), w.Flush() + return int64(len(buf)), w.AsyncFlush() } for { inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen] @@ -354,7 +354,7 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { } // Flush queued data first. if len(w.ibuf) > 0 { - err := w.Flush() + err := w.AsyncFlush() if err != nil { return err } @@ -716,9 +716,9 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { return nRet, nil } -// Flush flushes the Writer to its underlying io.Writer. -// This does not apply padding. -func (w *Writer) Flush() error { +// AsyncFlush writes any buffered bytes to a block, but does not flush it to the +// underlying io.Writer. +func (w *Writer) AsyncFlush() error { if err := w.err(nil); err != nil { return err } @@ -738,6 +738,15 @@ func (w *Writer) Flush() error { } } } + return w.err(nil) +} + +// Flush flushes the Writer to its underlying io.Writer. +// This does not apply padding. +func (w *Writer) Flush() error { + if err := w.AsyncFlush(); err != nil { + return err + } if w.output == nil { return w.err(nil) } From 09a048f4ea815b49df8c24df99dac1ab6b43b507 Mon Sep 17 00:00:00 2001 From: Jille Timmermans Date: Sun, 11 Feb 2024 11:39:27 +0100 Subject: [PATCH 2/2] Update s2/writer.go Co-authored-by: Klaus Post --- s2/writer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/s2/writer.go b/s2/writer.go index 891f18decf..1253ea675c 100644 --- a/s2/writer.go +++ b/s2/writer.go @@ -716,8 +716,8 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { return nRet, nil } -// AsyncFlush writes any buffered bytes to a block, but does not flush it to the -// underlying io.Writer. +// AsyncFlush writes any buffered bytes to a block and starts compressing it. +// It does not wait for the output has been written as Flush() does. func (w *Writer) AsyncFlush() error { if err := w.err(nil); err != nil { return err