Skip to content

Commit

Permalink
add vector compiler skeleton and basic logic (#5013)
Browse files Browse the repository at this point in the history
This commit adds basic comparison logic to the vector runtime
and some initial compiler framework to translate vectorized DAGs
more generally into runtime ops and exprs.  This is partially complete
but we want to merge as is so the team can begin work on vector-enabled
field search in parallel.

We also renamed the "zed dev vcache" command to "zed dev vector".

In a subsequent PR, we will finish out the mechanisms for handling
heterogeneously typed data with mixed-in errors building up a library
of helper functions for Zed variants to do so.  At that point, we will
begin adding comprehensive tests for the vector runtime.
  • Loading branch information
mccanne authored Feb 2, 2024
1 parent 961ae2e commit 5e986d3
Show file tree
Hide file tree
Showing 39 changed files with 1,981 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/brimdata/zed"
"github.com/brimdata/zed/cli/outputflags"
devvcache "github.com/brimdata/zed/cmd/zed/dev/vcache"
"github.com/brimdata/zed/cmd/zed/dev/vector"
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
Expand All @@ -27,13 +27,13 @@ The project command reads VNG vectors from
a VNG storage objects (local files or s3 objects) and outputs
the reconstructed ZNG row data as an aggregate function.
This command is most useful for testing the VNG vector cache.
This command is most useful for testing the vector cache and runtime.
`,
New: newCommand,
}

func init() {
devvcache.Cmd.Add(Agg)
vector.Cmd.Add(Agg)
}

type Command struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package vcache
package vector

import (
"flag"
Expand All @@ -9,11 +9,11 @@ import (
)

var Cmd = &charm.Spec{
Name: "vcache",
Usage: "vcache sub-command [arguments...]",
Name: "vector",
Usage: "vector sub-command [arguments...]",
Short: "run specified VNG vector test",
Long: `
vcache runs various tests of the vector cache as specified by its sub-command.`,
vector runs various tests of the vector cache and runtime as specified by its sub-command.`,
New: New,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/brimdata/zed"
"github.com/brimdata/zed/cli/outputflags"
devvcache "github.com/brimdata/zed/cmd/zed/dev/vcache"
"github.com/brimdata/zed/cmd/zed/dev/vector"
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
Expand All @@ -30,7 +30,7 @@ This command is most useful for testing the VNG vector cache.
}

func init() {
devvcache.Cmd.Add(Copy)
vector.Cmd.Add(Copy)
}

type Command struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/brimdata/zed"
"github.com/brimdata/zed/cli/outputflags"
devvcache "github.com/brimdata/zed/cmd/zed/dev/vcache"
"github.com/brimdata/zed/cmd/zed/dev/vector"
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/field"
Expand All @@ -33,7 +33,7 @@ This command is most useful for testing the VNG vector cache.
}

func init() {
devvcache.Cmd.Add(Project)
vector.Cmd.Add(Project)
}

type Command struct {
Expand Down
86 changes: 86 additions & 0 deletions cmd/zed/dev/vector/query/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package query

import (
"errors"
"flag"

"github.com/brimdata/zed"
"github.com/brimdata/zed/cli/outputflags"
"github.com/brimdata/zed/cmd/zed/dev/vector"
"github.com/brimdata/zed/cmd/zed/root"
"github.com/brimdata/zed/compiler"
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
)

var query = &charm.Spec{
Name: "query",
Usage: "query [flags] query path",
Short: "run a Zed query on a VNG file",
Long: `
The query command runs a query on a VNG file presuming the
query is entirely vectorizable. The VNG object is read through
the vcache and projected as needed into the runtime.
This command is most useful for testing the vector runtime
in isolation from a Zed lake.
`,
New: newCommand,
}

func init() {
vector.Cmd.Add(query)
}

type Command struct {
*root.Command
outputFlags outputflags.Flags
}

func newCommand(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
c := &Command{Command: parent.(*root.Command)}
c.outputFlags.SetFlags(f)
return c, nil
}

func (c *Command) Run(args []string) error {
ctx, cleanup, err := c.Init(&c.outputFlags)
if err != nil {
return err
}
defer cleanup()
if len(args) != 2 {
return errors.New("usage: query followed by a single path argument of VNG data")
}
text := args[0]
uri, err := storage.ParseURI(args[1])
if err != nil {
return err
}
local := storage.NewLocalEngine()
cache := vcache.NewCache(local)
object, err := cache.Fetch(ctx, uri, ksuid.Nil)
if err != nil {
return err
}
defer object.Close()
rctx := runtime.NewContext(ctx, zed.NewContext())
puller, err := compiler.VectorCompile(rctx, text, object)
if err != nil {
return err
}
writer, err := c.outputFlags.Open(ctx, local)
if err != nil {
return err
}
if err := zio.Copy(writer, zbuf.PullerReader(puller)); err != nil {
writer.Close()
return err
}
return writer.Close()
}
7 changes: 4 additions & 3 deletions cmd/zed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
_ "github.com/brimdata/zed/cmd/zed/dev/dig/section"
_ "github.com/brimdata/zed/cmd/zed/dev/dig/slice"
_ "github.com/brimdata/zed/cmd/zed/dev/dig/trailer"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/agg"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/copy"
_ "github.com/brimdata/zed/cmd/zed/dev/vcache/project"
_ "github.com/brimdata/zed/cmd/zed/dev/vector/agg"
_ "github.com/brimdata/zed/cmd/zed/dev/vector/copy"
_ "github.com/brimdata/zed/cmd/zed/dev/vector/project"
_ "github.com/brimdata/zed/cmd/zed/dev/vector/query"
_ "github.com/brimdata/zed/cmd/zed/dev/vng"
"github.com/brimdata/zed/cmd/zed/drop"
zedinit "github.com/brimdata/zed/cmd/zed/init"
Expand Down
25 changes: 25 additions & 0 deletions compiler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/brimdata/zed/lakeparse"
"github.com/brimdata/zed/runtime"
"github.com/brimdata/zed/runtime/sam/op"
"github.com/brimdata/zed/runtime/vam"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
)
Expand Down Expand Up @@ -140,3 +142,26 @@ type anyCompiler struct{}
func (*anyCompiler) Parse(src string, filenames ...string) (ast.Seq, error) {
return Parse(src, filenames...)
}

