Skip to content

Commit

Permalink
refacto codestyle
Browse files Browse the repository at this point in the history
  • Loading branch information
EvgeniiMunin committed Aug 22, 2024
1 parent b4bf1bd commit 54dd419
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

@ConditionalOnProperty(prefix = "hooks." + GreenbidsRealTimeDataModule.CODE, name = "enabled", havingValue = "true")
@Configuration
Expand All @@ -42,6 +43,8 @@ GreenbidsRealTimeDataModule greenbidsRealTimeDataModule(
.expireAfterWrite(cacheExpirationMinutes, TimeUnit.MINUTES)
.build();

final ReentrantLock lock = new ReentrantLock();

final GreenbidsRealTimeDataProperties globalProperties = GreenbidsRealTimeDataProperties.of(
modelCacheWithExpiration,
thresholdsCacheWithExpiration,
Expand All @@ -50,7 +53,8 @@ GreenbidsRealTimeDataModule greenbidsRealTimeDataModule(
gcsBucketName,
cacheExpirationMinutes,
onnxModelCacheKeyPrefix,
thresholdsCacheKeyPrefix
thresholdsCacheKeyPrefix,
lock
);

return new GreenbidsRealTimeDataModule(List.of(
Expand All @@ -62,6 +66,7 @@ GreenbidsRealTimeDataModule greenbidsRealTimeDataModule(
googleCloudGreenbidsProject,
gcsBucketName,
onnxModelCacheKeyPrefix,
thresholdsCacheKeyPrefix)));
thresholdsCacheKeyPrefix,
lock)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@
@Value
public class ThrottlingThresholds {

@JsonProperty("featurizer")
String featurizer;

@JsonProperty("pipeline")
String pipeline;

@JsonProperty("thresholds")
List<Double> thresholds;

Expand All @@ -31,14 +25,10 @@ public class ThrottlingThresholds {

@JsonCreator
public ThrottlingThresholds(
@JsonProperty("featurizer") String featurizer,
@JsonProperty("pipeline") String pipeline,
@JsonProperty("thresholds") List<Double> thresholds,
@JsonProperty("tpr") List<Double> tpr,
@JsonProperty("fpr") List<Double> fpr,
@JsonProperty("version") String version) {
this.featurizer = featurizer;
this.pipeline = pipeline;
this.thresholds = thresholds;
this.tpr = tpr;
this.fpr = fpr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import lombok.Value;
import org.prebid.server.hooks.modules.greenbids.real.time.data.core.ThrottlingThresholds;

import java.util.concurrent.locks.Lock;

@Value(staticConstructor = "of")
public class GreenbidsRealTimeDataProperties {

Expand All @@ -31,4 +33,7 @@ public class GreenbidsRealTimeDataProperties {

@JsonProperty(value = "thresholdsCacheKeyPrefix", required = true)
String thresholdsCacheKeyPrefix;

@JsonProperty(value = "lock", required = true)
Lock lock;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,23 @@ public class ModelCache {

Storage storage;

ReentrantLock lock;

String onnxModelCacheKeyPrefix;

ReentrantLock lock;

public ModelCache(
String modelPath,
Storage storage,
String gcsBucketName,
Cache<String, OnnxModelRunner> cache,
String onnxModelCacheKeyPrefix) {
String onnxModelCacheKeyPrefix,
ReentrantLock lock) {
this.gcsBucketName = gcsBucketName;
this.modelPath = modelPath;
this.cache = cache;
this.storage = storage;
this.lock = new ReentrantLock();
this.onnxModelCacheKeyPrefix = onnxModelCacheKeyPrefix;
this.lock = lock;
}

public OnnxModelRunner getModelRunner(String pbuid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,27 @@ public class ThresholdCache {

Storage storage;

ReentrantLock lock;

ObjectMapper mapper;

String thresholdsCacheKeyPrefix;

ReentrantLock lock;

public ThresholdCache(
String thresholdPath,
Storage storage,
String gcsBucketName,
ObjectMapper mapper,
Cache<String, ThrottlingThresholds> cache,
String thresholdsCacheKeyPrefix) {
String thresholdsCacheKeyPrefix,
ReentrantLock lock) {
this.gcsBucketName = gcsBucketName;
this.thresholdPath = thresholdPath;
this.cache = cache;
this.storage = storage;
this.lock = new ReentrantLock();
this.mapper = mapper;
this.thresholdsCacheKeyPrefix = thresholdsCacheKeyPrefix;
this.lock = lock;
}

public ThrottlingThresholds getThrottlingThresholds(String pbuid) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
Expand All @@ -87,6 +88,7 @@ public class GreenbidsRealTimeDataProcessedAuctionRequestHook implements Process
private final String gcsBucketName;
private final String onnxModelCacheKeyPrefix;
private final String thresholdsCacheKeyPrefix;
private final ReentrantLock lock;

public GreenbidsRealTimeDataProcessedAuctionRequestHook(
ObjectMapper mapper,
Expand All @@ -96,7 +98,8 @@ public GreenbidsRealTimeDataProcessedAuctionRequestHook(
String googleCloudGreenbidsProject,
String gcsBucketName,
String onnxModelCacheKeyPrefix,
String thresholdsCacheKeyPrefix) {
String thresholdsCacheKeyPrefix,
ReentrantLock lock) {
this.mapper = Objects.requireNonNull(mapper);
this.jacksonMapper = new JacksonMapper(mapper);
this.modelCacheWithExpiration = modelCacheWithExpiration;
Expand All @@ -106,6 +109,7 @@ public GreenbidsRealTimeDataProcessedAuctionRequestHook(
this.gcsBucketName = gcsBucketName;
this.onnxModelCacheKeyPrefix = onnxModelCacheKeyPrefix;
this.thresholdsCacheKeyPrefix = thresholdsCacheKeyPrefix;
this.lock = lock;
}

@Override
Expand Down Expand Up @@ -189,7 +193,8 @@ private OnnxModelRunner retrieveOnnxModelRunner(Partner partner, Storage storage
storage,
gcsBucketName,
modelCacheWithExpiration,
onnxModelCacheKeyPrefix);
onnxModelCacheKeyPrefix,
lock);
return modelCache.getModelRunner(partner.getPbuid());
}

Expand All @@ -201,7 +206,8 @@ private Double retrieveThreshold(Partner partner, Storage storage) {
gcsBucketName,
mapper,
thresholdsCacheWithExpiration,
thresholdsCacheKeyPrefix);
thresholdsCacheKeyPrefix,
lock);
final ThrottlingThresholds throttlingThresholds = thresholdCache
.getThrottlingThresholds(partner.getPbuid());
return partner.getThresholdForPartner(throttlingThresholds);
Expand All @@ -211,8 +217,7 @@ private Map<String, Map<String, Boolean>> runModeAndFilterBidders(
OnnxModelRunner onnxModelRunner,
List<ThrottlingMessage> throttlingMessages,
String[][] throttlingInferenceRows,
Double threshold
) {
Double threshold) {
final Map<String, Map<String, Boolean>> impsBiddersFilterMap = new HashMap<>();
final OrtSession.Result results;
try {
Expand Down Expand Up @@ -332,8 +337,7 @@ private Map<String, Ortb2ImpExtResult> createOrtb2ImpExt(
greenbidsId, impBiddersFilterMap, isExploration);
return Ortb2ImpExtResult.of(
explorationResult, tid);
}
));
}));
}

private Tags toAnalyticsTags(List<AnalyticsResult> analyticsResults) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import org.prebid.server.hooks.modules.greenbids.real.time.data.core.ThrottlingThresholds;
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.AnalyticsResult;
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.ExplorationResult;
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.ModelCache;
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.OnnxModelRunner;
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.Ortb2ImpExtResult;
import org.prebid.server.hooks.modules.greenbids.real.time.data.model.ThresholdCache;
import org.prebid.server.hooks.modules.greenbids.real.time.data.v1.model.analytics.ActivityImpl;
import org.prebid.server.hooks.modules.greenbids.real.time.data.v1.model.analytics.AppliedToImpl;
import org.prebid.server.hooks.modules.greenbids.real.time.data.v1.model.analytics.ResultImpl;
Expand All @@ -47,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.UnaryOperator;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -80,10 +79,6 @@ public class GreenbidsRealTimeDataProcessedAuctionRequestHookTest {

private Cache<String, ThrottlingThresholds> thresholdsCacheWithExpiration;

private ModelCache modelCache;

private ThresholdCache thresholdCache;

@BeforeEach
public void setUp() {
final ObjectMapper mapper = new ObjectMapper();
Expand All @@ -102,20 +97,8 @@ public void setUp() {
GOOGLE_CLOUD_PROJECT,
GCS_BUCKET_NAME,
ONNX_MODEL_CACHE_KEY_PREFIX,
THRESHOLDS_CACHE_KEY_PREFIX);
modelCache = new ModelCache(
null,
null,
null,
modelCacheWithExpiration,
ONNX_MODEL_CACHE_KEY_PREFIX);
thresholdCache = new ThresholdCache(
null,
null,
null,
jacksonMapper.mapper(),
thresholdsCacheWithExpiration,
THRESHOLDS_CACHE_KEY_PREFIX);
THRESHOLDS_CACHE_KEY_PREFIX,
new ReentrantLock());
}

@Test
Expand All @@ -135,10 +118,10 @@ public void shouldExitEarlyWhenPartnerNotActivatedInBidRequest() throws IOExcept
final AuctionInvocationContext invocationContext = givenAuctionInvocationContext(auctionContext);
when(invocationContext.auctionContext()).thenReturn(auctionContext);

modelCache.getCache().cleanUp();
thresholdCache.getCache().cleanUp();
modelCache.getCache().put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());
thresholdCache.getCache().put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());
modelCacheWithExpiration.cleanUp();
thresholdsCacheWithExpiration.cleanUp();
modelCacheWithExpiration.put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());
thresholdsCacheWithExpiration.put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());

