Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(businessAttribute): parallelize-business-attribute-propagation #10638

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -942,29 +942,27 @@ public RelatedEntitiesScrollResult scrollRelatedEntities(
final String edgeCriteria = relationshipFilterToCriteria(relationshipFilter);

final RelationshipDirection relationshipDirection = relationshipFilter.getDirection();
String srcNodeLabel = "";

String matchTemplate = "MATCH (src %s)-[r%s %s]-(dest %s)%s";
if (relationshipDirection == RelationshipDirection.INCOMING) {
matchTemplate = "MATCH (src %s)<-[r%s %s]-(dest %s)%s";
} else if (relationshipDirection == RelationshipDirection.OUTGOING) {
matchTemplate = "MATCH (src %s)-[r%s %s]->(dest %s)%s";
}

String srcNodeLabel = StringUtils.EMPTY;
// Create a URN from the String. Only proceed if srcCriteria is not null or empty
if (srcCriteria != null && !srcCriteria.isEmpty()) {
if (StringUtils.isNotEmpty(srcCriteria)) {
final String urnValue =
sourceEntityFilter.getOr().get(0).getAnd().get(0).getValue().toString();
try {
final Urn urn = Urn.createFromString(urnValue);
srcNodeLabel = urn.getEntityType();
matchTemplate = matchTemplate.replace("(src ", "(src:%s ");
} catch (URISyntaxException e) {
log.error("Failed to parse URN: {} ", urnValue, e);
}
}
String matchTemplate = "MATCH (src:%s %s)-[r%s %s]-(dest %s)%s";
if (relationshipDirection == RelationshipDirection.INCOMING) {
matchTemplate = "MATCH (src:%s %s)<-[r%s %s]-(dest %s)%s";
} else if (relationshipDirection == RelationshipDirection.OUTGOING) {
matchTemplate = "MATCH (src:%s %s)-[r%s %s]->(dest %s)%s";
}

final String returnNodes =
String.format(
"RETURN dest, src, type(r)"); // Return both related entity and the relationship type.
final String returnCount = "RETURN count(*)"; // For getting the total results.

String relationshipTypeFilter = "";
if (!relationshipTypes.isEmpty()) {
Expand All @@ -974,18 +972,34 @@ public RelatedEntitiesScrollResult scrollRelatedEntities(
String whereClause = computeEntityTypeWhereClause(sourceTypes, destinationTypes);

// Build Statement strings
String baseStatementString =
String.format(
matchTemplate,
srcNodeLabel,
srcCriteria,
relationshipTypeFilter,
edgeCriteria,
destCriteria,
whereClause);
String baseStatementString;

if (StringUtils.isNotEmpty(srcNodeLabel)) {
baseStatementString =
String.format(
matchTemplate,
srcNodeLabel,
srcCriteria,
relationshipTypeFilter,
edgeCriteria,
destCriteria,
whereClause);
} else {
baseStatementString =
String.format(
matchTemplate,
srcCriteria,
relationshipTypeFilter,
edgeCriteria,
destCriteria,
whereClause);
}
log.info(baseStatementString);

final String returnNodes =
"RETURN dest, src, type(r)"; // Return both related entity and the relationship type.
final String returnCount = "RETURN count(*)"; // For getting the total results.

final String resultStatementString =
String.format("%s %s SKIP $offset LIMIT $count", baseStatementString, returnNodes);
final String countStatementString = String.format("%s %s", baseStatementString, returnCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import com.linkedin.mxe.PlatformEvent;
import com.linkedin.platform.event.v1.EntityChangeEvent;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
Expand All @@ -41,18 +42,25 @@ public class BusinessAttributeUpdateHookService {
private final UpdateIndicesService updateIndicesService;
private final int relatedEntitiesCount;
private final int getRelatedEntitiesBatchSize;

private ExecutorService executor;
public static final String TAG = "TAG";
public static final String GLOSSARY_TERM = "GLOSSARY_TERM";
public static final String DOCUMENTATION = "DOCUMENTATION";
private final int threadCount;
private final int AWAIT_TERMINATION_TIME = 10;
private final int keepAlive;

public BusinessAttributeUpdateHookService(
@NonNull UpdateIndicesService updateIndicesService,
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesCount}") int relatedEntitiesCount,
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesBatchSize}") int relatedBatchSize) {
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesBatchSize}") int relatedBatchSize,
@NonNull @Value("${businessAttribute.threadCount}") int threadCount,
@NonNull @Value("${businessAttribute.keepAliveTime}") int keepAlive) {
this.updateIndicesService = updateIndicesService;
this.relatedEntitiesCount = relatedEntitiesCount;
this.getRelatedEntitiesBatchSize = relatedBatchSize;
this.threadCount = threadCount;
this.keepAlive = keepAlive;
}

public void handleChangeEvent(
Expand All @@ -61,38 +69,51 @@ public void handleChangeEvent(
GenericRecordUtils.deserializePayload(
event.getPayload().getValue(), EntityChangeEvent.class);

executor = businessAttributePropagationWorkerPool(threadCount, keepAlive);

if (!entityChangeEvent.getEntityType().equals(Constants.BUSINESS_ATTRIBUTE_ENTITY_NAME)) {
log.info("Skipping MCL event for entity:" + entityChangeEvent.getEntityType());
return;
}

final Set<String> businessAttributeCategories =
ImmutableSet.of(TAG, GLOSSARY_TERM, DOCUMENTATION);
if (!businessAttributeCategories.contains(entityChangeEvent.getCategory())) {
log.info("Skipping MCL event for category: " + entityChangeEvent.getCategory());
return;
}

Urn urn = entityChangeEvent.getEntityUrn();
log.info("Business Attribute update hook invoked for urn :" + urn);
log.info("Business Attribute update hook invoked for urn : {}", urn);
fetchRelatedEntities(
opContext,
urn,
(batch, batchNumber) -> processBatch(opContext, batch, batchNumber),
(batch, batchNumber, entityKey) -> processBatch(opContext, batch, batchNumber, entityKey),
null,
0,
1);

executor.shutdown();
try {
if (!executor.awaitTermination(AWAIT_TERMINATION_TIME, TimeUnit.MINUTES)) {
executor.shutdownNow(); // Cancel currently executing tasks
if (!executor.awaitTermination(AWAIT_TERMINATION_TIME, TimeUnit.MINUTES))
log.error("Business Attribute Propagation Executor is not terminating");
}
} catch (InterruptedException ie) {
executor.shutdownNow();
}
}

private void fetchRelatedEntities(
@NonNull final OperationContext opContext,
@NonNull final Urn urn,
@NonNull final BiConsumer<RelatedEntitiesScrollResult, Integer> resultConsumer,
@NonNull
final TriFunction<RelatedEntitiesScrollResult, Integer, String, Callable<ExecutionResult>>
resultFunction,
@Nullable String scrollId,
int consumedEntityCount,
int batchNumber) {
GraphRetriever graph = opContext.getRetrieverContext().get().getGraphRetriever();

final ArrayList<Future<ExecutionResult>> futureList = new ArrayList<>();
RelatedEntitiesScrollResult result =
graph.scrollRelatedEntities(
null,
Expand All @@ -106,52 +127,143 @@ private void fetchRelatedEntities(
getRelatedEntitiesBatchSize,
null,
null);
resultConsumer.accept(result, batchNumber);

futureList.add(
executor.submit(resultFunction.apply(result, batchNumber, urn.getEntityKey().toString())));

consumedEntityCount = consumedEntityCount + result.getEntities().size();
if (result.getScrollId() != null && consumedEntityCount < relatedEntitiesCount) {
batchNumber = batchNumber + 1;
fetchRelatedEntities(
opContext, urn, resultConsumer, result.getScrollId(), consumedEntityCount, batchNumber);
opContext, urn, resultFunction, result.getScrollId(), consumedEntityCount, batchNumber);
}

for (Future<ExecutionResult> future : futureList) {
try {
ExecutionResult futureResult = future.get();
if (futureResult.getException() != null) {
log.error(
"Batch {} for BA:{} is failed with exception",
futureResult.getBatchNumber(),
futureResult.getEntityKey(),
futureResult.getException());
} else {
log.info(futureResult.getResult());
}
} catch (InterruptedException | ExecutionException e) {
log.error("Business Attribute Propagation Parallel Processing Exception", e);
}
}
futureList.clear();
}

private void processBatch(
private Callable<ExecutionResult> processBatch(
@NonNull OperationContext opContext,
@NonNull RelatedEntitiesScrollResult batch,
int batchNumber) {
AspectRetriever aspectRetriever = opContext.getRetrieverContext().get().getAspectRetriever();
log.info("BA Update Batch {} started", batchNumber);
Set<Urn> entityUrns =
batch.getEntities().stream()
.map(RelatedEntity::getUrn)
.map(UrnUtils::getUrn)
.collect(Collectors.toSet());

Map<Urn, Map<String, Aspect>> entityAspectMap =
aspectRetriever.getLatestAspectObjects(
entityUrns, Set.of(Constants.BUSINESS_ATTRIBUTE_ASPECT));

entityAspectMap.entrySet().stream()
.filter(entry -> entry.getValue().containsKey(Constants.BUSINESS_ATTRIBUTE_ASPECT))
.forEach(
entry -> {
final Urn entityUrn = entry.getKey();
final Aspect aspect = entry.getValue().get(Constants.BUSINESS_ATTRIBUTE_ASPECT);

updateIndicesService.handleChangeEvent(
opContext,
PegasusUtils.constructMCL(
null,
Constants.SCHEMA_FIELD_ENTITY_NAME,
entityUrn,
ChangeType.UPSERT,
Constants.BUSINESS_ATTRIBUTE_ASPECT,
opContext.getAuditStamp(),
new BusinessAttributes(aspect.data()),
null,
null,
null));
});
log.info("BA Update Batch {} completed", batchNumber);
int batchNumber,
String entityKey) {
return () -> {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
AspectRetriever aspectRetriever = opContext.getRetrieverContext().get().getAspectRetriever();
log.info("Batch {} for BA:{} started", batchNumber, entityKey);
ExecutionResult executionResult = new ExecutionResult();
executionResult.setBatchNumber(batchNumber);
executionResult.setEntityKey(entityKey);
try {
Set<Urn> entityUrns =
batch.getEntities().stream()
.map(RelatedEntity::getUrn)
.map(UrnUtils::getUrn)
.collect(Collectors.toSet());

Map<Urn, Map<String, Aspect>> entityAspectMap =
aspectRetriever.getLatestAspectObjects(
entityUrns, Set.of(Constants.BUSINESS_ATTRIBUTE_ASPECT));

entityAspectMap.entrySet().stream()
.filter(entry -> entry.getValue().containsKey(Constants.BUSINESS_ATTRIBUTE_ASPECT))
.forEach(
entry -> {
final Urn entityUrn = entry.getKey();
final Aspect aspect = entry.getValue().get(Constants.BUSINESS_ATTRIBUTE_ASPECT);
updateIndicesService.handleChangeEvent(
opContext,
PegasusUtils.constructMCL(
null,
Constants.SCHEMA_FIELD_ENTITY_NAME,
entityUrn,
ChangeType.UPSERT,
Constants.BUSINESS_ATTRIBUTE_ASPECT,
opContext.getAuditStamp(),
new BusinessAttributes(aspect.data()),
null,
null,
null));
});
stopWatch.stop();
String result =
String.format(
"Batch %s for BA:%s is completed in %s",
batchNumber, entityKey, TimeAgo.toDuration(stopWatch.getTime()))
.toString();
executionResult.setResult(result);
} catch (Exception e) {
executionResult.setException(e);
}
return executionResult;
};
}

private ExecutorService businessAttributePropagationWorkerPool(int numThreads, int keepAlive) {
numThreads = numThreads < 0 ? Runtime.getRuntime().availableProcessors() * 2 : numThreads;
return new ThreadPoolExecutor(
numThreads, numThreads, keepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
}

@FunctionalInterface
private interface TriFunction<T, U, V, R> {
R apply(T t, U u, V v);
}

@Data
private class ExecutionResult {
String result;
Throwable exception;
int batchNumber;
String entityKey;
}

private static final class TimeAgo {
private static final List<Long> times =
Arrays.asList(
TimeUnit.DAYS.toMillis(365),
TimeUnit.DAYS.toMillis(30),
TimeUnit.DAYS.toMillis(1),
TimeUnit.HOURS.toMillis(1),
TimeUnit.MINUTES.toMillis(1),
TimeUnit.SECONDS.toMillis(1),
TimeUnit.MILLISECONDS.toMillis(1));
private static final List<String> timesString =
Arrays.asList("year", "month", "day", "hour", "minute", "second", "milliseconds");

private static String toDuration(long duration) {

StringBuffer res = new StringBuffer();
for (int i = 0; i < times.size(); i++) {
Long current = times.get(i);
long temp = duration / current;
if (temp > 0) {
res.append(temp)
.append(" ")
.append(timesString.get(i))
.append(temp != 1 ? "s" : StringUtils.EMPTY)
.append(" ");
}
duration = duration % current;
}
if (StringUtils.EMPTY.equals(res.toString())) return "0 seconds ago";
else return res.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void setupTest() throws URISyntaxException {
mockUpdateIndicesService = mock(UpdateIndicesService.class);
actorUrn = Urn.createFromString(TEST_ACTOR_URN);
businessAttributeServiceHook =
new BusinessAttributeUpdateHookService(mockUpdateIndicesService, 100, 1);
new BusinessAttributeUpdateHookService(mockUpdateIndicesService, 100, 1, 10, 60);
businessAttributeUpdateHook =
new BusinessAttributeUpdateHook(businessAttributeServiceHook, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ forms:
businessAttribute:
fetchRelatedEntitiesCount: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_COUNT:20000}
fetchRelatedEntitiesBatchSize: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_BATCH_SIZE:1000}
threadCount: ${BUSINESS_ATTRIBUTE_PROPAGATION_CONCURRENCY_THREAD_COUNT:-1} # Thread Pool size, default 2 * # of cores
keepAliveTime: ${BUSINESS_ATTRIBUTE_PROPAGATION_CONCURRENCY_KEEP_ALIVE:60} # Number of seconds to keep inactive threads alive

metadataChangeProposal:
throttle:
Expand Down
Loading