Skip to content

Commit

Permalink
[ISSUE #271] Fix bugs that can't consume multiple topics in single co…
Browse files Browse the repository at this point in the history
…nsumer (#310)

* fix issue 271
  • Loading branch information
wenfengwang authored and ShannonDing committed Nov 26, 2019
1 parent c9b9f93 commit 82bd638
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 10 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ env:

before_script:
- cd ${TRAVIS_HOME}
- wget http://us.mirrors.quenda.co/apache/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip
- unzip rocketmq-all-4.5.2-bin-release.zip
- cd rocketmq-all-4.5.2-bin-release
- wget http://us.mirrors.quenda.co/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip
- unzip rocketmq-all-4.6.0-bin-release.zip
- cd rocketmq-all-4.6.0-bin-release
- perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
- nohup sh bin/mqnamesrv &
- nohup sh bin/mqbroker -n localhost:9876 &
Expand Down
43 changes: 36 additions & 7 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/apache/rocketmq-client-go/internal"
"github.com/apache/rocketmq-client-go/internal/remote"
"github.com/apache/rocketmq-client-go/internal/utils"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
)
Expand All @@ -46,11 +47,20 @@ const (
Mb = 1024 * 1024
)

type PushConsumerCallback struct {
topic string
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)
}

func (callback PushConsumerCallback) UniqueID() string {
return callback.topic
}

type pushConsumer struct {
*defaultConsumer
queueFlowControlTimes int
queueMaxSpanFlowControlTimes int
consume func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)
consumeFunc utils.Set
submitToConsume func(*processQueue, *primitive.MessageQueue)
subscribedTopic map[string]string
interceptor primitive.Interceptor
Expand Down Expand Up @@ -98,6 +108,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
queueLock: newQueueLock(),
lockTicker: time.NewTicker(dc.option.RebalanceLockInterval),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
Expand All @@ -111,7 +122,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
return p, nil
}

// TODO: add shutdown on pushConsumer.
func (pc *pushConsumer) Start() error {
var err error
pc.once.Do(func() {
Expand All @@ -123,13 +133,14 @@ func (pc *pushConsumer) Start() error {
pc.state = internal.StateStartFailed
pc.validate()

err := pc.client.RegisterConsumer(pc.consumerGroup, pc)
err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
pc.state = internal.StateStartFailed
rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
err = ErrCreated
return
}

err = pc.defaultConsumer.start()
Expand Down Expand Up @@ -160,7 +171,6 @@ func (pc *pushConsumer) Start() error {

go func() {
// todo start clean msg expired
// TODO quit
for {
select {
case pr := <-pc.prCh:
Expand All @@ -177,6 +187,10 @@ func (pc *pushConsumer) Start() error {
}()
})

if err != nil {
return err
}

pc.client.UpdateTopicRouteInfo()
for k := range pc.subscribedTopic {
_, exist := pc.topicSubscribeInfoTable.Load(k)
Expand Down Expand Up @@ -224,7 +238,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
pc.subscribedTopic[retryTopic] = ""
}

pc.consume = f
pc.consumeFunc.Add(&PushConsumerCallback{
f: f,
topic: topic,
})
return nil
}

Expand Down Expand Up @@ -752,14 +769,26 @@ func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue
}

func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {
if len(subMsgs) == 0 {
return ConsumeRetryLater, errors.New("msg list empty")
}

f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
if !exist {
return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
}

callback, ok := f.(*PushConsumerCallback)
if !ok {
return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)
}
if pc.interceptor == nil {
return pc.consume(ctx, subMsgs...)
return callback.f(ctx, subMsgs...)
} else {
var container ConsumeResultHolder
err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {
msgs := req.([]*primitive.MessageExt)
r, e := pc.consume(ctx, msgs...)
r, e := callback.f(ctx, msgs...)

realReply := reply.(*ConsumeResultHolder)
realReply.ConsumeResult = r
Expand Down
7 changes: 7 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,13 @@ func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, re
}

func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error {
_, exist := c.consumerMap.Load(group)
if exist {
rlog.Warning("the consumer group exist already", map[string]interface{}{
rlog.LogKeyConsumerGroup: group,
})
return fmt.Errorf("the consumer group exist already")
}
c.consumerMap.Store(group, consumer)
return nil
}
Expand Down

0 comments on commit 82bd638

Please sign in to comment.