diff --git a/pkg/model/process_mask.go b/pkg/model/process_mask.go index bbd135c9..fd073293 100644 --- a/pkg/model/process_mask.go +++ b/pkg/model/process_mask.go @@ -48,7 +48,6 @@ func (mep *MaskEngineProcess) ProcessDictionary(dictionary Dictionary, out Colle over.AddGlobalFields("path") over.MDC().Set("path", mep.selector) defer func() { over.MDC().Remove("path") }() - result := dictionary applied := mep.selector.Apply(result, func(rootContext, parentContext Dictionary, key string, value Entry) (Action, Entry) { switch { diff --git a/pkg/parquet/parquet.go b/pkg/parquet/parquet.go index 96c9515a..0b75c07e 100755 --- a/pkg/parquet/parquet.go +++ b/pkg/parquet/parquet.go @@ -32,33 +32,35 @@ const BufferSize = 4 // NewSource creates a new Source. func NewSource(path string) model.Source { - return &Source{path, nil, make([]parquet.Row, BufferSize), 0, model.NewDictionary(), nil, false, 0} + return &Source{path, nil, make([]parquet.Row, BufferSize), nil, 0, 0, false, model.NewDictionary(), nil, false, 0} } // NewPackedSource creates a new packed Source. func NewPackedSource(path string) model.Source { log.Trace().Msg("NewPackedSource") - return &Source{path, nil, make([]parquet.Row, BufferSize), 0, model.NewDictionary(), nil, true, 0} + return &Source{path, nil, make([]parquet.Row, BufferSize), nil, 0, 0, false, model.NewDictionary(), nil, true, 0} } // Source export line to JSON format. type Source struct { - path string - file *parquet.File - buffer []parquet.Row - groupIndex int - value model.Dictionary - err error - packed bool - size int + path string + file *parquet.File + buffer []parquet.Row + rows parquet.Rows + groupIndex int + bufferIndex int + endOfFile bool + value model.Dictionary + err error + packed bool + size int } func (s *Source) Open() error { log.Trace().Msg("open parquet file") f, err := os.Open(s.path) - print(err) if err != nil { return err } @@ -69,27 +71,47 @@ func (s *Source) Open() error { } s.file, err = parquet.OpenFile(f, stat.Size()) - print(s.file.Schema().String()) - if err != nil { return err } s.groupIndex = 0 - log.Printf("%v", s.file.RowGroups()) + s.readGroup() + + err = s.readRows() + + if err != nil { + return err + } + + return nil +} + +func (s *Source) readGroup() { + s.rows = s.file.RowGroups()[s.groupIndex].Rows() +} - s.size, err = s.file.RowGroups()[0].Rows().ReadRows(s.buffer) +func (s *Source) readRows() error { + log.Trace().Int("groupIndex", s.groupIndex).Msg("read rows") - if err != nil && errors.Is(err, io.EOF) { + size, err := s.rows.ReadRows(s.buffer) + s.size = size + s.bufferIndex = 0 + + if errors.Is(err, io.EOF) { + s.endOfFile = true + } else if err != nil { return err } return nil } -func (s *Source) readColumn() { +func (s *Source) readColumns() { + log.Trace().Int("bufferIndex", s.bufferIndex).Msg("read columns") + dict := model.NewDictionary() - s.buffer[s.groupIndex].Range(func(columnIndex int, columnValues []parquet.Value) bool { + s.buffer[s.bufferIndex].Range(func(columnIndex int, columnValues []parquet.Value) bool { column := s.file.Schema().Columns()[columnIndex][0] switch columnValues[0].Kind() { case parquet.ByteArray: @@ -108,33 +130,31 @@ func (s *Source) readColumn() { dict = dict.With(column, columnValues[0].Double()) } s.value = model.CleanDictionary(dict) - s.groupIndex++ return true }) + s.bufferIndex++ } // Next convert next line to model.Dictionary func (s *Source) Next() bool { - // read next buffer - if s.groupIndex < BufferSize { - s.readColumn() - return true + if s.err != nil { + return false } - return false - for _, rowGroup := range s.file.RowGroups() { - rows := rowGroup.Rows() - _, err := rows.ReadRows(s.buffer) - if err != nil { - s.err = err + // read next buffer + if s.bufferIndex >= s.size && !s.endOfFile { + s.err = s.readRows() + if s.err != nil { return false } } - s.buffer[0].Range(func(columnIndex int, columnValues []parquet.Value) bool { - print(columnValues[0].GoString()) + + if s.bufferIndex < s.size { + s.readColumns() return true - }) - return false // s.err == nil + } + + return false } func (s *Source) Value() model.Entry { diff --git a/pkg/parquet/parquet_test.go b/pkg/parquet/parquet_test.go index 1c671de7..838761d8 100755 --- a/pkg/parquet/parquet_test.go +++ b/pkg/parquet/parquet_test.go @@ -30,7 +30,7 @@ func TestSourceReturnDictionary(t *testing.T) { err := pipeline.AddSink(model.NewSinkToSlice(&result)).Run() assert.Nil(t, err) - assert.GreaterOrEqual(t, 1, len(result)) + assert.Equal(t, 8, len(result)) document := result[0].(model.Dictionary) assert.Equal(t, true, document.Get("bool_col"))