Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zqd: Support pcap ingest for archive stores #1450

Merged
merged 10 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/brimsec/zq/ast"
"github.com/brimsec/zq/zbuf"
"github.com/brimsec/zq/zng"
"github.com/brimsec/zq/zng/resolver"
"github.com/brimsec/zq/zqd/api"
)
Expand Down Expand Up @@ -147,3 +148,57 @@ func Copy(ctx context.Context, w zbuf.Writer, prog ast.Proc, zctx *resolver.Cont
d := &transformDriver{w: w}
return Run(ctx, d, prog, zctx, r, cfg)
}

type muxReader struct {
*muxOutput
batch zbuf.Batch
cancel context.CancelFunc
index int
statsTickCh <-chan time.Time
}

func (mr *muxReader) Read() (*zng.Record, error) {
read:
if mr.batch == nil {
chunk := mr.Pull(mr.statsTickCh)
if chunk.ID != 0 {
return nil, errors.New("transform proc with multiple tails")
}
if chunk.Batch != nil {
mr.batch = chunk.Batch
} else if chunk.Err != nil {
return nil, chunk.Err
} else if chunk.Warning != "" {
goto read
} else {
return nil, nil
}
}
if mr.index >= mr.batch.Length() {
mr.batch.Unref()
mr.batch, mr.index = nil, 0
goto read
}
rec := mr.batch.Index(mr.index)
mr.index++
return rec, nil
}

func (mr *muxReader) Close() error {
mr.cancel()
return nil
}

