Skip to content

Commit

Permalink
feat(kafka): add an error loop
Browse files Browse the repository at this point in the history
  • Loading branch information
grandwizard28 committed Oct 8, 2024
1 parent feccf15 commit e8d7aec
Showing 1 changed file with 57 additions and 30 deletions.
87 changes: 57 additions & 30 deletions receiver/signozkafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,29 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
}()
<-consumerGroup.ready

select {
case p := <-consumerGroup.pausePartition:
c.settings.Logger.Info("pausing partition", zap.Int32("partition", p))
c.consumerGroup.Pause(map[string][]int32{c.topics[0]: {p}})
case p := <-consumerGroup.resumePartition:
c.settings.Logger.Info("resuming partition", zap.Int32("partition", p))
c.consumerGroup.Resume(map[string][]int32{c.topics[0]: {p}})
case <-ctx.Done():
return nil
}
go func() {
c.errorLoop(ctx, consumerGroup)
}()

return nil
}

func (c *kafkaTracesConsumer) errorLoop(ctx context.Context, tracesConsumerGroup *tracesConsumerGroupHandler) {
for {
select {
case p := <-tracesConsumerGroup.pausePartition:
c.settings.Logger.Info("pausing partition", zap.Int32("partition", p))
c.consumerGroup.Pause(map[string][]int32{c.topics[0]: {p}})
case p := <-tracesConsumerGroup.resumePartition:
c.settings.Logger.Info("resuming partition", zap.Int32("partition", p))
c.consumerGroup.Resume(map[string][]int32{c.topics[0]: {p}})
case <-ctx.Done():
c.settings.Logger.Info("Consumer Error loop stopped", zap.Error(ctx.Err()))
return
}
}
}

func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
for {
// `Consume` should be called inside an infinite loop, when a
Expand Down Expand Up @@ -277,20 +286,29 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
}()
<-metricsConsumerGroup.ready

select {
case p := <-metricsConsumerGroup.pausePartition:
c.settings.Logger.Info("pausing partition", zap.Int32("partition", p))
c.consumerGroup.Pause(map[string][]int32{c.topics[0]: {p}})
case p := <-metricsConsumerGroup.resumePartition:
c.settings.Logger.Info("resuming partition", zap.Int32("partition", p))
c.consumerGroup.Resume(map[string][]int32{c.topics[0]: {p}})
case <-ctx.Done():
return nil
}
go func() {
c.errorLoop(ctx, metricsConsumerGroup)
}()

return nil
}

func (c *kafkaMetricsConsumer) errorLoop(ctx context.Context, metricsConsumerGroup *metricsConsumerGroupHandler) {
for {
select {
case p := <-metricsConsumerGroup.pausePartition:
c.settings.Logger.Info("pausing partition", zap.Int32("partition", p))
c.consumerGroup.Pause(map[string][]int32{c.topics[0]: {p}})
case p := <-metricsConsumerGroup.resumePartition:
c.settings.Logger.Info("resuming partition", zap.Int32("partition", p))
c.consumerGroup.Resume(map[string][]int32{c.topics[0]: {p}})
case <-ctx.Done():
c.settings.Logger.Info("Consumer Error loop stopped", zap.Error(ctx.Err()))
return
}
}
}

func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
for {
// `Consume` should be called inside an infinite loop, when a
Expand Down Expand Up @@ -421,20 +439,29 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
}()
<-logsConsumerGroup.ready

select {
case p := <-logsConsumerGroup.pausePartition:
c.settings.Logger.Info("pausing partition", zap.Int32("partition", p))
c.consumerGroup.Pause(map[string][]int32{c.topics[0]: {p}})
case p := <-logsConsumerGroup.resumePartition:
c.settings.Logger.Info("resuming partition", zap.Int32("partition", p))
c.consumerGroup.Resume(map[string][]int32{c.topics[0]: {p}})
case <-ctx.Done():
return nil
}
go func() {
c.errorLoop(ctx, logsConsumerGroup)
}()

return nil
}

func (c *kafkaLogsConsumer) errorLoop(ctx context.Context, logsConsumerGroup *logsConsumerGroupHandler) {
for {
select {
case p := <-logsConsumerGroup.pausePartition:
c.settings.Logger.Info("pausing partition", zap.Int32("partition", p))
c.consumerGroup.Pause(map[string][]int32{c.topics[0]: {p}})
case p := <-logsConsumerGroup.resumePartition:
c.settings.Logger.Info("resuming partition", zap.Int32("partition", p))
c.consumerGroup.Resume(map[string][]int32{c.topics[0]: {p}})
case <-ctx.Done():
c.settings.Logger.Info("Consumer Error loop stopped", zap.Error(ctx.Err()))
return
}
}
}

func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
for {
// `Consume` should be called inside an infinite loop, when a
Expand Down

0 comments on commit e8d7aec

Please sign in to comment.