From d4a7a8984305306f3f0a4c594f64634cd65114a9 Mon Sep 17 00:00:00 2001 From: Christopher Tubbs Date: Sat, 23 Dec 2023 03:08:15 -0500 Subject: [PATCH] Revert "Fix bug in compaction props (#4092)" This reverts commit 67fd967415612ab0770b422a5b24430655275946. Reverting in favor of #4117 --- .../accumulo/core/conf/ConfigurationCopy.java | 24 +- .../compaction/DefaultCompactionPlanner.java | 3 +- .../CompactionPlannerInitParams.java | 2 +- .../DefaultCompactionPlannerTest.java | 361 +++++++++--------- 4 files changed, 176 insertions(+), 214 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java index e487e2840a8..14424c196a1 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java @@ -30,8 +30,6 @@ * configuration */ public class ConfigurationCopy extends AccumuloConfiguration { - - AccumuloConfiguration parent = null; private long updateCount = 0; final Map copy = Collections.synchronizedMap(new HashMap<>()); @@ -44,17 +42,6 @@ public ConfigurationCopy(Map config) { this(config.entrySet()); } - /** - * Creates a new configuration - * - * @param config - configuration property key/value pairs to copy - * @param parent - Higher level accumulo config to allow for property overrides - */ - public ConfigurationCopy(Map config, AccumuloConfiguration parent) { - this(config.entrySet()); - this.parent = parent; - } - /** * Creates a new configuration. * @@ -82,20 +69,11 @@ public ConfigurationCopy() { @Override public String get(Property property) { - if (copy.containsKey(property.getKey())) { - return copy.get(property.getKey()); - } else if (parent != null) { - return parent.get(property); - } else { - return null; - } + return copy.get(property.getKey()); } @Override public void getProperties(Map props, Predicate filter) { - if (parent != null) { - parent.getProperties(props, filter); - } for (Entry entry : copy.entrySet()) { if (filter.test(entry.getKey())) { props.put(entry.getKey(), entry.getValue()); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java index 9385806831b..be8e25299bb 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java @@ -221,8 +221,7 @@ private void determineMaxFilesToCompact(InitParameters params) { this.maxFilesToCompact = Integer.parseInt(params.getServiceEnvironment().getConfiguration() .get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey())); } else { - this.maxFilesToCompact = Integer.parseInt(params.getOptions().getOrDefault("maxOpen", - Property.TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN.getDefaultValue())); + this.maxFilesToCompact = Integer.parseInt(params.getOptions().getOrDefault("maxOpen", "10")); } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java index eb287153d66..0f79ce4df00 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java @@ -60,7 +60,7 @@ public Map getOptions() { @Override public String getFullyQualifiedOption(String key) { - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".planner.opts." + key; + return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".opts." + key; } @Override diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java index c6262f1e02d..568b57cd2d1 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java @@ -22,34 +22,27 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; -import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; -import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.common.ServiceEnvironment.Configuration; import org.apache.accumulo.core.spi.compaction.CompactionPlan.Builder; -import org.apache.accumulo.core.util.ConfigurationImpl; import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; -import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; -import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DefaultCompactionPlannerTest { @@ -57,12 +50,6 @@ private static T getOnlyElement(Collection c) { return c.stream().collect(onlyElement()); } - private static final Configuration defaultConf = - new ConfigurationImpl(DefaultConfiguration.getInstance()); - private static final CompactionServiceId csid = CompactionServiceId.of("cs1"); - - private static final Logger log = LoggerFactory.getLogger(DefaultCompactionPlannerTest.class); - @Test public void testFindFilesToCompact() { @@ -143,13 +130,7 @@ public void testFindFilesToCompact() { @Test public void testRunningCompaction() { - String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," - + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}," - + "{'name':'huge','type': 'internal','numThreads':4}]"; - - var planner = createPlanner(defaultConf, executors); - + var planner = createPlanner(true); var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", "F5", "13M"); var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M"); var compacting = @@ -171,126 +152,12 @@ public void testRunningCompaction() { // planner should compact. var job = getOnlyElement(plan.getJobs()); assertEquals(candidates, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), job.getExecutor()); - } - - /** - * Tests that the maxOpen property overrides the deprecated open.max property with the default - * service - */ - @Test - @SuppressWarnings("removal") - public void testOverrideMaxOpenDefaultService() { - // Set old property and use that for max open files. - ConfigurationCopy aconf = new ConfigurationCopy(Map.of(), DefaultConfiguration.getInstance()); - aconf.set(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17"); - ConfigurationImpl config = new ConfigurationImpl(aconf); - - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(config).anyTimes(); - EasyMock.replay(senv); - - // Use the CompactionServicesConfig to create options based on default property values - var compactionServices = new CompactionServicesConfig(aconf, log::warn); - var options = compactionServices.getOptions().get("default"); - - var initParams = - new CompactionPlannerInitParams(CompactionServiceId.of("default"), options, senv); - - var planner = new DefaultCompactionPlanner(); - planner.init(initParams); - - var all = createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", "F5", "14M", "F6", - "15M", "F7", "16M", "F8", "17M", "F9", "18M", "FA", "19M", "FB", "20M", "FC", "21M", "FD", - "22M", "FE", "23M", "FF", "24M", "FG", "25M", "FH", "26M"); - Set compacting = Set.of(); - var params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - var plan = planner.makePlan(params); - var job = getOnlyElement(plan.getJobs()); - assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"), "large"), - job.getExecutor()); - - aconf.set(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "5"); - // Create new initParams so executor IDs can be reused - initParams = new CompactionPlannerInitParams(CompactionServiceId.of("default"), options, senv); - planner = new DefaultCompactionPlanner(); - planner.init(initParams); - - params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - plan = planner.makePlan(params); - job = getOnlyElement(plan.getJobs()); - assertEquals(createCFs("F1", "10M", "F2", "11M", "F3", "12M", "F4", "13M", "F5", "14M"), - job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(CompactionServiceId.of("default"), "medium"), - job.getExecutor()); - } - - /** - * Tests that the maxOpen property overrides the deprecated open.max property - */ - @Test - @SuppressWarnings("removal") - public void testOverrideMaxOpen() { - ConfigurationCopy aconf = new ConfigurationCopy(Map.of(), DefaultConfiguration.getInstance()); - // Set old property and use that for max open files. - aconf.set(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "17"); - ConfigurationImpl config = new ConfigurationImpl(aconf); - - String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," - + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}," - + "{'name':'huge','type': 'internal','numThreads':4}]"; - - var planner = createPlanner(config, executors); - var all = createCFs("F1", "1M", "F2", "2M", "F3", "4M", "F4", "8M", "F5", "16M", "F6", "32M", - "F7", "64M", "F8", "128M", "F9", "256M", "FA", "512M", "FB", "1G", "FC", "2G", "FD", "4G", - "FE", "8G", "FF", "16G", "FG", "32G", "FH", "64G"); - Set compacting = Set.of(); - var params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - var plan = planner.makePlan(params); - var job = getOnlyElement(plan.getJobs()); - assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), job.getExecutor()); - - // Set new property that overrides the old property. - aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "15"); - config = new ConfigurationImpl(aconf); - planner = createPlanner(config, executors); - params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - plan = planner.makePlan(params); - - // 17 files that do not meet the compaction ratio. When max files to compact is 15, - // the plan should do 3 files then 15 - job = getOnlyElement(plan.getJobs()); - assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); - - aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "5"); - // 17 files that do not meet the compaction ratio. When max files to compact is 5 should do 5, - // files then another 5, then the final 5. - config = new ConfigurationImpl(aconf); - planner = createPlanner(config, executors); - params = createPlanningParams(all, all, compacting, 2, CompactionKind.USER); - plan = planner.makePlan(params); - job = getOnlyElement(plan.getJobs()); - assertEquals(createCFs("F4", "8M", "F3", "4M", "F2", "2M", "F1", "1M", "F5", "16M"), - job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.externalId("medium"), job.getExecutor()); } @Test public void testUserCompaction() { - ConfigurationCopy aconf = new ConfigurationCopy(DefaultConfiguration.getInstance()); - aconf.set(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "15"); - ConfigurationImpl config = new ConfigurationImpl(aconf); - - String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," - + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}," - + "{'name':'huge','type': 'internal','numThreads':4}]"; - - var planner = createPlanner(config, executors); + var planner = createPlanner(true); var all = createCFs("F1", "3M", "F2", "3M", "F3", "11M", "F4", "12M", "F5", "13M"); var candidates = createCFs("F3", "11M", "F4", "12M", "F5", "13M"); var compacting = @@ -301,7 +168,7 @@ public void testUserCompaction() { // a running non-user compaction should not prevent a user compaction var job = getOnlyElement(plan.getJobs()); assertEquals(candidates, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.externalId("medium"), job.getExecutor()); // should only run one user compaction at a time compacting = Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "3M", "F2", "3M"))); @@ -319,7 +186,7 @@ public void testUserCompaction() { plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); // should compact all 15 all = createCFs("FI", "7M", "F4", "8M", "F5", "16M", "F6", "32M", "F7", "64M", "F8", "128M", @@ -329,7 +196,7 @@ public void testUserCompaction() { plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "huge"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.externalId("huge"), job.getExecutor()); // For user compaction, can compact a subset that meets the compaction ratio if there is also a // larger set of files the meets the compaction ratio @@ -339,7 +206,7 @@ public void testUserCompaction() { plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(createCFs("F1", "3M", "F2", "4M", "F3", "5M", "F4", "6M"), job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "small"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); // There is a subset of small files that meets the compaction ratio, but the larger set does not // so compact everything to avoid doing more than logarithmic work @@ -348,17 +215,13 @@ public void testUserCompaction() { plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "medium"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.externalId("medium"), job.getExecutor()); } @Test public void testMaxSize() { - String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," - + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}]"; - - var planner = createPlanner(defaultConf, executors); + var planner = createPlanner(false); var all = createCFs("F1", "128M", "F2", "129M", "F3", "130M", "F4", "131M", "F5", "132M"); var params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM); var plan = planner.makePlan(params); @@ -366,14 +229,14 @@ public void testMaxSize() { // should only compact files less than max size var job = getOnlyElement(plan.getJobs()); assertEquals(createCFs("F1", "128M", "F2", "129M", "F3", "130M"), job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor()); // user compaction can exceed the max size params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.USER); plan = planner.makePlan(params); job = getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorIdImpl.internalId(csid, "large"), job.getExecutor()); + assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor()); } /** @@ -382,12 +245,18 @@ public void testMaxSize() { @Test public void testErrorInternalTypeNoNumThreads() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - String executors = "[{'name':'small','type':'internal','maxSize':'32M'}," - + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type':'internal','maxSize':'512M','numThreads':3}]"; + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(conf, senv); + + String executors = getExecutors("'type': 'internal','maxSize':'32M'", + "'type': 'internal','maxSize':'128M','numThreads':2", + "'type': 'internal','maxSize':'512M','numThreads':3"); var e = assertThrows(NullPointerException.class, - () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("numThreads"), "Error message didn't contain numThreads"); } @@ -397,12 +266,18 @@ public void testErrorInternalTypeNoNumThreads() { @Test public void testErrorExternalTypeNumThreads() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," - + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type':'external','maxSize':'512M','numThreads':3}]"; + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(conf, senv); + String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", + "'type': 'internal','maxSize':'128M','numThreads':2", + "'type': 'external','maxSize':'512M','numThreads':3"); var e = assertThrows(IllegalArgumentException.class, - () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("numThreads"), "Error message didn't contain numThreads"); } @@ -412,12 +287,18 @@ public void testErrorExternalTypeNumThreads() { @Test public void testErrorExternalNoQueue() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," - + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type':'external','maxSize':'512M'}]"; + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(conf, senv); + + String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", + "'type': 'internal','maxSize':'128M','numThreads':2", + "'type': 'external','maxSize':'512M'"); var e = assertThrows(NullPointerException.class, - () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("queue"), "Error message didn't contain queue"); } @@ -427,12 +308,17 @@ public void testErrorExternalNoQueue() { @Test public void testErrorOnlyOneMaxSize() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," - + "{'name':'medium','type':'internal','numThreads':2}," - + "{'name':'large','type':'external','queue':'q1'}]"; + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(conf, senv); + + String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", + "'type': 'internal','numThreads':2", "'type': 'external','queue':'q1'"); var e = assertThrows(IllegalArgumentException.class, - () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("maxSize"), "Error message didn't contain maxSize"); } @@ -442,20 +328,69 @@ public void testErrorOnlyOneMaxSize() { @Test public void testErrorDuplicateMaxSize() { DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - String executors = "[{'name':'small','type':'internal','maxSize':'32M', 'numThreads':1}," - + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," - + "{'name':'large','type':'external','maxSize':'128M','queue':'q1'}]"; + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(conf, senv); + + String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", + "'type': 'internal','maxSize':'128M','numThreads':2", + "'type': 'external','maxSize':'128M','queue':'q1'"); var e = assertThrows(IllegalArgumentException.class, - () -> planner.init(getInitParams(defaultConf, executors)), "Failed to throw error"); + () -> planner.init(getInitParams(senv, executors)), "Failed to throw error"); assertTrue(e.getMessage().contains("maxSize"), "Error message didn't contain maxSize"); } + private CompactionPlanner.InitParameters getInitParams(ServiceEnvironment senv, + String executors) { + return new CompactionPlanner.InitParameters() { + + @Override + public ServiceEnvironment getServiceEnvironment() { + return senv; + } + + @Override + public Map getOptions() { + return Map.of("executors", executors, "maxOpen", "15"); + } + + @Override + public String getFullyQualifiedOption(String key) { + assertEquals("maxOpen", key); + return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; + } + + @Override + public ExecutorManager getExecutorManager() { + return new ExecutorManager() { + @Override + public CompactionExecutorId createExecutor(String name, int threads) { + return CompactionExecutorIdImpl.externalId(name); + } + + @Override + public CompactionExecutorId getExternalExecutor(String name) { + return CompactionExecutorIdImpl.externalId(name); + } + }; + } + }; + } + + private String getExecutors(String small, String medium, String large) { + String execBldr = "[{'name':'small'," + small + "},{'name':'medium'," + medium + "}," + + "{'name':'large'," + large + "}]"; + return execBldr.replaceAll("'", "\""); + } + private CompactionJob createJob(CompactionKind kind, Set all, Set files) { return new CompactionPlanImpl.BuilderImpl(kind, all, all) - .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, "small"), files) - .build().getJobs().iterator().next(); + .addJob((short) all.size(), CompactionExecutorIdImpl.externalId("small"), files).build() + .getJobs().iterator().next(); } private static Set createCFs(String... namesSizePairs) { @@ -551,30 +486,80 @@ public Builder createPlanBuilder() { }; } - private static CompactionPlanner.InitParameters getInitParams(Configuration conf, - String executors) { + private static DefaultCompactionPlanner createPlanner(boolean withHugeExecutor) { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); - String maxOpen = - conf.get(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen"); - Map options = new HashMap<>(); - options.put("executors", executors.replaceAll("'", "\"")); + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - if (maxOpen != null) { - options.put("maxOpen", maxOpen); + EasyMock.replay(conf, senv); + + StringBuilder execBldr = + new StringBuilder("[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}"); + + if (withHugeExecutor) { + execBldr.append(",{'name':'huge','type': 'internal','numThreads':4}]"); + } else { + execBldr.append("]"); } - ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); - EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); - EasyMock.replay(senv); + String executors = execBldr.toString().replaceAll("'", "\""); - return new CompactionPlannerInitParams(csid, options, senv); - } + planner.init(new CompactionPlanner.InitParameters() { - private static DefaultCompactionPlanner createPlanner(Configuration conf, String executors) { - DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); - var initParams = getInitParams(conf, executors); + @Override + public ServiceEnvironment getServiceEnvironment() { + return senv; + } + + @Override + public Map getOptions() { + return Map.of("executors", executors, "maxOpen", "15"); + } + + @Override + public String getFullyQualifiedOption(String key) { + assertEquals("maxOpen", key); + return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; + } + + @Override + public ExecutorManager getExecutorManager() { + return new ExecutorManager() { + @Override + public CompactionExecutorId createExecutor(String name, int threads) { + switch (name) { + case "small": + assertEquals(1, threads); + break; + case "medium": + assertEquals(2, threads); + break; + case "large": + assertEquals(3, threads); + break; + case "huge": + assertEquals(4, threads); + break; + default: + fail("Unexpected name " + name); + break; + } + return CompactionExecutorIdImpl.externalId(name); + } + + @Override + public CompactionExecutorId getExternalExecutor(String name) { + throw new UnsupportedOperationException(); + } + }; + } + }); - planner.init(initParams); return planner; } }