Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verticles: Make deployment synchronous #2313

Merged
merged 7 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ ContextRunner contextRunner(Vertx vertx, @Value("${vertx.init-timeout-ms}") long
}

@Bean
VerticleDeployer verticleDeployer(Vertx vertx) {
return new VerticleDeployer(vertx);
VerticleDeployer verticleDeployer(Vertx vertx, @Value("${vertx.init-timeout-ms}") long initTimeoutMs) {
return new VerticleDeployer(vertx, initTimeoutMs);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.prebid.server.spring.config.server.application;

import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
Expand Down Expand Up @@ -59,6 +58,7 @@
import org.prebid.server.util.HttpUtil;
import org.prebid.server.validation.BidderParamValidator;
import org.prebid.server.version.PrebidVersionProvider;
import org.prebid.server.vertx.verticles.InitializableVerticle;
import org.prebid.server.vertx.verticles.VerticleDefinition;
import org.prebid.server.vertx.verticles.server.ServerVerticle;
import org.prebid.server.vertx.verticles.server.application.ApplicationResource;
Expand Down Expand Up @@ -95,7 +95,7 @@ public class ApplicationServerConfiguration {
VerticleDefinition httpApplicationServerVerticleDefinition(
@Value("#{'${http.port:${server.http.port}}'}") Integer port,
@Value("#{'${vertx.http-server-instances:${server.http.server-instances}}'}") Integer instances,
BiFunction<String, SocketAddress, Verticle> applicationVerticleFactory) {
BiFunction<String, SocketAddress, InitializableVerticle> applicationVerticleFactory) {

return VerticleDefinition.ofMultiInstance(
() -> applicationVerticleFactory.apply(
Expand All @@ -108,7 +108,7 @@ VerticleDefinition httpApplicationServerVerticleDefinition(
VerticleDefinition unixSocketApplicationServerVerticleDefinition(
@Value("${server.unix-socket.path}") String path,
@Value("${server.unix-socket.server-instances}") Integer instances,
BiFunction<String, SocketAddress, Verticle> applicationVerticleFactory) {
BiFunction<String, SocketAddress, InitializableVerticle> applicationVerticleFactory) {

return VerticleDefinition.ofMultiInstance(
() -> applicationVerticleFactory.apply(
Expand Down Expand Up @@ -155,7 +155,7 @@ ExceptionHandler exceptionHandler(Metrics metrics) {
}

@Bean
BiFunction<String, SocketAddress, Verticle> applicationVerticleFactory(
BiFunction<String, SocketAddress, InitializableVerticle> applicationVerticleFactory(
@Qualifier("applicationServerRouterFactory") Supplier<Router> routerFactory,
HttpServerOptions httpServerOptions,
ExceptionHandler exceptionHandler) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.prebid.server.vertx.verticles;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

/**
* Base class for pbs Verticles, exists for making asynchronous verticles initialization synchronous,
* so that server bootstrap will crash if verticle can't init. Every child class should do initialization
* in {@link #initialize(Vertx, Context)} method
*/
public abstract class InitializableVerticle extends AbstractVerticle {

private final Promise<Void> verticleInitialization = Promise.promise();

public Future<Void> getVerticleInitialization() {
return verticleInitialization.future();
}

public abstract Future<Void> initialize(Vertx vertx, Context context);

@Override
public void init(Vertx vertx, Context context) {
initialize(vertx, context)
.onSuccess(verticleInitialization::tryComplete)
.onFailure(verticleInitialization::tryFail);
}
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
package org.prebid.server.vertx.verticles;

import io.vertx.core.Verticle;
import lombok.Value;

import java.util.function.Supplier;

@Value(staticConstructor = "of")
public class VerticleDefinition {

Supplier<Verticle> factory;
Supplier<InitializableVerticle> factory;

int amount;

public static VerticleDefinition ofZeroInstances() {
return of(null, 0);
}

public static VerticleDefinition ofSingleInstance(Supplier<Verticle> factory) {
public static VerticleDefinition ofSingleInstance(Supplier<InitializableVerticle> factory) {
return of(factory, 1);
}

public static VerticleDefinition ofMultiInstance(Supplier<Verticle> factory, int amount) {
public static VerticleDefinition ofMultiInstance(Supplier<InitializableVerticle> factory, int amount) {
return of(factory, amount);
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
package org.prebid.server.vertx.verticles;

import io.vertx.core.DeploymentOptions;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class VerticleDeployer {

private static final int TIMEOUT_MILLIS = 5000;

private final long timeoutMillis;
private final Vertx vertx;

public VerticleDeployer(Vertx vertx) {
public VerticleDeployer(Vertx vertx, long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
this.vertx = Objects.requireNonNull(vertx);
}

Expand All @@ -26,38 +29,43 @@ public void deploy(VerticleDefinition definition) {
}

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
final AtomicBoolean failed = new AtomicBoolean();

final DeploymentOptions deploymentOptions = new DeploymentOptions();
deploymentOptions.setInstances(amount);

vertx.deployVerticle(definition.getFactory(), deploymentOptions, result -> {
if (result.failed()) {
failureThrowable.set(result.cause());
failed.set(true);
}

latch.countDown();
});
final List<InitializableVerticle> verticles = toVerticles(definition);
final CompositeFuture verticlesInitialization = toVerticlesInitialization(verticles)
.onComplete(result -> latch.countDown());
verticles.forEach(vertx::deployVerticle);

try {
if (!latch.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
if (!latch.await(timeoutMillis, TimeUnit.MILLISECONDS)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of latch any timer approach with onComplete check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to block main thread until all verticles reported initialization completion. So we need some form of synchronization here. CountdownLatch is the simplest one.

throw new RuntimeException(
"Action has not completed within defined timeout %d ms".formatted(TIMEOUT_MILLIS));
"Action has not completed within defined timeout %d ms".formatted(timeoutMillis));
}

if (failed.get()) {
final Throwable cause = failureThrowable.get();
if (verticlesInitialization.failed()) {
final Throwable cause = verticlesInitialization.cause();
if (cause != null) {
throw new RuntimeException(cause);
} else {
throw new RuntimeException("Action failed");
}
}
} catch (InterruptedException e) {
} catch (
InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for action to complete", e);
}
}

private static List<InitializableVerticle> toVerticles(VerticleDefinition definition) {
return IntStream.range(0, definition.getAmount())
.mapToObj(i -> definition.getFactory().get())
.toList();
}

private static CompositeFuture toVerticlesInitialization(List<InitializableVerticle> verticles) {
final List<Future> verticlesInitializations = verticles.stream()
.map(InitializableVerticle::getVerticleInitialization)
.collect(Collectors.toCollection(ArrayList::new));

return CompositeFuture.all(verticlesInitializations);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.prebid.server.vertx.verticles.server;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
Expand All @@ -12,11 +13,12 @@
import io.vertx.ext.web.Router;
import org.apache.commons.lang3.ObjectUtils;
import org.prebid.server.handler.ExceptionHandler;
import org.prebid.server.vertx.verticles.InitializableVerticle;

import java.util.Objects;
import java.util.function.Supplier;

public class ServerVerticle extends AbstractVerticle {
public class ServerVerticle extends InitializableVerticle {

private static final Logger logger = LoggerFactory.getLogger(ServerVerticle.class);

Expand Down Expand Up @@ -48,7 +50,8 @@ public ServerVerticle(String name, SocketAddress address, Supplier<Router> route
}

@Override
public void init(Vertx vertx, Context context) {
public Future<Void> initialize(Vertx vertx, Context context) {
final Promise<Void> completionPromise = Promise.promise();
final HttpServerOptions httpServerOptions = ObjectUtils.defaultIfNull(serverOptions, new HttpServerOptions());
final HttpServer server = vertx.createHttpServer(httpServerOptions)
.requestHandler(router);
Expand All @@ -57,18 +60,20 @@ public void init(Vertx vertx, Context context) {
server.exceptionHandler(exceptionHandler);
}

server.listen(address, this::onServerStarted);
server.listen(address, result -> onServerStarted(result, completionPromise));
return completionPromise.future();
}

private void onServerStarted(AsyncResult<HttpServer> result) {
private void onServerStarted(AsyncResult<HttpServer> result, Promise<Void> completionPromise) {
if (result.succeeded()) {
completionPromise.tryComplete();
logger.info(
"Successfully started {0} instance on address: {1}, thread: {2}",
name,
address,
Thread.currentThread().getName());
} else {
throw new RuntimeException(result.cause());
completionPromise.tryFail(result.cause());
}
}
}