diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d8a8058799e46..1f857783bfbdf 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -220,6 +220,9 @@ Release 2.8.0 - UNRELEASED YARN-3537. NPE when NodeManager.serviceInit fails and stopRecoveryStore invoked (Brahma Reddy Battula via jlowe) + YARN-3464. Race condition in LocalizerRunner kills localizer before + localizing all resources. (Zhihai Xu via kasha) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c9874a63bf078..68669aad61aad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; @@ -716,7 +718,12 @@ public ContainerState transition(ContainerImpl container, return ContainerState.LOCALIZING; } + container.dispatcher.getEventHandler().handle( + new ContainerLocalizationEvent(LocalizationEventType. + CONTAINER_RESOURCES_LOCALIZED, container)); + container.sendLaunchEvent(); + container.metrics.endInitingContainer(); // If this is a recovered container that has already launched, skip // uploading resources to the shared cache. We do this to avoid uploading @@ -734,7 +741,6 @@ public ContainerState transition(ContainerImpl container, SharedCacheUploadEventType.UPLOAD)); } - container.metrics.endInitingContainer(); return ContainerState.LOCALIZED; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 611fe80a4a0c6..cdd252cd71387 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -35,6 +35,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -108,6 +109,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -389,6 +391,9 @@ public void handle(LocalizationEvent event) { case INIT_CONTAINER_RESOURCES: handleInitContainerResources((ContainerLocalizationRequestEvent) event); break; + case CONTAINER_RESOURCES_LOCALIZED: + handleContainerResourcesLocalized((ContainerLocalizationEvent) event); + break; case CACHE_CLEANUP: handleCacheCleanup(event); break; @@ -455,7 +460,18 @@ private void handleInitContainerResources( } } } - + + /** + * Once a container's resources are localized, kill the corresponding + * {@link ContainerLocalizer} + */ + private void handleContainerResourcesLocalized( + ContainerLocalizationEvent event) { + Container c = event.getContainer(); + String locId = ConverterUtils.toString(c.getContainerId()); + localizerTracker.endContainerLocalization(locId); + } + private void handleCacheCleanup(LocalizationEvent event) { ResourceRetentionSet retain = new ResourceRetentionSet(delService, cacheTargetSize); @@ -670,7 +686,7 @@ public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) { response.setLocalizerAction(LocalizerAction.DIE); return response; } - return localizer.update(status.getResources()); + return localizer.processHeartbeat(status.getResources()); } } @@ -724,6 +740,17 @@ public void cleanupPrivLocalizers(String locId) { localizer.interrupt(); } } + + public void endContainerLocalization(String locId) { + LocalizerRunner localizer; + synchronized (privLocalizers) { + localizer = privLocalizers.get(locId); + if (null == localizer) { + return; // ignore + } + } + localizer.endContainerLocalization(); + } } @@ -878,6 +905,7 @@ class LocalizerRunner extends Thread { final Map scheduled; // Its a shared list between Private Localizer and dispatcher thread. final List pending; + private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false); // TODO: threadsafe, use outer? private final RecordFactory recordFactory = @@ -898,6 +926,10 @@ public void addResource(LocalizerResourceRequestEvent request) { pending.add(request); } + public void endContainerLocalization() { + killContainerLocalizer.set(true); + } + /** * Find next resource to be given to a spawned localizer. * @@ -944,7 +976,7 @@ private LocalResource findNextResource() { } } - LocalizerHeartbeatResponse update( + LocalizerHeartbeatResponse processHeartbeat( List remoteResourceStatuses) { LocalizerHeartbeatResponse response = recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); @@ -953,7 +985,7 @@ LocalizerHeartbeatResponse update( ApplicationId applicationId = context.getContainerId().getApplicationAttemptId().getApplicationId(); - LocalizerAction action = LocalizerAction.LIVE; + boolean fetchFailed = false; // Update resource statuses. for (LocalResourceStatus stat : remoteResourceStatuses) { LocalResource rsrc = stat.getResource(); @@ -989,7 +1021,7 @@ LocalizerHeartbeatResponse update( case FETCH_FAILURE: final String diagnostics = stat.getException().toString(); LOG.warn(req + " failed: " + diagnostics); - action = LocalizerAction.DIE; + fetchFailed = true; getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle(new ResourceFailedLocalizationEvent( req, diagnostics)); @@ -1001,15 +1033,15 @@ LocalizerHeartbeatResponse update( break; default: LOG.info("Unknown status: " + stat.getStatus()); - action = LocalizerAction.DIE; + fetchFailed = true; getLocalResourcesTracker(req.getVisibility(), user, applicationId) .handle(new ResourceFailedLocalizationEvent( req, stat.getException().getMessage())); break; } } - if (action == LocalizerAction.DIE) { - response.setLocalizerAction(action); + if (fetchFailed || killContainerLocalizer.get()) { + response.setLocalizerAction(LocalizerAction.DIE); return response; } @@ -1037,12 +1069,9 @@ LocalizerHeartbeatResponse update( } catch (URISyntaxException e) { //TODO fail? Already translated several times... } - } else if (pending.isEmpty()) { - // TODO: Synchronization - action = LocalizerAction.DIE; } - response.setLocalizerAction(action); + response.setLocalizerAction(LocalizerAction.LIVE); response.setResourceSpecs(rsrcs); return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java index 5134349d9c7f4..4785fba4229fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java @@ -23,4 +23,5 @@ public enum LocalizationEventType { CACHE_CLEANUP, CLEANUP_CONTAINER_RESOURCES, DESTROY_APPLICATION_RESOURCES, + CONTAINER_RESOURCES_LOCALIZED, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index d3c3521f30568..2edaf458772e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; @@ -975,7 +976,8 @@ public boolean matches(Object o) { .thenReturn(Collections.emptyList()) .thenReturn(Collections.singletonList(rsrc1success)) .thenReturn(Collections.singletonList(rsrc2pending)) - .thenReturn(rsrcs4); + .thenReturn(rsrcs4) + .thenReturn(Collections.emptyList()); String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR + "user0" + Path.SEPARATOR + @@ -1019,7 +1021,13 @@ public boolean matches(Object o) { assertTrue(localizedPath.getFile().endsWith( localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12")); - // get shutdown + response = spyService.heartbeat(stat); + assertEquals(LocalizerAction.LIVE, response.getLocalizerAction()); + + spyService.handle(new ContainerLocalizationEvent( + LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c)); + + // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event response = spyService.heartbeat(stat); assertEquals(LocalizerAction.DIE, response.getLocalizerAction());