Skip to content
This repository has been archived by the owner on Dec 11, 2023. It is now read-only.

Commit

Permalink
add bounded subscriptions for kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
odacremolbap committed Jul 26, 2023
1 parent e63635b commit bc9134c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 122 deletions.
3 changes: 3 additions & 0 deletions pkg/backend/impl/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ func (s *kafka) Subscribe(name string, bounds *broker.TriggerBounds, ccb backend
// caller's callback for dispatching events from Kafka.
ccbDispatch: ccb,

// caller's callback for setting subscription status.
scb: scb,

// cancel function let us control when we want to exit the subscription loop.
ctx: ctx,
cancel: cancel,
Expand Down
133 changes: 11 additions & 122 deletions pkg/backend/impl/kafka/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.uber.org/zap"

"github.com/triggermesh/brokers/pkg/backend"
"github.com/triggermesh/brokers/pkg/status"
)

const (
Expand Down Expand Up @@ -63,8 +64,8 @@ type subscription struct {
// caller's callback for dispatching events from Redis.
ccbDispatch backend.ConsumerDispatcher

// // caller's callback for subscription status changes
// scb backend.SubscriptionStatusChange
// caller's callback for subscription status changes
scb backend.SubscriptionStatusChange

// cancel function let us control when the subscription loop should exit.
ctx context.Context
Expand Down Expand Up @@ -145,18 +146,17 @@ func (s *subscription) start() {
return
}

// TODO add checkbounds check
// // If an end date has been specified, compare the current message ID
// // with the end date. If the message ID is newer than the end date,
// // exit the loop.
// if s.checkBoundsExceeded != nil {
// if exitLoop = s.checkBoundsExceeded(record.Offset); exitLoop {
// s.scb(&status.SubscriptionStatus{
// Status: status.SubscriptionStatusComplete,
// })
// break
// }
// }
if s.checkBoundsExceeded != nil {
if exitLoop = s.checkBoundsExceeded(record); exitLoop {
s.scb(&status.SubscriptionStatus{
Status: status.SubscriptionStatusComplete,
})
return
}
}

if s.trackingEnabled {
if err := ce.Context.SetExtension(BackendIDAttribute, record.Offset); err != nil {
Expand All @@ -172,111 +172,6 @@ func (s *subscription) start() {
zap.Error(err))
}
}(record)

// // If we are processing pending messages the ACK might take a
// // while to be sent. We need to set the message ID so that the
// // next requested element is not any of the pending being processed.
// if id != ">" {
// id = msg.ID
// }
// })

// streams, err := s.client.XReadGroup(s.ctx, &goredis.XReadGroupArgs{
// Group: s.group,
// Consumer: s.instance,
// Streams: []string{s.stream, id},
// Count: 1,
// // Setting block low since cancelling the context
// // does not force the read to finish, making the process slow
// // to exit.
// Block: 3 * time.Second,
// NoAck: false,
// }).Result()

// if err != nil {
// // Ignore errors when the blocking period ends without
// // receiving any event, and errors when the context is
// // canceled
// if !errors.Is(err, goredis.Nil) &&
// !strings.HasSuffix(err.Error(), "i/o timeout") &&
// err.Error() != "context canceled" {
// s.logger.Errorw("Error reading CloudEvents from consumer group", zap.String("group", s.group), zap.Error(err))
// }
// continue
// }

// if len(streams) != 1 {
// s.logger.Errorw("unexpected number of streams read", zap.Any("streams", streams))
// continue
// }

// // If we are processing pending messages from Redis and we reach
// // EOF, switch to reading new messages.
// if len(streams[0].Messages) == 0 && id != ">" {
// id = ">"
// }

// for _, msg := range streams[0].Messages {
// ce := &cloudevents.Event{}
// for k, v := range msg.Values {
// if k != ceKey {
// s.logger.Debug(fmt.Sprintf("Ignoring non expected key at message from backend: %s", k))
// continue
// }

// if err = ce.UnmarshalJSON([]byte(v.(string))); err != nil {
// s.logger.Errorw("Could not unmarshal CloudEvent from Redis", zap.Error(err))
// continue
// }
// }

// // If there was no valid CE in the message ACK so that we do not receive it again.
// if err = ce.Validate(); err != nil {
// s.logger.Warn(fmt.Sprintf("Removing non CloudEvent message from backend: %s", msg.ID))
// if err = s.ack(msg.ID); err != nil {
// s.logger.Errorw(fmt.Sprintf("could not ACK the Redis message %s containing a non valid CloudEvent", id),
// zap.Error(err))
// }

// continue
// }

// // If an end date has been specified, compare the current message ID
// // with the end date. If the message ID is newer than the end date,
// // exit the loop.
// if s.checkBoundsExceeded != nil {
// if exitLoop = s.checkBoundsExceeded(msg.ID); exitLoop {
// s.scb(&status.SubscriptionStatus{
// Status: status.SubscriptionStatusComplete,
// })
// break
// }
// }

// if s.trackingEnabled {
// if err = ce.Context.SetExtension(BackendIDAttribute, msg.ID); err != nil {
// s.logger.Errorw(fmt.Sprintf("could not set %s attributes for the Redis message %s. Tracking will not be possible.", BackendIDAttribute, msg.ID),
// zap.Error(err))
// }
// }

// go func(msgID string) {
// s.ccbDispatch(ce)
// if err := s.ack(msgID); err != nil {
// s.logger.Errorw(fmt.Sprintf("could not ACK the Redis message %s containing CloudEvent %s", msgID, ce.Context.GetID()),
// zap.Error(err))
// }
// }(msg.ID)

// // If we are processing pending messages the ACK might take a
// // while to be sent. We need to set the message ID so that the
// // next requested element is not any of the pending being processed.
// if id != ">" {
// id = msg.ID
// }
// }
// }

})
}

Expand All @@ -291,9 +186,3 @@ func (s *subscription) start() {

}()
}

// func (s *subscription) ack(id string) error {
// res := s.client.XAck(s.ctx, s.stream, s.group, id)
// _, err := res.Result()
// return err
// }

0 comments on commit bc9134c

Please sign in to comment.