Skip to content

Commit

Permalink
feat: read all lines
Browse files Browse the repository at this point in the history
  • Loading branch information
youen committed Sep 12, 2024
1 parent ce9ac32 commit 0d0ab8d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 35 deletions.
1 change: 0 additions & 1 deletion pkg/model/process_mask.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (mep *MaskEngineProcess) ProcessDictionary(dictionary Dictionary, out Colle
over.AddGlobalFields("path")
over.MDC().Set("path", mep.selector)
defer func() { over.MDC().Remove("path") }()

result := dictionary
applied := mep.selector.Apply(result, func(rootContext, parentContext Dictionary, key string, value Entry) (Action, Entry) {
switch {
Expand Down
86 changes: 53 additions & 33 deletions pkg/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,35 @@ const BufferSize = 4

// NewSource creates a new Source.
func NewSource(path string) model.Source {
return &Source{path, nil, make([]parquet.Row, BufferSize), 0, model.NewDictionary(), nil, false, 0}
return &Source{path, nil, make([]parquet.Row, BufferSize), nil, 0, 0, false, model.NewDictionary(), nil, false, 0}
}

// NewPackedSource creates a new packed Source.
func NewPackedSource(path string) model.Source {
log.Trace().Msg("NewPackedSource")

return &Source{path, nil, make([]parquet.Row, BufferSize), 0, model.NewDictionary(), nil, true, 0}
return &Source{path, nil, make([]parquet.Row, BufferSize), nil, 0, 0, false, model.NewDictionary(), nil, true, 0}
}

// Source export line to JSON format.
type Source struct {
path string
file *parquet.File
buffer []parquet.Row
groupIndex int
value model.Dictionary
err error
packed bool
size int
path string
file *parquet.File
buffer []parquet.Row
rows parquet.Rows
groupIndex int
bufferIndex int
endOfFile bool
value model.Dictionary
err error
packed bool
size int
}

func (s *Source) Open() error {
log.Trace().Msg("open parquet file")

f, err := os.Open(s.path)
print(err)
if err != nil {
return err
}
Expand All @@ -69,27 +71,47 @@ func (s *Source) Open() error {
}
s.file, err = parquet.OpenFile(f, stat.Size())

print(s.file.Schema().String())

if err != nil {
return err
}

s.groupIndex = 0
log.Printf("%v", s.file.RowGroups())
s.readGroup()

err = s.readRows()

if err != nil {
return err
}

return nil
}

func (s *Source) readGroup() {
s.rows = s.file.RowGroups()[s.groupIndex].Rows()
}

s.size, err = s.file.RowGroups()[0].Rows().ReadRows(s.buffer)
func (s *Source) readRows() error {
log.Trace().Int("groupIndex", s.groupIndex).Msg("read rows")

if err != nil && errors.Is(err, io.EOF) {
size, err := s.rows.ReadRows(s.buffer)
s.size = size
s.bufferIndex = 0

if errors.Is(err, io.EOF) {
s.endOfFile = true
} else if err != nil {
return err
}

return nil
}

func (s *Source) readColumn() {
func (s *Source) readColumns() {
log.Trace().Int("bufferIndex", s.bufferIndex).Msg("read columns")

dict := model.NewDictionary()
s.buffer[s.groupIndex].Range(func(columnIndex int, columnValues []parquet.Value) bool {
s.buffer[s.bufferIndex].Range(func(columnIndex int, columnValues []parquet.Value) bool {
column := s.file.Schema().Columns()[columnIndex][0]
switch columnValues[0].Kind() {
case parquet.ByteArray:
Expand All @@ -108,33 +130,31 @@ func (s *Source) readColumn() {
dict = dict.With(column, columnValues[0].Double())
}
s.value = model.CleanDictionary(dict)
s.groupIndex++
return true
})
s.bufferIndex++
}

// Next convert next line to model.Dictionary
func (s *Source) Next() bool {
// read next buffer
if s.groupIndex < BufferSize {
s.readColumn()
return true
if s.err != nil {
return false
}

return false
for _, rowGroup := range s.file.RowGroups() {
rows := rowGroup.Rows()
_, err := rows.ReadRows(s.buffer)
if err != nil {
s.err = err
// read next buffer
if s.bufferIndex >= s.size && !s.endOfFile {
s.err = s.readRows()
if s.err != nil {
return false
}
}
s.buffer[0].Range(func(columnIndex int, columnValues []parquet.Value) bool {
print(columnValues[0].GoString())

if s.bufferIndex < s.size {
s.readColumns()
return true
})
return false // s.err == nil
}

return false
}

func (s *Source) Value() model.Entry {
Expand Down
2 changes: 1 addition & 1 deletion pkg/parquet/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSourceReturnDictionary(t *testing.T) {
err := pipeline.AddSink(model.NewSinkToSlice(&result)).Run()
assert.Nil(t, err)

assert.GreaterOrEqual(t, 1, len(result))
assert.Equal(t, 8, len(result))
document := result[0].(model.Dictionary)

assert.Equal(t, true, document.Get("bool_col"))
Expand Down

0 comments on commit 0d0ab8d

Please sign in to comment.