Skip to content

Commit

Permalink
[ISSUE #487] Implement Unsubscribe method for push consumer (#626)
Browse files Browse the repository at this point in the history
* Implement Unsubscribe method for push consumer
  • Loading branch information
maixiaohai authored Mar 16, 2021
1 parent c97d492 commit 8019e59
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
13 changes: 12 additions & 1 deletion consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
return errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
}

// add retry topic subscription for resubscribe
retryTopic := internal.GetRetryTopic(pc.consumerGroup)
_, exists := pc.subscriptionDataTable.Load(retryTopic)
if !exists {
sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
pc.subscriptionDataTable.Store(retryTopic, sub)
}

if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}
Expand All @@ -241,7 +249,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
return nil
}

func (pc *pushConsumer) Unsubscribe(string) error {
func (pc *pushConsumer) Unsubscribe(topic string) error {
pc.subscriptionDataTable.Delete(topic)
retryTopic := internal.GetRetryTopic(pc.consumerGroup)
pc.subscriptionDataTable.Delete(retryTopic)
return nil
}

Expand Down
17 changes: 17 additions & 0 deletions consumer/push_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ func TestStart(t *testing.T) {
return ConsumeSuccess, nil
})

_, exists := c.subscriptionDataTable.Load("TopicTest")
So(exists, ShouldBeTrue)

err = c.Unsubscribe("TopicTest")
So(err, ShouldBeNil)
_, exists = c.subscriptionDataTable.Load("TopicTest")
So(exists, ShouldBeFalse)

err = c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (ConsumeResult, error) {
fmt.Printf("subscribe callback: %v \n", msgs)
return ConsumeSuccess, nil
})

_, exists = c.subscriptionDataTable.Load("TopicTest")
So(exists, ShouldBeTrue)

client.EXPECT().ClientID().Return("127.0.0.1@DEFAULT")
client.EXPECT().Start().Return()
client.EXPECT().RegisterConsumer(gomock.Any(), gomock.Any()).Return(nil)
Expand Down

0 comments on commit 8019e59

Please sign in to comment.