Skip to content

Commit

Permalink
[ISSUE #462] fix the trace message was send failed. (#463)
Browse files Browse the repository at this point in the history
* fix(trace): fix the trace message was send failed.

* fix(bug): remove namespace in the trace message body

* fix the processid error in client id.
  • Loading branch information
ShannonDing authored Mar 27, 2020
1 parent 267dca5 commit 9a58797
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 18 deletions.
5 changes: 5 additions & 0 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,11 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.M

msgCtx, _ := primitive.GetConsumerCtx(ctx)
msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
if realReply.ConsumeResult == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
} else {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
}
return e
})
return container.ConsumeResult, err
Expand Down
20 changes: 19 additions & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,12 @@ func (c *rmqClient) Shutdown() {
}

func (c *rmqClient) ClientID() string {
id := c.option.ClientIP + "@" + c.option.InstanceName
id := c.option.ClientIP + "@"
if c.option.InstanceName == "DEFAULT" {
id += strconv.Itoa(os.Getpid())
} else {
id += c.option.InstanceName
}
if c.option.UnitName != "" {
id += "@" + c.option.UnitName
}
Expand Down Expand Up @@ -466,6 +471,19 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
brokerName := key.(string)
data := value.(*BrokerData)
for id, addr := range data.BrokerAddresses {
rlog.Debug("try to send heart beat to broker", map[string]interface{}{
"brokerName": brokerName,
"brokerId": id,
"brokerAddr": addr,
})
if hbData.ConsumerDatas.Len() == 0 && id != 0 {
rlog.Debug("notice, will not send heart beat to broker", map[string]interface{}{
"brokerName": brokerName,
"brokerId": id,
"brokerAddr": addr,
})
continue
}
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())

ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down
68 changes: 54 additions & 14 deletions internal/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,21 @@ func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
buffer.WriteRune(contentSplitter)
buffer.WriteString(ctx.RegionId)
buffer.WriteRune(contentSplitter)
buffer.WriteString(ctx.GroupName)
ss := strings.Split(ctx.GroupName, "%")
if len(ss) == 2 {
buffer.WriteString(ss[1])
} else {
buffer.WriteString(ctx.GroupName)
}

buffer.WriteRune(contentSplitter)
buffer.WriteString(bean.Topic)
ssTopic := strings.Split(bean.Topic, "%")
if len(ssTopic) == 2 {
buffer.WriteString(ssTopic[1])
} else {
buffer.WriteString(bean.Topic)
}
//buffer.WriteString(bean.Topic)
buffer.WriteRune(contentSplitter)
buffer.WriteString(bean.MsgId)
buffer.WriteRune(contentSplitter)
Expand Down Expand Up @@ -119,7 +131,12 @@ func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
buffer.WriteRune(contentSplitter)
buffer.WriteString(ctx.RegionId)
buffer.WriteRune(contentSplitter)
buffer.WriteString(ctx.GroupName)
ss := strings.Split(ctx.GroupName, "%")
if len(ss) == 2 {
buffer.WriteString(ss[1])
} else {
buffer.WriteString(ctx.GroupName)
}
buffer.WriteRune(contentSplitter)
buffer.WriteString(ctx.RequestId)
buffer.WriteRune(contentSplitter)
Expand Down Expand Up @@ -233,6 +250,9 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
}

