Skip to content

Commit

Permalink
YARN-3464. Race condition in LocalizerRunner kills localizer before l…
Browse files Browse the repository at this point in the history
…ocalizing all resources. (Zhihai Xu via kasha)

(cherry picked from commit 47279c3)
  • Loading branch information
kambatla committed Apr 26, 2015
1 parent 7aa3399 commit 4045c41
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 15 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ Release 2.8.0 - UNRELEASED
YARN-3537. NPE when NodeManager.serviceInit fails and stopRecoveryStore
invoked (Brahma Reddy Battula via jlowe)

YARN-3464. Race condition in LocalizerRunner kills localizer before
localizing all resources. (Zhihai Xu via kasha)

Release 2.7.1 - UNRELEASED

INCOMPATIBLE CHANGES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
Expand Down Expand Up @@ -716,7 +718,12 @@ public ContainerState transition(ContainerImpl container,
return ContainerState.LOCALIZING;
}

container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(LocalizationEventType.
CONTAINER_RESOURCES_LOCALIZED, container));

container.sendLaunchEvent();
container.metrics.endInitingContainer();

// If this is a recovered container that has already launched, skip
// uploading resources to the shared cache. We do this to avoid uploading
Expand All @@ -734,7 +741,6 @@ public ContainerState transition(ContainerImpl container,
SharedCacheUploadEventType.UPLOAD));
}

container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -108,6 +109,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
Expand Down Expand Up @@ -389,6 +391,9 @@ public void handle(LocalizationEvent event) {
case INIT_CONTAINER_RESOURCES:
handleInitContainerResources((ContainerLocalizationRequestEvent) event);
break;
case CONTAINER_RESOURCES_LOCALIZED:
handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
break;
case CACHE_CLEANUP:
handleCacheCleanup(event);
break;
Expand Down Expand Up @@ -455,7 +460,18 @@ private void handleInitContainerResources(
}
}
}


/**
* Once a container's resources are localized, kill the corresponding
* {@link ContainerLocalizer}
*/
private void handleContainerResourcesLocalized(
ContainerLocalizationEvent event) {
Container c = event.getContainer();
String locId = ConverterUtils.toString(c.getContainerId());
localizerTracker.endContainerLocalization(locId);
}

private void handleCacheCleanup(LocalizationEvent event) {
ResourceRetentionSet retain =
new ResourceRetentionSet(delService, cacheTargetSize);
Expand Down Expand Up @@ -670,7 +686,7 @@ public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
response.setLocalizerAction(LocalizerAction.DIE);
return response;
}
return localizer.update(status.getResources());
return localizer.processHeartbeat(status.getResources());
}
}

Expand Down Expand Up @@ -724,6 +740,17 @@ public void cleanupPrivLocalizers(String locId) {
localizer.interrupt();
}
}

public void endContainerLocalization(String locId) {
LocalizerRunner localizer;
synchronized (privLocalizers) {
localizer = privLocalizers.get(locId);
if (null == localizer) {
return; // ignore
}
}
localizer.endContainerLocalization();
}
}


Expand Down Expand Up @@ -878,6 +905,7 @@ class LocalizerRunner extends Thread {
final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
// Its a shared list between Private Localizer and dispatcher thread.
final List<LocalizerResourceRequestEvent> pending;
private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false);

// TODO: threadsafe, use outer?
private final RecordFactory recordFactory =
Expand All @@ -898,6 +926,10 @@ public void addResource(LocalizerResourceRequestEvent request) {
pending.add(request);
}

public void endContainerLocalization() {
killContainerLocalizer.set(true);
}

/**
* Find next resource to be given to a spawned localizer.
*
Expand Down Expand Up @@ -944,7 +976,7 @@ private LocalResource findNextResource() {
}
}

LocalizerHeartbeatResponse update(
LocalizerHeartbeatResponse processHeartbeat(
List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
Expand All @@ -953,7 +985,7 @@ LocalizerHeartbeatResponse update(
ApplicationId applicationId =
context.getContainerId().getApplicationAttemptId().getApplicationId();

LocalizerAction action = LocalizerAction.LIVE;
boolean fetchFailed = false;
// Update resource statuses.
for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
Expand Down Expand Up @@ -989,7 +1021,7 @@ LocalizerHeartbeatResponse update(
case FETCH_FAILURE:
final String diagnostics = stat.getException().toString();
LOG.warn(req + " failed: " + diagnostics);
action = LocalizerAction.DIE;
fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent(
req, diagnostics));
Expand All @@ -1001,15 +1033,15 @@ LocalizerHeartbeatResponse update(
break;
default:
LOG.info("Unknown status: " + stat.getStatus());
action = LocalizerAction.DIE;
fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent(
req, stat.getException().getMessage()));
break;
}
}
if (action == LocalizerAction.DIE) {
response.setLocalizerAction(action);
if (fetchFailed || killContainerLocalizer.get()) {
response.setLocalizerAction(LocalizerAction.DIE);
return response;
}

Expand Down Expand Up @@ -1037,12 +1069,9 @@ LocalizerHeartbeatResponse update(
} catch (URISyntaxException e) {
//TODO fail? Already translated several times...
}
} else if (pending.isEmpty()) {
// TODO: Synchronization
action = LocalizerAction.DIE;
}

response.setLocalizerAction(action);
response.setLocalizerAction(LocalizerAction.LIVE);
response.setResourceSpecs(rsrcs);
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ public enum LocalizationEventType {
CACHE_CLEANUP,
CLEANUP_CONTAINER_RESOURCES,
DESTROY_APPLICATION_RESOURCES,
CONTAINER_RESOURCES_LOCALIZED,
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
Expand Down Expand Up @@ -975,7 +976,8 @@ public boolean matches(Object o) {
.thenReturn(Collections.<LocalResourceStatus>emptyList())
.thenReturn(Collections.singletonList(rsrc1success))
.thenReturn(Collections.singletonList(rsrc2pending))
.thenReturn(rsrcs4);
.thenReturn(rsrcs4)
.thenReturn(Collections.<LocalResourceStatus>emptyList());

String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
Path.SEPARATOR + "user0" + Path.SEPARATOR +
Expand Down Expand Up @@ -1019,7 +1021,13 @@ public boolean matches(Object o) {
assertTrue(localizedPath.getFile().endsWith(
localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));

// get shutdown
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());

spyService.handle(new ContainerLocalizationEvent(
LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c));

// get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event
response = spyService.heartbeat(stat);
assertEquals(LocalizerAction.DIE, response.getLocalizerAction());

Expand Down

0 comments on commit 4045c41

Please sign in to comment.