Skip to content

Commit

Permalink
Update docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
klauspost committed Jan 10, 2022
1 parent 22da84c commit b6cc540
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 8 deletions.
89 changes: 88 additions & 1 deletion s2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ The 10 byte 'stream identifier' of the second stream can optionally be stripped,

Blocks can be concatenated using the `ConcatBlocks` function.

Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
Streams with indexes (see below) will currently not work on concatenated streams.

# Stream Seek Index

Expand All @@ -701,9 +702,27 @@ so the output remains compatible with other decoders.
To automatically add an index to a stream, add `WriterAddIndex()` option to your writer.
Then the index will be added to the stream when `Close()` is called.

```
// Add Index to stream...
enc := s2.NewWriter(w, s2.WriterAddIndex())
io.Copy(enc, r)
enc.Close()
```

If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`.
This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`.

```
// Get index for separate storage...
enc := s2.NewWriter(w)
io.Copy(enc, r)
index, err := enc.CloseIndex()
```

The `index` can then be used needing to read from the stream.
This means the index can be used without needing to seek to the end of the stream
or for manually forwarding streams. See below.

## Using Indexes

To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available.
Expand All @@ -713,15 +732,83 @@ Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeek
If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported.
Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.

```
dec := s2.NewReader(r)
rs, err := dec.ReadSeeker(false, nil)
rs.Seek(wantOffset, io.SeekStart)
```

Get a seeker to seek forward. Since no index is provided, the index is read from the stream.
This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.

A custom index can be specified which will be used if supplied.
When using a custom index, it will not be read from the input stream.

```
dec := s2.NewReader(r)
rs, err := dec.ReadSeeker(false, index)
rs.Seek(wantOffset, io.SeekStart)
```

This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker

```
dec := s2.NewReader(r)
rs, err := dec.ReadSeeker(true, index)
rs.Seek(wantOffset, io.SeekStart)
```

Finally, since we specify that we want to do random seeking `r` must be an io.Seeker.

The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader,
meaning changes performed to one is reflected in the other.

## Manually Forwarding Streams

Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type.
This can be used for parsing indexes, either separate or in streams.

In some cases it may not be possible to serve a seekable stream.
This can for instance be an HTTP stream, where the Range request
is sent at the start of the stream.

With a little bit of extra code it is still possible to forward

It is possible to load the index manually like this:
```
var index s2.Index
_, err = index.Load(idxBytes)
```

This can be used to figure out how much to offset the compressed stream:

```
compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
```

The `compressedOffset` is the number of bytes that should be skipped
from the beginning of the compressed file.

The `uncompressedOffset` will then be offset of the uncompressed bytes returned
when decoding from that position. This will always be <= wantOffset.