cliOp := DefaultClientOptions()
cliOp.GroupName = traceCfg.GroupName
cliOp.NameServerAddrs = traceCfg.NamesrvAddrs
cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT"
cliOp.RetryTimes = 0
cliOp.Namesrv = srvs
cliOp.Credentials = traceCfg.Credentials
Expand Down Expand Up @@ -301,8 +321,9 @@ func (td *traceDispatcher) process() {
batch = append(batch, ctx)
if count == batchSize {
count = 0
batchSend := batch
go primitive.WithRecover(func() {
td.batchCommit(batch)
td.batchCommit(batchSend)
})
batch = make([]TraceContext, 0)
}
Expand All @@ -312,15 +333,17 @@ func (td *traceDispatcher) process() {
count++
lastput = time.Now()
if len(batch) > 0 {
batchSend := batch
go primitive.WithRecover(func() {
td.batchCommit(batch)
td.batchCommit(batchSend)
})
batch = make([]TraceContext, 0)
}
}
case <-td.ctx.Done():
batchSend := batch
go primitive.WithRecover(func() {
td.batchCommit(batch)
td.batchCommit(batchSend)
})
batch = make([]TraceContext, 0)

Expand Down Expand Up @@ -403,30 +426,47 @@ func (td *traceDispatcher) flush(topic, regionID string, data []TraceTransferBea
}

func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, data string) {
msg := primitive.NewMessage(td.traceTopic, []byte(data))
traceTopic := td.traceTopic
if td.access == primitive.Cloud {
traceTopic = td.traceTopic + regionID
}
msg := primitive.NewMessage(traceTopic, []byte(data))
msg.WithKeys(keySet.slice())

mq, addr := td.findMq()
mq, addr := td.findMq(regionID)
if mq == nil {
return
}

var req = td.buildSendRequest(mq, msg)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
resp := new(primitive.SendResult)
if e != nil {
rlog.Error("send trace data error", map[string]interface{}{
rlog.Info("send trace data error.", map[string]interface{}{
"traceData": data,
})
} else {
td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
rlog.Debug("send trace data success:", map[string]interface{}{
"SendResult": resp,
"traceData": data,
})
}
})
rlog.Error("send trace data error when invoke", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
if err != nil {
rlog.Info("send trace data error when invoke", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
}
}

func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
mqs, err := td.namesrvs.FetchPublishMessageQueues(td.traceTopic)
func (td *traceDispatcher) findMq(regionID string) (*primitive.MessageQueue, string) {
traceTopic := td.traceTopic
if td.access == primitive.Cloud {
traceTopic = td.traceTopic + regionID
}
mqs, err := td.namesrvs.FetchPublishMessageQueues(traceTopic)
if err != nil {
rlog.Error("fetch publish message queues failed", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
Expand Down
13 changes: 10 additions & 3 deletions primitive/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (m *Message) Marshal() []byte {
type MessageExt struct {
Message
MsgId string
OffsetMsgId string
StoreSize int32
QueueOffset int64
SysFlag int32
Expand All @@ -263,9 +264,9 @@ func (msgExt *MessageExt) IsTraceOn() string {
}

func (msgExt *MessageExt) String() string {
return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
return fmt.Sprintf("[Message=%s, MsgId=%s, OffsetMsgId=%s,QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
"ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.Queue.QueueId,
"ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.OffsetMsgId, msgExt.Queue.QueueId,
msgExt.StoreSize, msgExt.QueueOffset, msgExt.SysFlag, msgExt.BornTimestamp, msgExt.BornHost,
msgExt.StoreTimestamp, msgExt.StoreHost, msgExt.CommitLogOffset, msgExt.BodyCRC, msgExt.ReconsumeTimes,
msgExt.PreparedTransactionOffset)
Expand Down Expand Up @@ -364,11 +365,17 @@ func DecodeMessage(data []byte) []*MessageExt {
}
count += 2 + int(propertiesLength)

msg.MsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
msg.OffsetMsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
//count += 16
if msg.properties == nil {
msg.properties = make(map[string]string, 0)
}
msgID := msg.GetProperty(PropertyUniqueClientMessageIdKeyIndex)
if len(msgID) == 0 {
msg.MsgId = msg.OffsetMsgId
} else {
msg.MsgId = msgID
}
msgs = append(msgs, msg)
}

Expand Down
1 change: 1 addition & 0 deletions primitive/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package primitive
// config for message trace.
type TraceConfig struct {
TraceTopic string
GroupName string
Access AccessChannel
NamesrvAddrs []string
Credentials // acl config for trace. omit if acl is closed on broker.
Expand Down

0 comments on commit 9a58797

Please sign in to comment.