diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index be2cf613..3221c6c9 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -770,6 +770,8 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { goto NEXT } + pc.processPullResult(request.mq, result, sd) + switch result.Status { case primitive.PullFound: rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil) @@ -779,8 +781,6 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { rt := time.Now().Sub(beginTime) / time.Millisecond pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt)) - pc.processPullResult(request.mq, result, sd) - msgFounded := result.GetMessageExts() firstMsgOffset := int64(math.MaxInt64) if len(msgFounded) != 0 {