Skip to content

Commit

Permalink
Basic service supports failing out mid run
Browse files Browse the repository at this point in the history
* tested this on a private fork and it worked, will report that bits and bobs on the PR on monday
* if you're reading this, have a great weekend
  • Loading branch information
rdooley committed Nov 11, 2022
1 parent 518d214 commit 824de70
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion modules/distributor/receiver/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type receiversShim struct {
pusher TracesPusher
logger *log.RateLimitedLogger
metricViews []*view.View
fatal chan error
}

func (r *receiversShim) Capabilities() consumer.Capabilities {
Expand All @@ -94,6 +95,7 @@ func New(receiverCfg map[string]interface{}, pusher TracesPusher, middleware Mid
shim := &receiversShim{
pusher: pusher,
logger: log.NewRateLimitedLogger(logsPerSecond, level.Error(log.Logger)),
fatal: make(chan error),
}

// shim otel observability
Expand Down Expand Up @@ -217,10 +219,20 @@ func New(receiverCfg map[string]interface{}, pusher TracesPusher, middleware Mid
shim.receivers = append(shim.receivers, receiver)
}

shim.Service = services.NewIdleService(shim.starting, shim.stopping)
shim.Service = services.NewBasicService(shim.starting, shim.running, shim.stopping)

return shim, nil
}

func (r *receiversShim) running(ctx context.Context) error {
select {
case err := <-r.fatal:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (r *receiversShim) starting(ctx context.Context) error {
for _, receiver := range r.receivers {
err := receiver.Start(ctx, r)
Expand Down

0 comments on commit 824de70

Please sign in to comment.