Skip to content

Commit

Permalink
feat(client): add mutex for rebalance (#316)
Browse files Browse the repository at this point in the history
- add rbMutex for client rebalance
- use atomic value to avoid concurrency

Closes #313
  • Loading branch information
xujianhai666 authored and wenfengwang committed Dec 10, 2019
1 parent bda9f6a commit fdbb58a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
10 changes: 4 additions & 6 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
if exist {
pq := v.(*processQueue)
pq.WithLock(true)
pq.lastConsumeTime = time.Now()
pq.lastLockTime = time.Now()
pq.UpdateLastConsumeTime()
pq.UpdateLastLockTime()
}
if _mq.Equals(mq) {
lockOK = true
Expand Down Expand Up @@ -532,7 +532,7 @@ func (dc *defaultConsumer) lockAll() {
if exist {
pq := v.(*processQueue)
pq.WithLock(true)
pq.lastConsumeTime = time.Now()
pq.UpdateLastConsumeTime()
}
set[_mq] = true
}
Expand All @@ -543,7 +543,7 @@ func (dc *defaultConsumer) lockAll() {
if exist {
pq := v.(*processQueue)
pq.WithLock(false)
pq.lastLockTime = time.Now()
pq.UpdateLastLockTime()
rlog.Info("lock MessageQueue", map[string]interface{}{
"lockOK": false,
rlog.LogKeyConsumerGroup: dc.consumerGroup,
Expand Down Expand Up @@ -653,14 +653,12 @@ func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() map[string][]*pr
return result
}

// TODO 问题不少 需要再好好对一下
func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitive.MessageQueue) bool {
var changed bool
mqSet := make(map[primitive.MessageQueue]bool)
for idx := range mqs {
mqSet[*mqs[idx]] = true
}
// TODO
dc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
Expand Down
40 changes: 31 additions & 9 deletions consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ type processQueue struct {
queueOffsetMax int64
dropped *uatomic.Bool
lastPullTime time.Time
lastConsumeTime time.Time
lastConsumeTime atomic.Value
locked *uatomic.Bool
lastLockTime time.Time
lastLockTime atomic.Value
consuming bool
msgAccCnt int64
lockConsume sync.Mutex
Expand All @@ -63,11 +63,17 @@ type processQueue struct {
func newProcessQueue(order bool) *processQueue {
consumingMsgOrderlyTreeMap := treemap.NewWith(gods_util.Int64Comparator)

lastConsumeTime := atomic.Value{}
lastConsumeTime.Store(time.Now())

lastLockTime := atomic.Value{}
lastLockTime.Store(time.Now())

pq := &processQueue{
msgCache: treemap.NewWith(utils.Int64Comparator),
lastPullTime: time.Now(),
lastConsumeTime: time.Now(),
lastLockTime: time.Now(),
lastConsumeTime: lastConsumeTime,
lastLockTime: lastLockTime,
msgCh: make(chan []*primitive.MessageExt, 32),
consumingMsgOrderlyTreeMap: consumingMsgOrderlyTreeMap,
order: order,
Expand Down Expand Up @@ -131,6 +137,22 @@ func (pq *processQueue) IsDroppd() bool {
return pq.dropped.Load()
}

func (pq *processQueue) UpdateLastConsumeTime() {
pq.lastConsumeTime.Store(time.Now())
}

func (pq *processQueue) LastConsumeTime() time.Time {
return pq.lastConsumeTime.Load().(time.Time)
}

func (pq *processQueue) UpdateLastLockTime() {
pq.lastLockTime.Store(time.Now())
}

func (pq *processQueue) LastLockTime() time.Time {
return pq.lastLockTime.Load().(time.Time)
}

func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageExt) {
pq.mutex.Lock()
for _, msg := range messages {
Expand All @@ -144,7 +166,7 @@ func (pq *processQueue) makeMessageToCosumeAgain(messages ...*primitive.MessageE
func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
result := int64(-1)
pq.mutex.Lock()
pq.lastConsumeTime = time.Now()
pq.UpdateLastConsumeTime()
if !pq.msgCache.Empty() {
result = pq.queueOffsetMax + 1
removedCount := 0
Expand All @@ -169,7 +191,7 @@ func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
}

func (pq *processQueue) isLockExpired() bool {
return time.Now().Sub(pq.lastLockTime) > _RebalanceLockMaxTime
return time.Now().Sub(pq.LastLockTime()) > _RebalanceLockMaxTime
}

func (pq *processQueue) isPullExpired() bool {
Expand Down Expand Up @@ -332,10 +354,10 @@ func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
info := internal.ProcessQueueInfo{
Locked: pq.locked.Load(),
TryUnlockTimes: pq.tryUnlockTimes,
LastLockTimestamp: pq.lastLockTime.UnixNano() / 10e6,
LastLockTimestamp: pq.LastLockTime().UnixNano() / int64(time.Millisecond),
Dropped: pq.dropped.Load(),
LastPullTimestamp: pq.lastPullTime.UnixNano() / 10e6,
LastConsumeTimestamp: pq.lastConsumeTime.UnixNano() / 10e6,
LastPullTimestamp: pq.lastPullTime.UnixNano() / int64(time.Millisecond),
LastConsumeTimestamp: pq.LastConsumeTime().UnixNano() / int64(time.Millisecond),
}

if !pq.msgCache.Empty() {
Expand Down
3 changes: 3 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ type rmqClient struct {
remoteClient remote.RemotingClient
hbMutex sync.Mutex
close bool
rbMutex sync.Mutex
namesrvs *namesrvs
done chan struct{}
shutdownOnce sync.Once
Expand Down Expand Up @@ -618,6 +619,8 @@ func (c *rmqClient) SelectConsumer(group string) InnerConsumer {
}

func (c *rmqClient) RebalanceImmediately() {
c.rbMutex.Lock()
defer c.rbMutex.Unlock()
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
consumer.Rebalance()
Expand Down

0 comments on commit fdbb58a

Please sign in to comment.