Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

low allocs (draft) #249

Merged
merged 47 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
87b8d16
To save
AsafMah Jun 5, 2024
6a20375
Initial solution
AsafMah Jun 6, 2024
edd474c
Initial solution
AsafMah Jun 6, 2024
7c92f7b
Initial solution
AsafMah Jun 6, 2024
6b3582d
faster json
AsafMah Jun 9, 2024
f6d8567
New method
AsafMah Jun 9, 2024
e3c4299
Fixed race
AsafMah Jun 9, 2024
203e031
Fixed tests
AsafMah Jun 9, 2024
1742a3b
Tests pass
AsafMah Jun 10, 2024
8bad818
Tests pass
AsafMah Jun 10, 2024
2ed02ef
Fixed etoe running
AsafMah Jun 10, 2024
953fb0c
Code re-org
AsafMah Jun 10, 2024
1a5df82
Code re-org
AsafMah Jun 10, 2024
6e2c874
More refactoring
AsafMah Jun 10, 2024
4e2a153
Added test
AsafMah Jun 10, 2024
cfab674
Merge branch 'refs/heads/master' into low-allocations
AsafMah Jun 17, 2024
3126c82
Update deps
AsafMah Jun 17, 2024
91afe77
Added missing tests
AsafMah Jun 17, 2024
520c1f3
Refactor frame reader
AsafMah Jun 22, 2024
1201c2b
Better error
AsafMah Jun 25, 2024
63460f3
Fixed sync error
AsafMah Jun 25, 2024
f807c7a
Fixed nested rows
AsafMah Jun 25, 2024
4eced91
removed indirection
AsafMah Jun 25, 2024
7dccb73
playing with benchmark
AsafMah Jun 25, 2024
bff7a68
Fixed race
AsafMah Jun 26, 2024
2c943d5
Added extensive docs
AsafMah Jul 1, 2024
da1deda
Fix
AsafMah Jul 1, 2024
79ec32c
Added more tests
AsafMah Jul 1, 2024
f01e369
Don't close the reader (as the response does it already)
AsafMah Jul 1, 2024
ff0c313
Merge branch 'refs/heads/master' into low-allocations
AsafMah Jul 1, 2024
c3d0297
Don't close the reader (as the response does it already)
AsafMah Jul 1, 2024
9218349
Restore method that allocates slightly less
AsafMah Jul 1, 2024
6f8a220
Merge branch 'master' into low-allocations
AsafMah Jul 23, 2024
1742173
comments
AsafMah Jul 31, 2024
66f59a7
Remove skip property
AsafMah Jul 31, 2024
8204441
Try to close reader
AsafMah Aug 1, 2024
3d3bb4d
Update CHANGELOG.md
AsafMah Aug 21, 2024
07ce92a
Clarify UseNumber
AsafMah Aug 21, 2024
ccfb9d5
Merge branch 'master' into low-allocations
AsafMah Aug 21, 2024
a34584d
Merge branch 'master' into low-allocations
AsafMah Aug 21, 2024
a60f812
try impersonate blob
AsafMah Aug 25, 2024
3508613
don't impersonate
AsafMah Aug 25, 2024
6da89cb
Try new storage
AsafMah Aug 25, 2024
13d5748
Try mi
AsafMah Aug 25, 2024
4fd1049
Merge branch 'master' into low-allocations
AsafMah Aug 25, 2024
5446b98
Fix + changelog
AsafMah Aug 25, 2024
883ea2e
Changelog
AsafMah Aug 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [1.0.0-preview-4] - 2024-06-05
### Changed
- IterativeTable.Skip() was renamed to IterativeTable.IsSkipped() to avoid confusion.
AsafMah marked this conversation as resolved.
Show resolved Hide resolved
- V2FrameCapacity was renamed to V2IoCapacity to better reflect its purpose.
- V2FragmentCapacity was renamed to V2TableCapacity to better reflect its purpose.
- Better defaults for buffer sizes.

- the `WithApplicationCertificate` on `KustoConnectionStringBuilder` was removed as it was ambiguous and not implemented correctly. Instead there are two new methods:
- `WithAppCertificatePath` - Receives the path to the certificate file.
- `WithAppCertificateBytes` - Receives the certificate bytes in-memory.
Both methods accept an optional password for the certificate.

### Fixed
- Fixed Mapping Kind not working correctly with certain formats.
- Fixed plenty of sync issues.
- Reduced allocations.


## [1.0.0-preview-3] - 2024-06-05
### Added
Expand Down
177 changes: 144 additions & 33 deletions azkustodata/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,86 @@ import (
"github.com/Azure/azure-kusto-go/azkustodata/kql"
queryv2 "github.com/Azure/azure-kusto-go/azkustodata/query/v2"
"io"
"os"
"strconv"
"strings"
"testing"
)