// VectorCompile is used for testing queries over single VNG object scans
// where the entire query is vectorizable. It does not call optimize
// nor does it compute the demand of the query to prune the projection
// from the vcache.
func VectorCompile(rctx *runtime.Context, query string, object *vcache.Object) (zbuf.Puller, error) {
seq, err := Parse(query)
if err != nil {
return nil, err
}
src := &data.Source{}
entry, err := semantic.Analyze(rctx.Context, seq, src, nil)
if err != nil {
return nil, err
}
puller := vam.NewVectorProjection(rctx.Zctx, object, nil) //XXX project all
builder := kernel.NewBuilder(rctx, src)
outputs, err := builder.BuildWithPuller(entry, puller)
if err != nil {
return nil, err
}
return vam.NewMaterializer(outputs[0]), nil
}
55 changes: 22 additions & 33 deletions compiler/kernel/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/brimdata/zed"
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/compiler/data"
"github.com/brimdata/zed/compiler/optimizer"
"github.com/brimdata/zed/compiler/optimizer/demand"
"github.com/brimdata/zed/lake"
"github.com/brimdata/zed/order"
Expand Down Expand Up @@ -39,7 +38,6 @@ import (
"github.com/brimdata/zed/runtime/sam/op/uniq"
"github.com/brimdata/zed/runtime/sam/op/yield"
"github.com/brimdata/zed/runtime/vam"
vamop "github.com/brimdata/zed/runtime/vam/op"
"github.com/brimdata/zed/vector"
"github.com/brimdata/zed/zbuf"
"github.com/brimdata/zed/zio"
Expand Down Expand Up @@ -84,6 +82,10 @@ func (b *Builder) Build(seq dag.Seq, readers ...zio.Reader) ([]zbuf.Puller, erro
return b.compileSeq(seq, nil)
}

func (b *Builder) BuildWithPuller(seq dag.Seq, parent vector.Puller) ([]vector.Puller, error) {
return b.compileVamSeq(seq, []vector.Puller{parent})
}

func (b *Builder) zctx() *zed.Context {
return b.rctx.Zctx
}
Expand Down Expand Up @@ -305,7 +307,24 @@ func (b *Builder) compileLeaf(o dag.Op, parent zbuf.Puller) (zbuf.Puller, error)
case *dag.Load:
return load.New(b.rctx, b.source.Lake(), parent, v.Pool, v.Branch, v.Author, v.Message, v.Meta), nil
case *dag.Vectorize:
return b.compileVectorize(v.Body, parent)
// If the first op is SeqScan, then pull it out so we can
// give the scanner a zio.Puller parent (i.e., the lister).
if scan, ok := v.Body[0].(*dag.SeqScan); ok {
puller, err := b.compileVamScan(scan, parent)
if err != nil {
return nil, err
}
if len(v.Body) > 1 {
p, err := b.compileVamSeq(v.Body[1:], []vector.Puller{puller})
if err != nil {
return nil, err
}
puller = p[0]
}
return vam.NewMaterializer(puller), nil
}
//XXX
return nil, errors.New("dag.Vectorize must begin with SeqScan")
default:
return nil, fmt.Errorf("unknown DAG operator type: %v", v)
}
Expand Down Expand Up @@ -672,33 +691,3 @@ func isEntry(seq dag.Seq) bool {
}
return false
}

func (b *Builder) compileVectorize(seq dag.Seq, parent zbuf.Puller) (zbuf.Puller, error) {
var vamParent vector.Puller
for _, o := range seq {
switch o := o.(type) {
case *dag.SeqScan:
pool, err := b.lookupPool(o.Pool)
if err != nil {
return nil, err
}
//XXX check VectorCache not nil
vamParent = vamop.NewScanner(b.rctx, b.source.Lake().VectorCache(), parent, pool, o.Fields, nil, nil)
case *dag.Summarize:
if name, ok := optimizer.IsCountByString(o); ok {
vamParent = vamop.NewCountByString(b.rctx.Zctx, vamParent, name)
} else if name, ok := optimizer.IsSum(o); ok {
vamParent = vamop.NewSum(b.rctx.Zctx, vamParent, name)
} else {
return nil, fmt.Errorf("internal error: unhandled dag.Summarize: %#v", o)
}

default:
return nil, fmt.Errorf("internal error: unknown dag.Op: %#v", o)
}
}
if vamParent == nil {
return nil, fmt.Errorf("internal error: vectorized DAG did not compile")
}
return vam.NewMaterializer(vamParent), nil
}
Loading

0 comments on commit 5e986d3

Please sign in to comment.