func NewReader(ctx context.Context, program ast.Proc, zctx *resolver.Context, reader zbuf.Reader) (zbuf.ReadCloser, error) {
ctx, cancel := context.WithCancel(ctx)
mux, err := compileSingle(ctx, program, zctx, reader, Config{})
if err != nil {
cancel()
return nil, err
}
return &muxReader{
cancel: cancel,
muxOutput: mux,
statsTickCh: make(chan time.Time),
}, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/aws/aws-sdk-go v1.30.19
github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f
github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e
github.com/fsnotify/fsnotify v1.4.9
github.com/go-resty/resty/v2 v2.2.0
github.com/golang/mock v1.4.3
github.com/google/gopacket v1.1.17
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fp
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down
213 changes: 213 additions & 0 deletions pkg/fs/tail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package fs

import (
"errors"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/fsnotify/fsnotify"
)

var ErrIsDir = errors.New("path is a directory")

type TFile struct {
f *os.File
watcher *fsnotify.Watcher
}

func TailFile(name string) (*TFile, error) {
info, err := os.Stat(name)
if err != nil {
return nil, err
}
if info.IsDir() {
return nil, ErrIsDir
}
f, err := OpenFile(name, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
f.Close()
return nil, err
}
if err := watcher.Add(name); err != nil {
f.Close()
watcher.Close()
return nil, err
mattnibs marked this conversation as resolved.
Show resolved Hide resolved
}
return &TFile{f, watcher}, nil
}

func (t *TFile) Read(b []byte) (int, error) {
read:
n, err := t.f.Read(b)
if err == io.EOF {
if n > 0 {
return n, nil
}
if err := t.waitWrite(); err != nil {
return 0, err
}
goto read
}
if errors.Is(err, os.ErrClosed) {
err = io.EOF
}
return n, err
}

func (t *TFile) waitWrite() error {
for {
select {
case ev, ok := <-t.watcher.Events:
if !ok {
return io.EOF
}
if ev.Op == fsnotify.Write {
return nil
}
case err := <-t.watcher.Errors:
return err
}
}
}

func (t *TFile) Stop() error {
return t.watcher.Close()
}

func (t *TFile) Close() error {
return t.f.Close()
}

type FileOp int

const (
FileOpCreated FileOp = iota
FileOpExisting
FileOpRemoved
)

func (o FileOp) Exists() bool {
return o == FileOpCreated || o == FileOpExisting
}

type FileEvent struct {
Name string
Op FileOp
Err error
}

// DirWatcher observes a directory and will emit events when files are added
// or removed. When open for the first time this will emit an event for
// every existing file.
type DirWatcher struct {
Events chan FileEvent

dir string
watched map[string]struct{}
watcher *fsnotify.Watcher
}

func NewDirWatcher(dir string) (*DirWatcher, error) {
info, err := os.Stat(dir)
if err != nil {
return nil, err
}
if !info.IsDir() {
return nil, errors.New("provided path must be a directory")
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
w := &DirWatcher{
mattnibs marked this conversation as resolved.
Show resolved Hide resolved
Events: make(chan FileEvent),
dir: dir,
watched: make(map[string]struct{}),
watcher: watcher,
}
if err := w.watcher.Add(w.dir); err != nil {
return nil, err
}
go func() {
err := w.run()
if errc := w.watcher.Close(); err == nil {
err = errc
}
if err != nil {
w.Events <- FileEvent{Err: err}
}
close(w.Events)
}()
return w, nil
}

func (w *DirWatcher) run() error {
if err := w.poll(); err != nil {
return err
}
for ev := range w.watcher.Events {
switch {
case ev.Op&fsnotify.Create == fsnotify.Create:
if err := w.addFile(ev.Name); err != nil {
return err
}
case ev.Op&fsnotify.Rename == fsnotify.Rename, ev.Op&fsnotify.Remove == fsnotify.Remove:
if err := w.removeFile(ev.Name); err != nil {
return err
}
}
}
// watcher has been closed, poll once more to make sure we haven't missed
// any files due to race.
return w.poll()
}

func (w *DirWatcher) addFile(name string) error {
p, err := filepath.Abs(name)
if err != nil {
return err
}
if _, ok := w.watched[p]; !ok {
w.watched[p] = struct{}{}
w.Events <- FileEvent{Name: p, Op: FileOpCreated}
}
return nil
}

func (w *DirWatcher) removeFile(name string) error {
p, err := filepath.Abs(name)
if err != nil {
return err
}
if _, ok := w.watched[p]; ok {
delete(w.watched, p)
w.Events <- FileEvent{Name: p, Op: FileOpRemoved}
}
return nil
}

func (w *DirWatcher) poll() error {
infos, err := ioutil.ReadDir(w.dir)
if err != nil {
return err
}
for _, info := range infos {
if info.IsDir() {
continue
}
if err := w.addFile(filepath.Join(w.dir, info.Name())); err != nil {
return err
}
}
return nil
}

func (w *DirWatcher) Stop() error {
return w.watcher.Close()
}
65 changes: 65 additions & 0 deletions pkg/fs/tail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package fs

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTailFile(t *testing.T) {
f, err := ioutil.TempFile("", "tailfile.log")
require.NoError(t, err)
t.Cleanup(func() { os.Remove(f.Name()) })
tf, err := TailFile(f.Name())
require.NoError(t, err)
buf := make([]byte, 100)

for i := 0; i < 10; i++ {
str := fmt.Sprintf("line #%d\n", i)
_, err := f.WriteString(str)
require.NoError(t, err)
n, err := tf.Read(buf)
require.NoError(t, err)
assert.Equal(t, str, string(buf[:n]))
}
go require.NoError(t, tf.Stop())
n, err := tf.Read(buf)
assert.Equal(t, 0, n)
assert.Error(t, io.EOF, err)
}

func TestTailFileReadToEOF(t *testing.T) {
expected := `line #0
line #1
line #2
line #3
line #4
line #5
line #6
line #7
line #8
line #9
`
f, err := ioutil.TempFile("", "tailfile.log")
require.NoError(t, err)
defer os.Remove(f.Name())
tf, err := TailFile(f.Name())
require.NoError(t, err)

for i := 0; i < 10; i++ {
str := fmt.Sprintf("line #%d\n", i)
_, err := f.WriteString(str)
require.NoError(t, err)
}
require.NoError(t, tf.Stop())
buf := bytes.NewBuffer(nil)
_, err = io.Copy(buf, tf)
require.NoError(t, err)
assert.Equal(t, expected, buf.String())
}
4 changes: 3 additions & 1 deletion zqd/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ type PcapPostStatus struct {
UpdateTime nano.Ts `json:"update_time"`
PcapSize int64 `json:"pcap_total_size" unit:"bytes"`
PcapReadSize int64 `json:"pcap_read_size" unit:"bytes"`
SnapshotCount int `json:"snapshot_count"`
RecordBytes int64 `json:"record_bytes,omitempty" unit:"bytes"`
RecordCount int64 `json:"record_count,omitempty"`
SnapshotCount int `json:"snapshot_count,omitempty"`
Span *nano.Span `json:"span,omitempty"`
}

Expand Down
Loading