Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Ingesters Occassionally Double Flushing #364

Merged
merged 14 commits into from
Nov 24, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [ENHANCEMENT] Add support for S3 V2 signatures. [#352](https:/grafana/tempo/pull/352)
* [BUGFIX] Frequent errors logged by compactor regarding meta not found [#327](https:/grafana/tempo/pull/327)
* [BUGFIX] Fix distributors panicking on rollout [#343](https:/grafana/tempo/pull/343)
* [BUGFIX] Fix ingesters occassionally double flushing [#364](https:/grafana/tempo/pull/364)

## v0.3.0

Expand Down
20 changes: 10 additions & 10 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (i *Ingester) Flush() {
}
}

// FlushHandler triggers a flush of all in memory chunks. Mainly used for
// local testing.
// FlushHandler calls sweepUsers(true) which will force push all traces into the WAL and force
// mark all head blocks as ready to flush.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
i.sweepUsers(true)
w.WriteHeader(http.StatusNoContent)
Expand Down Expand Up @@ -104,9 +104,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {

// see if any complete blocks are ready to be flushed
if instance.GetBlockToBeFlushed() != nil {
i.flushQueueIndex++
flushQueueIndex := i.flushQueueIndex % i.cfg.ConcurrentFlushes
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{
i.flushQueues.Enqueue(&flushOp{
time.Now().Unix(),
instance.instanceID,
})
Expand All @@ -120,23 +118,25 @@ func (i *Ingester) flushLoop(j int) {
}()

for {
o := i.flushQueues[j].Dequeue()
o := i.flushQueues.Dequeue(j)
if o == nil {
return
}
op := o.(*flushOp)

level.Debug(util.Logger).Log("msg", "flushing stream", "userid", op.userID, "fp")
level.Debug(util.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")

err := i.flushUserTraces(op.userID)
if err != nil {
level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err)
}

if err != nil {
// re-queue failed flush
op.from += int64(flushBackoff)
i.flushQueues[j].Enqueue(op)
i.flushQueues.Requeue(op)
continue
}

i.flushQueues.ClearKey(op.Key())
}
}

Expand Down
13 changes: 8 additions & 5 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/validation"
tempodb_wal "github.com/grafana/tempo/tempodb/wal"
Expand Down Expand Up @@ -47,9 +48,7 @@ type Ingester struct {
lifecycler *ring.Lifecycler
store storage.Store

// One queue per flush thread.
flushQueues []*util.PriorityQueue
flushQueueIndex int
flushQueues *flushqueues.ExclusiveQueues
flushQueuesDone sync.WaitGroup

limiter *Limiter
Expand All @@ -63,12 +62,11 @@ func New(cfg Config, store storage.Store, limits *overrides.Overrides) (*Ingeste
cfg: cfg,
instances: map[string]*instance{},
store: store,
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength),
}

i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
for j := 0; j < cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(metricFlushQueueLength)
go i.flushLoop(j)
}

Expand Down Expand Up @@ -136,6 +134,11 @@ func (i *Ingester) stopping(_ error) error {
return services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}

if i.flushQueues != nil {
i.flushQueues.Stop()
i.flushQueuesDone.Wait()
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type instance struct {
headBlock *tempodb_wal.AppendBlock
completingBlock *tempodb_wal.AppendBlock
completeBlocks []*tempodb_wal.CompleteBlock
lastBlockCut time.Time

lastBlockCut time.Time

instanceID string
tracesCreatedTotal prometheus.Counter
Expand Down
63 changes: 63 additions & 0 deletions pkg/flushqueues/exclusivequeues.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package flushqueues

import (
"sync"

"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)

type ExclusiveQueues struct {
queues []*util.PriorityQueue
index int
activeKeys sync.Map
}

// New creates a new set of flush queues with a prom gauge to track current depth
func New(queues int, metric prometheus.Gauge) *ExclusiveQueues {
f := &ExclusiveQueues{
queues: make([]*util.PriorityQueue, queues),
}

for j := 0; j < queues; j++ {
f.queues[j] = util.NewPriorityQueue(metric)
}

return f
}

// Enqueue adds the op to the next queue and prevents any other items to be added with this key
func (f *ExclusiveQueues) Enqueue(op util.Op) {
_, ok := f.activeKeys.Load(op.Key())
if ok {
return
}

f.activeKeys.Store(op.Key(), struct{}{})
f.Requeue(op)
}

// Dequeue removes the next op from the requested queue. After dequeueing the calling
// process either needs to call ClearKey or Requeue
func (f *ExclusiveQueues) Dequeue(q int) util.Op {
return f.queues[q].Dequeue()
}

// Requeue adds an op that is presumed to already be covered by activeKeys
func (f *ExclusiveQueues) Requeue(op util.Op) {
f.index++
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
flushQueueIndex := f.index % len(f.queues)
f.queues[flushQueueIndex].Enqueue(op)
}

// ClearKey unblocks the requested key. This should be called only after a flush has been successful
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
func (f *ExclusiveQueues) ClearKey(key string) {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
f.activeKeys.Delete(key)
}

// Stop closes all queues
func (f *ExclusiveQueues) Stop() {
for _, q := range f.queues {
q.Close()
}
}
106 changes: 106 additions & 0 deletions pkg/flushqueues/exclusivequeues_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package flushqueues

import (
"testing"

"github.com/google/uuid"
"github.com/grafana/tempo/pkg/util/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)

type mockOp struct {
key string
}

func (m mockOp) Key() string {
return m.key
}

func (m mockOp) Priority() int64 {
return 0
}

func TestExclusiveQueues(t *testing.T) {
gauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "testersons",
})

q := New(1, gauge)
op := mockOp{
key: "not unique",
}

// enqueue twice
q.Enqueue(op)
length, err := test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 1, int(length))

q.Enqueue(op)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 1, int(length))

// dequeue -> requeue
_ = q.Dequeue(0)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 0, int(length))

q.Requeue(op)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 1, int(length))

// dequeue -> clearkey -> enqueue
_ = q.Dequeue(0)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 0, int(length))

q.ClearKey(op.key)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 0, int(length))

q.Enqueue(op)
length, err = test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, 1, int(length))
}

func TestMultipleQueues(t *testing.T) {
gauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "test",
Name: "testersons",
})

totalQueues := 10
totalItems := 10
q := New(totalQueues, gauge)

// add stuff to the queue and confirm the length matches expected
for i := 0; i < totalItems; i++ {
op := mockOp{
key: uuid.New().String(),
}

q.Enqueue(op)

length, err := test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, i+1, int(length))
}

// each queue should have 1 thing
for i := 0; i < totalQueues; i++ {
op := q.Dequeue(i)
assert.NotNil(t, op)

length, err := test.GetGaugeValue(gauge)
assert.NoError(t, err)
assert.Equal(t, totalQueues-(i+1), int(length))
}
}
32 changes: 32 additions & 0 deletions pkg/util/test/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package test

import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

func GetCounterValue(metric prometheus.Counter) (float64, error) {
var m = &dto.Metric{}
err := metric.Write(m)
if err != nil {
return 0, err
}
return m.Counter.GetValue(), nil
}

func GetGaugeValue(metric prometheus.Gauge) (float64, error) {
var m = &dto.Metric{}
err := metric.Write(m)
if err != nil {
return 0, err
}
return m.Gauge.GetValue(), nil
}

func GetCounterVecValue(metric *prometheus.CounterVec, label string) (float64, error) {
var m = &dto.Metric{}
if err := metric.WithLabelValues(label).Write(m); err != nil {
return 0, err
}
return m.Counter.GetValue(), nil
}
Loading