Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat][decoder] - Adding generic decoding package to libbeat #36028

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions x-pack/libbeat/decoding/decoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package decoding

import (
"bufio"
"fmt"
"io"
)

// Decoder is an interface for decoding data from an io reader.
type Decoder interface {
// Decode reads and decodes data from an io reader based on the codec type.
// It returns the decoded data and an error if the data cannot be decoded.
Decode() ([]byte, error)
// Next advances the decoder to the next data item and returns true if there is more data to be decoded.
Next() bool
// Close closes the decoder and releases any resources associated with it.
// It returns an error if the decoder cannot be closed.
Close() error
// Type returns the underlying type of the decoder.
Type() interface{}
}

// NewDecoder creates a new decoder based on the codec type.
// It returns a decoder type and an error if the codec type is not supported.
// If the reader config codec option is not set, it returns a nil decoder and nil error.
func NewDecoder(config DecoderConfig, r io.Reader, offset int64) (Decoder, error) {
// apply gzipdecoder if required
var err error
r, err = addGzipDecoderIfNeeded(bufio.NewReader(r))
if err != nil {
return nil, fmt.Errorf("failed to add gzip decoder to data stream with error: %w", err)
}
switch {
case config.Codec == nil:
return nil, nil
case config.Codec.Parquet != nil:
return NewParquetDecoder(config, r)
case config.Codec.JSON != nil:
return NewJSONDecoder(config, r)
default:
return nil, fmt.Errorf("unsupported config value: %v", config)
}
}
34 changes: 34 additions & 0 deletions x-pack/libbeat/decoding/decoding_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package decoding

// DecoderConfig contains the configuration options for instantiating a decoder.
type DecoderConfig struct {
Codec *CodecConfig `config:"codec"`
}

// CodecConfig contains the configuration options for different codecs used by a decoder.
type CodecConfig struct {
Parquet *ParquetCodecConfig `config:"parquet"`
Auto *AutoConfig `config:"auto"`
JSON *JSONConfig `config:"json"`
}

// ParquetCodecConfig contains the configuration options for the parquet codec.
type ParquetCodecConfig struct {
Enabled bool `config:"enabled"`
ProcessParallel bool `config:"process_parallel"`
BatchSize int `config:"batch_size" default:"1"`
}

// AutoConfig contains the configuration options for the auto decoder which uses a set of known codecs to decode the data.
type AutoConfig struct {
Enabled bool `config:"enabled"`
}

// JSON contains the configuration options for the JSON decoder.
type JSONConfig struct {
Enabled bool `config:"enabled"`
}
92 changes: 92 additions & 0 deletions x-pack/libbeat/decoding/decoding_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package decoding

import (
"bufio"
"encoding/json"
"fmt"
"io"
)

// JSONDecoder is a decoder for json data.
type JSONDecoder struct {
offset int64
isRootArray bool
reader *io.Reader
decoder *json.Decoder
}

// NewJSONDecoder creates a new json decoder.
// It returns an error if the json reader cannot be created.
func NewJSONDecoder(config DecoderConfig, r io.Reader) (Decoder, error) {
r, isRootArray, err := evaluateJSON(bufio.NewReader(r))
if err != nil {
return nil, fmt.Errorf("failed to evaluate json with error: %w", err)
}
dec := json.NewDecoder(r)
// If array is present at root then read json token and advance decoder
if isRootArray {
_, err := dec.Token()
if err != nil {
return nil, fmt.Errorf("failed to read JSON token with error: %w", err)
}
}
dec.UseNumber()

return &JSONDecoder{
isRootArray: isRootArray,
reader: &r,
decoder: dec,
}, nil
}

// Next advances the json decoder to the next data item and returns true if there is more data to be decoded.
func (jd *JSONDecoder) Next() bool {
return jd.decoder.More()
}

// Decode reads and decodes a json data stream. After reading the json data it decodes
// the output to JSON and returns it as a byte slice. It returns an error if the data cannot be decoded.
func (jd *JSONDecoder) Decode() ([]byte, error) {
var data []byte
if err := jd.decoder.Decode(&data); err != nil {
return nil, fmt.Errorf("failed to decode json: %w", err)
}
jd.offset = jd.decoder.InputOffset()
return data, nil
}

// Type returns the underlying type of the decoder.
func (jd *JSONDecoder) Type() interface{} {
return jd
}

// Offset returns the current offset of the json data stream.
func (jd *JSONDecoder) Offset() int64 {
return jd.offset
}

// Seek seeks to the given offset in the json data stream.
func (jd *JSONDecoder) Seek(offset int64) error {
for jd.decoder.InputOffset() < offset {
_, err := jd.decoder.Token()
if err != nil {
return fmt.Errorf("failed to read JSON token with error: %w", err)
}
}
return nil
}

// IsRootArray returns true if the root element of the json data is an array.
func (jd *JSONDecoder) IsRootArray() bool {
return jd.isRootArray
}

// Close closes the json decoder and releases the resources.
func (jd *JSONDecoder) Close() error {
jd.decoder = nil
return nil
}
56 changes: 56 additions & 0 deletions x-pack/libbeat/decoding/decoding_parquet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package decoding

