diff --git a/s2/README.md b/s2/README.md index a6ed53f184..e145aee1d6 100644 --- a/s2/README.md +++ b/s2/README.md @@ -685,7 +685,8 @@ The 10 byte 'stream identifier' of the second stream can optionally be stripped, Blocks can be concatenated using the `ConcatBlocks` function. -Snappy blocks/streams can safely be concatenated with S2 blocks and streams. +Snappy blocks/streams can safely be concatenated with S2 blocks and streams. +Streams with indexes (see below) will currently not work on concatenated streams. # Stream Seek Index @@ -701,9 +702,27 @@ so the output remains compatible with other decoders. To automatically add an index to a stream, add `WriterAddIndex()` option to your writer. Then the index will be added to the stream when `Close()` is called. +``` + // Add Index to stream... + enc := s2.NewWriter(w, s2.WriterAddIndex()) + io.Copy(enc, r) + enc.Close() +``` + If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`. This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`. +``` + // Get index for separate storage... + enc := s2.NewWriter(w) + io.Copy(enc, r) + index, err := enc.CloseIndex() +``` + +The `index` can then be used needing to read from the stream. +This means the index can be used without needing to seek to the end of the stream +or for manually forwarding streams. See below. + ## Using Indexes To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available. @@ -713,15 +732,83 @@ Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeek If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported. Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface. +``` + dec := s2.NewReader(r) + rs, err := dec.ReadSeeker(false, nil) + rs.Seek(wantOffset, io.SeekStart) +``` + +Get a seeker to seek forward. Since no index is provided, the index is read from the stream. +This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface. + A custom index can be specified which will be used if supplied. When using a custom index, it will not be read from the input stream. +``` + dec := s2.NewReader(r) + rs, err := dec.ReadSeeker(false, index) + rs.Seek(wantOffset, io.SeekStart) +``` + +This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker + +``` + dec := s2.NewReader(r) + rs, err := dec.ReadSeeker(true, index) + rs.Seek(wantOffset, io.SeekStart) +``` + +Finally, since we specify that we want to do random seeking `r` must be an io.Seeker. + The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader, meaning changes performed to one is reflected in the other. +## Manually Forwarding Streams + Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type. This can be used for parsing indexes, either separate or in streams. +In some cases it may not be possible to serve a seekable stream. +This can for instance be an HTTP stream, where the Range request +is sent at the start of the stream. + +With a little bit of extra code it is still possible to forward + +It is possible to load the index manually like this: +``` + var index s2.Index + _, err = index.Load(idxBytes) +``` + +This can be used to figure out how much to offset the compressed stream: + +``` + compressedOffset, uncompressedOffset, err := index.Find(wantOffset) +``` + +The `compressedOffset` is the number of bytes that should be skipped +from the beginning of the compressed file. + +The `uncompressedOffset` will then be offset of the uncompressed bytes returned +when decoding from that position. This will always be <= wantOffset. + +When creating a decoder it must be specified that it should *not* expect a frame header +at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset` +we create the decoder like this: + +``` + dec := s2.NewReader(r, s2.ReaderIgnoreFrameHeader()) +``` + +We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want. +This is done using the regular "Skip" function: + +``` + err = dec.Skip(wantOffset - uncompressedOffset) +``` + +This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset. + ## Index Format: Each block is structured as a snappy skippable block, with the chunk ID 0x99. diff --git a/s2/decode.go b/s2/decode.go index 55644d1d69..5f2e4e4f1c 100644 --- a/s2/decode.go +++ b/s2/decode.go @@ -100,6 +100,7 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader { } else { nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize) } + nr.readHeader = nr.ignoreFrameHeader nr.paramsOK = true return &nr } @@ -143,6 +144,16 @@ func ReaderAllocBlock(blockSize int) ReaderOption { } } +// ReaderIgnoreFrameHeader will make the reader skip the expected +// frame header at the beginning of the stream. +// This can be used when serving a stream that has been forwarded to a specific point. +func ReaderIgnoreFrameHeader() ReaderOption { + return func(r *Reader) error { + r.ignoreFrameHeader = true + return nil + } +} + // ReaderSkippableCB will register a callback for chuncks with the specified ID. // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive). // For each chunk with the ID, the callback is called with the content. @@ -166,6 +177,7 @@ type Reader struct { buf []byte skippableCB [0x80]func(r io.Reader) error blockStart int64 // Uncompressed offset at start of current. + index *Index // decoded[i:j] contains decoded bytes that have not yet been passed on. i, j int @@ -174,11 +186,11 @@ type Reader struct { // maximum expected buffer size. maxBufSize int // alloc a buffer this size if > 0. - lazyBuf int - readHeader bool - paramsOK bool - snappyFrame bool - index *Index + lazyBuf int + readHeader bool + paramsOK bool + snappyFrame bool + ignoreFrameHeader bool } // ensureBufferSize will ensure that the buffer can take at least n bytes. @@ -208,7 +220,7 @@ func (r *Reader) Reset(reader io.Reader) { r.err = nil r.i = 0 r.j = 0 - r.readHeader = false + r.readHeader = r.ignoreFrameHeader } func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { diff --git a/s2/encode.go b/s2/encode.go index 2ae3520bc9..59f992ca6e 100644 --- a/s2/encode.go +++ b/s2/encode.go @@ -1095,7 +1095,7 @@ func (w *Writer) Close() error { // CloseIndex calls Close and returns an index on first call. // This is not required if you are only adding index to a stream. func (w *Writer) CloseIndex() ([]byte, error) { - return w.closeIndex(false) + return w.closeIndex(true) } func (w *Writer) closeIndex(idx bool) ([]byte, error) { diff --git a/s2/index_test.go b/s2/index_test.go new file mode 100644 index 0000000000..49a60204eb --- /dev/null +++ b/s2/index_test.go @@ -0,0 +1,102 @@ +package s2_test + +import ( + "bytes" + "fmt" + "io" + "math/rand" + "sync" + + "github.com/klauspost/compress/s2" +) + +func ExampleIndex_Load() { + fatalErr := func(err error) { + if err != nil { + panic(err) + } + } + + // Create a test corpus + tmp := make([]byte, 5<<20) + rng := rand.New(rand.NewSource(0xbeefcafe)) + rng.Read(tmp) + // Make it compressible... + for i, v := range tmp { + tmp[i] = '0' + v&3 + } + // Compress it... + var buf bytes.Buffer + // We use smaller blocks just for the example... + enc := s2.NewWriter(&buf, s2.WriterBlockSize(100<<10), s2.WriterAddIndex()) + err := enc.EncodeBuffer(tmp) + fatalErr(err) + + // Close and get index... + idxBytes, err := enc.CloseIndex() + fatalErr(err) + + // This is our compressed stream... + compressed := buf.Bytes() + + var once sync.Once + for wantOffset := int64(0); wantOffset < int64(len(tmp)); wantOffset += 555555 { + // Let's assume we want to read from uncompressed offset 'i' + // and we cannot seek in input, but we have the index. + want := tmp[wantOffset:] + + // Load the index. + var index s2.Index + _, err = index.Load(idxBytes) + fatalErr(err) + + // Find offset in file: + compressedOffset, uncompressedOffset, err := index.Find(wantOffset) + fatalErr(err) + + // Offset the input to the compressed offset. + // Notice how we do not provide any bytes before the offset. + input := io.Reader(bytes.NewBuffer(compressed[compressedOffset:])) + if _, ok := input.(io.Seeker); !ok { + // Notice how the input cannot be seeked... + once.Do(func() { + fmt.Println("Input does not support seeking...") + }) + } else { + panic("did you implement seeking on bytes.Buffer?") + } + + // When creating the decoder we must specify that it should not + // expect a frame header at the beginning og the frame. + dec := s2.NewReader(input, s2.ReaderIgnoreFrameHeader()) + + rs, err := dec.ReadSeeker(true, nil) + rs.Seek(wantOffset, io.SeekStart) + // We now have a reader, but it will start outputting at uncompressedOffset, + // and not the actual offset we want, so skip forward to that. + toSkip := wantOffset - uncompressedOffset + err = dec.Skip(toSkip) + fatalErr(err) + + // Read the rest of the stream... + got, err := io.ReadAll(dec) + fatalErr(err) + if bytes.Equal(got, want) { + fmt.Println("Successfully skipped forward to", wantOffset) + } else { + fmt.Println("Failed to skip forward to", wantOffset) + } + } + // OUTPUT: + //Input does not support seeking... + //Successfully skipped forward to 0 + //Successfully skipped forward to 555555 + //Successfully skipped forward to 1111110 + //Successfully skipped forward to 1666665 + //Successfully skipped forward to 2222220 + //Successfully skipped forward to 2777775 + //Successfully skipped forward to 3333330 + //Successfully skipped forward to 3888885 + //Successfully skipped forward to 4444440 + //Successfully skipped forward to 4999995 +}