Skip to content

Commit

Permalink
YARN-3336. FileSystem memory leak in DelegationTokenRenewer.
Browse files Browse the repository at this point in the history
  • Loading branch information
cnauroth committed Mar 23, 2015
1 parent 7e6f384 commit 6ca1f12
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,9 @@ Release 2.7.0 - UNRELEASED
YARN-3384. TestLogAggregationService.verifyContainerLogs fails after
YARN-2777. (Naganarasimha G R via ozawa)

YARN-3336. FileSystem memory leak in DelegationTokenRenewer.
(Zhihai Xu via cnauroth)

Release 2.6.0 - 2014-11-18

INCOMPATIBLE CHANGES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ private void requestNewHdfsDelegationToken(ApplicationId applicationId,
rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
}

@VisibleForTesting
protected Token<?>[] obtainSystemTokensForUser(String user,
final Credentials credentials) throws IOException, InterruptedException {
// Get new hdfs tokens on behalf of this user
Expand All @@ -615,8 +616,16 @@ protected Token<?>[] obtainSystemTokensForUser(String user,
proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
@Override
public Token<?>[] run() throws Exception {
return FileSystem.get(getConfig()).addDelegationTokens(
UserGroupInformation.getLoginUser().getUserName(), credentials);
FileSystem fs = FileSystem.get(getConfig());
try {
return fs.addDelegationTokens(
UserGroupInformation.getLoginUser().getUserName(),
credentials);
} finally {
// Close the FileSystem created by the new proxy user,
// So that we don't leave an entry in the FileSystem cache
fs.close();
}
}
});
return newTokens;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,16 @@ public String toString() {
* exception
*/
static class MyFS extends DistributedFileSystem {

public MyFS() {}
public void close() {}
private static AtomicInteger instanceCounter = new AtomicInteger();
public MyFS() {
instanceCounter.incrementAndGet();
}
public void close() {
instanceCounter.decrementAndGet();
}
public static int getInstanceCounter() {
return instanceCounter.get();
}
@Override
public void initialize(URI uri, Configuration conf) throws IOException {}

Expand All @@ -299,6 +306,11 @@ public MyToken getDelegationToken(String renewer) throws IOException {
LOG.info("Called MYDFS.getdelegationtoken " + result);
return result;
}

public Token<?>[] addDelegationTokens(
final String renewer, Credentials credentials) throws IOException {
return new Token<?>[0];
}
}

/**
Expand Down Expand Up @@ -1022,4 +1034,16 @@ public void testAppSubmissionWithPreviousToken() throws Exception{
// app2 completes, app1 is still running, check the token is not cancelled
Assert.assertFalse(Renewer.cancelled);
}

// Test FileSystem memory leak in obtainSystemTokensForUser.
@Test
public void testFSLeakInObtainSystemTokensForUser() throws Exception{
Credentials credentials = new Credentials();
String user = "test";
int oldCounter = MyFS.getInstanceCounter();
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
Assert.assertEquals(oldCounter, MyFS.getInstanceCounter());
}
}

0 comments on commit 6ca1f12

Please sign in to comment.