-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
108 lines (101 loc) · 3.48 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/ratelimit"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/awslabs/kinesis-hot-shard-advisor/analyse"
"github.com/awslabs/kinesis-hot-shard-advisor/analyse/aggregator"
"github.com/awslabs/kinesis-hot-shard-advisor/analyse/service"
)
var opts = &options{}
func init() {
flag.StringVar(&opts.Stream, "stream", "", "Stream name")
flag.IntVar(&opts.Limit, "limit", 10, "Number of keys to output in key distribution graph (Optional).")
flag.BoolVar(&opts.CMS, "cms", true, "Use count-min-sketch (Optional) algorithm for counting key distribution (Optional). Default is false. Use this method to avoid OOM condition when analysing busy streams with high cardinality.")
flag.StringVar(&opts.Start, "from", "", "Start time in yyyy-mm-dd hh:mm format (Optional). Default value is current time - 5 minutes.")
flag.StringVar(&opts.End, "to", "", "End time in yyyy-mm-dd hh:mm format (Optional). Default value is current time.")
flag.StringVar(&opts.Out, "out", "out.html", "Path to output file (Optional). Default is out.html.")
flag.StringVar(&opts.SIDs, "shard-ids", "", "Comma separated list of shard ids to analyse.")
flag.IntVar(&opts.Top, "top", 10, "Number of shards to emit to the report(Optional). Use 0 to emit all shards. Emitting all shards can result in a large file that may take a lot of system resources to view in the browser.")
flag.IntVar(&opts.MaxWorkers, "max-workers", 0, "Maximum number of workers to read shards concurrently. Set value accordingly to optimise host's resource utilisation")
flag.BoolVar(&opts.AggregateKeys, "aggregate-keys", false, "Use this flag to aggregate keys.")
}
func aggregatorBuilder(start, end time.Time) func() []service.Aggregator {
return func() []service.Aggregator {
aggregators := make([]service.Aggregator, 0)
if opts.AggregateKeys {
if opts.CMS {
c, err := aggregator.NewCMSByKey(5, 10000, opts.Limit)
if err != nil {
panic(err)
}
aggregators = append(aggregators, c)
} else {
aggregators = append(aggregators, aggregator.NewCountByKey())
}
}
aggregators = append(aggregators, aggregator.NewBytesPerSecond(start, end), aggregator.NewCountPerSecond(start, end))
return aggregators
}
}
func main() {
var (
err error
ctx context.Context
)
flag.Parse()
if !opts.Validate() {
os.Exit(1)
}
start, end, err := opts.Period()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
// Create a context that can be used
// to handle SIGINT.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
<-signals
cancel()
}()
cfg, err := config.LoadDefaultConfig(ctx, config.WithRetryer(func() aws.Retryer {
return retry.NewStandard(func(so *retry.StandardOptions) {
so.RateLimiter = ratelimit.NewTokenRateLimit(1000000)
})
}))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
cmd := analyse.NewCMD(
opts.Stream,
kinesis.NewFromConfig(cfg),
service.NewHTMLReporter(opts.Out),
aggregatorBuilder(start, end),
opts.Limit,
opts.Top,
start,
end,
opts.ShardIDs(),
opts.CalculateMaxWorkers())
err = cmd.Start(ctx)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
os.Exit(0)
}