Skip to content

Commit

Permalink
User/ohbitton/add file options (#212)
Browse files Browse the repository at this point in the history
* add raw data size compression size

* Move compression to a new package

* revert

* fixed bad merge

* fmt

* b

* fix test

* fix

* fix test

* tests and change source->ingestoptions

* fmt

* fix

* fix

* revert

---------

Co-authored-by: Ohad Bitton <[email protected]>
  • Loading branch information
ohadbitt and ohbitton authored Dec 3, 2023
1 parent c557823 commit 03783a1
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 110 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `ResultsProgressiveDisable()` has been removed.
- Use `ResultsProgressiveEnabled()` to enable progressive queries.

### Added
- Add file options: RawDataSize, CompressionType
- New package ingest/ingestoptions now contains Compression properties (in the future will hold DataFormat)

### Fixed

- String quoting in default value of query parameters
Expand Down
18 changes: 17 additions & 1 deletion kusto/ingest/file_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
"github.com/cenkalti/backoff/v4"
)
Expand Down Expand Up @@ -461,6 +462,21 @@ func ClientRequestId(clientRequestId string) FileOption {
}
}

// CompressionType sets the compression type of the data.
// Use this if the file name does not expose the compression type.
// This sets DontCompress to true for compressed data.
func CompressionType(compressionType ingestoptions.CompressionType) FileOption {
return option{
run: func(p *properties.All) error {
p.Source.CompressionType = compressionType
return nil
},
clientScopes: QueuedClient | StreamingClient | ManagedClient,
sourceScope: FromFile | FromReader,
name: "CompressionType",
}
}

// RawDataSize is the uncompressed data size. Should be used to comunicate the file size to the service for efficient ingestion.
// Also used by managed client in the decision to use queued ingestion instead of streaming (if > 4mb)
func RawDataSize(size int64) FileOption {
Expand All @@ -469,8 +485,8 @@ func RawDataSize(size int64) FileOption {
p.Ingestion.RawDataSize = size
return nil
},
clientScopes: QueuedClient | ManagedClient,
sourceScope: FromFile | FromReader | FromBlob,
clientScopes: StreamingClient | ManagedClient | QueuedClient,
name: "RawDataSize",
}
}
27 changes: 27 additions & 0 deletions kusto/ingest/ingestoptions/ingestoptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ingestoptions

// CompressionType is a file's compression type.
type CompressionType int8

// String implements fmt.Stringer.
func (c CompressionType) String() string {
switch c {
case GZIP:
return "gzip"
case ZIP:
return "zip"
}
return "unknown compression type"
}

//goland:noinspection GoUnusedConst - Part of the API
const (
// CTUnknown indicates that that the compression type was unset.
CTUnknown CompressionType = 0
// CTNone indicates that the file was not compressed.
CTNone CompressionType = 1
// GZIP indicates that the file is GZIP compressed.
GZIP CompressionType = 2
// ZIP indicates that the file is ZIP compressed.
ZIP CompressionType = 3
)
92 changes: 36 additions & 56 deletions kusto/ingest/internal/properties/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,12 @@ import (

"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/errors"

"github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
)

// CompressionType is a file's compression type.
type CompressionType int8

// String implements fmt.Stringer.
func (c CompressionType) String() string {
switch c {
case GZIP:
return "gzip"
case ZIP:
return "zip"
}
return "unknown compression type"
}

// MarshalJSON implements json.Marshaler.MarshalJSON.
func (c CompressionType) MarshalJSON() ([]byte, error) {
if c == 0 {
return nil, fmt.Errorf("CTUnknown is an invalid compression type")
}
return []byte(fmt.Sprintf("%q", c.String())), nil
}

//goland:noinspection GoUnusedConst - Part of the API
const (
// CTUnknown indicates that that the compression type was unset.
CTUnknown CompressionType = 0
// CTNone indicates that the file was not compressed.
CTNone CompressionType = 1
// GZIP indicates that the file is GZIP compressed.
GZIP CompressionType = 2
// ZIP indicates that the file is ZIP compressed.
ZIP CompressionType = 3
)

