Skip to content

Commit

Permalink
Fix of flaky `TestHoodieClientMultiWriter::testMultiWriterWithAsyncTa…
Browse files Browse the repository at this point in the history
…bleServicesWithConflict`
  • Loading branch information
geserdugarov committed Oct 20, 2024
1 parent 26a3a00 commit 6cf65af
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand Down Expand Up @@ -461,7 +462,7 @@ private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) {

@ParameterizedTest
@MethodSource("providerClassResolutionStrategyAndTableType")
public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass,
public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class<? extends LockProvider<?>> providerClass,
ConflictResolutionStrategy resolutionStrategy) throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
Expand Down Expand Up @@ -526,6 +527,11 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);

// Test with concurrent operations could be flaky, to reduce possibility of wrong ordering some queue is added
// For InProcessLockProvider we could wait less
final int waitAndRunFirst = providerClass.isAssignableFrom(InProcessLockProvider.class) ? 2000 : 20000;
final int waitAndRunSecond = providerClass.isAssignableFrom(InProcessLockProvider.class) ? 3000 : 30000;

// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
final String newCommitTime = client1.createNewInstantTime();
Expand All @@ -534,7 +540,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta

// We want the upsert to go through only after the compaction
// and cleaning schedule completion. So, waiting on latch here.
latchCountDownAndWait(scheduleCountDownLatch, 30000);
latchCountDownAndWait(scheduleCountDownLatch, waitAndRunSecond);
if (tableType == HoodieTableType.MERGE_ON_READ && !(resolutionStrategy instanceof PreferWriterConflictResolutionStrategy)) {
// HUDI-6897: Improve SimpleConcurrentFileWritesConflictResolutionStrategy for NB-CC
// There is no need to throw concurrent modification exception for the simple strategy under NB-CC, because the compactor would finally resolve the conflicts instead.
Expand All @@ -561,12 +567,12 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT);
});
}
latchCountDownAndWait(scheduleCountDownLatch, 30000);
latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
});

Future future3 = executors.submit(() -> {
assertDoesNotThrow(() -> {
latchCountDownAndWait(scheduleCountDownLatch, 30000);
latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
String cleanCommitTime = client3.createNewInstantTime();
client3.scheduleTableService(cleanCommitTime, Option.empty(), TableServiceType.CLEAN);
});
Expand All @@ -590,15 +596,15 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
future1 = executors.submit(() -> {
final String newCommitTime = client1.createNewInstantTime();
final int numRecords = 100;
latchCountDownAndWait(runCountDownLatch, 30000);
latchCountDownAndWait(runCountDownLatch, waitAndRunSecond);
assertDoesNotThrow(() -> {
createCommitWithInserts(cfg, client1, thirdCommitTime, newCommitTime, numRecords, true);
validInstants.add(newCommitTime);
});
});

future2 = executors.submit(() -> {
latchCountDownAndWait(runCountDownLatch, 30000);
latchCountDownAndWait(runCountDownLatch, waitAndRunFirst);
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client2.compact(pendingCompactionTime);
Expand All @@ -609,7 +615,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
});

future3 = executors.submit(() -> {
latchCountDownAndWait(runCountDownLatch, 30000);
latchCountDownAndWait(runCountDownLatch, waitAndRunFirst);
assertDoesNotThrow(() -> {
client3.clean(pendingCleanTime, false);
validInstants.add(pendingCleanTime);
Expand Down

0 comments on commit 6cf65af

Please sign in to comment.