Skip to content

Commit

Permalink
simplify and improve VNG format (#4984)
Browse files Browse the repository at this point in the history
This commit simplifies the VNG format with a new VNG metadata object
called Variant, which represents a sequence of typed values (without
putting the types into a union), in line with the Zed data model.
This replaces the ad hoc data structures that were encoded in the
VNG trailer.  We also changed vectors so they are all represented in
a single segment in the data section, which simplifies vector stats and
reading.  To support streaming, we replaced the trailer with
a header and added framing so that VNG objects can simply be concatenated
in a stream or file.  We added the command "zed dev vng" to displace
the fixed VNG framing header (with magic) and the metadata as Zed data.

Nulls are now handled completely by the Nulls metadata structure and
vector metadata now includes length information, which will be used by the
vcache in a subsequent commit to faciliate easier flattening of leaf
vectors so that all leaf values within a type all have the same length
(except perhaps for vectors below unions, TBD).

The VNG write path now stores each object entirely in memory.  Once
all writes have completed, the vectors are encoded in parallel (e.g.,
applying compression). Then a DFS traversal computes the data layout
and produces a single Zed value representing the vector metadata
with data locations.  The header, metadatasection, and data vectors
are  then written out sequentially from a single thread such that the
buffers end up being written in the same locations that were computed
in the metadata pass.
  • Loading branch information
mccanne authored Jan 15, 2024
1 parent cebfb3a commit 8a48163
Show file tree
Hide file tree
Showing 36 changed files with 930 additions and 887 deletions.
7 changes: 0 additions & 7 deletions cli/outputflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/anyio"
"github.com/brimdata/zed/zio/emitter"
"github.com/brimdata/zed/zio/vngio"
"github.com/brimdata/zed/zio/zngio"
)