// when
final Future<InvocationResult<AuctionRequestPayload>> future = target
Expand Down Expand Up @@ -182,9 +165,9 @@ public void shouldExitEarlyWhenThresholdIsNotAvailable() throws OrtException, IO
final AuctionInvocationContext invocationContext = givenAuctionInvocationContext(auctionContext);
when(invocationContext.auctionContext()).thenReturn(auctionContext);

modelCache.getCache().cleanUp();
thresholdCache.getCache().cleanUp();
modelCache.getCache().put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());
modelCacheWithExpiration.cleanUp();
thresholdsCacheWithExpiration.cleanUp();
modelCacheWithExpiration.put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());

// when
final Future<InvocationResult<AuctionRequestPayload>> future = target
Expand Down Expand Up @@ -228,9 +211,9 @@ public void shouldExitEarlyWhenModelIsNotAvailable() throws IOException {
final AuctionInvocationContext invocationContext = givenAuctionInvocationContext(auctionContext);
when(invocationContext.auctionContext()).thenReturn(auctionContext);

modelCache.getCache().cleanUp();
thresholdCache.getCache().cleanUp();
thresholdCache.getCache().put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());
modelCacheWithExpiration.cleanUp();
thresholdsCacheWithExpiration.cleanUp();
thresholdsCacheWithExpiration.put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());