// DataFormat indicates what type of encoding format was used for source data.
// Note: This is very similar to ingest.DataFormat, except this supports more formats.
// We are not using a shared list, because this list is used only internally and is for the
Expand Down Expand Up @@ -105,27 +73,28 @@ type dfDescriptor struct {
jsonName string
detectableExt string
validMappingKind bool
shouldCompress bool
}

var dfDescriptions = []dfDescriptor{
{"", "", "", false},
{"Avro", "avro", ".avro", true},
{"ApacheAvro", "avro", "", false},
{"Csv", "csv", ".csv", true},
{"Json", "json", ".json", true},
{"MultiJson", "multijson", "", false},
{"Orc", "orc", ".orc", true},
{"Parquet", "parquet", ".parquet", true},
{"Psv", "psv", ".psv", false},
{"Raw", "raw", ".raw", false},
{"Scsv", "scsv", ".scsv", false},
{"Sohsv", "sohsv", ".sohsv", false},
{"SStream", "sstream", ".ss", false},
{"Tsv", "tsv", ".tsv", false},
{"Tsve", "tsve", ".tsve", false},
{"Txt", "txt", ".txt", false},
{"W3cLogFile", "w3clogfile", ".w3clogfile", false},
{"SingleJson", "singlejson", "", false},
{"", "", "", false, true},
{"Avro", "avro", ".avro", true, false},
{"ApacheAvro", "avro", "", false, false},
{"Csv", "csv", ".csv", true, true},
{"Json", "json", ".json", true, true},
{"MultiJson", "multijson", "", false, true},
{"Orc", "orc", ".orc", true, false},
{"Parquet", "parquet", ".parquet", true, false},
{"Psv", "psv", ".psv", false, true},
{"Raw", "raw", ".raw", false, true},
{"Scsv", "scsv", ".scsv", false, true},
{"Sohsv", "sohsv", ".sohsv", false, true},
{"SStream", "sstream", ".ss", false, true},
{"Tsv", "tsv", ".tsv", false, true},
{"Tsve", "tsve", ".tsve", false, true},
{"Txt", "txt", ".txt", false, true},
{"W3cLogFile", "w3clogfile", ".w3clogfile", false, true},
{"SingleJson", "singlejson", "", false, true},
}

