Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 committed Aug 26, 2024
1 parent a6cac07 commit d754752
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
if err != nil {
return err
}
t.EnableMessageOrdering = p.config.EnableMessageOrdering

logFields := make(watermill.LogFields, 2)
logFields["topic"] = topic
Expand Down Expand Up @@ -211,6 +210,7 @@ func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, e

p.topicsLock.Lock()
defer func() {
t.EnableMessageOrdering = p.config.EnableMessageOrdering
if err == nil {
p.topics[topic] = t
}
Expand Down
64 changes: 64 additions & 0 deletions pkg/googlecloud/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package googlecloud_test
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -276,3 +278,65 @@ func produceMessages(t *testing.T, topic string, howMany int) {

require.NoError(t, pub.Publish(topic, messages...))
}

func TestPublishOrdering(t *testing.T) {
t.Skip("to investigate")

pub, sub := newPubSub(
t,
true,
googlecloud.NewOrderingMarshaler(func(topic string, msg *message.Message) (string, error) {
return msg.Metadata["ordering"], nil
}),
googlecloud.NewOrderingUnmarshaler(func(orderingKey string, msg *message.Message) error {
return nil
}),
googlecloud.TopicSubscriptionNameWithSuffix("TestPublishOrdering"),
)

defer func() {
_ = pub.Close()
_ = sub.Close()
}()

topic := fmt.Sprintf("topic_ordering_%v", uuid.NewString())

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

messages, err := sub.Subscribe(ctx, topic)
require.NoError(t, err)

newMsg := func(id string, ordering string) *message.Message {
msg := message.NewMessage(id, []byte{})
msg.Metadata["ordering"] = ordering
return msg
}

toPublish := []*message.Message{
newMsg("1", "A"),
newMsg("2", "A"),
newMsg("3", "B"),
newMsg("4", "B"),
}

for i := range toPublish {
err := pub.Publish(topic, toPublish[i])
require.NoError(t, err)
}

received := map[string][]string{}

for i := 0; i < len(toPublish); i++ {
select {
case msg := <-messages:
received[msg.Metadata["ordering"]] = append(received[msg.Metadata["ordering"]], msg.UUID)
msg.Ack()
case <-time.After(5 * time.Second):
t.Fatal("timeout")
}
}

assert.Equal(t, []string{"1", "2"}, received["A"])
assert.Equal(t, []string{"3", "4"}, received["B"])
}

0 comments on commit d754752

Please sign in to comment.