func getData(k int) (string, context.Context) {
// cache files in bench/test/{k}.json
ctx := context.Background()

if bytes, err := os.ReadFile("g:\\bench\\" + strconv.Itoa(k) + ".json"); err == nil {
return string(bytes), ctx
}

kcsb := NewConnectionStringBuilder("https://help.kusto.windows.net/").WithAzCli()
client, err := New(kcsb)

res, err := client.QueryToJson(context.Background(), "Samples", kql.New("StormEvents | limit ").AddInt(int32(k)),
V2FragmentPrimaryTables(), V2NewlinesBetweenFrames(), ResultsErrorReportingPlacement(ResultsErrorReportingPlacementEndOfTable))
res, err := client.QueryToJson(context.Background(), "Samples", kql.New("StormEvents | take ").AddInt(int32(k)).AddLiteral(" ; StormEvents | take ").AddInt(int32(k)).AddLiteral(" ; StormEvents | take ").AddInt(int32(k)),
V2FragmentPrimaryTables(), V2NewlinesBetweenFrames(), ResultsErrorReportingPlacement(ResultsErrorReportingPlacementEndOfTable), NoTruncation(), NoRequestTimeout())

if err != nil {
panic(err)
}

ctx := context.Background()
err = os.WriteFile("g:\\bench\\"+strconv.Itoa(k)+".json", []byte(res), 0644)
if err != nil {
return "", nil
}

return res, ctx

}

