Skip to content

Commit

Permalink
SNOW-1061855 Fix the E2E test for SchemaEvolutionDropTable with a sin…
Browse files Browse the repository at this point in the history
…gle buffer (snowflakedb#882)
  • Loading branch information
sfc-gh-gjachimko authored Jul 18, 2024
1 parent 7fce0f5 commit fbd15d6
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel {
private final AtomicLong processedOffset =
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);

// This offset is would not be required for buffer-less channel, but we add it to keep buffered
// and non-buffered
// channel versions compatible.
private final AtomicLong currentConsumerGroupOffset =
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);

// Indicates whether we need to skip and discard any leftover rows in the current batch, this
// could happen when the channel gets invalidated and reset, then anything left in the buffer
// should be skipped
Expand Down Expand Up @@ -258,7 +264,7 @@ public DirectTopicPartitionChannel(
metricsJmxReporter,
this.offsetPersistedInSnowflake,
this.processedOffset,
new AtomicLong(0));
this.currentConsumerGroupOffset);
this.telemetryServiceV2.reportKafkaPartitionStart(
new SnowflakeTelemetryChannelCreation(this.tableName, this.channelNameFormatV1, startTime));

Expand Down Expand Up @@ -303,6 +309,11 @@ public void insertRecord(SinkRecord kafkaSinkRecord, boolean isFirstRowPerPartit
final long currentOffsetPersistedInSnowflake = this.offsetPersistedInSnowflake.get();
final long currentProcessedOffset = this.processedOffset.get();

// for backwards compatibility - set the consumer offset to be the first one received from kafka
if (currentConsumerGroupOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.currentConsumerGroupOffset.set(kafkaSinkRecord.kafkaOffset());
}

// Reset the value if it's a new batch
if (isFirstRowPerPartitionInBatch) {
needToSkipCurrentBatch = false;
Expand Down Expand Up @@ -689,17 +700,18 @@ private void resetChannelMetadataAfterRecovery(
SnowflakeStreamingIngestChannel newChannel) {
if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
LOGGER.info(
"{} Channel:{}, offset token is NULL",
"{} Channel:{}, offset token is NULL, will attempt to use offset managed by the connector"
+ ", consumer offset: {}",
streamingApiFallbackInvoker,
this.getChannelNameFormatV1());
this.getChannelNameFormatV1(),
this.currentConsumerGroupOffset.get());
}

final long offsetToResetInKafka = offsetRecoveredFromSnowflake + 1L;
final long offsetToResetInKafka =
offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
? currentConsumerGroupOffset.get()
: offsetRecoveredFromSnowflake + 1L;
if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
LOGGER.info(
"There is no offset registered for {} channel in Snowflake. Stop recovering the channel"
+ " metadata.",
this.getChannelNameFormatV1());
return;
}

Expand Down Expand Up @@ -917,9 +929,8 @@ public long getProcessedOffset() {

@Override
@VisibleForTesting
@Deprecated
public long getLatestConsumerOffset() {
return 0;
return this.currentConsumerGroupOffset.get();
}

@Override
Expand Down Expand Up @@ -948,8 +959,11 @@ public SnowflakeTelemetryChannelStatus getSnowflakeTelemetryChannelStatus() {
}

@Override
@Deprecated
public void setLatestConsumerOffset(long consumerOffset) {}
public void setLatestConsumerOffset(long consumerOffset) {
if (consumerOffset > this.currentConsumerGroupOffset.get()) {
this.currentConsumerGroupOffset.set(consumerOffset);
}
}

/**
* Converts the original kafka sink record into a Json Record. i.e key and values are converted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": "false",
"snowflake.streaming.enable.single.buffer": "$SNOWFLAKE_STREAMING_ENABLE_SINGLE_BUFFER",
"snowflake.streaming.closeChannelsInParallel.enabled": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": "false",
"snowflake.streaming.enable.single.buffer": "$SNOWFLAKE_STREAMING_ENABLE_SINGLE_BUFFER",
"snowflake.streaming.closeChannelsInParallel.enabled": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand Down
3 changes: 3 additions & 0 deletions test/test_suit/test_confluent_protobuf_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def __init__(self, driver, nameSalt):
self.sensor.uint64_val = (1 << 64) - 1

self.schema_registry_client = SchemaRegistryClient({'url': driver.schemaRegistryAddress})
#uncomment for local tests
#self.keyProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True})
#self.valueProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client, {'use.deprecated.format': True})
self.keyProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client)
self.valueProtobufSerializer = ProtobufSerializer(sensor_pb2.SensorReading, self.schema_registry_client)
producer_conf = {
Expand Down

0 comments on commit fbd15d6

Please sign in to comment.