From 9f006dc1de5470aa6275a1caeb6d9e794af2f0d1 Mon Sep 17 00:00:00 2001 From: Gautam Kumar Date: Thu, 14 Sep 2023 18:02:55 +0530 Subject: [PATCH] Follow-up changes based on #3768 - Addressing comments on the PR #3768 - The changes in this PR are around organization of the code only. There are no functional changes. --- .../apache/gobblin/salesforce/Histogram.java | 28 +++++++++-- .../gobblin/salesforce/HistogramGroup.java | 33 ------------- ...ava => RecordModTimeHistogramService.java} | 20 ++++---- .../gobblin/salesforce/SalesforceSource.java | 49 +++++++++---------- .../salesforce/SalesforceSourceTest.java | 10 ++-- 5 files changed, 62 insertions(+), 78 deletions(-) delete mode 100644 gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/HistogramGroup.java rename gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/{SalesforceHistogramService.java => RecordModTimeHistogramService.java} (95%) diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/Histogram.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/Histogram.java index bb0350392cc..45b52f22ba2 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/Histogram.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/Histogram.java @@ -18,20 +18,24 @@ import java.util.ArrayList; import java.util.List; +import lombok.AllArgsConstructor; import lombok.Getter; +/** + * A class encapsulating the count of records in the consecutive intervals. + */ @Getter public class Histogram { private long totalRecordCount; - private final List groups; + private final List groups; - Histogram() { + public Histogram() { totalRecordCount = 0; groups = new ArrayList<>(); } - void add(HistogramGroup group) { + void add(Group group) { groups.add(group); totalRecordCount += group.getCount(); } @@ -41,7 +45,7 @@ void add(Histogram histogram) { totalRecordCount += histogram.totalRecordCount; } - HistogramGroup get(int idx) { + Group get(int idx) { return this.groups.get(idx); } @@ -49,4 +53,20 @@ HistogramGroup get(int idx) { public String toString() { return groups.toString(); } + + /** + * A class to encapsulate the key and the corresponding frequency/count, in the context of a + * histogram. It represents one data point in the histogram. + */ + @Getter + @AllArgsConstructor + static class Group { + private final String key; + private final int count; + + @Override + public String toString() { + return key + ":" + count; + } + } } diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/HistogramGroup.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/HistogramGroup.java deleted file mode 100644 index f43d6106030..00000000000 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/HistogramGroup.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gobblin.salesforce; - -import lombok.AllArgsConstructor; -import lombok.Getter; - - -@Getter -@AllArgsConstructor -class HistogramGroup { - private final String key; - private final int count; - - @Override - public String toString() { - return key + ":" + count; - } -} \ No newline at end of file diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/RecordModTimeHistogramService.java similarity index 95% rename from gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java rename to gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/RecordModTimeHistogramService.java index ec09da57a35..08ab968ed0c 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceHistogramService.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/RecordModTimeHistogramService.java @@ -54,7 +54,7 @@ * mapping of number of records to be fetched by time intervals. */ @Slf4j -public class SalesforceHistogramService { +public class RecordModTimeHistogramService { private static final int MIN_SPLIT_TIME_MILLIS = 1000; private static final String ZERO_TIME_SUFFIX = "-00:00:00"; private static final Gson GSON = new Gson(); @@ -74,7 +74,7 @@ public class SalesforceHistogramService { protected SalesforceConnector salesforceConnector; private final SfConfig sfConfig; - SalesforceHistogramService(SfConfig sfConfig, SalesforceConnector connector) { + RecordModTimeHistogramService(SfConfig sfConfig, SalesforceConnector connector) { this.sfConfig = sfConfig; salesforceConnector = connector; } @@ -97,9 +97,9 @@ Histogram getHistogram(String entity, String watermarkColumn, SourceState state, // exchange the first histogram group key with the global low watermark to ensure that the low watermark is captured // in the range of generated partitions - HistogramGroup firstGroup = histogram.get(0); + Histogram.Group firstGroup = histogram.get(0); Date lwmDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); - histogram.getGroups().set(0, new HistogramGroup(Utils.epochToDate(lwmDate.getTime(), SalesforceSource.SECONDS_FORMAT), + histogram.getGroups().set(0, new Histogram.Group(Utils.epochToDate(lwmDate.getTime(), SalesforceSource.SECONDS_FORMAT), firstGroup.getCount())); // refine the histogram @@ -178,8 +178,8 @@ private Histogram getRefinedHistogram(SalesforceConnector connector, String enti log.info("Refining histogram with bucket size limit {}.", bucketSizeLimit); - HistogramGroup currentGroup; - HistogramGroup nextGroup; + Histogram.Group currentGroup; + Histogram.Group nextGroup; final TableCountProbingContext probingContext = new TableCountProbingContext(connector, entity, watermarkColumn, bucketSizeLimit, probeLimit); @@ -188,9 +188,9 @@ private Histogram getRefinedHistogram(SalesforceConnector connector, String enti } // make a copy of the histogram list and add a dummy entry at the end to avoid special processing of the last group - List list = new ArrayList(histogram.getGroups()); + List list = new ArrayList(histogram.getGroups()); Date hwmDate = Utils.toDate(partition.getHighWatermark(), Partitioner.WATERMARKTIMEFORMAT); - list.add(new HistogramGroup(Utils.epochToDate(hwmDate.getTime(), SalesforceSource.SECONDS_FORMAT), 0)); + list.add(new Histogram.Group(Utils.epochToDate(hwmDate.getTime(), SalesforceSource.SECONDS_FORMAT), 0)); for (int i = 0; i < list.size() - 1; i++) { currentGroup = list.get(i); @@ -261,7 +261,7 @@ private Histogram parseDayBucketingHistogram(JsonArray records) { String time = element.get("time").getAsString() + ZERO_TIME_SUFFIX; int count = element.get("cnt").getAsInt(); - histogram.add(new HistogramGroup(time, count)); + histogram.add(new Histogram.Group(time, count)); } return histogram; @@ -278,7 +278,7 @@ private void getHistogramRecursively(TableCountProbingContext probingContext, Hi if (count <= probingContext.bucketSizeLimit || probingContext.probeCount > probingContext.probeLimit || (midpointEpoch - startEpoch < MIN_SPLIT_TIME_MILLIS)) { - histogram.add(new HistogramGroup(Utils.epochToDate(startEpoch, SalesforceSource.SECONDS_FORMAT), count)); + histogram.add(new Histogram.Group(Utils.epochToDate(startEpoch, SalesforceSource.SECONDS_FORMAT), count)); return; } diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java index aae7681e1eb..10a876f279d 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java @@ -75,15 +75,12 @@ public class SalesforceSource extends QueryBasedSource { public static final String USE_ALL_OBJECTS = "use.all.objects"; public static final boolean DEFAULT_USE_ALL_OBJECTS = false; - @VisibleForTesting - static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing"; - static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize"; + public static final String ENABLE_DYNAMIC_PROBING = "salesforce.enableDynamicProbing"; + public static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize"; static final int DEFAULT_MIN_TARGET_PARTITION_SIZE = 250000; - @VisibleForTesting - static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning"; - @VisibleForTesting - static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit"; + public static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning"; + public static final String EARLY_STOP_TOTAL_RECORDS_LIMIT = "salesforce.earlyStopTotalRecordsLimit"; private static final long DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT = DEFAULT_MIN_TARGET_PARTITION_SIZE * 4; static final String SECONDS_FORMAT = "yyyy-MM-dd-HH:mm:ss"; @@ -91,7 +88,7 @@ public class SalesforceSource extends QueryBasedSource { private boolean isEarlyStopped = false; protected SalesforceConnector salesforceConnector = null; - private SalesforceHistogramService salesforceHistogramService; + private RecordModTimeHistogramService histogramService; public SalesforceSource() { this.lineageInfo = Optional.absent(); @@ -103,9 +100,9 @@ public SalesforceSource() { } @VisibleForTesting - SalesforceSource(SalesforceHistogramService salesforceHistogramService) { + SalesforceSource(RecordModTimeHistogramService histogramService) { this.lineageInfo = Optional.absent(); - this.salesforceHistogramService = salesforceHistogramService; + this.histogramService = histogramService; } @Override @@ -133,11 +130,11 @@ protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity } @Override protected List generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { - SalesforceConnector connector = getConnector(state); SfConfig sfConfig = new SfConfig(state.getProperties()); - if (salesforceHistogramService == null) { - salesforceHistogramService = new SalesforceHistogramService(sfConfig, connector); + if (histogramService == null) { + salesforceConnector = getConnector(state); + histogramService = new RecordModTimeHistogramService(sfConfig, getConnector(state)); } List workUnits; @@ -294,7 +291,7 @@ List generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState st Partition partition = partitioner.getGlobalPartition(previousWatermark); Histogram histogram = - salesforceHistogramService.getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition); + histogramService.getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition); // we should look if the count is too big, cut off early if count exceeds the limit, or bucket size is too large @@ -303,7 +300,7 @@ List generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState st // TODO: we should consider move this logic into getRefinedHistogram so that we can early terminate the search if (isEarlyStopEnabled(state)) { histogramAdjust = new Histogram(); - for (HistogramGroup group : histogram.getGroups()) { + for (Histogram.Group group : histogram.getGroups()) { histogramAdjust.add(group); long earlyStopRecordLimit = state.getPropAsLong(EARLY_STOP_TOTAL_RECORDS_LIMIT, DEFAULT_EARLY_STOP_TOTAL_RECORDS_LIMIT); if (histogramAdjust.getTotalRecordCount() > earlyStopRecordLimit) { @@ -316,7 +313,7 @@ List generateWorkUnitsHelper(SourceEntity sourceEntity, SourceState st long expectedHighWatermark = partition.getHighWatermark(); if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) { - HistogramGroup lastPlusOne = histogram.get(histogramAdjust.getGroups().size()); + Histogram.Group lastPlusOne = histogram.get(histogramAdjust.getGroups().size()); long earlyStopHighWatermark = Long.parseLong(Utils.toDateTimeFormat(lastPlusOne.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT)); log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark); @@ -354,13 +351,13 @@ String generateSpecifiedPartitions(Histogram histogram, int minTargetPartitionSi log.info("maxPartitions: " + maxPartitions); log.info("interval: " + interval); - List groups = histogram.getGroups(); + List groups = histogram.getGroups(); List partitionPoints = new ArrayList<>(); DescriptiveStatistics statistics = new DescriptiveStatistics(); int count = 0; - HistogramGroup group; - Iterator it = groups.iterator(); + Histogram.Group group; + Iterator it = groups.iterator(); while (it.hasNext()) { group = it.next(); @@ -427,18 +424,18 @@ protected Set getSourceEntities(State state) { return super.getSourceEntities(state); } - SalesforceConnector connector = getConnector(state); + salesforceConnector = getConnector(state); try { - if (!connector.connect()) { + if (!salesforceConnector.connect()) { throw new RuntimeException("Failed to connect."); } } catch (RestApiConnectionException e) { throw new RuntimeException("Failed to connect.", e); } - List commands = RestApiConnector.constructGetCommand(connector.getFullUri("/sobjects")); + List commands = RestApiConnector.constructGetCommand(salesforceConnector.getFullUri("/sobjects")); try { - CommandOutput response = connector.getResponse(commands); + CommandOutput response = salesforceConnector.getResponse(commands); Iterator itr = (Iterator) response.getResults().values().iterator(); if (itr.hasNext()) { String next = itr.next(); @@ -462,9 +459,9 @@ private static Set getSourceEntities(String response) { } protected SalesforceConnector getConnector(State state) { - if (this.salesforceConnector == null) { - this.salesforceConnector = new SalesforceConnector(state); + if (salesforceConnector == null) { + salesforceConnector = new SalesforceConnector(state); } - return this.salesforceConnector; + return salesforceConnector; } } diff --git a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java index 9e01b9c737d..8048236e383 100644 --- a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java +++ b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java @@ -58,7 +58,7 @@ void testSourceLineageInfo() { @Test void testGenerateSpecifiedPartitionFromSinglePointHistogram() { Histogram histogram = new Histogram(); - histogram.add(new HistogramGroup("2014-02-13-00:00:00", 10)); + histogram.add(new Histogram.Group("2014-02-13-00:00:00", 10)); SalesforceSource source = new SalesforceSource(); long expectedHighWatermark = 20170407152123L; @@ -114,13 +114,13 @@ void testGenerateWorkUnitsHelperForSinglePartitionAndEarlyStop(long earlyStopRec state.setProp(SalesforceSource.EARLY_STOP_TOTAL_RECORDS_LIMIT, earlyStopRecordCount); long previousWtm = 20140213000000L; - SalesforceHistogramService salesforceHistogramService = mock(SalesforceHistogramService.class); + RecordModTimeHistogramService histogramService = mock(RecordModTimeHistogramService.class); String deltaFieldKey = state.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY); Partition partition = new Partitioner(state).getGlobalPartition(previousWtm); - when(salesforceHistogramService.getHistogram(sourceEntity.getSourceEntityName(), deltaFieldKey, state, partition)) + when(histogramService.getHistogram(sourceEntity.getSourceEntityName(), deltaFieldKey, state, partition)) .thenReturn(getHistogram()); - List actualWorkUnits = new SalesforceSource(salesforceHistogramService).generateWorkUnitsHelper(sourceEntity, state, previousWtm); + List actualWorkUnits = new SalesforceSource(histogramService).generateWorkUnitsHelper(sourceEntity, state, previousWtm); Assert.assertEquals(actualWorkUnits.size(), 1); double actualHighWtm = (double) new Gson().fromJson(actualWorkUnits.get(0).getExpectedHighWatermark(), HashMap.class).get("value"); Assert.assertEquals(actualHighWtm, Double.parseDouble(String.valueOf(expectedHighWtm))); @@ -139,7 +139,7 @@ private Histogram getHistogram() { Histogram histogram = new Histogram(); for (String group: HISTOGRAM.split(", ")) { String[] groupInfo = group.split("::"); - histogram.add(new HistogramGroup(groupInfo[0], Integer.parseInt(groupInfo[1]))); + histogram.add(new Histogram.Group(groupInfo[0], Integer.parseInt(groupInfo[1]))); } return histogram; }