func benchmarkIterative(b *testing.B, k int) {
func benchmarkIterative(b *testing.B, k int, frameCapacity int, rowCapacity int, fragmentCapacity int) {
res, ctx := getData(k)
b.ReportAllocs()
b.ResetTimer()

for k := 0; k < b.N; k++ {
dataset, err := queryv2.NewIterativeDataset(ctx, io.NopCloser(strings.NewReader(res)), queryv2.DefaultFrameCapacity, queryv2.DefaultRowCapacity, queryv2.DefaultFragmentCapacity)
//factor := 10000000 / k
factor := 1

/* results, err := os.Create("g:\\bench\\" + strconv.Itoa(k) + ".results.txt")
if err != nil {
panic(err)
}

for tableResult := range dataset.Tables() {
if tableResult.Err() != nil {
panic(tableResult.Err())
}
defer results.Close()
*/
for u := 0; u < b.N; u++ {
for i := 0; i < factor; i++ {
dataset, err := queryv2.NewIterativeDataset(ctx, io.NopCloser(strings.NewReader(res)), frameCapacity, rowCapacity, fragmentCapacity)

table := tableResult.Table()
if !table.IsPrimaryResult() {
break
if err != nil {
panic(err)
}

c := int32(0)
for res := range table.Rows() {
if res.Err() != nil {
panic(res.Err())
for tableResult := range dataset.Tables() {
if tableResult.Err() != nil {
panic(tableResult.Err())
}
id, err := res.Row().IntByName("EventId")
if err != nil {
panic(err)

table := tableResult.Table()
if !table.IsPrimaryResult() {
break
}
if id == nil || *id == 0 {
panic("invalid id")

c := int32(0)
for res := range table.Rows() {
if res.Err() != nil {
panic(res.Err())
}
id, err := res.Row().IntByName("EventId")
if err != nil {
panic(err)
}
if id == nil || *id == 0 {
panic("invalid id")
}
//results.WriteString(fmt.Sprintf("%d:%d,", c, *id))
c++
}
c++
}
}
}
Expand All @@ -70,7 +95,7 @@ func benchmarkFull(b *testing.B, k int) {
b.ResetTimer()

for k := 0; k < b.N; k++ {
dataset, err := queryv2.NewIterativeDataset(ctx, io.NopCloser(strings.NewReader(res)), queryv2.DefaultFrameCapacity, queryv2.DefaultRowCapacity, queryv2.DefaultFragmentCapacity)
dataset, err := queryv2.NewIterativeDataset(ctx, io.NopCloser(strings.NewReader(res)), queryv2.DefaultIoCapacity, queryv2.DefaultRowCapacity, queryv2.DefaultTableCapacity)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -100,19 +125,105 @@ func benchmarkFull(b *testing.B, k int) {
}
}

func BenchmarkIterative1(b *testing.B) { benchmarkIterative(b, 1) }
func BenchmarkIterative10(b *testing.B) { benchmarkIterative(b, 10) }
func BenchmarkIterative100(b *testing.B) { benchmarkIterative(b, 100) }
func BenchmarkIterative1000(b *testing.B) { benchmarkIterative(b, 1000) }
func BenchmarkIterative10000(b *testing.B) { benchmarkIterative(b, 10000) }
func BenchmarkIterative100000(b *testing.B) { benchmarkIterative(b, 100000) }
// default values

/*
func BenchmarkIterative1(b *testing.B) {
benchmarkIterative(b, 1, queryv2.DefaultIoCapacity, queryv2.DefaultTableCapacity, queryv2.DefaultRowCapacity)
}

func BenchmarkIterative10(b *testing.B) {
benchmarkIterative(b, 10, queryv2.DefaultIoCapacity, queryv2.DefaultTableCapacity, queryv2.DefaultRowCapacity)
}
*/
func BenchmarkIterative100(b *testing.B) {
benchmarkIterative(b, 100, 0, queryv2.DefaultRowCapacity, 1)
}

/*
func BenchmarkIterative1000(b *testing.B) {
benchmarkIterative(b, 1000, 0, queryv2.DefaultTableCapacity, queryv2.DefaultRowCapacity)
}
*/
func BenchmarkIterative10000(b *testing.B) {
benchmarkIterative(b, 10000, 0, queryv2.DefaultRowCapacity, 1)
}

func BenchmarkIterative118132(b *testing.B) {
benchmarkIterative(b, 118132, 0, queryv2.DefaultRowCapacity, 1)
}

/*
func BenchmarkIterative1000000(b *testing.B) {
benchmarkIterative(b, 1000000, queryv2.DefaultIoCapacity, queryv2.DefaultTableCapacity, queryv2.DefaultRowCapacity)
}

func BenchmarkIterative10000000(b *testing.B) {
benchmarkIterative(b, 10000000, queryv2.DefaultIoCapacity, queryv2.DefaultTableCapacity, queryv2.DefaultRowCapacity)
}
*/
func BenchmarkIterativeNoBuffer100(b *testing.B) {
benchmarkIterative(b, 100, 0, 0, 1)
}

func BenchmarkIterative1000000(b *testing.B) { benchmarkIterative(b, 1000000) }
func BenchmarkIterativeNoBuffer10000(b *testing.B) {
benchmarkIterative(b, 10000, 0, 0, 1)
}

func BenchmarkFull1(b *testing.B) { benchmarkFull(b, 1) }
func BenchmarkIterativeNoBuffer118132(b *testing.B) {
benchmarkIterative(b, 118132, 0, 0, 1)
}

/*func BenchmarkIterativeNoBuffer1000000(b *testing.B) {
benchmarkIterative(b, 1000000, 0, 0, 0)
}
*/
/*func BenchmarkIterativeNoBuffer10000000(b *testing.B) {
benchmarkIterative(b, 10000000, 0, 0, 0)
}
*/
func BenchmarkIterativeOneBuffer100(b *testing.B) {
benchmarkIterative(b, 100, 0, 1, 1)
}
func BenchmarkIterativeOneBuffer10000(b *testing.B) {
benchmarkIterative(b, 10000, 0, 1, 1)
}

func BenchmarkIterativeOneBuffer118132(b *testing.B) {
benchmarkIterative(b, 118132, 0, 1, 1)
}

/*func BenchmarkIterativeOneBuffer1000000(b *testing.B) {
benchmarkIterative(b, 1000000, 1, 1, 1)
}
*/
/*func BenchmarkIterativeOneBuffer10000000(b *testing.B) {
benchmarkIterative(b, 10000000, 1, 1, 1)
}
*/
func BenchmarkIterativeBigBuffer100(b *testing.B) {
benchmarkIterative(b, 100, 0, queryv2.DefaultRowCapacity*100, 1)
}
func BenchmarkIterativeBigBuffer10000(b *testing.B) {
benchmarkIterative(b, 10000, 0, queryv2.DefaultRowCapacity*100, 1)
}
func BenchmarkIterativeBigBuffer118132(b *testing.B) {
benchmarkIterative(b, 118132, 0, queryv2.DefaultRowCapacity*100, 1)
}

/*func BenchmarkIterativeBigBuffer1000000(b *testing.B) {
benchmarkIterative(b, 1000000, queryv2.DefaultIoCapacity*100, queryv2.DefaultTableCapacity*100, queryv2.DefaultRowCapacity*100)
}
*/
/*func BenchmarkIterativeBigBuffer10000000(b *testing.B) {
benchmarkIterative(b, 10000000, queryv2.DefaultIoCapacity*100, queryv2.DefaultTableCapacity*100, queryv2.DefaultRowCapacity*100)
}
*/
/*func BenchmarkFull1(b *testing.B) { benchmarkFull(b, 1) }
func BenchmarkFull10(b *testing.B) { benchmarkFull(b, 10) }
func BenchmarkFull100(b *testing.B) { benchmarkFull(b, 100) }
func BenchmarkFull1000(b *testing.B) { benchmarkFull(b, 1000) }
func BenchmarkFull10000(b *testing.B) { benchmarkFull(b, 10000) }
func BenchmarkFull100000(b *testing.B) { benchmarkFull(b, 100000) }
func BenchmarkFull1000000(b *testing.B) { benchmarkFull(b, 1000000) }
*/
Loading
Loading