Skip to content

Commit

Permalink
NOSNOW: review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-gjachimko committed Jul 16, 2024
1 parent 62209fd commit 3872b13
Showing 1 changed file with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel {
// This offset is would not be required for buffer-less channel, but we add it to keep buffered
// and non-buffered
// channel versions compatible.
private final AtomicLong latestConsumerOffset =
private final AtomicLong currentConsumerGroupOffset =
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);

// Indicates whether we need to skip and discard any leftover rows in the current batch, this
Expand Down Expand Up @@ -264,7 +264,7 @@ public DirectTopicPartitionChannel(
metricsJmxReporter,
this.offsetPersistedInSnowflake,
this.processedOffset,
this.latestConsumerOffset);
this.currentConsumerGroupOffset);
this.telemetryServiceV2.reportKafkaPartitionStart(
new SnowflakeTelemetryChannelCreation(this.tableName, this.channelNameFormatV1, startTime));

Expand Down Expand Up @@ -310,8 +310,8 @@ public void insertRecord(SinkRecord kafkaSinkRecord, boolean isFirstRowPerPartit
final long currentProcessedOffset = this.processedOffset.get();

// for backwards compatibility - set the consumer offset to be the first one received from kafka
if (latestConsumerOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.latestConsumerOffset.set(kafkaSinkRecord.kafkaOffset());
if (currentConsumerGroupOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.currentConsumerGroupOffset.set(kafkaSinkRecord.kafkaOffset());
}

// Reset the value if it's a new batch
Expand Down Expand Up @@ -704,13 +704,16 @@ private void resetChannelMetadataAfterRecovery(
+ ", consumer offset: {}",
streamingApiFallbackInvoker,
this.getChannelNameFormatV1(),
this.latestConsumerOffset.get());
this.currentConsumerGroupOffset.get());
}

final long offsetToResetInKafka =
offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
? latestConsumerOffset.get()
? currentConsumerGroupOffset.get()
: offsetRecoveredFromSnowflake + 1L;
if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
return;
}

// Reset Offset in kafka for this topic partition.
this.sinkTaskContext.offset(this.topicPartition, offsetToResetInKafka);
Expand Down Expand Up @@ -927,7 +930,7 @@ public long getProcessedOffset() {
@Override
@VisibleForTesting
public long getLatestConsumerOffset() {
return this.latestConsumerOffset.get();
return this.currentConsumerGroupOffset.get();
}

@Override
Expand Down Expand Up @@ -957,8 +960,8 @@ public SnowflakeTelemetryChannelStatus getSnowflakeTelemetryChannelStatus() {

@Override
public void setLatestConsumerOffset(long consumerOffset) {
if (consumerOffset > this.latestConsumerOffset.get()) {
this.latestConsumerOffset.set(consumerOffset);
if (consumerOffset > this.currentConsumerGroupOffset.get()) {
this.currentConsumerGroupOffset.set(consumerOffset);
}
}

Expand Down

0 comments on commit 3872b13

Please sign in to comment.