From ce50b5f90c5a2dd7bc487f85cdf387ea14229b9f Mon Sep 17 00:00:00 2001 From: ShannonDing Date: Fri, 20 Sep 2019 14:57:13 +0800 Subject: [PATCH] feat(demos): add seperately demos --- demos/orderly_push_consumer.go | 82 ++++++++++++++++++++++++++++++++++ demos/producer.go | 66 +++++++++++++++++++++++++++ demos/producer_orderly.go | 60 +++++++++++++++++++++++++ demos/push_consumer.go | 77 +++++++++++++++++++++++++++++++ 4 files changed, 285 insertions(+) create mode 100644 demos/orderly_push_consumer.go create mode 100644 demos/producer.go create mode 100644 demos/producer_orderly.go create mode 100644 demos/push_consumer.go diff --git a/demos/orderly_push_consumer.go b/demos/orderly_push_consumer.go new file mode 100644 index 00000000..bf10055b --- /dev/null +++ b/demos/orderly_push_consumer.go @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "github.com/apache/rocketmq-client-go/core" + "math/rand" + "sync/atomic" +) + +func main() { + pConfig := &rocketmq.PushConsumerConfig{ + ClientConfig: rocketmq.ClientConfig{ + GroupID: "GID_XXXXXXXXXXXX", + NameServer: "http://XXXXXXXXXXXXXXXXXX:80", + Credentials: &rocketmq.SessionCredentials{ + AccessKey: "Your Access Key", + SecretKey: "Your Secret Key", + Channel: "ALIYUN/OtherChannel", + }, + }, + Model: rocketmq.Clustering, + ConsumerModel: rocketmq.Orderly, + } + ConsumeWithOrderly(pConfig) +} +func ConsumeWithOrderly(config *rocketmq.PushConsumerConfig) { + + consumer, err := rocketmq.NewPushConsumer(config) + if err != nil { + println("create Consumer failed, error:", err) + return + } + + ch := make(chan interface{}) + var count = (int64)(1000000) + // ******************************************** + // MUST subscribe topic before consumer started. + // ********************************************* + consumer.Subscribe("YourOrderlyTopicXXXXXXXX", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus { + fmt.Printf("A message received, MessageID:%s, Body:%s \n", msg.MessageID, msg.Body) + if atomic.AddInt64(&count, -1) <= 0 { + ch <- "quit" + } + if 0 == rand.Int()%7 { + fmt.Printf("Consumer Later, MessageID:%s \n", msg.MessageID) + return rocketmq.ReConsumeLater + } + return rocketmq.ConsumeSuccess + }) + + err = consumer.Start() + if err != nil { + println("consumer start failed,", err) + return + } + + fmt.Printf("consumer: %s started...\n", consumer) + <-ch + err = consumer.Shutdown() + if err != nil { + println("consumer shutdown failed") + return + } + println("consumer has shutdown.") +} diff --git a/demos/producer.go b/demos/producer.go new file mode 100644 index 00000000..a9413051 --- /dev/null +++ b/demos/producer.go @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "github.com/apache/rocketmq-client-go/core" +) + +func main() { + pConfig := &rocketmq.ProducerConfig{ + ClientConfig: rocketmq.ClientConfig{ + GroupID: "GID_XXXXXXXXXXXX", + NameServer: "http://XXXXXXXXXXXXXXXXXX:80", + Credentials: &rocketmq.SessionCredentials{ + AccessKey: "Your Access Key", + SecretKey: "Your Secret Key", + Channel: "ALIYUN/OtherChannel", + }, + }, + //Set to Common Producer as default. + ProducerModel: rocketmq.CommonProducer, + } + sendMessage(pConfig) +} +func sendMessage(config *rocketmq.ProducerConfig) { + producer, err := rocketmq.NewProducer(config) + + if err != nil { + fmt.Println("create common producer failed, error:", err) + return + } + + err = producer.Start() + if err != nil { + fmt.Println("start common producer error", err) + return + } + defer producer.Shutdown() + + fmt.Printf("Common producer: %s started... \n", producer) + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("%s-%d", "Hello,Common MQ Message-", i) + result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "YourTopicXXXXXXXX", Body: msg}) + if err != nil { + fmt.Println("Error:", err) + } + fmt.Printf("send message: %s result: %s\n", msg, result) + } + fmt.Println("shutdown common producer.") +} diff --git a/demos/producer_orderly.go b/demos/producer_orderly.go new file mode 100644 index 00000000..7822448b --- /dev/null +++ b/demos/producer_orderly.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package main + +import ( + "fmt" + "github.com/apache/rocketmq-client-go/core" + "time" +) + +func main() { + pConfig := &rocketmq.ProducerConfig{ + ClientConfig: rocketmq.ClientConfig{ + GroupID: "GID_XXXXXXXXXXXX", + NameServer: "http://XXXXXXXXXXXXXXXXXX:80", + Credentials: &rocketmq.SessionCredentials{ + AccessKey: "Your Access Key", + SecretKey: "Your Secret Key", + Channel: "ALIYUN/OtherChannel", + }, + }, + ProducerModel: rocketmq.OrderlyProducer, + } + sendMessageOrderlyByShardingKey(pConfig) +} +func sendMessageOrderlyByShardingKey(config *rocketmq.ProducerConfig) { + producer, err := rocketmq.NewProducer(config) + if err != nil { + fmt.Println("create Producer failed, error:", err) + return + } + + producer.Start() + defer producer.Shutdown() + for i := 0; i < 1000; i++ { + msg := fmt.Sprintf("%s-%d", "Hello Lite Orderly Message", i) + r, err := producer.SendMessageOrderlyByShardingKey( + &rocketmq.Message{Topic: "YourOrderLyTopicXXXXXXXX", Body: msg}, "ShardingKey" /*orderID*/) + if err != nil { + println("Send Orderly Message Error:", err) + } + fmt.Printf("send orderly message result:%+v\n", r) + time.Sleep(time.Duration(1) * time.Second) + } + +} diff --git a/demos/push_consumer.go b/demos/push_consumer.go new file mode 100644 index 00000000..af8a9a00 --- /dev/null +++ b/demos/push_consumer.go @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "github.com/apache/rocketmq-client-go/core" + "sync/atomic" +) + +func main() { + pConfig := &rocketmq.PushConsumerConfig{ + ClientConfig: rocketmq.ClientConfig{ + GroupID: "GID_XXXXXXXXXXXX", + NameServer: "http://XXXXXXXXXXXXXXXXXX:80", + Credentials: &rocketmq.SessionCredentials{ + AccessKey: "Your Access Key", + SecretKey: "Your Secret Key", + Channel: "ALIYUN/OtherChannel", + }, + }, + Model: rocketmq.Clustering, + ConsumerModel: rocketmq.CoCurrently, + } + ConsumeWithPush(pConfig) +} +func ConsumeWithPush(config *rocketmq.PushConsumerConfig) { + + consumer, err := rocketmq.NewPushConsumer(config) + if err != nil { + println("create Consumer failed, error:", err) + return + } + + ch := make(chan interface{}) + var count = (int64)(1000000) + // ******************************************** + // MUST subscribe topic before consumer started. + // ********************************************* + consumer.Subscribe("YourTopicXXXXXXXX", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus { + fmt.Printf("A message received, MessageID:%s, Body:%s \n", msg.MessageID, msg.Body) + if atomic.AddInt64(&count, -1) <= 0 { + ch <- "quit" + } + return rocketmq.ConsumeSuccess + }) + + err = consumer.Start() + if err != nil { + println("consumer start failed,", err) + return + } + + fmt.Printf("consumer: %s started...\n", consumer) + <-ch + err = consumer.Shutdown() + if err != nil { + println("consumer shutdown failed") + return + } + println("consumer has shutdown.") +}