Skip to content

Commit

Permalink
feat(producer): auto create topic for producer
Browse files Browse the repository at this point in the history
Closes #415
  • Loading branch information
Jianhai Xu authored and Jianhai Xu committed Feb 22, 2020
1 parent adc707f commit 626dcdf
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 26 deletions.
12 changes: 9 additions & 3 deletions examples/producer/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"os"
"strconv"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
Expand All @@ -38,9 +39,14 @@ func main() {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 1000; i++ {
res, err := p.SendSync(context.Background(), primitive.NewMessage("test",
[]byte("Hello RocketMQ Go Client!")))
topic := "test"

for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
return true
})
for topic := range publishTopicSet {
data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
c.UpdatePublishInfo(topic, data, changed)
}

Expand All @@ -514,7 +514,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
})

for topic := range subscribedTopicSet {
data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
c.updateSubscribeInfo(topic, data, changed)
}
}
Expand Down
32 changes: 26 additions & 6 deletions internal/mock_namesrv.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/namesrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Namesrvs interface {

cleanOfflineBroker()

UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool)
UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool, err error)

FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)

Expand Down
38 changes: 30 additions & 8 deletions internal/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,45 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
return int(qIndex) % length
}

func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool, error) {
return s.UpdateTopicRouteInfoWithDefault(topic, "", 0)
}

func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) {
s.lockNamesrv.Lock()
defer s.lockNamesrv.Unlock()

routeData, err := s.queryTopicRouteInfoFromServer(topic)
var (
routeData *TopicRouteData
err error
)

t := topic
if len(defaultTopic) > 0 {
t = defaultTopic
}
routeData, err = s.queryTopicRouteInfoFromServer(t)

if err != nil {
rlog.Warning("query topic route from server error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, false
}

if routeData == nil {
rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
rlog.LogKeyTopic: topic,
})
return nil, false
return nil, false, err
}

if len(defaultTopic) > 0 {
for _, q := range routeData.QueueDataList {
if q.ReadQueueNums > defaultQueueNum {
q.ReadQueueNums = defaultQueueNum
q.WriteQueueNums = defaultQueueNum
}
}
}

oldRouteData, exist := s.routeDataMap.Load(topic)
Expand All @@ -150,7 +172,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
}
}

return routeData.clone(), changed
return routeData.clone(), changed, nil
}

func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
Expand Down Expand Up @@ -330,13 +352,13 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
rlog.Error("connect to namesrv failed.", map[string]interface{}{
"namesrv": s,
})
return nil, err
return nil, primitive.NewRemotingErr(err.Error())
}

switch response.Code {
case ResSuccess:
if response.Body == nil {
return nil, errors.New(response.Remark)
return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
routeData := &TopicRouteData{}

Expand All @@ -351,7 +373,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
case ResTopicNotExist:
return nil, ErrTopicNotExist
default:
return nil, errors.New(response.Remark)
return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
}

Expand Down
30 changes: 30 additions & 0 deletions primitive/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,33 @@ type MQBrokerErr struct {
func (e MQBrokerErr) Error() string {
return "CODE: " + strconv.Itoa(int(e.ResponseCode)) + " DESC: " + e.ErrorMessage
}

func NewRemotingErr(s string) error {
return &RemotingErr{s: s}
}

type RemotingErr struct {
s string
}

func (e *RemotingErr) Error() string {
return e.s
}

func NewMQClientErr(code int16, msg string) error {
return &MQClientErr{code: code, msg: msg}
}

type MQClientErr struct {
code int16
msg string
}

func (e MQClientErr) Error() string {
return "CODE: " + strconv.Itoa(int(e.code)) + " DESC: " + e.msg
}

func IsRemotingErr(err error) bool {
_, ok := err.(*RemotingErr)
return ok
}
27 changes: 22 additions & 5 deletions producer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@ import (

func defaultProducerOptions() producerOptions {
opts := producerOptions{
ClientOptions: internal.DefaultClientOptions(),
Selector: NewHashQueueSelector(),
SendMsgTimeout: 3 * time.Second,
ClientOptions: internal.DefaultClientOptions(),
Selector: NewRoundRobinQueueSelector(),
SendMsgTimeout: 3 * time.Second,
DefaultTopicQueueNums: 4,
CreateTopicKey: "TBW102",
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
}

type producerOptions struct {
internal.ClientOptions
Selector QueueSelector
SendMsgTimeout time.Duration
Selector QueueSelector
SendMsgTimeout time.Duration
DefaultTopicQueueNums int
CreateTopicKey string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
// and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
}

type Option func(*producerOptions)
Expand Down Expand Up @@ -109,3 +114,15 @@ func WithCredentials(c primitive.Credentials) Option {
options.ClientOptions.Credentials = c
}
}

func WithDefaultTopicQueueNums(queueNum int) Option {
return func(options *producerOptions) {
options.DefaultTopicQueueNums = queueNum
}
}

func WithCreateTopicKey(topic string) Option {
return func(options *producerOptions) {
options.CreateTopicKey = topic
}
}
11 changes: 10 additions & 1 deletion producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,16 @@ func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.

v, exist := p.publishInfo.Load(topic)
if !exist {
data, changed := p.options.Namesrv.UpdateTopicRouteInfo(topic)
data, changed, err := p.options.Namesrv.UpdateTopicRouteInfo(topic)
if err != nil && primitive.IsRemotingErr(err) {
return nil
}
p.client.UpdatePublishInfo(topic, data, changed)
v, exist = p.publishInfo.Load(topic)
}

if !exist {
data, changed, _ := p.options.Namesrv.UpdateTopicRouteInfoWithDefault(topic, p.options.CreateTopicKey, p.options.DefaultTopicQueueNums)
p.client.UpdatePublishInfo(topic, data, changed)
v, exist = p.publishInfo.Load(topic)
}
Expand Down

0 comments on commit 626dcdf

Please sign in to comment.