Skip to content

Commit

Permalink
Merge pull request #534 from sahilTakiar/getOrDefaultBug
Browse files Browse the repository at this point in the history
Fixing bug with GobblinMetricsRegistry.getOrDefault
  • Loading branch information
sahilTakiar committed Dec 17, 2015
2 parents d519e78 + 21448c6 commit 8232ddd
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,18 @@ public static MetricContext getMetricContext(State state, Class<?> klazz, List<T

List<Tag<?>> generatedTags = Lists.newArrayList();
if (construct != null) {
generatedTags.add(new Tag<String>("construct", construct.toString()));
generatedTags.add(new Tag<>("construct", construct.toString()));
}
if (!klazz.isAnonymousClass()) {
generatedTags.add(new Tag<String>("class", klazz.getCanonicalName()));
generatedTags.add(new Tag<>("class", klazz.getCanonicalName()));
}

GobblinMetrics gobblinMetrics;
MetricContext.Builder builder = state.contains(METRIC_CONTEXT_NAME_KEY) &&
(gobblinMetrics = GobblinMetricsRegistry.getInstance().get(state.getProp(METRIC_CONTEXT_NAME_KEY))) != null ?
gobblinMetrics.getMetricContext().childBuilder(klazz.getCanonicalName() + "." + randomId) :
Optional<GobblinMetrics> gobblinMetrics = state.contains(METRIC_CONTEXT_NAME_KEY) ?
GobblinMetricsRegistry.getInstance().get(state.getProp(METRIC_CONTEXT_NAME_KEY)) :
Optional.<GobblinMetrics>absent();

MetricContext.Builder builder = gobblinMetrics.isPresent() ?
gobblinMetrics.get().getMetricContext().childBuilder(klazz.getCanonicalName() + "." + randomId) :
MetricContext.builder(klazz.getCanonicalName() + "." + randomId);
return builder.
addTags(generatedTags).
Expand Down
17 changes: 11 additions & 6 deletions gobblin-core/src/main/java/gobblin/metrics/GobblinMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -120,8 +121,12 @@ public static GobblinMetrics get(String id, MetricContext parentContext) {
* @param tags the given list of {@link Tag}s
* @return a {@link GobblinMetrics} instance
*/
public static GobblinMetrics get(String id, MetricContext parentContext, List<Tag<?>> tags) {
return GOBBLIN_METRICS_REGISTRY.getOrDefault(id, new GobblinMetrics(id, parentContext, tags));
public static GobblinMetrics get(final String id, final MetricContext parentContext, final List<Tag<?>> tags) {
return GOBBLIN_METRICS_REGISTRY.getOrDefault(id, new Callable<GobblinMetrics>() {
@Override public GobblinMetrics call() throws Exception {
return new GobblinMetrics(id, parentContext, tags);
}
});
}

/**
Expand Down Expand Up @@ -436,10 +441,10 @@ private void buildFileMetricReporter(Properties properties) {
}

OutputStream output = append ? fs.append(metricLogFile) : fs.create(metricLogFile, true);
this.scheduledReporters.add(this.closer.register(OutputStreamReporter.forContext(this.metricContext)
.outputTo(output).build()));
this.scheduledReporters.add(this.closer.register(OutputStreamEventReporter.forContext(this.metricContext)
.outputTo(output).build()));
this.scheduledReporters
.add(this.closer.register(OutputStreamReporter.forContext(this.metricContext).outputTo(output).build()));
this.scheduledReporters
.add(this.closer.register(OutputStreamEventReporter.forContext(this.metricContext).outputTo(output).build()));

LOGGER.info("Will start reporting metrics to directory " + metricsLogDir);
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,30 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;


/**
* Registry that stores instances of {@link GobblinMetrics} identified by an arbitrary string id.
* The static method getInstance() provides a static instance of this this class that should be considered
* the global registry of metrics.
* An application could also instantiate one or more registries to for example separate instances of
* {@link GobblinMetrics} into different scopes.
* Registry that stores instances of {@link GobblinMetrics} identified by an arbitrary string id. The static method
* {@link #getInstance()} provides a static instance of this this class that should be considered the global registry of
* metrics.
*
* <p>
* An application could also instantiate one or more registries to, for example, separate instances of
* {@link GobblinMetrics} into different scopes.
* </p>
*/
public class GobblinMetricsRegistry {

private static final GobblinMetricsRegistry GLOBAL_INSTANCE = new GobblinMetricsRegistry();

private final Cache<String, GobblinMetrics> metricsMap = CacheBuilder.newBuilder().softValues().build();
private final Cache<String, GobblinMetrics> metricsCache = CacheBuilder.newBuilder().softValues().build();

private GobblinMetricsRegistry() {

// Do nothing
}

/**
Expand All @@ -47,39 +51,34 @@ private GobblinMetricsRegistry() {
* if there's no previous {@link GobblinMetrics} instance associated with the ID
*/
public GobblinMetrics putIfAbsent(String id, GobblinMetrics gobblinMetrics) {
return this.metricsMap.asMap().putIfAbsent(id, gobblinMetrics);
return this.metricsCache.asMap().putIfAbsent(id, gobblinMetrics);
}

/**
* Get the {@link GobblinMetrics} instance associated with a given ID.
*
* @param id the given {@link GobblinMetrics} ID
* @return the {@link GobblinMetrics} instance associated with the ID or {@code null}
* if no {@link GobblinMetrics} instance for the given ID is found
* @return the {@link GobblinMetrics} instance associated with the ID, wrapped in an {@link Optional} or
* {@link Optional#absent()} if no {@link GobblinMetrics} instance for the given ID is found
*/
public GobblinMetrics get(String id) {
return this.metricsMap.getIfPresent(id);
public Optional<GobblinMetrics> get(String id) {
return Optional.fromNullable(this.metricsCache.getIfPresent(id));
}

/**
* Get the {@link GobblinMetrics} instance associated with a given ID or the given default
* {@link GobblinMetrics} instance if no {@link GobblinMetrics} instance for the given ID
* is found.
* Get the {@link GobblinMetrics} instance associated with a given ID. If the ID is not found this method returns the
* {@link GobblinMetrics} returned by the given {@link Callable}, and creates a mapping between the specified ID
* and the {@link GobblinMetrics} instance returned by the {@link Callable}.
*
* @param id the given {@link GobblinMetrics} ID
* @param defaultValue the default {@link GobblinMetrics} instance
* @return the {@link GobblinMetrics} instance associated with a given ID or the given default
* {@link GobblinMetrics} instance if no {@link GobblinMetrics} instance for the given ID
* is found
* @param valueLoader a {@link Callable} that returns a {@link GobblinMetrics}, the {@link Callable} is only invoked
* if the given id is not found
*
* @return a {@link GobblinMetrics} instance associated with the id
*/
public GobblinMetrics getOrDefault(String id, final GobblinMetrics defaultValue) {
public GobblinMetrics getOrDefault(String id, Callable<? extends GobblinMetrics> valueLoader) {
try {
return this.metricsMap.get(id, new Callable<GobblinMetrics>() {
@Override
public GobblinMetrics call() throws Exception {
return defaultValue;
}
});
return this.metricsCache.get(id, valueLoader);
} catch (ExecutionException ee) {
throw Throwables.propagate(ee);
}
Expand All @@ -93,7 +92,7 @@ public GobblinMetrics call() throws Exception {
* {@link GobblinMetrics} instance for the given ID is found
*/
public GobblinMetrics remove(String id) {
return this.metricsMap.asMap().remove(id);
return this.metricsCache.asMap().remove(id);
}

/**
Expand All @@ -104,5 +103,4 @@ public GobblinMetrics remove(String id) {
public static GobblinMetricsRegistry getInstance() {
return GLOBAL_INSTANCE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,10 @@ public MetricContext build() {
try {
return buildStrict();
} catch (NameConflictException nce) {
this.name = this.name + "_" + UUID.randomUUID().toString();
String uuid = UUID.randomUUID().toString();
LOG.warn("MetricContext with specified name already exists, appending UUID to the given name: " + uuid);

this.name = this.name + "_" + uuid;
try {
return buildStrict();
} catch (NameConflictException nce2) {
Expand Down
17 changes: 13 additions & 4 deletions gobblin-runtime/src/main/java/gobblin/runtime/util/JobMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package gobblin.runtime.util;

import java.util.List;
import java.util.concurrent.Callable;

import com.google.common.collect.Lists;

Expand Down Expand Up @@ -70,8 +71,12 @@ public static JobMetrics get(String jobId) {
* @param parentContext is the parent {@link MetricContext}
* @return a {@link JobMetrics} instance
*/
public static JobMetrics get(JobState jobState, MetricContext parentContext) {
return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new JobMetrics(jobState, parentContext));
public static JobMetrics get(final JobState jobState, final MetricContext parentContext) {
return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new Callable<GobblinMetrics>() {
@Override public GobblinMetrics call() throws Exception {
return new JobMetrics(jobState, parentContext);
}
});
}

/**
Expand All @@ -80,8 +85,12 @@ public static JobMetrics get(JobState jobState, MetricContext parentContext) {
* @param jobState the given {@link JobState} instance
* @return a {@link JobMetrics} instance
*/
public static JobMetrics get(JobState jobState) {
return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new JobMetrics(jobState));
public static JobMetrics get(final JobState jobState) {
return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new Callable<GobblinMetrics>() {
@Override public GobblinMetrics call() throws Exception {
return new JobMetrics(jobState);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package gobblin.runtime.util;

import java.util.List;
import java.util.concurrent.Callable;

import com.google.common.collect.Lists;

Expand Down Expand Up @@ -43,8 +44,12 @@ protected TaskMetrics(TaskState taskState) {
* @param taskState the given {@link TaskState} instance
* @return a {@link TaskMetrics} instance
*/
public static TaskMetrics get(TaskState taskState) {
return (TaskMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(taskState), new TaskMetrics(taskState));
public static TaskMetrics get(final TaskState taskState) {
return (TaskMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(taskState), new Callable<GobblinMetrics>() {
@Override public GobblinMetrics call() throws Exception {
return new TaskMetrics(taskState);
}
});
}

/**
Expand Down
11 changes: 8 additions & 3 deletions gobblin-yarn/src/main/java/gobblin/yarn/ContainerMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package gobblin.yarn;

import java.util.List;
import java.util.concurrent.Callable;

import org.apache.hadoop.yarn.api.records.ContainerId;

Expand Down Expand Up @@ -46,9 +47,13 @@ protected ContainerMetrics(State containerState, String applicationName, Contain
* @param containerId the {@link ContainerId} of the container
* @return a {@link ContainerMetrics} instance
*/
public static ContainerMetrics get(State containerState, String applicationName, ContainerId containerId) {
return (ContainerMetrics) GOBBLIN_METRICS_REGISTRY
.getOrDefault(name(containerId), new ContainerMetrics(containerState, applicationName, containerId));
public static ContainerMetrics get(final State containerState, final String applicationName,
final ContainerId containerId) {
return (ContainerMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(containerId), new Callable<GobblinMetrics>() {
@Override public GobblinMetrics call() throws Exception {
return new ContainerMetrics(containerState, applicationName, containerId);
}
});
}

private static String name(ContainerId containerId) {
Expand Down

0 comments on commit 8232ddd

Please sign in to comment.