Skip to content

Commit

Permalink
Stop Distributor on fatal error (#1887)
Browse files Browse the repository at this point in the history
* Stop Distributor on fatal error

* occasionally otel receivers will report a fatal error, and expected
  behavior is that the Host is stopped
* match this behavior by stopping the receiver shim service and letting
  the distributor stop itself

* Basic service supports failing out mid run

* cant use idle service to do this i think
* also add changelog entry
* fix my bad copypaste
  • Loading branch information
rdooley authored Nov 21, 2022
1 parent ee97e3f commit cf6a933
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## main / unreleased
* [BUGFIX] Stop distributors on Otel receiver fatal error[#1887](https:/grafana/tempo/pull/1887) (@rdooley)
* [CHANGE] **BREAKING CHANGE** Use snake case on Azure Storage config [#1879](https:/grafana/tempo/issues/1879) (@faustodavid)
Example of using snake case on Azure Storage config:
```
Expand Down
15 changes: 14 additions & 1 deletion modules/distributor/receiver/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type receiversShim struct {
pusher TracesPusher
logger *log.RateLimitedLogger
metricViews []*view.View
fatal chan error
}

func (r *receiversShim) Capabilities() consumer.Capabilities {
Expand All @@ -94,6 +95,7 @@ func New(receiverCfg map[string]interface{}, pusher TracesPusher, middleware Mid
shim := &receiversShim{
pusher: pusher,
logger: log.NewRateLimitedLogger(logsPerSecond, level.Error(log.Logger)),
fatal: make(chan error),
}

// shim otel observability
Expand Down Expand Up @@ -217,10 +219,20 @@ func New(receiverCfg map[string]interface{}, pusher TracesPusher, middleware Mid
shim.receivers = append(shim.receivers, receiver)
}

shim.Service = services.NewIdleService(shim.starting, shim.stopping)
shim.Service = services.NewBasicService(shim.starting, shim.running, shim.stopping)

return shim, nil
}

func (r *receiversShim) running(ctx context.Context) error {
select {
case err := <-r.fatal:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (r *receiversShim) starting(ctx context.Context) error {
for _, receiver := range r.receivers {
err := receiver.Start(ctx, r)
Expand Down Expand Up @@ -279,6 +291,7 @@ func (r *receiversShim) ConsumeTraces(ctx context.Context, td ptrace.Traces) err
// ReportFatalError implements component.Host
func (r *receiversShim) ReportFatalError(err error) {
_ = level.Error(log.Logger).Log("msg", "fatal error reported", "err", err)
r.fatal <- err
}

// GetFactory implements component.Host
Expand Down

0 comments on commit cf6a933

Please sign in to comment.