From 7460f489f92f8e86d043d13a45f950196ae55764 Mon Sep 17 00:00:00 2001 From: UnAfraid Date: Tue, 6 Dec 2022 20:49:08 +0200 Subject: [PATCH 01/10] Adding configuration to automatically resume publishing upon error with ordering key At the moment if a any error occurs while publishing and sorting key is set, google cloud pubsub library would refuse to send any subsequent messages to that topic with that sorting key until topic.ResumePublish(sortingKey) is called --- pkg/googlecloud/publisher.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/googlecloud/publisher.go b/pkg/googlecloud/publisher.go index acb39fa..d8c599a 100644 --- a/pkg/googlecloud/publisher.go +++ b/pkg/googlecloud/publisher.go @@ -42,6 +42,8 @@ type PublisherConfig struct { DoNotCreateTopicIfMissing bool // Enables the topic message ordering EnableMessageOrdering bool + // Enables automatic resume publish upon error + EnableMessageOrderingAutoResumePublishOnError bool // ConnectTimeout defines the timeout for connecting to Pub/Sub ConnectTimeout time.Duration @@ -162,6 +164,9 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error { _, err = result.Get(ctx) if err != nil { + if p.config.EnableMessageOrdering && p.config.EnableMessageOrderingAutoResumePublishOnError && googlecloudMsg.OrderingKey != "" { + t.ResumePublish(googlecloudMsg.OrderingKey) + } return errors.Wrapf(err, "publishing message %s failed", msg.UUID) } From 090171ee32bd8867cbde6f1ce851c6c1974e2723 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Tue, 18 Apr 2023 10:27:08 +0200 Subject: [PATCH 02/10] proper handling of EnableMessageOrdering where a new topic is created --- pkg/googlecloud/publisher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/googlecloud/publisher.go b/pkg/googlecloud/publisher.go index d8c599a..cae94d9 100644 --- a/pkg/googlecloud/publisher.go +++ b/pkg/googlecloud/publisher.go @@ -146,6 +146,7 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error { if err != nil { return err } + t.EnableMessageOrdering = p.config.EnableMessageOrdering logFields := make(watermill.LogFields, 2) logFields["topic"] = topic @@ -212,7 +213,6 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e }() t = p.client.Topic(topic) - t.EnableMessageOrdering = p.config.EnableMessageOrdering // todo: theoretically, one could want different publish settings per topic, which is supported by the client lib // different instances of publisher may be used then From ead7f6275e48f812f911998c2dd2c8c0b246d955 Mon Sep 17 00:00:00 2001 From: Arthur Silva Date: Tue, 9 Apr 2024 22:11:01 +0200 Subject: [PATCH 03/10] Add server side generated messageId to message attributes --- pkg/googlecloud/marshaler.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/googlecloud/marshaler.go b/pkg/googlecloud/marshaler.go index c8fb379..28f3d8d 100644 --- a/pkg/googlecloud/marshaler.go +++ b/pkg/googlecloud/marshaler.go @@ -20,6 +20,10 @@ type Unmarshaler interface { // UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID. const UUIDHeaderKey = "_watermill_message_uuid" +// GoogleMessageIdHeaderKey is the key of the Pub/Sub attribute that carries Google Cloud Message ID. +// This ID is assigned by the server when the message is published and is guaranteed to be unique within the topic. +const GoogleMessageIdHeaderKey = "_watermill_message_google_message_id" + // DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way: // All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata. // Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key. @@ -64,6 +68,7 @@ func (DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*messag } metadata.Set("publishTime", pubsubMsg.PublishTime.String()) + metadata.Set(GoogleMessageIdHeaderKey, pubsubMsg.ID) msg := message.NewMessage(id, pubsubMsg.Data) msg.Metadata = metadata From f1e03d1986fb33bd248742265c37279074485eb8 Mon Sep 17 00:00:00 2001 From: Arthur Silva Date: Tue, 9 Apr 2024 22:38:18 +0200 Subject: [PATCH 04/10] Add test for checking if received message contains message ID --- pkg/googlecloud/pubsub_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pkg/googlecloud/pubsub_test.go b/pkg/googlecloud/pubsub_test.go index ccc7c31..7324c1e 100644 --- a/pkg/googlecloud/pubsub_test.go +++ b/pkg/googlecloud/pubsub_test.go @@ -176,6 +176,29 @@ func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) { require.Equal(t, googlecloud.ErrUnexpectedTopic, errors.Cause(err)) } +func TestReceivedMessageContainsMessageId(t *testing.T) { + logger := watermill.NewStdLogger(true, true) + + sub, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{}, logger) + require.NoError(t, err) + + topic := fmt.Sprintf("topic_%d", rand.Int()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + messages, err := sub.Subscribe(ctx, topic) + require.NoError(t, err) + + howManyMessages := 1 + produceMessages(t, topic, howManyMessages) + + msg := <-messages + if msg.Metadata.Get(googlecloud.GoogleMessageIdHeaderKey) == "" { + t.Fatalf("Message %s does not contain %s", msg.UUID, googlecloud.GoogleMessageIdHeaderKey) + } +} + func produceMessages(t *testing.T, topic string, howMany int) { pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{}, nil) require.NoError(t, err) From 9d5e023569a5de242e313730e37856b6eb8dbae1 Mon Sep 17 00:00:00 2001 From: Arthur Silva Date: Sat, 13 Apr 2024 16:26:19 +0200 Subject: [PATCH 05/10] Update GoogleMessageIdHeaderKey to GoogleMessageIDHeaderKey in marshaler.go --- pkg/googlecloud/marshaler.go | 6 +++--- pkg/googlecloud/pubsub_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/googlecloud/marshaler.go b/pkg/googlecloud/marshaler.go index 28f3d8d..33bb38a 100644 --- a/pkg/googlecloud/marshaler.go +++ b/pkg/googlecloud/marshaler.go @@ -20,9 +20,9 @@ type Unmarshaler interface { // UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID. const UUIDHeaderKey = "_watermill_message_uuid" -// GoogleMessageIdHeaderKey is the key of the Pub/Sub attribute that carries Google Cloud Message ID. +// GoogleMessageIDHeaderKey is the key of the Pub/Sub attribute that carries Google Cloud Message ID. // This ID is assigned by the server when the message is published and is guaranteed to be unique within the topic. -const GoogleMessageIdHeaderKey = "_watermill_message_google_message_id" +const GoogleMessageIDHeaderKey = "_watermill_message_google_message_id" // DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way: // All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata. @@ -68,7 +68,7 @@ func (DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*messag } metadata.Set("publishTime", pubsubMsg.PublishTime.String()) - metadata.Set(GoogleMessageIdHeaderKey, pubsubMsg.ID) + metadata.Set(GoogleMessageIDHeaderKey, pubsubMsg.ID) msg := message.NewMessage(id, pubsubMsg.Data) msg.Metadata = metadata diff --git a/pkg/googlecloud/pubsub_test.go b/pkg/googlecloud/pubsub_test.go index 7324c1e..8ea7e23 100644 --- a/pkg/googlecloud/pubsub_test.go +++ b/pkg/googlecloud/pubsub_test.go @@ -194,8 +194,8 @@ func TestReceivedMessageContainsMessageId(t *testing.T) { produceMessages(t, topic, howManyMessages) msg := <-messages - if msg.Metadata.Get(googlecloud.GoogleMessageIdHeaderKey) == "" { - t.Fatalf("Message %s does not contain %s", msg.UUID, googlecloud.GoogleMessageIdHeaderKey) + if msg.Metadata.Get(googlecloud.GoogleMessageIDHeaderKey) == "" { + t.Fatalf("Message %s does not contain %s", msg.UUID, googlecloud.GoogleMessageIDHeaderKey) } } From 302f78c45e86eedf9ff67c8da7bd02b3c6c651a0 Mon Sep 17 00:00:00 2001 From: Arthur Silva Date: Sat, 13 Apr 2024 19:34:09 +0200 Subject: [PATCH 06/10] Set server message ID attribute after publishing message --- pkg/googlecloud/publisher.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/googlecloud/publisher.go b/pkg/googlecloud/publisher.go index cae94d9..39bff1a 100644 --- a/pkg/googlecloud/publisher.go +++ b/pkg/googlecloud/publisher.go @@ -163,7 +163,7 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error { result := t.Publish(ctx, googlecloudMsg) <-result.Ready() - _, err = result.Get(ctx) + serverMessageID, err := result.Get(ctx) if err != nil { if p.config.EnableMessageOrdering && p.config.EnableMessageOrderingAutoResumePublishOnError && googlecloudMsg.OrderingKey != "" { t.ResumePublish(googlecloudMsg.OrderingKey) @@ -171,6 +171,8 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error { return errors.Wrapf(err, "publishing message %s failed", msg.UUID) } + msg.Metadata.Set(GoogleMessageIDHeaderKey, serverMessageID) + p.logger.Trace("Message published to Google PubSub", logFields) } From 328e3b3e4d26bf0b2f975a8ab5af86d224a49503 Mon Sep 17 00:00:00 2001 From: Arthur Silva Date: Sun, 14 Apr 2024 19:01:57 +0200 Subject: [PATCH 07/10] Add test for checking if published message ID matches received message ID --- pkg/googlecloud/pubsub_test.go | 35 ++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/pkg/googlecloud/pubsub_test.go b/pkg/googlecloud/pubsub_test.go index 8ea7e23..85fff52 100644 --- a/pkg/googlecloud/pubsub_test.go +++ b/pkg/googlecloud/pubsub_test.go @@ -199,6 +199,41 @@ func TestReceivedMessageContainsMessageId(t *testing.T) { } } +func TestPublishedMessageIdMatchesReceivedMessageId(t *testing.T) { + logger := watermill.NewStdLogger(true, true) + topic := fmt.Sprintf("topic_message_id_match_%d", rand.Int()) + + // Set up subscriber + sub, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{}, logger) + require.NoError(t, err) + + // Subscribe to the topic + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + messages, err := sub.Subscribe(ctx, topic) + require.NoError(t, err) + + // Set up publisher + pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{}, nil) + require.NoError(t, err) + defer pub.Close() + + // Publish a message + publishedMsg := message.NewMessage(watermill.NewUUID(), []byte{}) + require.NoError(t, pub.Publish(topic, publishedMsg)) + publishedMessageId := publishedMsg.Metadata.Get(googlecloud.GoogleMessageIDHeaderKey) + + if publishedMessageId == "" { + t.Fatalf("Published message %s does not contain %s", publishedMsg.UUID, googlecloud.GoogleMessageIDHeaderKey) + } + + receivedMsg := <-messages + receivedMessageId := receivedMsg.Metadata.Get(googlecloud.GoogleMessageIDHeaderKey) + if publishedMessageId != receivedMessageId { + t.Fatalf("Published message ID %s does not match received message ID %s", publishedMessageId, receivedMessageId) + } +} + func produceMessages(t *testing.T, topic string, howMany int) { pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{}, nil) require.NoError(t, err) From 4af2cefc9cdf30c9e261851ac1e7fd2f9924599e Mon Sep 17 00:00:00 2001 From: Diego Frata Date: Sat, 13 Jul 2024 19:34:31 +0200 Subject: [PATCH 08/10] feat: skip topic exists check --- pkg/googlecloud/publisher.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/googlecloud/publisher.go b/pkg/googlecloud/publisher.go index 39bff1a..307553d 100644 --- a/pkg/googlecloud/publisher.go +++ b/pkg/googlecloud/publisher.go @@ -37,6 +37,9 @@ type PublisherConfig struct { // ProjectID is the Google Cloud Engine project ID. ProjectID string + // If true, `Publisher` does not check if the topic exists before publishing. + DoNotCheckTopicExistence bool + // If false (default), `Publisher` tries to create a topic if there is none with the requested name. // Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`. DoNotCreateTopicIfMissing bool @@ -222,6 +225,10 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e t.PublishSettings = *p.config.PublishSettings } + if p.config.DoNotCheckTopicExistence { + return t, nil + } + exists, err := t.Exists(ctx) if err != nil { return nil, errors.Wrapf(err, "could not check if topic %s exists", topic) From 071945196f04398936c430510cb8dd4da3d9fa16 Mon Sep 17 00:00:00 2001 From: Diego Frata Date: Tue, 16 Jul 2024 12:34:29 +0200 Subject: [PATCH 09/10] chore: add test case --- pkg/googlecloud/pubsub_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/googlecloud/pubsub_test.go b/pkg/googlecloud/pubsub_test.go index 85fff52..e0d1f13 100644 --- a/pkg/googlecloud/pubsub_test.go +++ b/pkg/googlecloud/pubsub_test.go @@ -234,6 +234,24 @@ func TestPublishedMessageIdMatchesReceivedMessageId(t *testing.T) { } } +func TestPublisherDoesNotAttemptToCreateTopic(t *testing.T) { + topic := fmt.Sprintf("missing_topic_%d", rand.Int()) + + // Set up publisher + pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{ + // DoNotCheckTopicExistence is set to true, so the publisher will not check + // if the topic exists and will also not attempt to create it. + DoNotCheckTopicExistence: true, + DoNotCreateTopicIfMissing: false, + }, nil) + require.NoError(t, err) + defer pub.Close() + + // Publish a message + publishedMsg := message.NewMessage(watermill.NewUUID(), []byte{}) + require.Error(t, pub.Publish(topic, publishedMsg), googlecloud.ErrTopicDoesNotExist) +} + func produceMessages(t *testing.T, topic string, howMany int) { pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{}, nil) require.NoError(t, err) From 1b725dbb0c069f7d43dee973bdc14e05705a31a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Tue, 16 Jul 2024 19:31:50 +0200 Subject: [PATCH 10/10] Fix lint and build --- Makefile | 3 +++ pkg/googlecloud/subscriber.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 9da3a8a..27d1133 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,9 @@ test_stress: test_reconnect: go test -tags=reconnect ./... +test_codecov: up wait + go test -coverprofile=coverage.out -covermode=atomic ./... + wait: go run github.com/ThreeDotsLabs/wait-for@latest localhost:8085 diff --git a/pkg/googlecloud/subscriber.go b/pkg/googlecloud/subscriber.go index 1f7c225..8100a1a 100644 --- a/pkg/googlecloud/subscriber.go +++ b/pkg/googlecloud/subscriber.go @@ -176,7 +176,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa } s.logger.Info("Subscribing to Google Cloud PubSub topic", logFields) - output := make(chan *message.Message, 0) + output := make(chan *message.Message) sub, err := s.subscription(ctx, subscriptionName, topic) if err != nil {