diff --git a/Makefile b/Makefile index 9da3a8a..27d1133 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,9 @@ test_stress: test_reconnect: go test -tags=reconnect ./... +test_codecov: up wait + go test -coverprofile=coverage.out -covermode=atomic ./... + wait: go run github.com/ThreeDotsLabs/wait-for@latest localhost:8085 diff --git a/pkg/googlecloud/marshaler.go b/pkg/googlecloud/marshaler.go index c8fb379..33bb38a 100644 --- a/pkg/googlecloud/marshaler.go +++ b/pkg/googlecloud/marshaler.go @@ -20,6 +20,10 @@ type Unmarshaler interface { // UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID. const UUIDHeaderKey = "_watermill_message_uuid" +// GoogleMessageIDHeaderKey is the key of the Pub/Sub attribute that carries Google Cloud Message ID. +// This ID is assigned by the server when the message is published and is guaranteed to be unique within the topic. +const GoogleMessageIDHeaderKey = "_watermill_message_google_message_id" + // DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way: // All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata. // Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key. @@ -64,6 +68,7 @@ func (DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*messag } metadata.Set("publishTime", pubsubMsg.PublishTime.String()) + metadata.Set(GoogleMessageIDHeaderKey, pubsubMsg.ID) msg := message.NewMessage(id, pubsubMsg.Data) msg.Metadata = metadata diff --git a/pkg/googlecloud/publisher.go b/pkg/googlecloud/publisher.go index acb39fa..307553d 100644 --- a/pkg/googlecloud/publisher.go +++ b/pkg/googlecloud/publisher.go @@ -37,11 +37,16 @@ type PublisherConfig struct { // ProjectID is the Google Cloud Engine project ID. ProjectID string + // If true, `Publisher` does not check if the topic exists before publishing. + DoNotCheckTopicExistence bool + // If false (default), `Publisher` tries to create a topic if there is none with the requested name. // Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`. DoNotCreateTopicIfMissing bool // Enables the topic message ordering EnableMessageOrdering bool + // Enables automatic resume publish upon error + EnableMessageOrderingAutoResumePublishOnError bool // ConnectTimeout defines the timeout for connecting to Pub/Sub ConnectTimeout time.Duration @@ -144,6 +149,7 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error { if err != nil { return err } + t.EnableMessageOrdering = p.config.EnableMessageOrdering logFields := make(watermill.LogFields, 2) logFields["topic"] = topic @@ -160,11 +166,16 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error { result := t.Publish(ctx, googlecloudMsg) <-result.Ready() - _, err = result.Get(ctx) + serverMessageID, err := result.Get(ctx) if err != nil { + if p.config.EnableMessageOrdering && p.config.EnableMessageOrderingAutoResumePublishOnError && googlecloudMsg.OrderingKey != "" { + t.ResumePublish(googlecloudMsg.OrderingKey) + } return errors.Wrapf(err, "publishing message %s failed", msg.UUID) } + msg.Metadata.Set(GoogleMessageIDHeaderKey, serverMessageID) + p.logger.Trace("Message published to Google PubSub", logFields) } @@ -207,7 +218,6 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e }() t = p.client.Topic(topic) - t.EnableMessageOrdering = p.config.EnableMessageOrdering // todo: theoretically, one could want different publish settings per topic, which is supported by the client lib // different instances of publisher may be used then @@ -215,6 +225,10 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e t.PublishSettings = *p.config.PublishSettings } + if p.config.DoNotCheckTopicExistence { + return t, nil + } + exists, err := t.Exists(ctx) if err != nil { return nil, errors.Wrapf(err, "could not check if topic %s exists", topic) diff --git a/pkg/googlecloud/pubsub_test.go b/pkg/googlecloud/pubsub_test.go index ccc7c31..e0d1f13 100644 --- a/pkg/googlecloud/pubsub_test.go +++ b/pkg/googlecloud/pubsub_test.go @@ -176,6 +176,82 @@ func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) { require.Equal(t, googlecloud.ErrUnexpectedTopic, errors.Cause(err)) } +func TestReceivedMessageContainsMessageId(t *testing.T) { + logger := watermill.NewStdLogger(true, true) + + sub, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{}, logger) + require.NoError(t, err) + + topic := fmt.Sprintf("topic_%d", rand.Int()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + messages, err := sub.Subscribe(ctx, topic) + require.NoError(t, err) + + howManyMessages := 1 + produceMessages(t, topic, howManyMessages) + + msg := <-messages + if msg.Metadata.Get(googlecloud.GoogleMessageIDHeaderKey) == "" { + t.Fatalf("Message %s does not contain %s", msg.UUID, googlecloud.GoogleMessageIDHeaderKey) + } +} + +func TestPublishedMessageIdMatchesReceivedMessageId(t *testing.T) { + logger := watermill.NewStdLogger(true, true) + topic := fmt.Sprintf("topic_message_id_match_%d", rand.Int()) + + // Set up subscriber + sub, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{}, logger) + require.NoError(t, err) + + // Subscribe to the topic + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + messages, err := sub.Subscribe(ctx, topic) + require.NoError(t, err) + + // Set up publisher + pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{}, nil) + require.NoError(t, err) + defer pub.Close() + + // Publish a message + publishedMsg := message.NewMessage(watermill.NewUUID(), []byte{}) + require.NoError(t, pub.Publish(topic, publishedMsg)) + publishedMessageId := publishedMsg.Metadata.Get(googlecloud.GoogleMessageIDHeaderKey) + + if publishedMessageId == "" { + t.Fatalf("Published message %s does not contain %s", publishedMsg.UUID, googlecloud.GoogleMessageIDHeaderKey) + } + + receivedMsg := <-messages + receivedMessageId := receivedMsg.Metadata.Get(googlecloud.GoogleMessageIDHeaderKey) + if publishedMessageId != receivedMessageId { + t.Fatalf("Published message ID %s does not match received message ID %s", publishedMessageId, receivedMessageId) + } +} + +func TestPublisherDoesNotAttemptToCreateTopic(t *testing.T) { + topic := fmt.Sprintf("missing_topic_%d", rand.Int()) + + // Set up publisher + pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{ + // DoNotCheckTopicExistence is set to true, so the publisher will not check + // if the topic exists and will also not attempt to create it. + DoNotCheckTopicExistence: true, + DoNotCreateTopicIfMissing: false, + }, nil) + require.NoError(t, err) + defer pub.Close() + + // Publish a message + publishedMsg := message.NewMessage(watermill.NewUUID(), []byte{}) + require.Error(t, pub.Publish(topic, publishedMsg), googlecloud.ErrTopicDoesNotExist) +} + func produceMessages(t *testing.T, topic string, howMany int) { pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{}, nil) require.NoError(t, err) diff --git a/pkg/googlecloud/subscriber.go b/pkg/googlecloud/subscriber.go index 1f7c225..8100a1a 100644 --- a/pkg/googlecloud/subscriber.go +++ b/pkg/googlecloud/subscriber.go @@ -176,7 +176,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa } s.logger.Info("Subscribing to Google Cloud PubSub topic", logFields) - output := make(chan *message.Message, 0) + output := make(chan *message.Message) sub, err := s.subscription(ctx, subscriptionName, topic) if err != nil {