// IngestionReportLevel defines which ingestion statuses are reported by the DM.
Expand Down Expand Up @@ -194,13 +163,21 @@ func (d DataFormat) MarshalJSON() ([]byte, error) {

// IsValidMappingKind returns true if a dataformat can be used as a MappingKind.
func (d DataFormat) IsValidMappingKind() bool {
if d > 0 && int(d) < len(dfDescriptions) {
if int(d) < len(dfDescriptions) {
return dfDescriptions[d].validMappingKind
}

return false
}

func (d DataFormat) ShouldCompress() bool {
if d > 0 && int(d) < len(dfDescriptions) {
return dfDescriptions[d].shouldCompress
}

return true
}

// DataFormatDiscovery looks at the file name and tries to discern what the file format is.
func DataFormatDiscovery(fName string) DataFormat {
name := fName
Expand Down Expand Up @@ -229,7 +206,7 @@ func DataFormatDiscovery(fName string) DataFormat {
type All struct {
// Ingestion is a set of properties that are used across all ingestion methods.
Ingestion Ingestion
// Source provides options that are used when doing an ingestion on a filesystem.
// Source provides options that are used to operate on the source data.
Source SourceOptions
// Streaming provides options that are used when doing an ingestion from a stream.
Streaming Streaming
Expand All @@ -243,13 +220,13 @@ type ManagedStreaming struct {
Backoff backoff.BackOff
}

// Streaming provides options that are used when doing an ingestion from a stream.
// Streaming provides options that are used when doing a streaming ingestion.
type Streaming struct {
// ClientRequestID is the client request ID to use for the ingestion.
ClientRequestId string
}

// SourceOptions are options that the user provides about the source file that is going to be uploaded.
// SourceOptions are options that the user provides about the source that is going to be uploaded.
type SourceOptions struct {
// ID allows someone to set the UUID for upload themselves. We aren't providing this option at this time, but here
// when we do.
Expand All @@ -263,6 +240,9 @@ type SourceOptions struct {

// OriginalSource is the path to the original source file, used for deletion.
OriginalSource string

// CompressionType is the type of compression used on the file.
CompressionType ingestoptions.CompressionType
}

// Ingestion is a JSON serializable set of options that must be provided to the service.
Expand Down
67 changes: 44 additions & 23 deletions kusto/ingest/internal/queued/queued.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/gzip"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/resources"
Expand Down Expand Up @@ -151,24 +152,9 @@ func (i *Ingestion) Reader(ctx context.Context, reader io.Reader, props properti
return "", errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry()
}

shouldCompress := true
if props.Source.OriginalSource != "" {
shouldCompress = utils.CompressionDiscovery(props.Source.OriginalSource) == properties.CTNone
}
if props.Source.DontCompress {
shouldCompress = false
}

extension := "gz"
if !shouldCompress {
if props.Source.OriginalSource != "" {
extension = filepath.Ext(props.Source.OriginalSource)
} else {
extension = props.Ingestion.Additional.Format.String() // Best effort
}
}

blobName := fmt.Sprintf("%s_%s_%s_%s.%s", i.db, i.table, nower(), filepath.Base(uuid.New().String()), extension)
compression := utils.CompressionDiscovery(props.Source.OriginalSource)
shouldCompress := ShouldCompress(&props, compression)
blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(props.Source.OriginalSource), compression, shouldCompress, props.Ingestion.Additional.Format.String())

size := int64(0)

Expand Down Expand Up @@ -332,10 +318,8 @@ var nower = time.Now
// error if there was one.
func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob.Client, container string, props *properties.All) (string, int64, error) {
compression := utils.CompressionDiscovery(from)
blobName := fmt.Sprintf("%s_%s_%s_%s_%s", i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(from))
if compression == properties.CTNone {
blobName = blobName + ".gz"
}
shouldCompress := ShouldCompress(props, compression)
blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(from), compression, shouldCompress, props.Ingestion.Additional.Format.String())

file, err := os.Open(from)
if err != nil {
Expand All @@ -356,7 +340,7 @@ func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob
).SetNoRetry()
}

if compression == properties.CTNone && !props.Source.DontCompress {
if shouldCompress {
gstream := gzip.New()
gstream.Reset(file)

Expand Down Expand Up @@ -396,6 +380,43 @@ func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob
return fullUrl(client, container, blobName), stat.Size(), nil
}

func GenBlobName(databaseName string, tableName string, time time.Time, guid string, fileName string, compressionFileExtension ingestoptions.CompressionType, shouldCompress bool, dataFormat string) string {
extension := "gz"
if !shouldCompress {
if compressionFileExtension == ingestoptions.CTNone {
extension = dataFormat
} else {
extension = compressionFileExtension.String()
}

extension = dataFormat
}

blobName := fmt.Sprintf("%s_%s_%s_%s_%s.%s", databaseName, tableName, time, guid, fileName, extension)

return blobName
}

// Do not compress if user specified in DontCompress or CompressionType,
// if the file extension shows compression, or if the format is binary.
func ShouldCompress(props *properties.All, compressionFileExtension ingestoptions.CompressionType) bool {
if props.Source.DontCompress {
return false
}

if props.Source.CompressionType != ingestoptions.CTUnknown {
if props.Source.CompressionType != ingestoptions.CTNone {
return false
}
} else {
if compressionFileExtension != ingestoptions.CTUnknown && compressionFileExtension != ingestoptions.CTNone {
return false
}
}

return props.Ingestion.Additional.Format.ShouldCompress()
}

// This allows mocking the stat func later on
var statFunc = os.Stat

Expand Down
Loading

0 comments on commit 03783a1

Please sign in to comment.