import (
"fmt"
"io"

"github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet"
)

// ParquetDecoder is a decoder for parquet data.
type ParquetDecoder struct {
reader *parquet.BufferedReader
}

// NewParquetDecoder creates a new parquet decoder. It uses the libbeat parquet reader under the hood.
// It returns an error if the parquet reader cannot be created.
func NewParquetDecoder(config DecoderConfig, r io.Reader) (Decoder, error) {
reader, err := parquet.NewBufferedReader(r, &parquet.Config{
ProcessParallel: config.Codec.Parquet.ProcessParallel,
BatchSize: config.Codec.Parquet.BatchSize,
})
if err != nil {
return nil, fmt.Errorf("failed to create parquet decoder: %w", err)
}
return &ParquetDecoder{
reader: reader,
}, nil
}

// Next advances the parquet decoder to the next data item and returns true if there is more data to be decoded.
func (pd *ParquetDecoder) Next() bool {
return pd.reader.Next()
}

// Decode reads and decodes a parquet data stream. After reading the parquet data it decodes
// the output to JSON and returns it as a byte slice. It returns an error if the data cannot be decoded.
func (pd *ParquetDecoder) Decode() ([]byte, error) {
data, err := pd.reader.Record()
if err != nil {
return nil, err
}
return data, nil
}

func (pd *ParquetDecoder) Type() interface{} {
return pd
}

// Close closes the parquet decoder and releases the resources.
func (pd *ParquetDecoder) Close() error {
return pd.reader.Close()
}
121 changes: 121 additions & 0 deletions x-pack/libbeat/decoding/decoding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package decoding

// all test files are read from the "testdata" directory
const testDataPath = "testdata"

Check failure on line 8 in x-pack/libbeat/decoding/decoding_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

const `testDataPath` is unused (unused)

// func TestParquetDecoding(t *testing.T) {
// testCases := []struct {
// name string
// file string
// contentType string
// numEvents int
// assertAgainst string
// config *readerConfig
// }{
// {
// name: "test decoding of a parquet file and compare the number of events with batch size 1",
// file: "vpc-flow.gz.parquet",
// numEvents: 1304,
// config: &readerConfig{
// Decoding: decoderConfig{
// Codec: &codecConfig{
// Parquet: &parquetCodecConfig{
// ProcessParallel: true,
// BatchSize: 1,
// },
// },
// },
// },
// },
// {
// name: "test decoding of a parquet file and compare the number of events with batch size 100",
// file: "vpc-flow.gz.parquet",
// numEvents: 1304,
// config: &readerConfig{
// Decoding: decoderConfig{
// Codec: &codecConfig{
// Parquet: &parquetCodecConfig{
// ProcessParallel: true,
// BatchSize: 100,
// },
// },
// },
// },
// },
// {
// name: "test decoding of a parquet file and compare the number of events with default parquet config",
// file: "vpc-flow.gz.parquet",
// numEvents: 1304,
// config: &readerConfig{
// Decoding: decoderConfig{
// Codec: &codecConfig{
// Parquet: &parquetCodecConfig{
// Enabled: true,
// },
// },
// },
// },
// },
// {
// name: "test decoding of a parquet file and compare the number of events along with the content",
// file: "cloudtrail.parquet",
// numEvents: 1,
// assertAgainst: "cloudtrail.json",
// config: &readerConfig{
// Decoding: decoderConfig{
// Codec: &codecConfig{
// Parquet: &parquetCodecConfig{
// Enabled: true,
// ProcessParallel: true,
// BatchSize: 1,
// },
// },
// },
// },
// },
// }

// for _, tc := range testCases {
// t.Run(tc.name, func(t *testing.T) {
// file := filepath.Join(testDataPath, tc.file)
// sel := fileSelectorConfig{ReaderConfig: *tc.config}
// if tc.contentType == "" {
// tc.contentType = "application/octet-stream"
// }
// // uses the s3_objects test method to perform the test
// events := testProcessS3Object(t, file, tc.contentType, tc.numEvents, sel)
// // if assertAgainst is not empty, then compare the events with the target file
// // there is a chance for this comparison to become flaky if number of events > 1 as
// // the order of events are not guaranteed by beats
// if tc.assertAgainst != "" {
// targetData := readJSONFromFile(t, filepath.Join(testDataPath, tc.assertAgainst))
// assert.Equal(t, len(targetData), len(events))

// for i, event := range events {
// msg, err := event.Fields.GetValue("message")
// assert.NoError(t, err)
// assert.JSONEq(t, targetData[i], msg.(string))
// }
// }
// })
// }
// }

// // readJSONFromFile reads the json file and returns the data as a slice of strings
// func readJSONFromFile(t *testing.T, filepath string) []string {
// fileBytes, err := os.ReadFile(filepath)
// assert.NoError(t, err)
// var rawMessages []json.RawMessage
// err = json.Unmarshal(fileBytes, &rawMessages)
// assert.NoError(t, err)
// var data []string

// for _, rawMsg := range rawMessages {
// data = append(data, string(rawMsg))
// }
// return data
// }
Loading
Loading