Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
fix(messagequeue): fix flaky MessageQueue tests
Browse files Browse the repository at this point in the history
mock time in message queue to fix tests
  • Loading branch information
hannahhoward committed Jun 5, 2021
1 parent f60e47c commit 85f4d84
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 62 deletions.
4 changes: 2 additions & 2 deletions internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ type dontHaveTimeoutMgr struct {

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr {
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock.New(), nil)
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
Expand Down
51 changes: 40 additions & 11 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/benbjohnson/clock"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
bsnet "github.com/ipfs/go-bitswap/network"
Expand Down Expand Up @@ -92,10 +93,16 @@ type MessageQueue struct {
sender bsnet.MessageSender
rebroadcastIntervalLk sync.RWMutex
rebroadcastInterval time.Duration
rebroadcastTimer *time.Timer
rebroadcastTimer *clock.Timer
// For performance reasons we just clear out the fields of the message
// instead of creating a new one every time.
msg bsmsg.BitSwapMessage

// For simulating time -- uses mock in test
clock clock.Clock

// Used to track things that happen asynchronously -- used only in test
events chan messageEvent
}

// recallWantlist keeps a list of pending wants and a list of sent wants
Expand Down Expand Up @@ -210,10 +217,19 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo
log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p)
onDontHaveTimeout(p, ks)
}
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr)
clock := clock.New()
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, clock)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, clock, nil)
}

type messageEvent int

const (
messageQueued messageEvent = iota
messageFinishedSending
latenciesRecorded
)

// This constructor is used by the tests
func newMessageQueue(
ctx context.Context,
Expand All @@ -222,7 +238,9 @@ func newMessageQueue(
maxMsgSize int,
sendErrorBackoff time.Duration,
maxValidLatency time.Duration,
dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
dhTimeoutMgr DontHaveTimeoutManager,
clock clock.Clock,
events chan messageEvent) *MessageQueue {

ctx, cancel := context.WithCancel(ctx)
return &MessageQueue{
Expand All @@ -243,7 +261,9 @@ func newMessageQueue(
priority: maxPriority,
// For performance reasons we just clear out the fields of the message
// after using it, instead of creating a new one every time.
msg: bsmsg.New(false),
msg: bsmsg.New(false),
clock: clock,
events: events,
}
}

Expand Down Expand Up @@ -368,7 +388,7 @@ func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
// Startup starts the processing of messages and rebroadcasting.
func (mq *MessageQueue) Startup() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastTimer = time.NewTimer(mq.rebroadcastInterval)
mq.rebroadcastTimer = mq.clock.Timer(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
go mq.runQueue()
}
Expand All @@ -392,7 +412,7 @@ func (mq *MessageQueue) runQueue() {
defer mq.onShutdown()

// Create a timer for debouncing scheduled work.
scheduleWork := time.NewTimer(0)
scheduleWork := mq.clock.Timer(0)
if !scheduleWork.Stop() {
// Need to drain the timer if Stop() returns false
// See: https://golang.org/pkg/time/#Timer.Stop
Expand Down Expand Up @@ -420,12 +440,15 @@ func (mq *MessageQueue) runQueue() {
// If we have too many updates and/or we've waited too
// long, send immediately.
if mq.pendingWorkCount() > sendMessageCutoff ||
time.Since(workScheduled) >= sendMessageMaxDelay {
mq.clock.Since(workScheduled) >= sendMessageMaxDelay {
mq.sendIfReady()
workScheduled = time.Time{}
} else {
// Otherwise, extend the timer.
scheduleWork.Reset(sendMessageDebounce)
if mq.events != nil {
mq.events <- messageQueued
}
}

case <-scheduleWork.C:
Expand Down Expand Up @@ -476,7 +499,7 @@ func (mq *MessageQueue) transferRebroadcastWants() bool {

func (mq *MessageQueue) signalWorkReady() {
select {
case mq.outgoingWork <- time.Now():
case mq.outgoingWork <- mq.clock.Now():
default:
}
}
Expand Down Expand Up @@ -566,7 +589,7 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
// handleResponse is called when a response is received from the peer,
// with the CIDs of received blocks / HAVEs / DONT_HAVEs
func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
now := time.Now()
now := mq.clock.Now()
earliest := time.Time{}

mq.wllock.Lock()
Expand Down Expand Up @@ -606,6 +629,9 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
// Inform the timeout manager of the calculated latency
mq.dhTimeoutMgr.UpdateMessageLatency(now.Sub(earliest))
}
if mq.events != nil {
mq.events <- latenciesRecorded
}
}

func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
Expand Down Expand Up @@ -787,7 +813,7 @@ FINISH:
// When the message has been sent, record the time at which each want was
// sent so we can calculate message latency
onSent := func() {
now := time.Now()
now := mq.clock.Now()

mq.wllock.Lock()
defer mq.wllock.Unlock()
Expand All @@ -803,6 +829,9 @@ FINISH:
mq.bcstWants.SentAt(e.Cid, now)
}
}
if mq.events != nil {
mq.events <- messageFinishedSending
}
}

return mq.msg, onSent
Expand Down
Loading

0 comments on commit 85f4d84

Please sign in to comment.