Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/go_modules/golang.org/x/net-0.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 authored Aug 23, 2024
2 parents fd5128a + 1b725db commit e211f96
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 3 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ test_stress:
test_reconnect:
go test -tags=reconnect ./...

test_codecov: up wait
go test -coverprofile=coverage.out -covermode=atomic ./...

wait:
go run github.com/ThreeDotsLabs/wait-for@latest localhost:8085

Expand Down
5 changes: 5 additions & 0 deletions pkg/googlecloud/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type Unmarshaler interface {
// UUIDHeaderKey is the key of the Pub/Sub attribute that carries Waterfall UUID.
const UUIDHeaderKey = "_watermill_message_uuid"

// GoogleMessageIDHeaderKey is the key of the Pub/Sub attribute that carries Google Cloud Message ID.
// This ID is assigned by the server when the message is published and is guaranteed to be unique within the topic.
const GoogleMessageIDHeaderKey = "_watermill_message_google_message_id"

// DefaultMarshalerUnmarshaler implements Marshaler and Unmarshaler in the following way:
// All Google Cloud Pub/Sub attributes are equivalent to Waterfall Message metadata.
// Waterfall Message UUID is equivalent to an attribute with `UUIDHeaderKey` as key.
Expand Down Expand Up @@ -64,6 +68,7 @@ func (DefaultMarshalerUnmarshaler) Unmarshal(pubsubMsg *pubsub.Message) (*messag
}

metadata.Set("publishTime", pubsubMsg.PublishTime.String())
metadata.Set(GoogleMessageIDHeaderKey, pubsubMsg.ID)

msg := message.NewMessage(id, pubsubMsg.Data)
msg.Metadata = metadata
Expand Down
18 changes: 16 additions & 2 deletions pkg/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ type PublisherConfig struct {
// ProjectID is the Google Cloud Engine project ID.
ProjectID string

// If true, `Publisher` does not check if the topic exists before publishing.
DoNotCheckTopicExistence bool

// If false (default), `Publisher` tries to create a topic if there is none with the requested name.
// Otherwise, trying to subscribe to non-existent subscription results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool
// Enables the topic message ordering
EnableMessageOrdering bool
// Enables automatic resume publish upon error
EnableMessageOrderingAutoResumePublishOnError bool

// ConnectTimeout defines the timeout for connecting to Pub/Sub
ConnectTimeout time.Duration
Expand Down Expand Up @@ -144,6 +149,7 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
if err != nil {
return err
}
t.EnableMessageOrdering = p.config.EnableMessageOrdering

logFields := make(watermill.LogFields, 2)
logFields["topic"] = topic
Expand All @@ -160,11 +166,16 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
result := t.Publish(ctx, googlecloudMsg)
<-result.Ready()

_, err = result.Get(ctx)
serverMessageID, err := result.Get(ctx)
if err != nil {
if p.config.EnableMessageOrdering && p.config.EnableMessageOrderingAutoResumePublishOnError && googlecloudMsg.OrderingKey != "" {
t.ResumePublish(googlecloudMsg.OrderingKey)
}
return errors.Wrapf(err, "publishing message %s failed", msg.UUID)
}

msg.Metadata.Set(GoogleMessageIDHeaderKey, serverMessageID)

p.logger.Trace("Message published to Google PubSub", logFields)
}

Expand Down Expand Up @@ -207,14 +218,17 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e
}()

t = p.client.Topic(topic)
t.EnableMessageOrdering = p.config.EnableMessageOrdering

// todo: theoretically, one could want different publish settings per topic, which is supported by the client lib
// different instances of publisher may be used then
if p.config.PublishSettings != nil {
t.PublishSettings = *p.config.PublishSettings
}

if p.config.DoNotCheckTopicExistence {
return t, nil
}

exists, err := t.Exists(ctx)
if err != nil {
return nil, errors.Wrapf(err, "could not check if topic %s exists", topic)
Expand Down
76 changes: 76 additions & 0 deletions pkg/googlecloud/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,82 @@ func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) {
require.Equal(t, googlecloud.ErrUnexpectedTopic, errors.Cause(err))
}

func TestReceivedMessageContainsMessageId(t *testing.T) {
logger := watermill.NewStdLogger(true, true)

sub, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{}, logger)
require.NoError(t, err)

topic := fmt.Sprintf("topic_%d", rand.Int())

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

messages, err := sub.Subscribe(ctx, topic)
require.NoError(t, err)

howManyMessages := 1
produceMessages(t, topic, howManyMessages)

msg := <-messages
if msg.Metadata.Get(googlecloud.GoogleMessageIDHeaderKey) == "" {
t.Fatalf("Message %s does not contain %s", msg.UUID, googlecloud.GoogleMessageIDHeaderKey)
}
}

func TestPublishedMessageIdMatchesReceivedMessageId(t *testing.T) {
logger := watermill.NewStdLogger(true, true)
topic := fmt.Sprintf("topic_message_id_match_%d", rand.Int())

// Set up subscriber
sub, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{}, logger)
require.NoError(t, err)

// Subscribe to the topic
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
messages, err := sub.Subscribe(ctx, topic)
require.NoError(t, err)

// Set up publisher
pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{}, nil)
require.NoError(t, err)
defer pub.Close()

// Publish a message
publishedMsg := message.NewMessage(watermill.NewUUID(), []byte{})
require.NoError(t, pub.Publish(topic, publishedMsg))
publishedMessageId := publishedMsg.Metadata.Get(googlecloud.GoogleMessageIDHeaderKey)

if publishedMessageId == "" {
t.Fatalf("Published message %s does not contain %s", publishedMsg.UUID, googlecloud.GoogleMessageIDHeaderKey)
}

receivedMsg := <-messages
receivedMessageId := receivedMsg.Metadata.Get(googlecloud.GoogleMessageIDHeaderKey)
if publishedMessageId != receivedMessageId {
t.Fatalf("Published message ID %s does not match received message ID %s", publishedMessageId, receivedMessageId)
}
}

func TestPublisherDoesNotAttemptToCreateTopic(t *testing.T) {
topic := fmt.Sprintf("missing_topic_%d", rand.Int())

// Set up publisher
pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{
// DoNotCheckTopicExistence is set to true, so the publisher will not check
// if the topic exists and will also not attempt to create it.
DoNotCheckTopicExistence: true,
DoNotCreateTopicIfMissing: false,
}, nil)
require.NoError(t, err)
defer pub.Close()

// Publish a message
publishedMsg := message.NewMessage(watermill.NewUUID(), []byte{})
require.Error(t, pub.Publish(topic, publishedMsg), googlecloud.ErrTopicDoesNotExist)
}

func produceMessages(t *testing.T, topic string, howMany int) {
pub, err := googlecloud.NewPublisher(googlecloud.PublisherConfig{}, nil)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/googlecloud/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
}
s.logger.Info("Subscribing to Google Cloud PubSub topic", logFields)

output := make(chan *message.Message, 0)
output := make(chan *message.Message)

sub, err := s.subscription(ctx, subscriptionName, topic)
if err != nil {
Expand Down

0 comments on commit e211f96

Please sign in to comment.