Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client optimizations. #37

Merged
merged 10 commits into from
Nov 16, 2023
Merged
6 changes: 3 additions & 3 deletions java/benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ java {
application {
// Define the main class for the application.
mainClass = 'javababushka.benchmarks.BenchmarkingApp'
applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release"
applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug"
}

tasks.withType(Test) {
tasks.withType(Test) {
testLogging {
exceptionFormat "full"
events "started", "skipped", "passed", "failed"
showStandardStreams true
}
jvmArgs "-Djava.library.path=${projectDir}/../target/debug"
jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug"
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void connectToRedis() {

@Override
public void connectToRedis(ConnectionSettings connectionSettings) {
waitForResult(asyncConnectToRedis(connectionSettings));
waitForResult(asyncConnectToRedis(connectionSettings));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,12 @@ public static void printResults(
public static void testClientSetGet(
Supplier<Client> clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) {
for (int concurrentNum : config.concurrentTasks) {
int iterations = 100000;
Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX);
int iterations =
Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX);
for (int clientCount : config.clientCount) {
for (int dataSize : config.dataSize) {
System.out.printf(
"%n =====> %s <===== %d clients %d concurrent %d data %n%n",
clientCreator.get().getName(), clientCount, concurrentNum, dataSize);
AtomicInteger iterationCounter = new AtomicInteger(0);
// Collections.synchronizedList

Map<ChosenAction, List<Long>> actionResults =
Map.of(
ChosenAction.GET_EXISTING, new ArrayList<>(),
Expand All @@ -172,6 +169,12 @@ public static void testClientSetGet(
clients.add(newClient);
}

String clientName = clients.get(0).getName();

System.out.printf(
"%n =====> %s <===== %d clients %d concurrent %d data %n%n",
clientName, clientCount, concurrentNum, dataSize);

for (int taskNum = 0; taskNum < concurrentNum; taskNum++) {
final int taskNumDebugging = taskNum;
tasks.add(
Expand Down Expand Up @@ -214,7 +217,7 @@ public static void testClientSetGet(
});
}
if (config.debugLogging) {
System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName());
System.out.printf("%s client Benchmarking: %n", clientName);
System.out.printf(
"===> concurrentNum = %d, clientNum = %d, tasks = %d%n",
concurrentNum, clientCount, tasks.size());
Expand Down Expand Up @@ -257,7 +260,7 @@ public static void testClientSetGet(
calculatedResults,
config.resultsFile.get(),
dataSize,
clientCreator.get().getName(),
clientName,
clientCount,
concurrentNum,
iterations / ((after - before) / TPS_NORMALIZATION));
Expand Down
20 changes: 16 additions & 4 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@ dependencies {

tasks.register('protobuf', Exec) {
doFirst {
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/org/babushka/javababushka/generated').toString())
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString())
}
commandLine 'protoc',
'-Iprotobuf=babushka-core/src/protobuf/',
'--java_out=java/client/src/main/java/org/babushka/javababushka/generated',
'--java_out=java/client/src/main/java/javababushka/generated',
'babushka-core/src/protobuf/connection_request.proto',
'babushka-core/src/protobuf/redis_request.proto',
'babushka-core/src/protobuf/response.proto'
workingDir Paths.get(project.rootDir.path, '..').toFile()
}

tasks.register('cleanProtobuf') {
doFirst {
project.delete(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString())
}
}

tasks.register('buildRust', Exec) {
commandLine 'cargo', 'build', '--release'
workingDir project.rootDir
Expand All @@ -54,7 +60,13 @@ tasks.register('buildAll') {
}

compileJava.dependsOn('protobuf')
clean.dependsOn('cleanProtobuf')

test {
systemProperty("java.library.path", "${projectDir}/../target/release")
tasks.withType(Test) {
testLogging {
exceptionFormat "full"
events "started", "skipped", "passed", "failed"
showStandardStreams true
}
jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug"
}
30 changes: 21 additions & 9 deletions java/client/src/main/java/javababushka/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,28 +227,40 @@ public void run() {
}

public void closeConnection() {
try {
channel.flush();

// flush and close the channel
channel.flush();
channel.close();
// TODO: check that the channel is closed

// shutdown the event loop group gracefully by waiting for the remaining response
// and then shutting down the connection
try {
long waitStarted = System.nanoTime();
long waitUntil =
waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos
for (var future : responses) {
if (future == null || future.isDone()) {
for (var responseFuture : responses) {
if (responseFuture == null || responseFuture.isDone()) {
continue;
}
try {
future.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS);
responseFuture.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException ignored) {
// TODO: print warning
} catch (TimeoutException e) {
future.cancel(true);
// TODO cancel the rest
responseFuture.cancel(true);
// TODO: cancel the rest
break;
}
}
} finally {
// channel.closeFuture().sync()
group.shutdownGracefully();
var shuttingDown = group.shutdownGracefully();
try {
shuttingDown.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
assert group.isShutdown() : "Redis connection did not shutdown gracefully";
}
}

Expand Down
Loading