// when
final Future<InvocationResult<AuctionRequestPayload>> future = target
Expand Down Expand Up @@ -274,10 +257,10 @@ public void shouldNotFilterBiddersAndReturnAnalyticsTagWhenExploration() throws
final AuctionInvocationContext invocationContext = givenAuctionInvocationContext(auctionContext);
when(invocationContext.auctionContext()).thenReturn(auctionContext);

modelCache.getCache().cleanUp();
thresholdCache.getCache().cleanUp();
modelCache.getCache().put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());
thresholdCache.getCache().put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());
modelCacheWithExpiration.cleanUp();
thresholdsCacheWithExpiration.cleanUp();
modelCacheWithExpiration.put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());
thresholdsCacheWithExpiration.put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());

final AnalyticsResult expectedAnalyticsResult = expectedAnalyticsResult(true, true);

Expand Down Expand Up @@ -330,10 +313,10 @@ public void shouldFilterBiddersBasedOnModelWhenAnyFeatureNotAvailable() throws O
final AuctionInvocationContext invocationContext = givenAuctionInvocationContext(auctionContext);
when(invocationContext.auctionContext()).thenReturn(auctionContext);

modelCache.getCache().cleanUp();
thresholdCache.getCache().cleanUp();
modelCache.getCache().put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());
thresholdCache.getCache().put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());
modelCacheWithExpiration.cleanUp();
thresholdsCacheWithExpiration.cleanUp();
modelCacheWithExpiration.put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());
thresholdsCacheWithExpiration.put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());

final BidRequest expectedBidRequest = expectedUpdatedBidRequest(
request -> request, jacksonMapper, explorationRate);
Expand Down Expand Up @@ -390,10 +373,10 @@ public void shouldFilterBiddersBasedOnModelResults() throws OrtException, IOExce
final AuctionInvocationContext invocationContext = givenAuctionInvocationContext(auctionContext);
when(invocationContext.auctionContext()).thenReturn(auctionContext);

modelCache.getCache().cleanUp();
thresholdCache.getCache().cleanUp();
modelCache.getCache().put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());
thresholdCache.getCache().put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());
modelCacheWithExpiration.cleanUp();
thresholdsCacheWithExpiration.cleanUp();
modelCacheWithExpiration.put("onnxModelRunner_test-pbuid", givenOnnxModelRunner());
thresholdsCacheWithExpiration.put("throttlingThresholds_test-pbuid", givenThrottlingThresholds());

final BidRequest expectedBidRequest = expectedUpdatedBidRequest(
request -> request, jacksonMapper, explorationRate);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
{
"featurizer": "featurizer",
"pipeline": "aaabbbccc",
"thresholds": [
0.4,
0.224,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,26 @@ public void requestShouldFailIfHttpRequestTimedOut(Vertx vertx, VertxTestContext
}));
}

@Test
public void requestShouldFailIfHttpResponseTimedOut(Vertx vertx, VertxTestContext context) {
// given
final BasicHttpClient httpClient = new BasicHttpClient(vertx, vertx.createHttpClient());
final int serverPort = 8888;

startServer(serverPort, 0L, 2000L);

// when
final Future<?> future = httpClient.get("http://localhost:" + serverPort, 1000L);

// then
future.onComplete(context.failing(e -> {
assertThat(e)
.isInstanceOf(TimeoutException.class)
.hasMessage("Timeout period of 1000ms has been exceeded");
context.completeNow();
}));
}

/**
* The server returns entire response or body with delay.
*/
Expand Down

0 comments on commit 54dd419

Please sign in to comment.