Skip to content

Commit

Permalink
[ISSUE #122]add msg delay to send. resolve #122 (#123)
Browse files Browse the repository at this point in the history
* add msg delay to send. resolve #122

* add log
  • Loading branch information
xujianhai666 authored and zongtanghu committed Jul 22, 2019
1 parent f38894a commit 8a49505
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 0 deletions.
56 changes: 56 additions & 0 deletions examples/consumer/delay/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"os"
"time"

"github.com/apache/rocketmq-client-go"
"github.com/apache/rocketmq-client-go/consumer"
"github.com/apache/rocketmq-client-go/primitive"
)

func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

for _, msg := range msgs {
t := time.Now().UnixNano()/int64(time.Millisecond) - msg.BornTimestamp
fmt.Printf("Receive message[msgId=%s] %d ms later\n", msg.MsgId, t)
}

return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
}
58 changes: 58 additions & 0 deletions examples/producer/delay/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"os"

"github.com/apache/rocketmq-client-go"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/producer"
)

func main() {
p, _ := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithRetry(2),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 10; i++ {
msg := &primitive.Message{
Topic: "TopicTest",
Body: []byte("Hello RocketMQ Go Client!"),
}
msg.SetDelayTimeLevel(3)
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shundown producer error: %s", err.Error())
}
}
11 changes: 11 additions & 0 deletions primitive/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package primitive

import (
"fmt"
"strconv"

"github.com/apache/rocketmq-client-go/internal/utils"
)
Expand Down Expand Up @@ -73,6 +74,16 @@ func NewMessage(topic string, body []byte) *Message {
}
}

// SetDelayTimeLevel set message delay time to consume.
// reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.
func (msg *Message) SetDelayTimeLevel(level int) {
if msg.Properties == nil {
msg.Properties = make(map[string]string)
}
msg.Properties[PropertyDelayTimeLevel] = strconv.Itoa(level)
}

func (msg *Message) String() string {
return fmt.Sprintf("[topic=%s, body=%s, Flag=%d, Properties=%v, TransactionId=%s]",
msg.Topic, string(msg.Body), msg.Flag, msg.Properties, msg.TransactionId)
Expand Down

0 comments on commit 8a49505

Please sign in to comment.