diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index eadd3e69c..576e15009 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -364,7 +364,7 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream } if len(cfg.Sources) != 0 { - if len(cfg.Sources) != len(resp.Sources) { + if len(cfg.Sources) != len(resp.Config.Sources) { return nil, ErrStreamSourceNotSupported } for i := range cfg.Sources { @@ -443,7 +443,7 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream } if len(cfg.Sources) != 0 { - if len(cfg.Sources) != len(resp.Sources) { + if len(cfg.Sources) != len(resp.Config.Sources) { return nil, ErrStreamSourceNotSupported } for i := range cfg.Sources { diff --git a/jetstream/test/jetstream_test.go b/jetstream/test/jetstream_test.go index d9e5106ae..3168bfa67 100644 --- a/jetstream/test/jetstream_test.go +++ b/jetstream/test/jetstream_test.go @@ -1664,8 +1664,8 @@ func TestStreamConfigMatches(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - cfgSource := jetstream.StreamConfig{ - Name: "source", + cfg := jetstream.StreamConfig{ + Name: "stream", Description: "desc", Subjects: []string{"foo.*"}, Retention: jetstream.WorkQueuePolicy, @@ -1703,15 +1703,15 @@ func TestStreamConfigMatches(t *testing.T) { }, } - s, err := js.CreateStream(context.Background(), cfgSource) + s, err := js.CreateStream(context.Background(), cfg) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !reflect.DeepEqual(s.CachedInfo().Config, cfgSource) { - t.Fatalf("StreamConfig doesn't match") + if !reflect.DeepEqual(s.CachedInfo().Config, cfg) { + t.Fatalf("StreamConfig doesn't match: %#v", s.CachedInfo().Config) } - cfg := jetstream.StreamConfig{ + cfgMirror := jetstream.StreamConfig{ Name: "mirror", MaxConsumers: 10, MaxMsgs: 100, @@ -1720,8 +1720,9 @@ func TestStreamConfigMatches(t *testing.T) { MaxMsgsPerSubject: 1000, MaxMsgSize: 10000, Replicas: 1, + Duplicates: 10 * time.Second, Mirror: &jetstream.StreamSource{ - Name: "source", + Name: "stream", OptStartSeq: 10, SubjectTransforms: []jetstream.SubjectTransformConfig{ {Source: ">", Destination: "transformed.>"}, @@ -1731,12 +1732,43 @@ func TestStreamConfigMatches(t *testing.T) { SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, } - s, err = js.CreateStream(context.Background(), cfg) + s, err = js.CreateStream(context.Background(), cfgMirror) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !reflect.DeepEqual(s.CachedInfo().Config, cfg) { - t.Fatalf("StreamConfig doesn't match") + if !reflect.DeepEqual(s.CachedInfo().Config, cfgMirror) { + t.Fatalf("StreamConfig doesn't match: %#v", s.CachedInfo().Config) + } + + cfgSourcing := jetstream.StreamConfig{ + Name: "sourcing", + Subjects: []string{"BAR"}, + MaxConsumers: 10, + MaxMsgs: 100, + MaxBytes: 1000, + MaxAge: 100 * time.Second, + MaxMsgsPerSubject: 1000, + MaxMsgSize: 10000, + Replicas: 1, + Duplicates: 10 * time.Second, + Sources: []*jetstream.StreamSource{ + { + Name: "stream", + OptStartSeq: 10, + SubjectTransforms: []jetstream.SubjectTransformConfig{ + {Source: ">", Destination: "transformed.>"}, + }, + }, + }, + SubjectTransform: &jetstream.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + } + + s, err = js.CreateStream(context.Background(), cfgSourcing) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(s.CachedInfo().Config, cfgSourcing) { + t.Fatalf("StreamConfig doesn't match: %#v", s.CachedInfo().Config) } } @@ -1792,8 +1824,6 @@ func TestConsumerConfigMatches(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } if !reflect.DeepEqual(c.CachedInfo().Config, cfg) { - fmt.Printf("%#v\n", c.CachedInfo().Config) - fmt.Printf("%#v\n", cfg) t.Fatalf("ConsumerConfig doesn't match") } } diff --git a/jsm.go b/jsm.go index 720efe2ee..266bf0665 100644 --- a/jsm.go +++ b/jsm.go @@ -817,7 +817,7 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { return nil, ErrStreamSubjectTransformNotSupported } if len(cfg.Sources) != 0 { - if len(cfg.Sources) != len(resp.Sources) { + if len(cfg.Sources) != len(resp.Config.Sources) { return nil, ErrStreamSourceNotSupported } for i := range cfg.Sources { @@ -1029,7 +1029,7 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error } if len(cfg.Sources) != 0 { - if len(cfg.Sources) != len(resp.Sources) { + if len(cfg.Sources) != len(resp.Config.Sources) { return nil, ErrStreamSourceNotSupported } for i := range cfg.Sources { diff --git a/test/js_test.go b/test/js_test.go index aad1c5bc8..d7ac0936c 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2799,8 +2799,8 @@ func TestStreamConfigMatches(t *testing.T) { nc, js := jsClient(t, srv) defer nc.Close() - cfgSource := nats.StreamConfig{ - Name: "source", + cfg := nats.StreamConfig{ + Name: "stream", Description: "desc", Subjects: []string{"foo.*"}, Retention: nats.WorkQueuePolicy, @@ -2838,15 +2838,15 @@ func TestStreamConfigMatches(t *testing.T) { }, } - s, err := js.AddStream(&cfgSource) + s, err := js.AddStream(&cfg) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !reflect.DeepEqual(s.Config, cfgSource) { - t.Fatalf("StreamConfig doesn't match") + if !reflect.DeepEqual(s.Config, cfg) { + t.Fatalf("StreamConfig doesn't match: %#v", s.Config) } - cfg := nats.StreamConfig{ + cfgMirror := nats.StreamConfig{ Name: "mirror", MaxConsumers: 10, MaxMsgs: 100, @@ -2855,8 +2855,9 @@ func TestStreamConfigMatches(t *testing.T) { MaxMsgsPerSubject: 1000, MaxMsgSize: 10000, Replicas: 1, + Duplicates: 10 * time.Second, Mirror: &nats.StreamSource{ - Name: "source", + Name: "stream", OptStartSeq: 10, SubjectTransforms: []nats.SubjectTransformConfig{ {Source: ">", Destination: "transformed.>"}, @@ -2866,12 +2867,43 @@ func TestStreamConfigMatches(t *testing.T) { SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, } - s, err = js.AddStream(&cfg) + s, err = js.AddStream(&cfgMirror) if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !reflect.DeepEqual(s.Config, cfg) { - t.Fatalf("StreamConfig doesn't match") + if !reflect.DeepEqual(s.Config, cfgMirror) { + t.Fatalf("StreamConfig doesn't match: %#v", s.Config) + } + + cfgSourcing := nats.StreamConfig{ + Name: "sourcing", + Subjects: []string{"BAR"}, + MaxConsumers: 10, + MaxMsgs: 100, + MaxBytes: 1000, + MaxAge: 100 * time.Second, + MaxMsgsPerSubject: 1000, + MaxMsgSize: 10000, + Replicas: 1, + Duplicates: 10 * time.Second, + Sources: []*nats.StreamSource{ + { + Name: "stream", + OptStartSeq: 10, + SubjectTransforms: []nats.SubjectTransformConfig{ + {Source: ">", Destination: "transformed.>"}, + }, + }, + }, + SubjectTransform: &nats.SubjectTransformConfig{Source: ">", Destination: "transformed.>"}, + } + + s, err = js.AddStream(&cfgSourcing) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !reflect.DeepEqual(s.Config, cfgSourcing) { + t.Fatalf("StreamConfig doesn't match: %#v", s.Config) } }