Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #415] feat(producer): auto create topic for producer #416

Merged
merged 1 commit into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions examples/producer/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"os"
"strconv"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
Expand All @@ -38,9 +39,14 @@ func main() {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 1000; i++ {
res, err := p.SendSync(context.Background(), primitive.NewMessage("test",
[]byte("Hello RocketMQ Go Client!")))
topic := "test"

for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
return true
})
for topic := range publishTopicSet {
data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
c.UpdatePublishInfo(topic, data, changed)
}

Expand All @@ -514,7 +514,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
})

for topic := range subscribedTopicSet {
data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
c.updateSubscribeInfo(topic, data, changed)
}
}
Expand Down
32 changes: 26 additions & 6 deletions internal/mock_namesrv.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/namesrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Namesrvs interface {

cleanOfflineBroker()

UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool)
UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool, err error)

FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)

Expand Down
38 changes: 30 additions & 8 deletions internal/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,45 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
return int(qIndex) % length
}

func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool, error) {
return s.UpdateTopicRouteInfoWithDefault(topic, "", 0)
}

func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) {
s.lockNamesrv.Lock()
defer s.lockNamesrv.Unlock()

routeData, err := s.queryTopicRouteInfoFromServer(topic)
var (
routeData *TopicRouteData
err error
)

t := topic
if len(defaultTopic) > 0 {
t = defaultTopic
}
routeData, err = s.queryTopicRouteInfoFromServer(t)

if err != nil {
rlog.Warning("query topic route from server error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, false
}

if routeData == nil {
rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
rlog.LogKeyTopic: topic,
})
return nil, false
return nil, false, err
}

if len(defaultTopic) > 0 {
for _, q := range routeData.QueueDataList {
if q.ReadQueueNums > defaultQueueNum {
q.ReadQueueNums = defaultQueueNum
q.WriteQueueNums = defaultQueueNum
}
}
}

oldRouteData, exist := s.routeDataMap.Load(topic)
Expand All @@ -150,7 +172,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
}
}

return routeData.clone(), changed
return routeData.clone(), changed, nil
}

func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
Expand Down Expand Up @@ -330,13 +352,13 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
rlog.Error("connect to namesrv failed.", map[string]interface{}{
"namesrv": s,
})
return nil, err
return nil, primitive.NewRemotingErr(err.Error())
}

switch response.Code {
case ResSuccess:
if response.Body == nil {
return nil, errors.New(response.Remark)
return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
routeData := &TopicRouteData{}

Expand All @@ -351,7 +373,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
case ResTopicNotExist:
return nil, ErrTopicNotExist
default:
return nil, errors.New(response.Remark)
return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
}

Expand Down
30 changes: 30 additions & 0 deletions primitive/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,33 @@ type MQBrokerErr struct {
func (e MQBrokerErr) Error() string {
return "CODE: " + strconv.Itoa(int(e.ResponseCode)) + " DESC: " + e.ErrorMessage
}

func NewRemotingErr(s string) error {
return &RemotingErr{s: s}
}

type RemotingErr struct {
s string
}

func (e *RemotingErr) Error() string {
return e.s
}

func NewMQClientErr(code int16, msg string) error {
return &MQClientErr{code: code, msg: msg}
}

type MQClientErr struct {
code int16
msg string
}

func (e MQClientErr) Error() string {
return "CODE: " + strconv.Itoa(int(e.code)) + " DESC: " + e.msg
}

func IsRemotingErr(err error) bool {
_, ok := err.(*RemotingErr)
return ok
}
27 changes: 22 additions & 5 deletions producer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@ import (

func defaultProducerOptions() producerOptions {
opts := producerOptions{
ClientOptions: internal.DefaultClientOptions(),
Selector: NewHashQueueSelector(),
SendMsgTimeout: 3 * time.Second,
ClientOptions: internal.DefaultClientOptions(),
Selector: NewRoundRobinQueueSelector(),
SendMsgTimeout: 3 * time.Second,
DefaultTopicQueueNums: 4,
CreateTopicKey: "TBW102",
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
}

type producerOptions struct {
internal.ClientOptions
Selector QueueSelector
SendMsgTimeout time.Duration
Selector QueueSelector
SendMsgTimeout time.Duration
DefaultTopicQueueNums int
CreateTopicKey string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
// and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
}

type Option func(*producerOptions)
Expand Down Expand Up @@ -109,3 +114,15 @@ func WithCredentials(c primitive.Credentials) Option {
options.ClientOptions.Credentials = c
}
}

func WithDefaultTopicQueueNums(queueNum int) Option {
return func(options *producerOptions) {
options.DefaultTopicQueueNums = queueNum
}
}

func WithCreateTopicKey(topic string) Option {
return func(options *producerOptions) {
options.CreateTopicKey = topic
}
}
11 changes: 10 additions & 1 deletion producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,16 @@ func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.

v, exist := p.publishInfo.Load(topic)
if !exist {
data, changed := p.options.Namesrv.UpdateTopicRouteInfo(topic)
data, changed, err := p.options.Namesrv.UpdateTopicRouteInfo(topic)
if err != nil && primitive.IsRemotingErr(err) {
return nil
}
p.client.UpdatePublishInfo(topic, data, changed)
v, exist = p.publishInfo.Load(topic)
}

if !exist {
data, changed, _ := p.options.Namesrv.UpdateTopicRouteInfoWithDefault(topic, p.options.CreateTopicKey, p.options.DefaultTopicQueueNums)
p.client.UpdatePublishInfo(topic, data, changed)
v, exist = p.publishInfo.Load(topic)
}
Expand Down