From 03783a1ec8d86a1a27cf106f31923f8c5e0fcd6a Mon Sep 17 00:00:00 2001 From: ohad bitton <32278684+ohadbitt@users.noreply.github.com> Date: Sun, 3 Dec 2023 16:08:26 +0200 Subject: [PATCH] User/ohbitton/add file options (#212) * add raw data size compression size * Move compression to a new package * revert * fixed bad merge * fmt * b * fix test * fix * fix test * tests and change source->ingestoptions * fmt * fix * fix * revert --------- Co-authored-by: Ohad Bitton --- CHANGELOG.md | 4 + kusto/ingest/file_options.go | 18 +++- kusto/ingest/ingestoptions/ingestoptions.go | 27 ++++++ .../ingest/internal/properties/properties.go | 92 ++++++++----------- kusto/ingest/internal/queued/queued.go | 67 +++++++++----- kusto/ingest/internal/queued/queued_test.go | 74 +++++++++++++-- .../ingest/internal/utils/ingestion_utils.go | 16 ++-- kusto/ingest/managed.go | 14 +-- kusto/ingest/streaming.go | 11 +-- kusto/test/etoe/etoe_test.go | 5 +- kusto/test/etoe/ingestion_status_e2e_test.go | 2 +- 11 files changed, 220 insertions(+), 110 deletions(-) create mode 100644 kusto/ingest/ingestoptions/ingestoptions.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b84fd595..fa696c2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `ResultsProgressiveDisable()` has been removed. - Use `ResultsProgressiveEnabled()` to enable progressive queries. +### Added +- Add file options: RawDataSize, CompressionType +- New package ingest/ingestoptions now contains Compression properties (in the future will hold DataFormat) + ### Fixed - String quoting in default value of query parameters diff --git a/kusto/ingest/file_options.go b/kusto/ingest/file_options.go index 8bae8f6d..ba42f832 100644 --- a/kusto/ingest/file_options.go +++ b/kusto/ingest/file_options.go @@ -6,6 +6,7 @@ import ( "time" "github.com/Azure/azure-kusto-go/kusto/data/errors" + "github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties" "github.com/cenkalti/backoff/v4" ) @@ -461,6 +462,21 @@ func ClientRequestId(clientRequestId string) FileOption { } } +// CompressionType sets the compression type of the data. +// Use this if the file name does not expose the compression type. +// This sets DontCompress to true for compressed data. +func CompressionType(compressionType ingestoptions.CompressionType) FileOption { + return option{ + run: func(p *properties.All) error { + p.Source.CompressionType = compressionType + return nil + }, + clientScopes: QueuedClient | StreamingClient | ManagedClient, + sourceScope: FromFile | FromReader, + name: "CompressionType", + } +} + // RawDataSize is the uncompressed data size. Should be used to comunicate the file size to the service for efficient ingestion. // Also used by managed client in the decision to use queued ingestion instead of streaming (if > 4mb) func RawDataSize(size int64) FileOption { @@ -469,8 +485,8 @@ func RawDataSize(size int64) FileOption { p.Ingestion.RawDataSize = size return nil }, + clientScopes: QueuedClient | ManagedClient, sourceScope: FromFile | FromReader | FromBlob, - clientScopes: StreamingClient | ManagedClient | QueuedClient, name: "RawDataSize", } } diff --git a/kusto/ingest/ingestoptions/ingestoptions.go b/kusto/ingest/ingestoptions/ingestoptions.go new file mode 100644 index 00000000..05bc6054 --- /dev/null +++ b/kusto/ingest/ingestoptions/ingestoptions.go @@ -0,0 +1,27 @@ +package ingestoptions + +// CompressionType is a file's compression type. +type CompressionType int8 + +// String implements fmt.Stringer. +func (c CompressionType) String() string { + switch c { + case GZIP: + return "gzip" + case ZIP: + return "zip" + } + return "unknown compression type" +} + +//goland:noinspection GoUnusedConst - Part of the API +const ( + // CTUnknown indicates that that the compression type was unset. + CTUnknown CompressionType = 0 + // CTNone indicates that the file was not compressed. + CTNone CompressionType = 1 + // GZIP indicates that the file is GZIP compressed. + GZIP CompressionType = 2 + // ZIP indicates that the file is ZIP compressed. + ZIP CompressionType = 3 +) diff --git a/kusto/ingest/internal/properties/properties.go b/kusto/ingest/internal/properties/properties.go index 2bb3500d..beba76aa 100644 --- a/kusto/ingest/internal/properties/properties.go +++ b/kusto/ingest/internal/properties/properties.go @@ -14,44 +14,12 @@ import ( "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/data/errors" + + "github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions" "github.com/cenkalti/backoff/v4" "github.com/google/uuid" ) -// CompressionType is a file's compression type. -type CompressionType int8 - -// String implements fmt.Stringer. -func (c CompressionType) String() string { - switch c { - case GZIP: - return "gzip" - case ZIP: - return "zip" - } - return "unknown compression type" -} - -// MarshalJSON implements json.Marshaler.MarshalJSON. -func (c CompressionType) MarshalJSON() ([]byte, error) { - if c == 0 { - return nil, fmt.Errorf("CTUnknown is an invalid compression type") - } - return []byte(fmt.Sprintf("%q", c.String())), nil -} - -//goland:noinspection GoUnusedConst - Part of the API -const ( - // CTUnknown indicates that that the compression type was unset. - CTUnknown CompressionType = 0 - // CTNone indicates that the file was not compressed. - CTNone CompressionType = 1 - // GZIP indicates that the file is GZIP compressed. - GZIP CompressionType = 2 - // ZIP indicates that the file is ZIP compressed. - ZIP CompressionType = 3 -) - // DataFormat indicates what type of encoding format was used for source data. // Note: This is very similar to ingest.DataFormat, except this supports more formats. // We are not using a shared list, because this list is used only internally and is for the @@ -105,27 +73,28 @@ type dfDescriptor struct { jsonName string detectableExt string validMappingKind bool + shouldCompress bool } var dfDescriptions = []dfDescriptor{ - {"", "", "", false}, - {"Avro", "avro", ".avro", true}, - {"ApacheAvro", "avro", "", false}, - {"Csv", "csv", ".csv", true}, - {"Json", "json", ".json", true}, - {"MultiJson", "multijson", "", false}, - {"Orc", "orc", ".orc", true}, - {"Parquet", "parquet", ".parquet", true}, - {"Psv", "psv", ".psv", false}, - {"Raw", "raw", ".raw", false}, - {"Scsv", "scsv", ".scsv", false}, - {"Sohsv", "sohsv", ".sohsv", false}, - {"SStream", "sstream", ".ss", false}, - {"Tsv", "tsv", ".tsv", false}, - {"Tsve", "tsve", ".tsve", false}, - {"Txt", "txt", ".txt", false}, - {"W3cLogFile", "w3clogfile", ".w3clogfile", false}, - {"SingleJson", "singlejson", "", false}, + {"", "", "", false, true}, + {"Avro", "avro", ".avro", true, false}, + {"ApacheAvro", "avro", "", false, false}, + {"Csv", "csv", ".csv", true, true}, + {"Json", "json", ".json", true, true}, + {"MultiJson", "multijson", "", false, true}, + {"Orc", "orc", ".orc", true, false}, + {"Parquet", "parquet", ".parquet", true, false}, + {"Psv", "psv", ".psv", false, true}, + {"Raw", "raw", ".raw", false, true}, + {"Scsv", "scsv", ".scsv", false, true}, + {"Sohsv", "sohsv", ".sohsv", false, true}, + {"SStream", "sstream", ".ss", false, true}, + {"Tsv", "tsv", ".tsv", false, true}, + {"Tsve", "tsve", ".tsve", false, true}, + {"Txt", "txt", ".txt", false, true}, + {"W3cLogFile", "w3clogfile", ".w3clogfile", false, true}, + {"SingleJson", "singlejson", "", false, true}, } // IngestionReportLevel defines which ingestion statuses are reported by the DM. @@ -194,13 +163,21 @@ func (d DataFormat) MarshalJSON() ([]byte, error) { // IsValidMappingKind returns true if a dataformat can be used as a MappingKind. func (d DataFormat) IsValidMappingKind() bool { - if d > 0 && int(d) < len(dfDescriptions) { + if int(d) < len(dfDescriptions) { return dfDescriptions[d].validMappingKind } return false } +func (d DataFormat) ShouldCompress() bool { + if d > 0 && int(d) < len(dfDescriptions) { + return dfDescriptions[d].shouldCompress + } + + return true +} + // DataFormatDiscovery looks at the file name and tries to discern what the file format is. func DataFormatDiscovery(fName string) DataFormat { name := fName @@ -229,7 +206,7 @@ func DataFormatDiscovery(fName string) DataFormat { type All struct { // Ingestion is a set of properties that are used across all ingestion methods. Ingestion Ingestion - // Source provides options that are used when doing an ingestion on a filesystem. + // Source provides options that are used to operate on the source data. Source SourceOptions // Streaming provides options that are used when doing an ingestion from a stream. Streaming Streaming @@ -243,13 +220,13 @@ type ManagedStreaming struct { Backoff backoff.BackOff } -// Streaming provides options that are used when doing an ingestion from a stream. +// Streaming provides options that are used when doing a streaming ingestion. type Streaming struct { // ClientRequestID is the client request ID to use for the ingestion. ClientRequestId string } -// SourceOptions are options that the user provides about the source file that is going to be uploaded. +// SourceOptions are options that the user provides about the source that is going to be uploaded. type SourceOptions struct { // ID allows someone to set the UUID for upload themselves. We aren't providing this option at this time, but here // when we do. @@ -263,6 +240,9 @@ type SourceOptions struct { // OriginalSource is the path to the original source file, used for deletion. OriginalSource string + + // CompressionType is the type of compression used on the file. + CompressionType ingestoptions.CompressionType } // Ingestion is a JSON serializable set of options that must be provided to the service. diff --git a/kusto/ingest/internal/queued/queued.go b/kusto/ingest/internal/queued/queued.go index 678b7f06..846c43ec 100644 --- a/kusto/ingest/internal/queued/queued.go +++ b/kusto/ingest/internal/queued/queued.go @@ -14,6 +14,7 @@ import ( "time" "github.com/Azure/azure-kusto-go/kusto/data/errors" + "github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/gzip" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/resources" @@ -151,24 +152,9 @@ func (i *Ingestion) Reader(ctx context.Context, reader io.Reader, props properti return "", errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry() } - shouldCompress := true - if props.Source.OriginalSource != "" { - shouldCompress = utils.CompressionDiscovery(props.Source.OriginalSource) == properties.CTNone - } - if props.Source.DontCompress { - shouldCompress = false - } - - extension := "gz" - if !shouldCompress { - if props.Source.OriginalSource != "" { - extension = filepath.Ext(props.Source.OriginalSource) - } else { - extension = props.Ingestion.Additional.Format.String() // Best effort - } - } - - blobName := fmt.Sprintf("%s_%s_%s_%s.%s", i.db, i.table, nower(), filepath.Base(uuid.New().String()), extension) + compression := utils.CompressionDiscovery(props.Source.OriginalSource) + shouldCompress := ShouldCompress(&props, compression) + blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(props.Source.OriginalSource), compression, shouldCompress, props.Ingestion.Additional.Format.String()) size := int64(0) @@ -332,10 +318,8 @@ var nower = time.Now // error if there was one. func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob.Client, container string, props *properties.All) (string, int64, error) { compression := utils.CompressionDiscovery(from) - blobName := fmt.Sprintf("%s_%s_%s_%s_%s", i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(from)) - if compression == properties.CTNone { - blobName = blobName + ".gz" - } + shouldCompress := ShouldCompress(props, compression) + blobName := GenBlobName(i.db, i.table, nower(), filepath.Base(uuid.New().String()), filepath.Base(from), compression, shouldCompress, props.Ingestion.Additional.Format.String()) file, err := os.Open(from) if err != nil { @@ -356,7 +340,7 @@ func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob ).SetNoRetry() } - if compression == properties.CTNone && !props.Source.DontCompress { + if shouldCompress { gstream := gzip.New() gstream.Reset(file) @@ -396,6 +380,43 @@ func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob return fullUrl(client, container, blobName), stat.Size(), nil } +func GenBlobName(databaseName string, tableName string, time time.Time, guid string, fileName string, compressionFileExtension ingestoptions.CompressionType, shouldCompress bool, dataFormat string) string { + extension := "gz" + if !shouldCompress { + if compressionFileExtension == ingestoptions.CTNone { + extension = dataFormat + } else { + extension = compressionFileExtension.String() + } + + extension = dataFormat + } + + blobName := fmt.Sprintf("%s_%s_%s_%s_%s.%s", databaseName, tableName, time, guid, fileName, extension) + + return blobName +} + +// Do not compress if user specified in DontCompress or CompressionType, +// if the file extension shows compression, or if the format is binary. +func ShouldCompress(props *properties.All, compressionFileExtension ingestoptions.CompressionType) bool { + if props.Source.DontCompress { + return false + } + + if props.Source.CompressionType != ingestoptions.CTUnknown { + if props.Source.CompressionType != ingestoptions.CTNone { + return false + } + } else { + if compressionFileExtension != ingestoptions.CTUnknown && compressionFileExtension != ingestoptions.CTNone { + return false + } + } + + return props.Ingestion.Additional.Format.ShouldCompress() +} + // This allows mocking the stat func later on var statFunc = os.Stat diff --git a/kusto/ingest/internal/queued/queued_test.go b/kusto/ingest/internal/queued/queued_test.go index 429c4bbb..2d658978 100644 --- a/kusto/ingest/internal/queued/queued_test.go +++ b/kusto/ingest/internal/queued/queued_test.go @@ -10,8 +10,10 @@ import ( "testing" "github.com/Azure/azure-kusto-go/kusto/data/errors" + "github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/utils" + "github.com/stretchr/testify/assert" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" @@ -56,13 +58,13 @@ func TestCompressionDiscovery(t *testing.T) { tests := []struct { input string - want properties.CompressionType + want ingestoptions.CompressionType }{ - {"https://somehost.somedomain.com:8080/v1/somestuff/file.gz", properties.GZIP}, - {"https://somehost.somedomain.com:8080/v1/somestuff/file.zip", properties.ZIP}, - {"/path/to/a/file.gz", properties.GZIP}, - {"/path/to/a/file.zip", properties.ZIP}, - {"/path/to/a/file", properties.CTNone}, + {"https://somehost.somedomain.com:8080/v1/somestuff/file.gz", ingestoptions.GZIP}, + {"https://somehost.somedomain.com:8080/v1/somestuff/file.zip", ingestoptions.ZIP}, + {"/path/to/a/file.gz", ingestoptions.GZIP}, + {"/path/to/a/file.zip", ingestoptions.ZIP}, + {"/path/to/a/file", ingestoptions.CTNone}, } for _, test := range tests { @@ -301,3 +303,63 @@ func TestIsLocalPath(t *testing.T) { }) } } + +func TestShouldCompress(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + props *properties.All + want bool + }{ + { + name: "Some file", + props: &properties.All{Source: properties.SourceOptions{CompressionType: ingestoptions.CTUnknown, + OriginalSource: "https://somehost.somedomain.com:8080/v1/somestuff/file"}}, + want: true, + }, + { + name: "Some file2", + props: &properties.All{Source: properties.SourceOptions{CompressionType: ingestoptions.CTNone, + OriginalSource: "https://somehost.somedomain.com:8080/v1/somestuff/file"}}, + want: true, + }, + { + name: "Provided compression type is GZIP", + props: &properties.All{Source: properties.SourceOptions{CompressionType: ingestoptions.GZIP, + OriginalSource: "https://somehost.somedomain.com:8080/v1/somestuff/file"}}, + want: false, + }, + { + name: "Guess by name is GZIP", + props: &properties.All{Source: properties.SourceOptions{CompressionType: ingestoptions.CTUnknown, + OriginalSource: "https://somehost.somedomain.com:8080/v1/somestuff/file.gz"}}, + want: false, + }, + { + name: "DontCompress is true", + props: &properties.All{Source: properties.SourceOptions{CompressionType: ingestoptions.CTNone, + DontCompress: true, + OriginalSource: "https://somehost.somedomain.com:8080/v1/somestuff/file"}}, + want: false, + }, + { + name: "Binary format", + props: &properties.All{Source: properties.SourceOptions{CompressionType: ingestoptions.CTNone, + OriginalSource: "https://somehost.somedomain.com:8080/v1/somestuff/file.avro"}}, + want: false, + }, + } + + for _, test := range tests { + test := test // capture + t.Run(test.name, func(t *testing.T) { + t.Parallel() + CompleteFormatFromFileName(test.props, test.props.Source.OriginalSource) + + got := ShouldCompress(test.props, + utils.CompressionDiscovery(test.props.Source.OriginalSource)) + assert.Equal(t, test.want, got) + }) + } +} diff --git a/kusto/ingest/internal/utils/ingestion_utils.go b/kusto/ingest/internal/utils/ingestion_utils.go index 38cd6331..19a49190 100644 --- a/kusto/ingest/internal/utils/ingestion_utils.go +++ b/kusto/ingest/internal/utils/ingestion_utils.go @@ -15,7 +15,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" - "github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties" + "github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/resources" ) @@ -79,10 +79,10 @@ func FetchBlobSize(fPath string, ctx context.Context, client *http.Client) (size return *properties.ContentLength, nil } -func EstimateRawDataSize(compression properties.CompressionType, fileSize int64) int64 { +func EstimateRawDataSize(compression ingestoptions.CompressionType, fileSize int64) int64 { switch compression { - case properties.GZIP: - case properties.ZIP: + case ingestoptions.GZIP: + case ingestoptions.ZIP: return fileSize * EstimatedCompressionFactor } @@ -92,7 +92,7 @@ func EstimateRawDataSize(compression properties.CompressionType, fileSize int64) // CompressionDiscovery looks at the file extension. If it is one we support, we return that // CompressionType that represents that value. Otherwise we return CTNone to indicate that the // file should not be compressed. -func CompressionDiscovery(fName string) properties.CompressionType { +func CompressionDiscovery(fName string) ingestoptions.CompressionType { var ext string if strings.HasPrefix(strings.ToLower(fName), "http") { ext = strings.ToLower(filepath.Ext(path.Base(fName))) @@ -102,9 +102,9 @@ func CompressionDiscovery(fName string) properties.CompressionType { switch ext { case ".gz": - return properties.GZIP + return ingestoptions.GZIP case ".zip": - return properties.ZIP + return ingestoptions.ZIP } - return properties.CTNone + return ingestoptions.CTNone } diff --git a/kusto/ingest/managed.go b/kusto/ingest/managed.go index cc224b3b..a57de919 100644 --- a/kusto/ingest/managed.go +++ b/kusto/ingest/managed.go @@ -8,8 +8,10 @@ import ( "time" "github.com/Azure/azure-kusto-go/kusto/data/errors" + "github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/gzip" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties" + "github.com/Azure/azure-kusto-go/kusto/ingest/internal/queued" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/utils" "github.com/cenkalti/backoff/v4" @@ -100,7 +102,7 @@ func (m *Managed) FromFile(ctx context.Context, fPath string, options ...FileOpt if !local { var size int64 - var compressionTypeForEstimation properties.CompressionType + var compressionTypeForEstimation ingestoptions.CompressionType if size = props.Ingestion.RawDataSize; size == 0 { size, err = utils.FetchBlobSize(fPath, ctx, m.queued.client.HttpClient()) if err != nil { @@ -111,7 +113,7 @@ func (m *Managed) FromFile(ctx context.Context, fPath string, options ...FileOpt props.Ingestion.RawDataSize = utils.EstimateRawDataSize(compressionTypeForEstimation, size) } else { // If user sets raw data size we always want to devide it for estimation - compressionTypeForEstimation = properties.CTNone + compressionTypeForEstimation = ingestoptions.CTNone } // File is not compressed and user says its compressed, raw 10 mb -> do @@ -129,9 +131,9 @@ func (m *Managed) FromFile(ctx context.Context, fPath string, options ...FileOpt return m.managedStreamImpl(ctx, file, props) } -func shouldUseQueuedIngestBySize(compression properties.CompressionType, fileSize int64) bool { +func shouldUseQueuedIngestBySize(compression ingestoptions.CompressionType, fileSize int64) bool { switch compression { - case properties.GZIP, properties.ZIP: + case ingestoptions.GZIP, ingestoptions.ZIP: return fileSize > maxStreamingSize } @@ -153,7 +155,7 @@ func (m *Managed) FromReader(ctx context.Context, reader io.Reader, options ...F func (m *Managed) managedStreamImpl(ctx context.Context, payload io.ReadCloser, props properties.All) (*Result, error) { defer payload.Close() - compress := !props.Source.DontCompress + compress := queued.ShouldCompress(&props, ingestoptions.CTUnknown) var compressed io.Reader = payload if compress { compressed = gzip.Compress(io.NopCloser(payload)) @@ -167,7 +169,7 @@ func (m *Managed) managedStreamImpl(ctx context.Context, payload io.ReadCloser, return nil, err } - if shouldUseQueuedIngestBySize(properties.GZIP, int64(len(buf))) { + if shouldUseQueuedIngestBySize(ingestoptions.GZIP, int64(len(buf))) { combinedBuf := io.MultiReader(bytes.NewReader(buf), compressed) return m.queued.fromReader(ctx, combinedBuf, []FileOption{}, props) } diff --git a/kusto/ingest/streaming.go b/kusto/ingest/streaming.go index de7b10e4..273371ad 100644 --- a/kusto/ingest/streaming.go +++ b/kusto/ingest/streaming.go @@ -1,7 +1,6 @@ package ingest import ( - // "bytes" "bytes" "context" "encoding/json" @@ -10,6 +9,7 @@ import ( "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/data/errors" + "github.com/Azure/azure-kusto-go/kusto/ingest/ingestoptions" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/gzip" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/queued" @@ -87,16 +87,13 @@ func prepFileAndProps(fPath string, props *properties.All, options []FileOption, return nil, err, local } - props.Source.OriginalSource = fPath - if !local { return nil, nil, false } + props.Source.OriginalSource = fPath compression := utils.CompressionDiscovery(fPath) - if compression != properties.CTNone { - props.Source.DontCompress = true - } + props.Source.DontCompress = !queued.ShouldCompress(props, compression) err = queued.CompleteFormatFromFileName(props, fPath) if err != nil { @@ -127,7 +124,7 @@ func (i *Streaming) FromReader(ctx context.Context, reader io.Reader, options .. } func streamImpl(c streamIngestor, ctx context.Context, payload io.Reader, props properties.All, isBlobUri bool) (*Result, error) { - compress := !props.Source.DontCompress + compress := queued.ShouldCompress(&props, ingestoptions.CTUnknown) if compress && !isBlobUri { payload = gzip.Compress(payload) } diff --git a/kusto/test/etoe/etoe_test.go b/kusto/test/etoe/etoe_test.go index eaab9a63..34400189 100644 --- a/kusto/test/etoe/etoe_test.go +++ b/kusto/test/etoe/etoe_test.go @@ -43,7 +43,7 @@ var ( ), ) - // This is needed because of a bug in the backend that sometimes causes the tables not to drop and get stuck. + // This is needed because streaming ingestion metadata is cached in the engine and needs to refresh clearStreamingCacheStatement = kql.New(".clear database cache streamingingestion schema") countStatement = kql.New("table(tableName) | count") @@ -1599,7 +1599,8 @@ func TestReaderIngestion(t *testing.T) { // ok _, isQueued := test.ingestor.(*ingest.Ingestion) _, isManaged := test.ingestor.(*ingest.Managed) if isQueued || isManaged { - test.options = append(test.options, ingest.FlushImmediately(), ingest.ReportResultToTable()) + test.options = append(test.options, ingest.FlushImmediately(), + ingest.ReportResultToTable()) } f, err := os.Open(test.src) diff --git a/kusto/test/etoe/ingestion_status_e2e_test.go b/kusto/test/etoe/ingestion_status_e2e_test.go index 91ebe42b..1e499b22 100644 --- a/kusto/test/etoe/ingestion_status_e2e_test.go +++ b/kusto/test/etoe/ingestion_status_e2e_test.go @@ -240,8 +240,8 @@ func TestIngestionStatus(t *testing.T) { defer cancel() f, err := os.Open(csvFile) - defer f.Close() require.NoError(t, err) + defer f.Close() reader, writer := io.Pipe() go func() {