Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Fix flaky tests in message queue #497

Merged
merged 1 commit into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ type dontHaveTimeoutMgr struct {

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr {
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock.New(), nil)
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
Expand Down
51 changes: 40 additions & 11 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/benbjohnson/clock"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
bsnet "github.com/ipfs/go-bitswap/network"
Expand Down Expand Up @@ -92,10 +93,16 @@ type MessageQueue struct {
sender bsnet.MessageSender
rebroadcastIntervalLk sync.RWMutex
rebroadcastInterval time.Duration
rebroadcastTimer *time.Timer
rebroadcastTimer *clock.Timer
// For performance reasons we just clear out the fields of the message
// instead of creating a new one every time.
msg bsmsg.BitSwapMessage

// For simulating time -- uses mock in test
clock clock.Clock

// Used to track things that happen asynchronously -- used only in test
events chan messageEvent
}

// recallWantlist keeps a list of pending wants and a list of sent wants
Expand Down Expand Up @@ -210,10 +217,19 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo
log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p)
onDontHaveTimeout(p, ks)
}
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr)
clock := clock.New()
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, clock)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, clock, nil)
}

type messageEvent int

const (
messageQueued messageEvent = iota
messageFinishedSending
latenciesRecorded
)

// This constructor is used by the tests
func newMessageQueue(
ctx context.Context,
Expand All @@ -222,7 +238,9 @@ func newMessageQueue(
maxMsgSize int,
sendErrorBackoff time.Duration,
maxValidLatency time.Duration,
dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
dhTimeoutMgr DontHaveTimeoutManager,
clock clock.Clock,
events chan messageEvent) *MessageQueue {

ctx, cancel := context.WithCancel(ctx)
return &MessageQueue{
Expand All @@ -243,7 +261,9 @@ func newMessageQueue(
priority: maxPriority,
// For performance reasons we just clear out the fields of the message
// after using it, instead of creating a new one every time.
msg: bsmsg.New(false),
msg: bsmsg.New(false),
clock: clock,
events: events,
}
}

Expand Down Expand Up @@ -368,7 +388,7 @@ func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
// Startup starts the processing of messages and rebroadcasting.
func (mq *MessageQueue) Startup() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastTimer = time.NewTimer(mq.rebroadcastInterval)
mq.rebroadcastTimer = mq.clock.Timer(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
go mq.runQueue()
}
Expand All @@ -392,7 +412,7 @@ func (mq *MessageQueue) runQueue() {
defer mq.onShutdown()

// Create a timer for debouncing scheduled work.
scheduleWork := time.NewTimer(0)
scheduleWork := mq.clock.Timer(0)
if !scheduleWork.Stop() {
// Need to drain the timer if Stop() returns false
// See: https://golang.org/pkg/time/#Timer.Stop
Expand Down Expand Up @@ -420,12 +440,15 @@ func (mq *MessageQueue) runQueue() {
// If we have too many updates and/or we've waited too
// long, send immediately.
if mq.pendingWorkCount() > sendMessageCutoff ||
time.Since(workScheduled) >= sendMessageMaxDelay {
mq.clock.Since(workScheduled) >= sendMessageMaxDelay {
mq.sendIfReady()
workScheduled = time.Time{}
} else {
// Otherwise, extend the timer.
scheduleWork.Reset(sendMessageDebounce)
if mq.events != nil {
mq.events <- messageQueued
}
}

case <-scheduleWork.C:
Expand Down Expand Up @@ -476,7 +499,7 @@ func (mq *MessageQueue) transferRebroadcastWants() bool {

func (mq *MessageQueue) signalWorkReady() {
select {
case mq.outgoingWork <- time.Now():
case mq.outgoingWork <- mq.clock.Now():
default:
}
}
Expand Down Expand Up @@ -566,7 +589,7 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
// handleResponse is called when a response is received from the peer,
// with the CIDs of received blocks / HAVEs / DONT_HAVEs
func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
now := time.Now()
now := mq.clock.Now()
earliest := time.Time{}

mq.wllock.Lock()
Expand Down Expand Up @@ -606,6 +629,9 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
// Inform the timeout manager of the calculated latency
mq.dhTimeoutMgr.UpdateMessageLatency(now.Sub(earliest))
}
if mq.events != nil {
mq.events <- latenciesRecorded
}
}

func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
Expand Down Expand Up @@ -787,7 +813,7 @@ FINISH:
// When the message has been sent, record the time at which each want was
// sent so we can calculate message latency
onSent := func() {
now := time.Now()
now := mq.clock.Now()

mq.wllock.Lock()
defer mq.wllock.Unlock()
Expand All @@ -803,6 +829,9 @@ FINISH:
mq.bcstWants.SentAt(e.Cid, now)
}
}
if mq.events != nil {
mq.events <- messageFinishedSending
}
}

return mq.msg, onSent
Expand Down
Loading