diff --git a/.travis.yml b/.travis.yml index e68fd314..2562e74c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,9 +19,9 @@ env: before_script: - cd ${TRAVIS_HOME} - - wget http://us.mirrors.quenda.co/apache/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip - - unzip rocketmq-all-4.5.2-bin-release.zip - - cd rocketmq-all-4.5.2-bin-release + - wget http://us.mirrors.quenda.co/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip + - unzip rocketmq-all-4.6.0-bin-release.zip + - cd rocketmq-all-4.6.0-bin-release - perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh - nohup sh bin/mqnamesrv & - nohup sh bin/mqbroker -n localhost:9876 & diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index aa999c5b..3c5b79a4 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -29,6 +29,7 @@ import ( "github.com/apache/rocketmq-client-go/internal" "github.com/apache/rocketmq-client-go/internal/remote" + "github.com/apache/rocketmq-client-go/internal/utils" "github.com/apache/rocketmq-client-go/primitive" "github.com/apache/rocketmq-client-go/rlog" ) @@ -46,11 +47,20 @@ const ( Mb = 1024 * 1024 ) +type PushConsumerCallback struct { + topic string + f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error) +} + +func (callback PushConsumerCallback) UniqueID() string { + return callback.topic +} + type pushConsumer struct { *defaultConsumer queueFlowControlTimes int queueMaxSpanFlowControlTimes int - consume func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error) + consumeFunc utils.Set submitToConsume func(*processQueue, *primitive.MessageQueue) subscribedTopic map[string]string interceptor primitive.Interceptor @@ -98,6 +108,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) { queueLock: newQueueLock(), lockTicker: time.NewTicker(dc.option.RebalanceLockInterval), done: make(chan struct{}, 1), + consumeFunc: utils.NewSet(), } dc.mqChanged = p.messageQueueChanged if p.consumeOrderly { @@ -111,7 +122,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) { return p, nil } -// TODO: add shutdown on pushConsumer. func (pc *pushConsumer) Start() error { var err error pc.once.Do(func() { @@ -123,13 +133,14 @@ func (pc *pushConsumer) Start() error { pc.state = internal.StateStartFailed pc.validate() - err := pc.client.RegisterConsumer(pc.consumerGroup, pc) + err = pc.client.RegisterConsumer(pc.consumerGroup, pc) if err != nil { pc.state = internal.StateStartFailed rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{ rlog.LogKeyConsumerGroup: pc.consumerGroup, }) err = ErrCreated + return } err = pc.defaultConsumer.start() @@ -160,7 +171,6 @@ func (pc *pushConsumer) Start() error { go func() { // todo start clean msg expired - // TODO quit for { select { case pr := <-pc.prCh: @@ -177,6 +187,10 @@ func (pc *pushConsumer) Start() error { }() }) + if err != nil { + return err + } + pc.client.UpdateTopicRouteInfo() for k := range pc.subscribedTopic { _, exist := pc.topicSubscribeInfoTable.Load(k) @@ -224,7 +238,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector, pc.subscribedTopic[retryTopic] = "" } - pc.consume = f + pc.consumeFunc.Add(&PushConsumerCallback{ + f: f, + topic: topic, + }) return nil } @@ -752,14 +769,26 @@ func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue } func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) { + if len(subMsgs) == 0 { + return ConsumeRetryLater, errors.New("msg list empty") + } + f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic) + if !exist { + return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic) + } + + callback, ok := f.(*PushConsumerCallback) + if !ok { + return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic) + } if pc.interceptor == nil { - return pc.consume(ctx, subMsgs...) + return callback.f(ctx, subMsgs...) } else { var container ConsumeResultHolder err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error { msgs := req.([]*primitive.MessageExt) - r, e := pc.consume(ctx, msgs...) + r, e := callback.f(ctx, msgs...) realReply := reply.(*ConsumeResultHolder) realReply.ConsumeResult = r diff --git a/internal/client.go b/internal/client.go index f66e8b04..33369440 100644 --- a/internal/client.go +++ b/internal/client.go @@ -565,6 +565,13 @@ func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, re } func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error { + _, exist := c.consumerMap.Load(group) + if exist { + rlog.Warning("the consumer group exist already", map[string]interface{}{ + rlog.LogKeyConsumerGroup: group, + }) + return fmt.Errorf("the consumer group exist already") + } c.consumerMap.Store(group, consumer) return nil }