When creating a decoder it must be specified that it should *not* expect a frame header
at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset`
we create the decoder like this:

```
dec := s2.NewReader(r, s2.ReaderIgnoreFrameHeader())
```

We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want.
This is done using the regular "Skip" function:

```
err = dec.Skip(wantOffset - uncompressedOffset)
```

This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset.

## Index Format:

Each block is structured as a snappy skippable block, with the chunk ID 0x99.
Expand Down
24 changes: 18 additions & 6 deletions s2/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
} else {
nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
}
nr.readHeader = nr.ignoreFrameHeader
nr.paramsOK = true
return &nr
}
Expand Down Expand Up @@ -143,6 +144,16 @@ func ReaderAllocBlock(blockSize int) ReaderOption {
}
}

// ReaderIgnoreFrameHeader will make the reader skip the expected
// frame header at the beginning of the stream.
// This can be used when serving a stream that has been forwarded to a specific point.
func ReaderIgnoreFrameHeader() ReaderOption {
return func(r *Reader) error {
r.ignoreFrameHeader = true
return nil
}
}

// ReaderSkippableCB will register a callback for chuncks with the specified ID.
// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
// For each chunk with the ID, the callback is called with the content.
Expand All @@ -166,6 +177,7 @@ type Reader struct {
buf []byte
skippableCB [0x80]func(r io.Reader) error
blockStart int64 // Uncompressed offset at start of current.
index *Index

// decoded[i:j] contains decoded bytes that have not yet been passed on.
i, j int
Expand All @@ -174,11 +186,11 @@ type Reader struct {
// maximum expected buffer size.
maxBufSize int
// alloc a buffer this size if > 0.
lazyBuf int
readHeader bool
paramsOK bool
snappyFrame bool
index *Index
lazyBuf int
readHeader bool
paramsOK bool
snappyFrame bool
ignoreFrameHeader bool
}

// ensureBufferSize will ensure that the buffer can take at least n bytes.
Expand Down Expand Up @@ -208,7 +220,7 @@ func (r *Reader) Reset(reader io.Reader) {
r.err = nil
r.i = 0
r.j = 0
r.readHeader = false
r.readHeader = r.ignoreFrameHeader
}

func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
Expand Down
2 changes: 1 addition & 1 deletion s2/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ func (w *Writer) Close() error {
// CloseIndex calls Close and returns an index on first call.
// This is not required if you are only adding index to a stream.
func (w *Writer) CloseIndex() ([]byte, error) {
return w.closeIndex(false)
return w.closeIndex(true)
}

func (w *Writer) closeIndex(idx bool) ([]byte, error) {
Expand Down
102 changes: 102 additions & 0 deletions s2/index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package s2_test

import (
"bytes"
"fmt"
"io"
"math/rand"
"sync"

"github.com/klauspost/compress/s2"
)

func ExampleIndex_Load() {
fatalErr := func(err error) {
if err != nil {
panic(err)
}
}

// Create a test corpus
tmp := make([]byte, 5<<20)
rng := rand.New(rand.NewSource(0xbeefcafe))
rng.Read(tmp)
// Make it compressible...
for i, v := range tmp {
tmp[i] = '0' + v&3
}
// Compress it...
var buf bytes.Buffer
// We use smaller blocks just for the example...
enc := s2.NewWriter(&buf, s2.WriterBlockSize(100<<10), s2.WriterAddIndex())
err := enc.EncodeBuffer(tmp)
fatalErr(err)

// Close and get index...
idxBytes, err := enc.CloseIndex()
fatalErr(err)

// This is our compressed stream...
compressed := buf.Bytes()

var once sync.Once
for wantOffset := int64(0); wantOffset < int64(len(tmp)); wantOffset += 555555 {
// Let's assume we want to read from uncompressed offset 'i'
// and we cannot seek in input, but we have the index.
want := tmp[wantOffset:]

// Load the index.
var index s2.Index
_, err = index.Load(idxBytes)
fatalErr(err)

// Find offset in file:
compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
fatalErr(err)

// Offset the input to the compressed offset.
// Notice how we do not provide any bytes before the offset.
input := io.Reader(bytes.NewBuffer(compressed[compressedOffset:]))
if _, ok := input.(io.Seeker); !ok {
// Notice how the input cannot be seeked...
once.Do(func() {
fmt.Println("Input does not support seeking...")
})
} else {
panic("did you implement seeking on bytes.Buffer?")
}

// When creating the decoder we must specify that it should not
// expect a frame header at the beginning og the frame.
dec := s2.NewReader(input, s2.ReaderIgnoreFrameHeader())

rs, err := dec.ReadSeeker(true, nil)
rs.Seek(wantOffset, io.SeekStart)
// We now have a reader, but it will start outputting at uncompressedOffset,
// and not the actual offset we want, so skip forward to that.
toSkip := wantOffset - uncompressedOffset
err = dec.Skip(toSkip)
fatalErr(err)

// Read the rest of the stream...
got, err := io.ReadAll(dec)
fatalErr(err)
if bytes.Equal(got, want) {
fmt.Println("Successfully skipped forward to", wantOffset)
} else {
fmt.Println("Failed to skip forward to", wantOffset)
}
}
// OUTPUT:
//Input does not support seeking...
//Successfully skipped forward to 0
//Successfully skipped forward to 555555
//Successfully skipped forward to 1111110
//Successfully skipped forward to 1666665
//Successfully skipped forward to 2222220
//Successfully skipped forward to 2777775
//Successfully skipped forward to 3333330
//Successfully skipped forward to 3888885
//Successfully skipped forward to 4444440
//Successfully skipped forward to 4999995
}

0 comments on commit b6cc540

Please sign in to comment.