Expand All @@ -42,12 +41,6 @@ func (f *Flags) Options() anyio.WriterOpts {
func (f *Flags) setFlags(fs *flag.FlagSet) {
// zio stuff
fs.BoolVar(&f.color, "color", true, "enable/disable color formatting for -Z and lake text output")
f.VNG = &vngio.WriterOpts{
ColumnThresh: vngio.DefaultColumnThresh,
SkewThresh: vngio.DefaultSkewThresh,
}
fs.Var(&f.VNG.ColumnThresh, "vng.colthresh", "minimum VNG frame size")
fs.Var(&f.VNG.SkewThresh, "vng.skewthresh", "minimum VNG skew size")
f.ZNG = &zngio.WriterOpts{}
fs.BoolVar(&f.ZNG.Compress, "zng.compress", true, "compress ZNG frames")
fs.IntVar(&f.ZNG.FrameThresh, "zng.framethresh", zngio.DefaultFrameThresh,
Expand Down
145 changes: 145 additions & 0 deletions cmd/zed/dev/vng/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package vng

import (
"bufio"
"errors"
"flag"
"fmt"
"io"

"github.com/brimdata/zed"
"github.com/brimdata/zed/cli/outputflags"
"github.com/brimdata/zed/cmd/zed/dev"
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/vng"
"github.com/brimdata/zed/zio"
"github.com/brimdata/zed/zio/zngio"
"github.com/brimdata/zed/zson"
)

var Cmd = &charm.Spec{
Name: "vng",
Usage: "vng uri",
Short: "dump vng metadata",
Long: `
vng decodes an input uri and emits the metadata sections in the format desired.`,
New: New,
}

func init() {
dev.Cmd.Add(Cmd)
}

type Command struct {
*root.Command
outputFlags outputflags.Flags
}

func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
c := &Command{Command: parent.(*root.Command)}
c.outputFlags.SetFlags(f)
return c, nil
}

func (c *Command) Run(args []string) error {
ctx, cleanup, err := c.Init(&c.outputFlags)
if err != nil {
return err
}
defer cleanup()
if len(args) != 1 {
return errors.New("a single file is required")
}
uri, err := storage.ParseURI(args[0])
if err != nil {
return err
}
engine := storage.NewLocalEngine()
r, err := engine.Get(ctx, uri)
if err != nil {
return err
}
defer r.Close()
writer, err := c.outputFlags.Open(ctx, engine)
if err != nil {
return err
}
meta := newReader(r)
err = zio.Copy(writer, meta)
if err2 := writer.Close(); err == nil {
err = err2
}
return err
}

type reader struct {
zctx *zed.Context
reader *bufio.Reader
meta *zngio.Reader
marshaler *zson.MarshalZNGContext
dataSize int
}

var _ zio.Reader = (*reader)(nil)

func newReader(r io.Reader) *reader {
zctx := zed.NewContext()
return &reader{
zctx: zctx,
reader: bufio.NewReader(r),
marshaler: zson.NewZNGMarshalerWithContext(zctx),
}
}

func (r *reader) Read() (*zed.Value, error) {
for {
if r.meta == nil {
hdr, err := r.readHeader()
if err != nil {
if err == io.EOF {
err = nil
}
return nil, err
}
r.meta = zngio.NewReader(r.zctx, io.LimitReader(r.reader, int64(hdr.MetaSize)))
r.dataSize = int(hdr.DataSize)
val, err := r.marshaler.Marshal(hdr)
return val.Ptr(), err
}
val, err := r.meta.Read()
if val != nil || err != nil {
return val, err
}
if err := r.meta.Close(); err != nil {
return nil, err
}
r.meta = nil
r.skip(r.dataSize)
}
}

func (r *reader) readHeader() (vng.Header, error) {
var bytes [vng.HeaderSize]byte
cc, err := r.reader.Read(bytes[:])
if err != nil {
return vng.Header{}, err
}
if cc != vng.HeaderSize {
return vng.Header{}, fmt.Errorf("truncated VNG file: %d bytes of %d read", cc, vng.HeaderSize)
}
var h vng.Header
if err := h.Deserialize(bytes[:]); err != nil {
return vng.Header{}, err
}
return h, nil
}

func (r *reader) skip(n int) error {
got, err := r.reader.Discard(n)
if n != got {
return fmt.Errorf("truncated VNG data: data section %d but read only %d", n, got)
}
return err
}
1 change: 1 addition & 0 deletions cmd/zed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/agg"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/copy"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/project"
_ "github.com/brimdata/zed/cmd/zed/dev/vng"
"github.com/brimdata/zed/cmd/zed/drop"
zedinit "github.com/brimdata/zed/cmd/zed/init"
"github.com/brimdata/zed/cmd/zed/load"
Expand Down
4 changes: 2 additions & 2 deletions fuzz/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func WriteZNG(t testing.TB, valuesIn []zed.Value, buf *bytes.Buffer) {
require.NoError(t, writer.Close())
}

func WriteVNG(t testing.TB, valuesIn []zed.Value, buf *bytes.Buffer, opts vngio.WriterOpts) {
writer, err := vngio.NewWriterWithOpts(zio.NopCloser(buf), opts)
func WriteVNG(t testing.TB, valuesIn []zed.Value, buf *bytes.Buffer) {
writer, err := vngio.NewWriter(zio.NopCloser(buf))
require.NoError(t, err)
require.NoError(t, zio.Copy(writer, zbuf.NewArray(valuesIn)))
require.NoError(t, writer.Close())
Expand Down
101 changes: 32 additions & 69 deletions runtime/vcache/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package vcache

import (
"context"
"fmt"
"io"
"sync"

"github.com/brimdata/zed"
Expand All @@ -26,31 +26,25 @@ type Object struct {
id ksuid.KSUID
uri *storage.URI
engine storage.Engine
reader storage.Reader
reader io.ReaderAt
// We keep a local context for each object since a new type context is created
// for each query and we need to map the VNG object context to the query
// context. Of course, with Zed, this is very cheap.
local *zed.Context

//XXX this is all gonna change in a subsequent PR when we get the Variant
// data type working across vng, vcache, and vector
metas []meta.Metadata
// There is one vector per Zed type and the typeKeys array provides
// the sequence order of each vector to be accessed.
vectors []vector.Any
typeDict []zed.Type
typeKeys []int32
//slots map[int32][]int32 //XXX handle this differently?
types []zed.Type
tags []int32

vectors []vector.Any
}

// NewObject creates a new in-memory Object corresponding to a VNG object
// residing in storage. It loads the list of VNG root types (one per value
// in the file) and the VNG metadata for vector reassembly. A table for each
// type is also created to map the global slot number in the object to the local
// slot number in the type so that an element's local position in the vector
// (within a particular type) can be related to its slot number in the object,
// e.g., so that filtering of a local vector can be turned into the list of
// matching object slots. The object provides the metadata needed to load vectors
// on demand only as they are referenced. A vector is loaded by calling its Load method,
// which decodes its zcode.Bytes into its native representation.
// XXX we may want to change the VNG format to code vectors in native format.
// residing in storage. The VNG header and metadata section are read and
// the metadata is deserialized so that vectors can be loaded into the cache
// on demand only as needed and retained in memory for future use.
func NewObject(ctx context.Context, engine storage.Engine, uri *storage.URI, id ksuid.KSUID) (*Object, error) {
// XXX currently we open a storage.Reader for every object and never close it.
// We should either close after a timeout and reopen when needed or change the
Expand All @@ -61,50 +55,36 @@ func NewObject(ctx context.Context, engine storage.Engine, uri *storage.URI, id
if err != nil {
return nil, err
}
size, err := storage.Size(reader)
if err != nil {
return nil, err
}
// XXX use the query's zctx so we don't have to map?,
// or maybe use a single context across all objects in the cache?
zctx := zed.NewContext()
z, err := vng.NewObject(zctx, reader, size)
z, err := vng.NewObject(zctx, reader)
if err != nil {
return nil, err
}
typeKeys, metas, err := z.FetchMetadata()
types, metas, tags, err := z.MiscMeta()
if err != nil {
return nil, err
}
if len(metas) == 0 {
return nil, fmt.Errorf("empty VNG object: %s", uri)
}
if len(metas) > MaxTypesPerObject {
return nil, fmt.Errorf("too many types in VNG object: %s", uri)
}
typeDict := make([]zed.Type, 0, len(metas))
for _, meta := range metas {
typeDict = append(typeDict, meta.Type(zctx)) //XXX commanet about context locality
}
vectors := make([]vector.Any, len(metas))
return &Object{
mu: make([]sync.Mutex, len(typeDict)),
id: id,
uri: uri,
engine: engine,
reader: reader,
local: zctx,
metas: metas,
vectors: vectors,
typeDict: typeDict,
typeKeys: typeKeys,
//slots: slots,
mu: make([]sync.Mutex, len(metas)),
id: id,
uri: uri,
engine: engine,
reader: z.DataReader(),
local: zctx,
metas: metas,
types: types,
tags: tags,
vectors: make([]vector.Any, len(metas)),
}, nil
}

func (o *Object) Close() error {
if o.reader != nil {
return o.reader.Close()
if closer, ok := o.reader.(io.Closer); ok {
return closer.Close()
}
}
return nil
}
Expand All @@ -114,31 +94,14 @@ func (o *Object) LocalContext() *zed.Context {
}

func (o *Object) Types() []zed.Type {
return o.typeDict
}

func (o *Object) TypeKeys() []int32 {
return o.typeKeys
}

func (o *Object) LookupType(typeKey uint32) zed.Type {
return o.typeDict[typeKey]
}

func (o *Object) Len() int {
return len(o.typeKeys)
return o.types
}

// XXX fix comment
// Due to the heterogenous nature of Zed data, a given path can appear in
// multiple types and a given type can have multiple vectors XXX (due to union
// types in the hiearchy). Load returns a Group for each type and the Group
// may contain multiple vectors.
func (o *Object) Load(typeKey uint32, path field.Path) (vector.Any, error) {
func (o *Object) Load(tag uint32, path field.Path) (vector.Any, error) {
l := loader{o.local, o.reader}
o.mu[typeKey].Lock()
defer o.mu[typeKey].Unlock()
return l.loadVector(&o.vectors[typeKey], o.typeDict[typeKey], path, o.metas[typeKey])
o.mu[tag].Lock()
defer o.mu[tag].Unlock()
return l.loadVector(&o.vectors[tag], o.types[tag], path, o.metas[tag])
}

func (o *Object) NewReader() *Reader {
Expand Down
24 changes: 4 additions & 20 deletions runtime/vcache/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,11 @@ func (l *loader) loadPrimitive(typ zed.Type, m *meta.Primitive) (vector.Any, err
// The VNG primitive columns are stored as one big
// list of Zed values. So we can just read the data in
// all at once, compute the byte offsets of each value
// (for random access, not used yet).
var n int
for _, segment := range m.Segmap {
n += int(segment.MemLength)
}
// (for random access, not used yet).XXX update comment
n := int(m.Location.MemLength)
bytes := make([]byte, n)
var off int
for _, segment := range m.Segmap {
if err := segment.Read(l.r, bytes[off:]); err != nil {
return nil, err
}
off += int(segment.MemLength)
if err := m.Location.Read(l.r, bytes); err != nil {
return nil, err
}
if len(m.Dict) > 0 {
var b []byte
Expand Down Expand Up @@ -107,12 +100,3 @@ type Const struct {
func NewConst(m *meta.Const) *Const {
return &Const{bytes: m.Value.Bytes()}
}

/*
func (c *Const) NewIter(r io.ReaderAt) (iterator, error) {
return func(b *zcode.Builder) error {
b.Append(c.bytes)
return nil
}, nil
}
*/
Loading

0 comments on commit 8a48163

Please sign in to comment.