diff --git a/driver/driver.go b/driver/driver.go index 8940da01b5..bea950b08d 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -9,6 +9,7 @@ import ( "github.com/brimsec/zq/ast" "github.com/brimsec/zq/zbuf" + "github.com/brimsec/zq/zng" "github.com/brimsec/zq/zng/resolver" "github.com/brimsec/zq/zqd/api" ) @@ -147,3 +148,57 @@ func Copy(ctx context.Context, w zbuf.Writer, prog ast.Proc, zctx *resolver.Cont d := &transformDriver{w: w} return Run(ctx, d, prog, zctx, r, cfg) } + +type muxReader struct { + *muxOutput + batch zbuf.Batch + cancel context.CancelFunc + index int + statsTickCh <-chan time.Time +} + +func (mr *muxReader) Read() (*zng.Record, error) { +read: + if mr.batch == nil { + chunk := mr.Pull(mr.statsTickCh) + if chunk.ID != 0 { + return nil, errors.New("transform proc with multiple tails") + } + if chunk.Batch != nil { + mr.batch = chunk.Batch + } else if chunk.Err != nil { + return nil, chunk.Err + } else if chunk.Warning != "" { + goto read + } else { + return nil, nil + } + } + if mr.index >= mr.batch.Length() { + mr.batch.Unref() + mr.batch, mr.index = nil, 0 + goto read + } + rec := mr.batch.Index(mr.index) + mr.index++ + return rec, nil +} + +func (mr *muxReader) Close() error { + mr.cancel() + return nil +} + +func NewReader(ctx context.Context, program ast.Proc, zctx *resolver.Context, reader zbuf.Reader) (zbuf.ReadCloser, error) { + ctx, cancel := context.WithCancel(ctx) + mux, err := compileSingle(ctx, program, zctx, reader, Config{}) + if err != nil { + cancel() + return nil, err + } + return &muxReader{ + cancel: cancel, + muxOutput: mux, + statsTickCh: make(chan time.Time), + }, nil +} diff --git a/go.mod b/go.mod index bb3513e5e3..c5d46c36b1 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go v1.30.19 github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e + github.com/fsnotify/fsnotify v1.4.9 github.com/go-resty/resty/v2 v2.2.0 github.com/golang/mock v1.4.3 github.com/google/gopacket v1.1.17 diff --git a/go.sum b/go.sum index d7d1806b13..5a8464c879 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,7 @@ github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fp github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= diff --git a/pkg/fs/tail.go b/pkg/fs/tail.go new file mode 100644 index 0000000000..6a35f32e78 --- /dev/null +++ b/pkg/fs/tail.go @@ -0,0 +1,213 @@ +package fs + +import ( + "errors" + "io" + "io/ioutil" + "os" + "path/filepath" + + "github.com/fsnotify/fsnotify" +) + +var ErrIsDir = errors.New("path is a directory") + +type TFile struct { + f *os.File + watcher *fsnotify.Watcher +} + +func TailFile(name string) (*TFile, error) { + info, err := os.Stat(name) + if err != nil { + return nil, err + } + if info.IsDir() { + return nil, ErrIsDir + } + f, err := OpenFile(name, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + watcher, err := fsnotify.NewWatcher() + if err != nil { + f.Close() + return nil, err + } + if err := watcher.Add(name); err != nil { + f.Close() + watcher.Close() + return nil, err + } + return &TFile{f, watcher}, nil +} + +func (t *TFile) Read(b []byte) (int, error) { +read: + n, err := t.f.Read(b) + if err == io.EOF { + if n > 0 { + return n, nil + } + if err := t.waitWrite(); err != nil { + return 0, err + } + goto read + } + if errors.Is(err, os.ErrClosed) { + err = io.EOF + } + return n, err +} + +func (t *TFile) waitWrite() error { + for { + select { + case ev, ok := <-t.watcher.Events: + if !ok { + return io.EOF + } + if ev.Op == fsnotify.Write { + return nil + } + case err := <-t.watcher.Errors: + return err + } + } +} + +func (t *TFile) Stop() error { + return t.watcher.Close() +} + +func (t *TFile) Close() error { + return t.f.Close() +} + +type FileOp int + +const ( + FileOpCreated FileOp = iota + FileOpExisting + FileOpRemoved +) + +func (o FileOp) Exists() bool { + return o == FileOpCreated || o == FileOpExisting +} + +type FileEvent struct { + Name string + Op FileOp + Err error +} + +// DirWatcher observes a directory and will emit events when files are added +// or removed. When open for the first time this will emit an event for +// every existing file. +type DirWatcher struct { + Events chan FileEvent + + dir string + watched map[string]struct{} + watcher *fsnotify.Watcher +} + +func NewDirWatcher(dir string) (*DirWatcher, error) { + info, err := os.Stat(dir) + if err != nil { + return nil, err + } + if !info.IsDir() { + return nil, errors.New("provided path must be a directory") + } + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + w := &DirWatcher{ + Events: make(chan FileEvent), + dir: dir, + watched: make(map[string]struct{}), + watcher: watcher, + } + if err := w.watcher.Add(w.dir); err != nil { + return nil, err + } + go func() { + err := w.run() + if errc := w.watcher.Close(); err == nil { + err = errc + } + if err != nil { + w.Events <- FileEvent{Err: err} + } + close(w.Events) + }() + return w, nil +} + +func (w *DirWatcher) run() error { + if err := w.poll(); err != nil { + return err + } + for ev := range w.watcher.Events { + switch { + case ev.Op&fsnotify.Create == fsnotify.Create: + if err := w.addFile(ev.Name); err != nil { + return err + } + case ev.Op&fsnotify.Rename == fsnotify.Rename, ev.Op&fsnotify.Remove == fsnotify.Remove: + if err := w.removeFile(ev.Name); err != nil { + return err + } + } + } + // watcher has been closed, poll once more to make sure we haven't missed + // any files due to race. + return w.poll() +} + +func (w *DirWatcher) addFile(name string) error { + p, err := filepath.Abs(name) + if err != nil { + return err + } + if _, ok := w.watched[p]; !ok { + w.watched[p] = struct{}{} + w.Events <- FileEvent{Name: p, Op: FileOpCreated} + } + return nil +} + +func (w *DirWatcher) removeFile(name string) error { + p, err := filepath.Abs(name) + if err != nil { + return err + } + if _, ok := w.watched[p]; ok { + delete(w.watched, p) + w.Events <- FileEvent{Name: p, Op: FileOpRemoved} + } + return nil +} + +func (w *DirWatcher) poll() error { + infos, err := ioutil.ReadDir(w.dir) + if err != nil { + return err + } + for _, info := range infos { + if info.IsDir() { + continue + } + if err := w.addFile(filepath.Join(w.dir, info.Name())); err != nil { + return err + } + } + return nil +} + +func (w *DirWatcher) Stop() error { + return w.watcher.Close() +} diff --git a/pkg/fs/tail_test.go b/pkg/fs/tail_test.go new file mode 100644 index 0000000000..74ede291e6 --- /dev/null +++ b/pkg/fs/tail_test.go @@ -0,0 +1,65 @@ +package fs + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTailFile(t *testing.T) { + f, err := ioutil.TempFile("", "tailfile.log") + require.NoError(t, err) + t.Cleanup(func() { os.Remove(f.Name()) }) + tf, err := TailFile(f.Name()) + require.NoError(t, err) + buf := make([]byte, 100) + + for i := 0; i < 10; i++ { + str := fmt.Sprintf("line #%d\n", i) + _, err := f.WriteString(str) + require.NoError(t, err) + n, err := tf.Read(buf) + require.NoError(t, err) + assert.Equal(t, str, string(buf[:n])) + } + go require.NoError(t, tf.Stop()) + n, err := tf.Read(buf) + assert.Equal(t, 0, n) + assert.Error(t, io.EOF, err) +} + +func TestTailFileReadToEOF(t *testing.T) { + expected := `line #0 +line #1 +line #2 +line #3 +line #4 +line #5 +line #6 +line #7 +line #8 +line #9 +` + f, err := ioutil.TempFile("", "tailfile.log") + require.NoError(t, err) + defer os.Remove(f.Name()) + tf, err := TailFile(f.Name()) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + str := fmt.Sprintf("line #%d\n", i) + _, err := f.WriteString(str) + require.NoError(t, err) + } + require.NoError(t, tf.Stop()) + buf := bytes.NewBuffer(nil) + _, err = io.Copy(buf, tf) + require.NoError(t, err) + assert.Equal(t, expected, buf.String()) +} diff --git a/zqd/api/api.go b/zqd/api/api.go index 6efcc4965a..76f5b6f751 100644 --- a/zqd/api/api.go +++ b/zqd/api/api.go @@ -166,7 +166,9 @@ type PcapPostStatus struct { UpdateTime nano.Ts `json:"update_time"` PcapSize int64 `json:"pcap_total_size" unit:"bytes"` PcapReadSize int64 `json:"pcap_read_size" unit:"bytes"` - SnapshotCount int `json:"snapshot_count"` + RecordBytes int64 `json:"record_bytes,omitempty" unit:"bytes"` + RecordCount int64 `json:"record_count,omitempty"` + SnapshotCount int `json:"snapshot_count,omitempty"` Span *nano.Span `json:"span,omitempty"` } diff --git a/zqd/handlers.go b/zqd/handlers.go index e7a6f39278..7737270487 100644 --- a/zqd/handlers.go +++ b/zqd/handlers.go @@ -10,7 +10,6 @@ import ( "github.com/brimsec/zq/pcap" "github.com/brimsec/zq/pkg/ctxio" - "github.com/brimsec/zq/pkg/nano" "github.com/brimsec/zq/zbuf" "github.com/brimsec/zq/zng/resolver" "github.com/brimsec/zq/zqd/api" @@ -191,12 +190,7 @@ func handlePcapSearch(c *Core, w http.ResponseWriter, r *http.Request) { respondError(c, w, r, zqe.E(zqe.Invalid, err)) return } - pspace, ok := s.(space.PcapSpace) - if !ok { - respondError(c, w, r, zqe.E(zqe.Invalid, "space does not support pcap searches")) - return - } - pcapstore := pspace.PcapStore() + pcapstore := s.PcapStore() if pcapstore.Empty() { respondError(c, w, r, zqe.E(zqe.NotFound, "no pcap in this space")) return @@ -365,18 +359,7 @@ func handlePcapPost(c *Core, w http.ResponseWriter, r *http.Request) { return } - pspace, ok := s.(space.PcapSpace) - if !ok { - respondError(c, w, r, zqe.E(zqe.Invalid, "space does not support pcap import")) - return - } - pcapstore := pspace.PcapStore() - logstore, ok := s.Storage().(ingest.ClearableStore) - if !ok { - respondError(c, w, r, zqe.E(zqe.Invalid, "storage does not support pcap import")) - return - } - op, warnings, err := ingest.NewPcapOp(ctx, pcapstore, logstore, req.Path, c.Suricata, c.Zeek) + op, warnings, err := ingest.NewPcapOp(ctx, s, req.Path, c.Suricata, c.Zeek) if err != nil { respondError(c, w, r, err) return @@ -417,15 +400,8 @@ func handlePcapPost(c *Core, w http.ResponseWriter, r *http.Request) { return } - status := api.PcapPostStatus{ - Type: "PcapPostStatus", - StartTime: op.StartTime, - UpdateTime: nano.Now(), - PcapSize: op.PcapSize, - PcapReadSize: op.PcapReadSize(), - SnapshotCount: op.SnapshotCount(), - Span: &sum.Span, - } + status := op.Status() + status.Span = &sum.Span if err := pipe.Send(status); err != nil { logger.Warn("Error sending payload", zap.Error(err)) return diff --git a/zqd/handlers_pcap_test.go b/zqd/handlers_pcap_test.go index 3cc09bcbc6..3184ec18fd 100644 --- a/zqd/handlers_pcap_test.go +++ b/zqd/handlers_pcap_test.go @@ -71,9 +71,9 @@ func TestPcapPostSuccess(t *testing.T) { require.NoError(t, err) plen := len(p.payloads) status := p.payloads[plen-2].(*api.PcapPostStatus) - assert.Equal(t, status.Type, "PcapPostStatus") - assert.Equal(t, status.PcapSize, info.Size()) - assert.Equal(t, status.PcapReadSize, info.Size()) + assert.Equal(t, "PcapPostStatus", status.Type) + assert.Equal(t, info.Size(), status.PcapSize) + assert.Equal(t, info.Size(), status.PcapReadSize) assert.Equal(t, 1, status.SnapshotCount) assert.Equal(t, nano.NewSpanTs(nano.Unix(1501770877, 471635000), nano.Unix(1501770880, 988247001)), *status.Span) }) @@ -197,17 +197,6 @@ func TestPcapPostZeekFailAfterWrite(t *testing.T) { last := p.payloads[len(p.payloads)-1] require.Equal(t, expected, last) }) - t.Run("EmptySpaceInfo", func(t *testing.T) { - info, err := p.client.SpaceInfo(context.Background(), p.space.ID) - assert.NoError(t, err) - expected := api.SpaceInfo{ - ID: p.space.ID, - Name: p.space.Name, - DataPath: p.space.DataPath, - StorageKind: storage.FileStore, - } - require.Equal(t, &expected, info) - }) } func launcherFromEnv(t *testing.T, key string) pcapanalyzer.Launcher { diff --git a/zqd/handlers_test.go b/zqd/handlers_test.go index e4618b52b2..fb9f8b0c55 100644 --- a/zqd/handlers_test.go +++ b/zqd/handlers_test.go @@ -688,15 +688,7 @@ func TestSpaceDataDir(t *testing.T) { func TestCreateArchiveSpace(t *testing.T) { thresh := int64(1000) - root := createTempDir(t) - - c, client := newCoreAtDir(t, root) - - c.Zeek = testLauncher(func(tzp *testPcapProcess) error { - const s = "unexpected attempt to run zeek" - t.Error(s) - return errors.New(s) - }, nil) + _, client := newCore(t) sp, err := client.SpacePost(context.Background(), api.SpacePostRequest{ Name: "arktest", @@ -733,11 +725,6 @@ func TestCreateArchiveSpace(t *testing.T) { ` res := searchTzng(t, client, sp.ID, "s=harefoot-raucous") require.Equal(t, test.Trim(exptzng), res) - - // Verify pcap post not supported - _, err = client.PcapPostStream(context.Background(), sp.ID, api.PcapPostRequest{"foo"}) - require.Error(t, err) - assert.Regexp(t, "space does not support pcap import", err.Error()) } func TestBlankNameSpace(t *testing.T) { diff --git a/zqd/ingest/archivepcap.go b/zqd/ingest/archivepcap.go new file mode 100644 index 0000000000..ba2d934c64 --- /dev/null +++ b/zqd/ingest/archivepcap.go @@ -0,0 +1,253 @@ +package ingest + +import ( + "context" + "io" + "io/ioutil" + "os" + "sync/atomic" + + "github.com/brimsec/zq/driver" + "github.com/brimsec/zq/pkg/ctxio" + "github.com/brimsec/zq/pkg/iosrc" + "github.com/brimsec/zq/pkg/nano" + "github.com/brimsec/zq/zbuf" + "github.com/brimsec/zq/zio" + "github.com/brimsec/zq/zio/ndjsonio" + "github.com/brimsec/zq/zng" + "github.com/brimsec/zq/zng/resolver" + "github.com/brimsec/zq/zqd/api" + "github.com/brimsec/zq/zqd/pcapanalyzer" + "github.com/brimsec/zq/zqd/pcapstorage" + "github.com/brimsec/zq/zqd/storage" + "golang.org/x/sync/errgroup" +) + +type archivePcapOp struct { + cleanupfns []func() + err error + done chan struct{} + pcapuri iosrc.URI + pcapstore *pcapstorage.Store + store storage.Storage + zeek, suricata pcapanalyzer.Launcher + zctx *resolver.Context + + // snap not used for archive store pcap ingest. Here for functional parity + // with legacyPcapOp. Can be removed once filestore has been deprecated + snap chan struct{} + + // stat fields + startTime nano.Ts + pcapBytesTotal int64 + pcapCounter *writeCounter + recordCounter *recordCounter +} + +func newArchivePcapOp(ctx context.Context, logstore storage.Storage, pcapstore *pcapstorage.Store, pcapuri iosrc.URI, suricata, zeek pcapanalyzer.Launcher) (PcapOp, []string, error) { + info, err := iosrc.Stat(ctx, pcapuri) + if err != nil { + return nil, nil, err + } + warn := make(chan string) + go func() { + err = pcapstore.Update(ctx, pcapuri, warn) + close(warn) + }() + var warnings []string + for w := range warn { + warnings = append(warnings, w) + } + if err != nil { + return nil, warnings, err + } + p := &archivePcapOp{ + startTime: nano.Now(), + pcapBytesTotal: info.Size(), + pcapstore: pcapstore, + store: logstore, + pcapuri: pcapuri, + pcapCounter: &writeCounter{}, + recordCounter: &recordCounter{}, + done: make(chan struct{}), + snap: make(chan struct{}), + suricata: suricata, + zeek: zeek, + zctx: resolver.NewContext(), + } + go func() { + p.err = p.run(ctx) + for _, fn := range p.cleanupfns { + fn() + } + close(p.done) + }() + return p, warnings, nil +} + +func (p *archivePcapOp) run(ctx context.Context) error { + group, ctx := errgroup.WithContext(ctx) + pcapfile, err := iosrc.NewReader(ctx, p.pcapuri) + if err != nil { + return err + } + defer pcapfile.Close() + // Keeps track of bytes read from pcapfile. + r := io.TeeReader(pcapfile, p.pcapCounter) + + group, zreaders, err := p.runAnalyzers(ctx, group, r) + if err != nil { + return err + } + combiner := zbuf.NewCombiner(zreaders, zbuf.RecordCompare(p.store.NativeDirection())) + defer combiner.Close() + // track stats on records produced from analyzers. + p.recordCounter.reader = combiner + + if err := p.store.Write(ctx, p.zctx, p.recordCounter); err != nil { + return err + } + return group.Wait() +} + +func (p *archivePcapOp) runAnalyzers(ctx context.Context, group *errgroup.Group, pcapstream io.Reader) (*errgroup.Group, []zbuf.Reader, error) { + var pipes []*io.PipeWriter + var zreaders []zbuf.Reader + if p.zeek != nil { + pw, dr, err := p.runAnalyzer(ctx, group, p.zeek) + if err != nil { + return nil, nil, err + } + pipes = append(pipes, pw) + zreaders = append(zreaders, dr) + } + if p.suricata != nil { + pw, dr, err := p.runAnalyzer(ctx, group, p.suricata) + if err != nil { + return nil, nil, err + } + pipes = append(pipes, pw) + // Suricata logs need flowgraph to rename timestamp fields into ts. + tr, err := driver.NewReader(ctx, suricataTransform, p.zctx, dr) + if err != nil { + return nil, nil, err + } + zreaders = append(zreaders, tr) + } + group.Go(func() error { + var writers []io.Writer + for _, p := range pipes { + writers = append(writers, p) + } + _, err := ctxio.Copy(ctx, io.MultiWriter(writers...), pcapstream) + // Once copy has completed, close pipe writers which will instruct the + // analyzer processes to exit. + for _, p := range pipes { + p.Close() + } + return err + }) + return group, zreaders, nil +} + +func (p *archivePcapOp) runAnalyzer(ctx context.Context, group *errgroup.Group, ln pcapanalyzer.Launcher) (*io.PipeWriter, zbuf.Reader, error) { + logdir, err := ioutil.TempDir("", "zqd-pcap-ingest-") + if err != nil { + return nil, nil, err + } + p.cleanup(func() { os.RemoveAll(logdir) }) + pr, pw := io.Pipe() + waiter, err := ln(ctx, pr, logdir) + if err != nil { + return nil, nil, err + } + dr, err := newLogTailer(p.zctx, logdir, zio.ReaderOpts{ + JSON: ndjsonio.ReaderOpts{TypeConfig: suricataTC}, + }) + if err != nil { + return nil, nil, err + } + group.Go(func() error { + err := waiter.Wait() + // Analyzer has either encountered an error or received an EOF from the + // pcap stream. Tell DirReader to stop tail files, which will in turn + // cause an EOF on zbuf.Read stream when remaining data has been read. + if errs := dr.Stop(); err == nil { + err = errs + } + return err + }) + return pw, dr, nil +} + +func (p *archivePcapOp) Status() api.PcapPostStatus { + return api.PcapPostStatus{ + Type: "PcapPostStatus", + StartTime: p.startTime, + UpdateTime: nano.Now(), + PcapSize: p.pcapBytesTotal, + PcapReadSize: p.pcapCounter.Bytes(), + RecordCount: p.recordCounter.Records(), + RecordBytes: p.recordCounter.Bytes(), + } +} + +type writeCounter struct { + writer io.Writer + count int64 +} + +func (w *writeCounter) Write(b []byte) (int, error) { + atomic.AddInt64(&w.count, int64(len(b))) + return len(b), nil +} + +func (w *writeCounter) Bytes() int64 { + return atomic.LoadInt64(&w.count) +} + +type recordCounter struct { + reader zbuf.Reader + bytesRead int64 + recordsRead int64 +} + +func (r *recordCounter) Read() (*zng.Record, error) { + rec, err := r.reader.Read() + if rec != nil { + atomic.AddInt64(&r.bytesRead, int64(len(rec.Raw))) + atomic.AddInt64(&r.recordsRead, 1) + } + return rec, err +} + +func (r *recordCounter) Bytes() int64 { + return atomic.LoadInt64(&r.bytesRead) +} + +func (r *recordCounter) Records() int64 { + return atomic.LoadInt64(&r.recordsRead) +} + +// Snap for archivePcapOp is functionally useless. It is only here to satisfy +// the PcapOp interface. This will go away once filestore is deprecated. +func (p *archivePcapOp) Snap() <-chan struct{} { + return p.snap +} + +// Err returns the an error if an error occurred while the ingest process was +// running. If the process is still running Err will wait for the process to +// complete before returning. +func (p *archivePcapOp) Err() error { + <-p.done + return p.err +} + +func (p *archivePcapOp) cleanup(fn func()) { + p.cleanupfns = append(p.cleanupfns, fn) +} + +// Done returns a chan that emits when the ingest process is complete. +func (p *archivePcapOp) Done() <-chan struct{} { + return p.done +} diff --git a/zqd/ingest/logtailer.go b/zqd/ingest/logtailer.go new file mode 100644 index 0000000000..a93ee5c8c5 --- /dev/null +++ b/zqd/ingest/logtailer.go @@ -0,0 +1,150 @@ +package ingest + +import ( + "path/filepath" + "sync" + + "github.com/brimsec/zq/pkg/fs" + "github.com/brimsec/zq/zio" + "github.com/brimsec/zq/zio/detector" + "github.com/brimsec/zq/zng" + "github.com/brimsec/zq/zng/resolver" +) + +type result struct { + rec *zng.Record + err error +} + +// logTailer is a zbuf.Reader that watches a specified directory and starts +// tailing existing and newly created files in the directory for new logs. Newly +// written log data are transformed into *zng.Records and returned on a +// first-come-first serve basis. +type logTailer struct { + opts zio.ReaderOpts + readers map[string]*fs.TFile + watcher *fs.DirWatcher + zctx *resolver.Context + + // synchronization primitives + results chan result + once sync.Once + wg sync.WaitGroup +} + +func newLogTailer(zctx *resolver.Context, dir string, opts zio.ReaderOpts) (*logTailer, error) { + dir = filepath.Clean(dir) + watcher, err := fs.NewDirWatcher(dir) + if err != nil { + return nil, err + } + r := &logTailer{ + opts: opts, + readers: make(map[string]*fs.TFile), + results: make(chan result, 5), + watcher: watcher, + zctx: zctx, + } + return r, nil +} + +func (d *logTailer) start() { + var err error + for { + ev, ok := <-d.watcher.Events + // Watcher closed. Enstruct all go routines to stop tailing files so + // they read remaining data then exit. + if !ok { + d.stopReaders(false) + break + } + if ev.Err != nil { + err = ev.Err + d.stopReaders(true) + break + } + if ev.Op.Exists() { + if terr := d.tailFile(ev.Name); terr != nil { + err = terr + d.stopReaders(true) + break + } + } + } + // Wait for all tail go routines to stop. We are about to close the results + // channel and do not want a write to closed channel panic. + d.wg.Wait() + // signfy EOS and close channel + d.results <- result{err: err} + close(d.results) +} + +// stopReaders instructs all open TFile to stop tailing their respective files. +// If close is set to false, the readers will read through the remaining data +// in their files before emitting EOF. If close is set to true, the file +// descriptors will be closed and no further data will be read. +func (d *logTailer) stopReaders(close bool) { + for _, r := range d.readers { + if close { + r.Close() + } + r.Stop() + } +} + +func (d *logTailer) tailFile(file string) error { + if _, ok := d.readers[file]; ok { + return nil + } + f, err := fs.TailFile(file) + if err == fs.ErrIsDir { + return nil + } + if err != nil { + return err + } + d.readers[file] = f + d.wg.Add(1) + go func() { + zr, err := detector.OpenFromNamedReadCloser(d.zctx, f, file, d.opts) + if err != nil { + d.results <- result{err: err} + return + } + var res result + for { + res.rec, res.err = zr.Read() + if res.rec != nil || res.err != nil { + d.results <- res + } + if res.rec == nil || res.err != nil { + d.wg.Done() + return + } + } + }() + return nil +} + +func (d *logTailer) Read() (*zng.Record, error) { + d.once.Do(func() { go d.start() }) + res, ok := <-d.results + if !ok { + // already closed return EOS + return nil, nil + } + if res.err != nil { + d.watcher.Stop() // exits loop + // drain results + for range d.results { + } + } + return res.rec, res.err +} + +// Stop instructs the directory watcher and indiviual file watchers to stop +// watching for changes. Read() will emit EOS when the remaining unread data +// in files has been read. +func (d *logTailer) Stop() error { + return d.watcher.Stop() +} diff --git a/zqd/ingest/logtailer_test.go b/zqd/ingest/logtailer_test.go new file mode 100644 index 0000000000..2fd000014c --- /dev/null +++ b/zqd/ingest/logtailer_test.go @@ -0,0 +1,155 @@ +package ingest + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/brimsec/zq/driver" + "github.com/brimsec/zq/zio" + "github.com/brimsec/zq/zio/tzngio" + "github.com/brimsec/zq/zng/resolver" + "github.com/brimsec/zq/zql" + "github.com/stretchr/testify/suite" +) + +var sortTs = zql.MustParseProc("sort ts") + +const expected = `#0:record[ts:time] +0:[0;] +0:[1;] +0:[2;] +0:[3;] +0:[4;] +0:[5;] +0:[6;] +0:[7;] +0:[8;] +0:[9;] +0:[10;] +0:[11;] +0:[12;] +0:[13;] +0:[14;] +0:[15;] +0:[16;] +0:[17;] +0:[18;] +0:[19;] +` + +type logTailerTSuite struct { + suite.Suite + dir string + zctx *resolver.Context + dr *logTailer +} + +func TestLogTailer(t *testing.T) { + suite.Run(t, new(logTailerTSuite)) +} + +func (s *logTailerTSuite) SetupTest() { + dir, err := ioutil.TempDir("", "TestLogTailer") + s.Require().NoError(err) + s.dir = dir + s.T().Cleanup(func() { os.RemoveAll(s.dir) }) + s.zctx = resolver.NewContext() + s.dr, err = newLogTailer(s.zctx, s.dir, zio.ReaderOpts{Format: "tzng"}) + s.Require().NoError(err) +} + +func (s *logTailerTSuite) TestCreatedFiles() { + result, errCh := s.read() + f1 := s.createFile("test1.tzng") + f2 := s.createFile("test2.tzng") + s.write(f1, f2) + s.Require().NoError(<-errCh) + s.Equal(expected, <-result) +} + +func (s *logTailerTSuite) TestIgnoreDir() { + result, errCh := s.read() + f1 := s.createFile("test1.tzng") + f2 := s.createFile("test2.tzng") + err := os.Mkdir(filepath.Join(s.dir, "testdir"), 0755) + s.Require().NoError(err) + s.write(f1, f2) + s.Require().NoError(<-errCh) + s.Equal(expected, <-result) +} + +func (s *logTailerTSuite) TestExistingFiles() { + f1 := s.createFile("test1.tzng") + f2 := s.createFile("test2.tzng") + result, errCh := s.read() + s.write(f1, f2) + s.Require().NoError(<-errCh) + s.Equal(expected, <-result) +} + +func (s *logTailerTSuite) TestInvalidFile() { + _, errCh := s.read() + f1 := s.createFile("test1.tzng") + _, err := f1.WriteString("#0:record[ts:time]\n") + s.Require().NoError(err) + _, err = f1.WriteString("this is an invalid line\n") + s.Require().NoError(err) + s.EqualError(<-errCh, "line 2: bad format") + s.NoError(s.dr.Stop()) +} + +func (s *logTailerTSuite) TestEmptyFile() { + result, errCh := s.read() + f1 := s.createFile("test1.tzng") + _ = s.createFile("test2.tzng") + s.write(f1) + s.Require().NoError(<-errCh) + s.Equal(expected, <-result) +} + +func (s *logTailerTSuite) createFile(name string) *os.File { + f, err := os.Create(filepath.Join(s.dir, name)) + s.Require().NoError(err) + // Call sync to ensure fs events are sent in a timely matter. + err = f.Sync() + s.Require().NoError(err) + return f +} + +func (s *logTailerTSuite) read() (<-chan string, <-chan error) { + result := make(chan string) + errCh := make(chan error) + buf := bytes.NewBuffer(nil) + w := tzngio.NewWriter(zio.NopCloser(buf)) + go func() { + err := driver.Copy(context.Background(), w, sortTs, s.zctx, s.dr, driver.Config{}) + if err != nil { + close(result) + errCh <- err + } else { + close(errCh) + result <- buf.String() + } + }() + return result, errCh +} + +func (s *logTailerTSuite) write(files ...*os.File) { + for _, f := range files { + _, err := f.WriteString("#0:record[ts:time]\n") + s.Require().NoError(err) + } + for i := 0; i < 20; { + for _, f := range files { + _, err := f.WriteString(fmt.Sprintf("0:[%d;]\n", i)) + s.Require().NoError(err) + i++ + } + } + s.Require().NoError(s.dr.Stop()) +} diff --git a/zqd/ingest/pcap.go b/zqd/ingest/pcap.go index 75514115c8..17784dd54c 100644 --- a/zqd/ingest/pcap.go +++ b/zqd/ingest/pcap.go @@ -22,44 +22,67 @@ import ( "github.com/brimsec/zq/zio/ndjsonio" "github.com/brimsec/zq/zio/zngio" "github.com/brimsec/zq/zng/resolver" + "github.com/brimsec/zq/zqd/api" "github.com/brimsec/zq/zqd/pcapanalyzer" "github.com/brimsec/zq/zqd/pcapstorage" + "github.com/brimsec/zq/zqd/space" "github.com/brimsec/zq/zqd/storage" "github.com/brimsec/zq/zql" ) //go:generate go run ../../zio/ndjsonio/typegenerator -o ./suricata.go -package ingest -var suricataTC ./suricata-types.json -type ClearableStore interface { - storage.Storage - Clear(ctx context.Context) error +type PcapOp interface { + Status() api.PcapPostStatus + // Err returns the an error if an error occurred while the ingest process was + // running. If the process is still running Err will wait for the process to + // complete before returning. + Err() error + Done() <-chan struct{} + Snap() <-chan struct{} } -type PcapOp struct { - StartTime nano.Ts - PcapSize int64 +var suricataTransform = zql.MustParseProc("rename ts=timestamp") - pcapstore *pcapstorage.Store - store ClearableStore - snapshots int32 - pcapuri iosrc.URI - pcapReadSize int64 - logdir string - done, snap chan struct{} - err error - slauncher, zlauncher pcapanalyzer.Launcher +type ClearableStore interface { + storage.Storage + Clear(ctx context.Context) error } // NewPcapOp kicks of the process for ingesting a pcap file into a space. -// Should everything start out successfully, this will return a thread safe -// Process instance once zeek log files have started to materialize in a tmp -// directory. If zeekExec is an empty string, this will attempt to resolve zeek -// from $PATH. -func NewPcapOp(ctx context.Context, pcapstore *pcapstorage.Store, store ClearableStore, pcap string, slauncher, zlauncher pcapanalyzer.Launcher) (*PcapOp, []string, error) { +func NewPcapOp(ctx context.Context, space space.Space, pcap string, suricata, zeek pcapanalyzer.Launcher) (PcapOp, []string, error) { pcapuri, err := iosrc.ParseURI(pcap) if err != nil { return nil, nil, err } + if suricata == nil && zeek == nil { + return nil, nil, fmt.Errorf("must provide at least one launcher") + } + logstore, ok := space.Storage().(ClearableStore) + if ok { + return newFilePcapOp(ctx, space.PcapStore(), logstore, pcapuri, suricata, zeek) + } + return newArchivePcapOp(ctx, space.Storage(), space.PcapStore(), pcapuri, suricata, zeek) +} + +type legacyPcapOp struct { + pcapstore *pcapstorage.Store + store ClearableStore + snapshots int32 + pcapuri iosrc.URI + logdir string + done, snap chan struct{} + err error + + slauncher, zlauncher pcapanalyzer.Launcher + + // stats + startTime nano.Ts + pcapSize int64 + pcapReadSize int64 +} + +func newFilePcapOp(ctx context.Context, pcapstore *pcapstorage.Store, store ClearableStore, pcapuri iosrc.URI, slauncher, zlauncher pcapanalyzer.Launcher) (*legacyPcapOp, []string, error) { if slauncher == nil && zlauncher == nil { return nil, nil, fmt.Errorf("must provide at least one launcher") } @@ -83,9 +106,9 @@ func NewPcapOp(ctx context.Context, pcapstore *pcapstorage.Store, store Clearabl if err != nil { return nil, warnings, err } - p := &PcapOp{ - StartTime: nano.Now(), - PcapSize: info.Size(), + p := &legacyPcapOp{ + startTime: nano.Now(), + pcapSize: info.Size(), pcapstore: pcapstore, store: store, pcapuri: pcapuri, @@ -103,7 +126,7 @@ func NewPcapOp(ctx context.Context, pcapstore *pcapstorage.Store, store Clearabl return p, warnings, nil } -func (p *PcapOp) run(ctx context.Context) error { +func (p *legacyPcapOp) run(ctx context.Context) error { var sErr, zErr error var wg sync.WaitGroup slurpDone := make(chan struct{}) @@ -177,7 +200,7 @@ outer: return nil } -func (p *PcapOp) runZeek(ctx context.Context) error { +func (p *legacyPcapOp) runZeek(ctx context.Context) error { pcapfile, err := iosrc.NewReader(ctx, p.pcapuri) if err != nil { return err @@ -191,7 +214,7 @@ func (p *PcapOp) runZeek(ctx context.Context) error { return zproc.Wait() } -func (p *PcapOp) runSuricata(ctx context.Context) error { +func (p *legacyPcapOp) runSuricata(ctx context.Context) error { pcapfile, err := iosrc.NewReader(ctx, p.pcapuri) if err != nil { return err @@ -207,36 +230,41 @@ func (p *PcapOp) runSuricata(ctx context.Context) error { return p.convertSuricataLog(ctx) } -// PcapReadSize returns the total size in bytes of data read from the underlying -// pcap file. -func (p *PcapOp) PcapReadSize() int64 { - return atomic.LoadInt64(&p.pcapReadSize) +func (p *legacyPcapOp) Status() api.PcapPostStatus { + return api.PcapPostStatus{ + Type: "PcapPostStatus", + StartTime: p.startTime, + UpdateTime: nano.Now(), + PcapSize: p.pcapSize, + PcapReadSize: atomic.LoadInt64(&p.pcapReadSize), + SnapshotCount: int(atomic.LoadInt32(&p.snapshots)), + } } // Err returns the an error if an error occurred while the ingest process was // running. If the process is still running Err will wait for the process to // complete before returning. -func (p *PcapOp) Err() error { +func (p *legacyPcapOp) Err() error { <-p.done return p.err } // Done returns a chan that emits when the ingest process is complete. -func (p *PcapOp) Done() <-chan struct{} { +func (p *legacyPcapOp) Done() <-chan struct{} { return p.done } -func (p *PcapOp) SnapshotCount() int { +func (p *legacyPcapOp) SnapshotCount() int { return int(atomic.LoadInt32(&p.snapshots)) } // Snap returns a chan that emits every time a snapshot is made. It // should no longer be read from after Done() has emitted. -func (p *PcapOp) Snap() <-chan struct{} { +func (p *legacyPcapOp) Snap() <-chan struct{} { return p.snap } -func (p *PcapOp) zeekFiles() []string { +func (p *legacyPcapOp) zeekFiles() []string { files, err := filepath.Glob(filepath.Join(p.logdir, "*.log")) // Per filepath.Glob documentation the only possible error would be due to // an invalid glob pattern. Ok to panic. @@ -246,7 +274,7 @@ func (p *PcapOp) zeekFiles() []string { return files } -func (p *PcapOp) suricataFiles() []string { +func (p *legacyPcapOp) suricataFiles() []string { path := filepath.Join(p.logdir, "eve.zng") if _, err := os.Stat(path); err != nil { return nil @@ -254,7 +282,7 @@ func (p *PcapOp) suricataFiles() []string { return []string{path} } -func (p *PcapOp) createSnapshot(ctx context.Context) error { +func (p *legacyPcapOp) createSnapshot(ctx context.Context) error { files := append(p.zeekFiles(), p.suricataFiles()...) if len(files) == 0 { return nil @@ -273,9 +301,7 @@ func (p *PcapOp) createSnapshot(ctx context.Context) error { return nil } -func (p *PcapOp) convertSuricataLog(ctx context.Context) error { - var eveProc = zql.MustParseProc("rename ts=timestamp") - +func (p *legacyPcapOp) convertSuricataLog(ctx context.Context) error { zctx := resolver.NewContext() path := filepath.Join(p.logdir, "eve.json") zr, err := detector.OpenFile(zctx, path, zio.ReaderOpts{JSON: ndjsonio.ReaderOpts{TypeConfig: suricataTC}}) @@ -285,11 +311,11 @@ func (p *PcapOp) convertSuricataLog(ctx context.Context) error { defer zr.Close() return fs.ReplaceFile(filepath.Join(p.logdir, "eve.zng"), os.FileMode(0666), func(w io.Writer) error { zw := zngio.NewWriter(zio.NopCloser(w), zngio.WriterOpts{}) - return driver.Copy(ctx, zw, eveProc, zctx, zr, driver.Config{}) + return driver.Copy(ctx, zw, suricataTransform, zctx, zr, driver.Config{}) }) } -func (p *PcapOp) Write(b []byte) (int, error) { +func (p *legacyPcapOp) Write(b []byte) (int, error) { n := len(b) atomic.AddInt64(&p.pcapReadSize, int64(n)) return n, nil diff --git a/zqd/space/filespace.go b/zqd/space/filespace.go index 8e31fb190a..b852831166 100644 --- a/zqd/space/filespace.go +++ b/zqd/space/filespace.go @@ -8,7 +8,6 @@ import ( "github.com/brimsec/zq/pkg/iosrc" "github.com/brimsec/zq/zqd/api" - "github.com/brimsec/zq/zqd/pcapstorage" "github.com/brimsec/zq/zqe" ) @@ -30,10 +29,6 @@ func (s *fileSpace) Info(ctx context.Context) (api.SpaceInfo, error) { return si, nil } -func (s *fileSpace) PcapStore() *pcapstorage.Store { - return s.pcapstore -} - func (s *fileSpace) dataURI() iosrc.URI { du := s.conf.DataURI if du.IsZero() { diff --git a/zqd/space/space.go b/zqd/space/space.go index eebb4e5a86..fba3a4474d 100644 --- a/zqd/space/space.go +++ b/zqd/space/space.go @@ -32,6 +32,7 @@ type Space interface { ID() api.SpaceID Name() string Storage() storage.Storage + PcapStore() *pcapstorage.Store Info(context.Context) (api.SpaceInfo, error) // StartOp is called to register an operation is in progress; the @@ -45,14 +46,6 @@ type Space interface { update(api.SpacePutRequest) error } -// PcapSpace denotes that a space is capable of storing pcap files and -// indexes. -// XXX Temporary. The should be removed once archive spaces are enabled to allow -// pcap ingest. -type PcapSpace interface { - PcapStore() *pcapstorage.Store -} - func newSpaceID() api.SpaceID { id := ksuid.New() return api.SpaceID(fmt.Sprintf("sp_%s", id.String())) @@ -123,16 +116,16 @@ func loadSpaces(ctx context.Context, p iosrc.URI, conf config, logger *zap.Logge } id := api.SpaceID(path.Base(p.Path)) logger = logger.With(zap.String("space_id", string(id))) + pcapstore, err := loadPcapStore(ctx, datapath) + if err != nil { + return nil, err + } switch conf.Storage.Kind { case storage.FileStore: store, err := filestore.Load(datapath) if err != nil { return nil, err } - pcapstore, err := loadPcapStore(ctx, datapath) - if err != nil { - return nil, err - } s := &fileSpace{ spaceBase: spaceBase{id, store, pcapstore, newGuard(), logger}, path: p, @@ -146,7 +139,7 @@ func loadSpaces(ctx context.Context, p iosrc.URI, conf config, logger *zap.Logge return nil, err } parent := &archiveSpace{ - spaceBase: spaceBase{id, store, nil, newGuard(), logger}, + spaceBase: spaceBase{id, store, pcapstore, newGuard(), logger}, path: p, conf: conf, } @@ -159,7 +152,7 @@ func loadSpaces(ctx context.Context, p iosrc.URI, conf config, logger *zap.Logge return nil, err } sub := &archiveSubspace{ - spaceBase: spaceBase{subcfg.ID, substore, nil, newGuard(), logger}, + spaceBase: spaceBase{subcfg.ID, substore, pcapstore, newGuard(), logger}, parent: parent, } ret = append(ret, sub) @@ -199,6 +192,10 @@ func (s *spaceBase) Storage() storage.Storage { return s.store } +func (s *spaceBase) PcapStore() *pcapstorage.Store { + return s.pcapstore +} + func (s *spaceBase) Info(ctx context.Context) (api.SpaceInfo, error) { sum, err := s.store.Summary(ctx) if err != nil { @@ -214,7 +211,7 @@ func (s *spaceBase) Info(ctx context.Context) (api.SpaceInfo, error) { StorageKind: sum.Kind, Size: sum.DataBytes, } - if s.pcapstore != nil && !s.pcapstore.Empty() { + if !s.pcapstore.Empty() { pcapinfo, err := s.pcapstore.Info(ctx) if err != nil { if !zqe.IsNotFound(err) { diff --git a/ztests/suite/zqd/archivestore/pcappost-suricata.yaml b/ztests/suite/zqd/archivestore/pcappost-suricata.yaml new file mode 100644 index 0000000000..20e84236fc --- /dev/null +++ b/ztests/suite/zqd/archivestore/pcappost-suricata.yaml @@ -0,0 +1,26 @@ +script: | + source services.sh + zapi -h $ZQD_HOST new -k archivestore testsp + zapi -h $ZQD_HOST -s testsp pcappost alerts.pcap >/dev/null + echo === + zapi -h $ZQD_HOST -s testsp get -f tzng "event_type = alert | every 1s count()" + echo === + zapi -h $ZQD_HOST -s testsp get -f tzng "_path != null | count()" + +inputs: + - name: alerts.pcap + source: ../alerts.pcap + - name: services.sh + source: ../services.sh + +outputs: + - name: stdout + data: | + testsp: space created + === + #0:record[ts:time,count:uint64] + 0:[1425568033;13;] + 0:[1425567868;2;] + === + #0:record[count:uint64] + 0:[379;] diff --git a/ztests/suite/zqd/archivestore/pcappost.yaml b/ztests/suite/zqd/archivestore/pcappost.yaml new file mode 100644 index 0000000000..f940dc5df8 --- /dev/null +++ b/ztests/suite/zqd/archivestore/pcappost.yaml @@ -0,0 +1,34 @@ +script: | + source services.sh + zapi -h $ZQD_HOST new -k archivestore testsp + zapi -h $ZQD_HOST -s testsp pcappost ng.pcap >/dev/null + echo === + zapi -h $ZQD_HOST -s testsp get -f tzng "_path != stats | cut -c uid | sort -r ts, _path" + echo === + zapi -h $ZQD_HOST -s testsp get -f tzng "count()" + +inputs: + - name: ng.pcap + source: ../../pcap/ng.pcap + - name: services.sh + source: ../services.sh + +outputs: + - name: stdout + data: | + testsp: space created + === + #0:record[_path:string,ts:time,ts_delta:duration,peer:bstring,gaps:uint64,acks:uint64,percent_lost:float64] + 0:[capture_loss;1425568893.736974;0.001192;zeek;0;0;0;] + #port=uint16 + #zenum=string + #1:record[_path:string,ts:time,id:record[orig_h:ip,orig_p:port,resp_h:ip,resp_p:port],proto:zenum,service:bstring,duration:duration,orig_bytes:uint64,resp_bytes:uint64,conn_state:bstring,local_orig:bool,local_resp:bool,missed_bytes:uint64,history:bstring,orig_pkts:uint64,orig_ip_bytes:uint64,resp_pkts:uint64,resp_ip_bytes:uint64,tunnel_parents:set[bstring],geo:record[orig:record[country_code:bstring,region:bstring,city:bstring,latitude:float64,longitude:float64],resp:record[country_code:bstring,region:bstring,city:bstring,latitude:float64,longitude:float64]],community_id:bstring] + 1:[conn;1425568893.735782;[192.168.0.2;34446;130.236.100.79;80;]tcp;-;0.001192;0;2776;OTH;-;-;0;Ad;1;52;2;2880;-;[[-;-;-;-;-;][SE;E;Linköping;58.4167;15.6167;]]1:tOxXSQENCvoWlZaDPdf9YgwpshQ=;] + 0:[capture_loss;1425568893.735782;1460.943301;zeek;0;0;0;] + 1:[conn;1425567432.792481;[192.168.0.51;50858;192.168.0.1;80;]tcp;-;0.00074;338;0;OTH;-;-;0;ADa;2;418;1;40;-;[[-;-;-;-;-;][-;-;-;-;-;]]1:YrhsMQd8siqxEOoc6A5E/gKZDgY=;] + 1:[conn;1425567047.804914;[192.168.0.51;33773;80.239.174.91;443;]tcp;-;-;-;-;OTH;-;-;0;^d;0;0;1;1440;-;[[-;-;-;-;-;][-;-;-;47;8;]]1:wPP6HnEPl0F9QDVCQ+E0du2PUi8=;] + 1:[conn;1425567047.804906;[192.168.0.51;33773;80.239.174.91;443;]tcp;-;-;-;-;OTH;-;-;0;^d;0;0;1;1440;-;[[-;-;-;-;-;][-;-;-;47;8;]]1:wPP6HnEPl0F9QDVCQ+E0du2PUi8=;] + 1:[conn;1425567047.803929;[192.168.0.51;33773;80.239.174.91;443;]tcp;-;-;-;-;OTH;-;-;0;^d;0;0;1;1440;-;[[-;-;-;-;-;][-;-;-;47;8;]]1:wPP6HnEPl0F9QDVCQ+E0du2PUi8=;] + === + #0:record[count:uint64] + 0:[10;]