Skip to content
This repository has been archived by the owner on Dec 11, 2023. It is now read-only.

Commit

Permalink
add producing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
odacremolbap committed Jul 21, 2023
1 parent b00ab1b commit 34591a6
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pkg/backend/impl/kafka/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type KafkaArgs struct {
// TLSKey string `help:"TLS Certificate key to connect to Redis." env:"TLS_KEY"`
// TLSCACertificate string `help:"CA Certificate to connect to Redis." name:"tls-ca-certificate" env:"TLS_CA_CERTIFICATE"`

ConsumerGroup string `help:"Kafka consumer group name." env:"CONSUMER_GROUP" default:"default"`
ConsumerGroupPrefix string `help:"Kafka consumer group name." env:"CONSUMER_GROUP_PREFIX" default:"default"`
// Instance at the Kafka consumer group. Copied from the InstanceName at the global args.
Instance string `kong:"-"`

Expand Down
141 changes: 138 additions & 3 deletions pkg/backend/impl/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"
Expand All @@ -19,12 +20,20 @@ import (
"github.com/triggermesh/brokers/pkg/config/broker"
)

const (
// Disconnect timeout
disconnectTimeout = time.Second * 20

// Unsubscribe timeout
unsubscribeTimeout = time.Second * 10
)

func New(args *KafkaArgs, logger *zap.SugaredLogger) backend.Interface {
return &kafka{
args: args,
logger: logger,
disconnecting: false,
subs: make(map[string]subscription),
subs: make(map[string]*subscription),
}
}

Expand All @@ -38,7 +47,11 @@ type kafka struct {
client *kgo.Client

// subscription list indexed by the name.
subs map[string]subscription
subs map[string]*subscription

// Waitgroup that should be used to wait for subscribers
// before disconnecting.
wgSubs sync.WaitGroup

// disconnecting is set to avoid setting up new subscriptions
// when the broker is shutting down.
Expand Down Expand Up @@ -104,11 +117,57 @@ func (s *kafka) Start(ctx context.Context) error {
s.ctx = ctx
<-ctx.Done()

// This prevents new subscriptions from being setup
s.disconnecting = true

s.mutex.Lock()
defer s.mutex.Unlock()

for name := range s.subs {
s.unsubscribe(name)
}

// wait for all subscriptions to finish
// before returning.
allSubsFinished := make(chan struct{})
go func() {
defer close(allSubsFinished)
s.wgSubs.Wait()
}()

select {
case <-allSubsFinished:
// Clean exit.
case <-time.After(disconnectTimeout):
// Timed out, some events have not been delivered.
s.logger.Error(fmt.Sprintf("Disconnection from Redis timed out after %d", disconnectTimeout))
}

s.client.Close()
return nil
}

func (s *kafka) Produce(ctx context.Context, event *cloudevents.Event) error {
// TODO Produce
b, err := event.MarshalJSON()
if err != nil {
return fmt.Errorf("could not serialize CloudEvent: %w", err)
}

r := &kgo.Record{
Topic: s.args.Topic,
Value: b,
}

var wg sync.WaitGroup
wg.Add(1)
if err := s.client.ProduceSync(ctx, r).FirstErr(); err != nil {
return fmt.Errorf("could not produce CloudEvent to backend: %w", err)
}

s.logger.Debug(fmt.Sprintf("CloudEvent %s/%s produced to the backend as %d",
event.Context.GetSource(),
event.Context.GetID(),
r.Offset))

return nil
}
Expand All @@ -119,7 +178,58 @@ func (s *kafka) Subscribe(name string, bounds *broker.TriggerBounds, ccb backend
s.mutex.Lock()
defer s.mutex.Unlock()

// avoid subscriptions if disconnection is going on
if s.disconnecting {
return errors.New("cannot create new subscriptions while disconnecting")
}

if _, ok := s.subs[name]; ok {
return fmt.Errorf("subscription for %q alredy exists", name)
}

// TODO calculate bounds

kopts := append(s.kopts,
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.ConsumerGroup(s.args.ConsumerGroupPrefix+"."+name))

client, err := kgo.NewClient(kopts...)
if err != nil {
return fmt.Errorf("client for subscription could not be created: %w", err)
}

// We don't use the parent context but create a new one so that we can control
// how subscriptions are finished by calling cancel at our will, either when the
// global context is called, or when unsubscribing.
ctx, cancel := context.WithCancel(context.Background())

subs := &subscription{
instance: s.args.Instance,
topic: s.args.Topic,
name: name,
group: s.args.ConsumerGroupPrefix,
// TODO exceed bounds
trackingEnabled: s.args.TrackingIDEnabled,

// caller's callback for dispatching events from Kafka.
ccbDispatch: ccb,

// cancel function let us control when we want to exit the subscription loop.
ctx: ctx,
cancel: cancel,
// stoppedCh signals when a subscription has completely finished.
stoppedCh: make(chan struct{}),

client: client,
logger: s.logger,
}

s.subs[name] = subs
s.wgSubs.Add(1)
subs.start()

return nil

}

func (s *kafka) Unsubscribe(name string) {
Expand All @@ -129,6 +239,31 @@ func (s *kafka) Unsubscribe(name string) {
}

func (s *kafka) unsubscribe(name string) {
sub, ok := s.subs[name]
if !ok {
s.logger.Infow("Unsubscribe action was not needed since the subscription did not exist",
zap.String("name", name))
return
}

// Finish the subscription's context.
sub.cancel()

// Wait for the subscription to finish
select {
case <-sub.stoppedCh:
s.logger.Debugw("Graceful shutdown of subscription", zap.String("name", name))

// Clean exit.
case <-time.After(unsubscribeTimeout):
// Timed out, some events have not been delivered.
s.logger.Errorw(fmt.Sprintf("Unsubscribing from Redis timed out after %d", unsubscribeTimeout),
zap.String("name", name))
}

s.client.Close()
delete(s.subs, name)
s.wgSubs.Done()
}

func (s *kafka) Probe(ctx context.Context) error {
Expand Down
60 changes: 31 additions & 29 deletions pkg/backend/impl/kafka/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,48 @@ const (
BackendIDAttribute = "triggermeshbackendid"
)

type exceedBounds func(id string) bool

func newExceedBounds(offset string) exceedBounds {
return func(id string) bool {
// Use the greater or equal here to make it
// exclusive on bounds. When the ID matches the
// one configured at the upper bound, the message
// wont be produced.
return id >= offset
}
}
// type exceedBounds func(id string) bool

// func newExceedBounds(offset string) exceedBounds {
// return func(id string) bool {
// // Use the greater or equal here to make it
// // exclusive on bounds. When the ID matches the
// // one configured at the upper bound, the message
// // wont be produced.
// return id >= offset
// }
// }

type subscription struct {
instance string
stream string
name string
group string
checkBoundsExceeded exceedBounds
instance string
topic string
name string
group string
// checkBoundsExceeded exceedBounds

trackingEnabled bool

// caller's callback for dispatching events from Redis.
ccbDispatch backend.ConsumerDispatcher

// caller's callback for subscription status changes
scb backend.SubscriptionStatusChange
// // caller's callback for subscription status changes
// scb backend.SubscriptionStatusChange

// cancel function let us control when the subscription loop should exit.
ctx context.Context
cancel context.CancelFunc
// stoppedCh signals when a subscription has completely finished.
stoppedCh chan struct{}

client kgo.Client
client *kgo.Client
logger *zap.SugaredLogger
}

func (s *subscription) start() {
s.logger.Infow("Starting Kafka consumer",
zap.String("group", s.group),
zap.String("instance", s.instance),
zap.String("stream", s.stream))
zap.String("topic", s.topic))
// Start reading all pending messages
// id := "0"

Expand All @@ -72,7 +72,7 @@ func (s *subscription) start() {
s.logger.Debugw("Waiting for last XReadGroup operation to finish before exiting subscription",
zap.String("group", s.group),
zap.String("instance", s.instance),
zap.String("stream", s.stream))
zap.String("topic", s.topic))
exitLoop = true
}()

Expand Down Expand Up @@ -255,16 +255,18 @@ func (s *subscription) start() {
// }
// }

// s.logger.Debugw("Exited Redis subscription",
// zap.String("group", s.group),
// zap.String("instance", s.instance),
// zap.String("stream", s.stream))

// // Close stoppedCh to signal external viewers that processing for this
// // subscription is no longer running.
// close(s.stoppedCh)
})
}

s.logger.Debugw("Exited Kafka subscription",
zap.String("group", s.group),
zap.String("instance", s.instance),
zap.String("topic", s.group))

// Close stoppedCh to signal external viewers that processing for this
// subscription is no longer running.
close(s.stoppedCh)

}()
}

Expand Down

0 comments on commit 34591a6

Please sign in to comment.