Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory queue: cancel in-progress writes on queue closed, not producer closed #38094

Merged
merged 8 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ fields added to events containing the Beats version. {pull}37553[37553]
- Update github.com/lestrrat-go/jwx dependency. {pull}37799[37799]
- [threatintel] MISP pagination fixes {pull}37898[37898]
- Fix file handle leak when handling errors in filestream {pull}37973[37973]
- Fix a race condition that could crash Filebeat with a "negative WaitGroup counter" error {pull}38094[38094]

*Heartbeat*

Expand Down
39 changes: 27 additions & 12 deletions libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type ackProducer struct {
}

type openState struct {
log *logp.Logger
done chan struct{}
events chan pushRequest
log *logp.Logger
done chan struct{}
queueDone <-chan struct{}
events chan pushRequest
}

// producerID stores the order of events within a single producer, so multiple
Expand All @@ -58,9 +59,10 @@ type ackHandler func(count int)

func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCancel bool) queue.Producer {
openState := openState{
log: b.logger,
done: make(chan struct{}),
events: b.pushChan,
log: b.logger,
done: make(chan struct{}),
queueDone: b.ctx.Done(),
events: b.pushChan,
}

if cb != nil {
Expand Down Expand Up @@ -143,27 +145,40 @@ func (st *openState) Close() {
func (st *openState) publish(req pushRequest) (queue.EntryID, bool) {
select {
case st.events <- req:
// If the output is blocked and the queue is full, `req` is written
// to `st.events`, however the queue never writes back to `req.resp`,
// which effectively blocks for ever. So we also need to select on the
// done channel to ensure we don't miss the shutdown signal.
// The events channel is buffered, which means we may successfully
// write to it even if the queue is shutting down. To avoid blocking
// forever during shutdown, we also have to wait on the queue's
// shutdown channel.
select {
case resp := <-req.resp:
return resp, true
case <-st.done:
case <-st.queueDone:
st.events = nil
return 0, false
}
case <-st.done:
st.events = nil
return 0, false
case <-st.queueDone:
st.events = nil
return 0, false
}
}

func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) {
select {
case st.events <- req:
return <-req.resp, true
// The events channel is buffered, which means we may successfully
// write to it even if the queue is shutting down. To avoid blocking
// forever during shutdown, we also have to wait on the queue's
// shutdown channel.
select {
case resp := <-req.resp:
return resp, true
case <-st.queueDone:
st.events = nil
return 0, false
}
case <-st.done:
st.events = nil
return 0, false
Expand Down
16 changes: 8 additions & 8 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,17 @@ func TestProduceConsumer(t *testing.T) {
t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond)))
}

// TestProducerDoesNotBlockWhenCancelled ensures the producer Publish
// does not block indefinitely.
// TestProducerDoesNotBlockWhenQueueClosed ensures the producer Publish
// does not block indefinitely during queue shutdown.
//
// Once we get a producer `p` from the queue we want to ensure
// Once we get a producer `p` from the queue `q` we want to ensure
// that if p.Publish is called and blocks it will unblock once
// p.Cancel is called.
// `q.Close` is called.
//
// For this test we start a queue with size 2 and try to add more
// than 2 events to it, p.Publish will block, once we call p.Cancel,
// than 2 events to it, p.Publish will block, once we call q.Close,
// we ensure the 3rd event was not successfully published.
func TestProducerDoesNotBlockWhenCancelled(t *testing.T) {
func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) {
q := NewQueue(nil, nil,
Settings{
Events: 2, // Queue size
Expand Down Expand Up @@ -138,8 +138,8 @@ func TestProducerDoesNotBlockWhenCancelled(t *testing.T) {
time.Millisecond,
"the first two events were not successfully published")

// Cancel the producer, this should unblock its Publish method
p.Cancel()
// Close the queue, this should unblock the pending Publish call
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was checking the wrong thing, for the same reason that the select cases were wrong: the producer shouldn't unblock when closed if its queue publish request has already been sent, because the queue can't tell that the producer has given up, so both the producer and the queue would "own" the cleanup for the event. The correct behavior is to unblock when the queue itself is closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a good comment to add to the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

q.Close()

require.Eventually(
t,
Expand Down
Loading