Skip to content

Commit

Permalink
NOSNOW: improve troubleshooting experience for scenario with skipped …
Browse files Browse the repository at this point in the history
…offsets (snowflakedb#883)

(cherry picked from commit 7172481)
  • Loading branch information
sfc-gh-gjachimko authored and sangeet259 committed Aug 22, 2024
1 parent 563fd8f commit 0dc8ca0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,13 @@ public void insertRecord(SinkRecord kafkaSinkRecord, boolean isFirstRowPerPartit
insertRecords(copiedStreamingBuffer);
}
} else {
LOGGER.debug(
"Skip adding offset:{} to buffer for channel:{} because"
+ " offsetPersistedInSnowflake:{}, processedOffset:{}",
kafkaSinkRecord.kafkaOffset(),
LOGGER.warn(
"Channel {} - skipping current record - expected offset {} but received {}. The current"
+ " offset stored in Snowflake: {}",
this.getChannelNameFormatV1(),
currentOffsetPersistedInSnowflake,
currentProcessedOffset);
currentProcessedOffset,
kafkaSinkRecord.kafkaOffset(),
currentOffsetPersistedInSnowflake);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,13 @@ public void insertRecord(SinkRecord kafkaSinkRecord, boolean isFirstRowPerPartit
|| kafkaSinkRecord.kafkaOffset() >= currentProcessedOffset + 1) {
transformAndSend(kafkaSinkRecord);
} else {
LOGGER.debug(
"Skip adding offset:{} to buffer for channel:{} because"
+ " offsetPersistedInSnowflake:{}, processedOffset:{}",
kafkaSinkRecord.kafkaOffset(),
LOGGER.warn(
"Channel {} - skipping current record - expected offset {} but received {}. The current"
+ " offset stored in Snowflake: {}",
this.getChannelNameFormatV1(),
currentOffsetPersistedInSnowflake,
currentProcessedOffset);
currentProcessedOffset,
kafkaSinkRecord.kafkaOffset(),
currentOffsetPersistedInSnowflake);
}
}

Expand Down

0 comments on commit 0dc8ca0

Please sign in to comment.