Skip to content

Commit

Permalink
HADOOP-11970. Replace uses of ThreadLocal<Random> with JDK7 ThreadLoc…
Browse files Browse the repository at this point in the history
…alRandom (Sean Busbey via Colin P. McCabe)
  • Loading branch information
Colin Patrick Mccabe committed May 19, 2015
1 parent c97f32e commit 470c87d
Show file tree
Hide file tree
Showing 21 changed files with 101 additions and 102 deletions.
3 changes: 3 additions & 0 deletions hadoop-common-project/hadoop-common/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-11812. Implement listLocatedStatus for ViewFileSystem to speed up
split calculation (gera)

HADOOP-11970. Replace uses of ThreadLocal<Random> with JDK7
ThreadLocalRandom. (Sean Busbey via Colin P. McCabe)

BUG FIXES
HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there
is an I/O error during requestShortCircuitShm (cmccabe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
Expand All @@ -47,13 +47,6 @@ public class RetryPolicies {

public static final Log LOG = LogFactory.getLog(RetryPolicies.class);

private static ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random();
}
};

/**
* <p>
* Try once, and fail by re-throwing the exception.
Expand Down Expand Up @@ -321,7 +314,8 @@ public RetryAction shouldRetry(Exception e, int curRetry, int failovers,
}

//calculate sleep time and return.
final double ratio = RANDOM.get().nextDouble() + 0.5;//0.5 <= ratio <=1.5
// ensure 0.5 <= ratio <=1.5
final double ratio = ThreadLocalRandom.current().nextDouble() + 0.5;
final long sleepTime = Math.round(p.sleepMillis * ratio);
return new RetryAction(RetryAction.RetryDecision.RETRY, sleepTime);
}
Expand Down Expand Up @@ -610,7 +604,7 @@ public RetryAction shouldRetry(Exception e, int retries,
private static long calculateExponentialTime(long time, int retries,
long cap) {
long baseTime = Math.min(time * (1L << retries), cap);
return (long) (baseTime * (RANDOM.get().nextDouble() + 0.5));
return (long) (baseTime * (ThreadLocalRandom.current().nextDouble() + 0.5));
}

private static long calculateExponentialTime(long time, int retries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -316,7 +317,8 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,

this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
this.clientName = "DFSClient_" + dfsClientConf.getTaskId() + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
ThreadLocalRandom.current().nextInt() + "_" +
Thread.currentThread().getId();
int numResponseToDrop = conf.getInt(
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -976,7 +977,9 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
// expanded to 9000ms.
final int timeWindow = dfsClient.getConf().getTimeWindow();
double waitTime = timeWindow * failures + // grace period for the last round of attempt
timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
// expanding time window for each failure
timeWindow * (failures + 1) *
ThreadLocalRandom.current().nextDouble();
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
Thread.sleep((long)waitTime);
} catch (InterruptedException iex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import javax.net.SocketFactory;

Expand Down Expand Up @@ -103,12 +103,6 @@ public class DFSUtil {
public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());

private DFSUtil() { /* Hidden constructor */ }
private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random();
}
};

private static final ThreadLocal<SecureRandom> SECURE_RANDOM = new ThreadLocal<SecureRandom>() {
@Override
Expand All @@ -117,11 +111,6 @@ protected SecureRandom initialValue() {
}
};

/** @return a pseudo random number generator. */
public static Random getRandom() {
return RANDOM.get();
}

/** @return a pseudo secure random number generator. */
public static SecureRandom getSecureRandom() {
return SECURE_RANDOM.get();
Expand All @@ -130,9 +119,8 @@ public static SecureRandom getSecureRandom() {
/** Shuffle the elements in the given array. */
public static <T> T[] shuffle(final T[] array) {
if (array != null && array.length > 0) {
final Random random = getRandom();
for (int n = array.length; n > 1; ) {
final int randomIndex = random.nextInt(n);
final int randomIndex = ThreadLocalRandom.current().nextInt(n);
n--;
if (n != randomIndex) {
final T tmp = array[randomIndex];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.HadoopIllegalArgumentException;
Expand Down Expand Up @@ -1027,7 +1028,8 @@ private BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
return new BlocksWithLocations(new BlockWithLocations[0]);
}
Iterator<BlockInfoContiguous> iter = node.getBlockIterator();
int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
// starting from a random block
int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
// skip blocks
for(int i=0; i<startBlock; i++) {
iter.next();
Expand Down Expand Up @@ -1669,7 +1671,7 @@ else if (node.isDecommissionInProgress()) {
// switch to a different node randomly
// this to prevent from deterministically selecting the same node even
// if the node failed to replicate the block on previous iterations
if(DFSUtil.getRandom().nextBoolean())
if(ThreadLocalRandom.current().nextBoolean())
srcNode = node;
}
if(numReplicas != null)
Expand Down Expand Up @@ -1920,7 +1922,7 @@ void rescanPostponedMisreplicatedBlocks() {
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
if (base > 0) {
startIndex = DFSUtil.getRandom().nextLong() % (base+1);
startIndex = ThreadLocalRandom.current().nextLong() % (base+1);
if (startIndex < 0) {
startIndex += (base+1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;

/**
* Manage datanodes, include decommission and other activities.
Expand Down Expand Up @@ -457,7 +458,7 @@ DatanodeDescriptor getDatanodeDescriptor(String address) {
// Try something rack local.
if (node == null && !rackNodes.isEmpty()) {
node = (DatanodeDescriptor) (rackNodes
.get(DFSUtil.getRandom().nextInt(rackNodes.size())));
.get(ThreadLocalRandom.current().nextInt(rackNodes.size())));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSUtil;

/** A map from host names to datanode descriptors. */
@InterfaceAudience.Private
Expand Down Expand Up @@ -161,7 +161,7 @@ DatanodeDescriptor getDatanodeByHost(String ipAddr) {
return nodes[0];
}
// more than one node
return nodes[DFSUtil.getRandom().nextInt(nodes.length)];
return nodes[ThreadLocalRandom.current().nextInt(nodes.length)];
} finally {
hostmapLock.readLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
Expand Down Expand Up @@ -122,7 +122,7 @@ static enum RunningState {
this.dn = bpos.getDataNode();
this.nnAddr = nnAddr;
this.dnConf = dn.getDnConf();
prevBlockReportId = DFSUtil.getRandom().nextLong();
prevBlockReportId = ThreadLocalRandom.current().nextLong();
scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.blockReportInterval);
}

Expand Down Expand Up @@ -409,7 +409,7 @@ private long generateUniqueBlockReportId() {
// not send a 0 value ourselves.
prevBlockReportId++;
while (prevBlockReportId == 0) {
prevBlockReportId = DFSUtil.getRandom().nextLong();
prevBlockReportId = ThreadLocalRandom.current().nextLong();
}
return prevBlockReportId;
}
Expand Down Expand Up @@ -1054,7 +1054,7 @@ long scheduleBlockReport(long delay) {
if (delay > 0) { // send BR after random delay
// Numerical overflow is possible here and is okay.
nextBlockReportTime =
monotonicNow() + DFSUtil.getRandom().nextInt((int) (delay));
monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay));
} else { // send at next heartbeat
nextBlockReportTime = monotonicNow();
}
Expand All @@ -1073,7 +1073,7 @@ void scheduleNextBlockReport() {
// time before we start the periodic block reports.
if (resetBlockReportTime) {
nextBlockReportTime = monotonicNow() +
DFSUtil.getRandom().nextInt((int)(blockReportIntervalMs));
ThreadLocalRandom.current().nextInt((int)(blockReportIntervalMs));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -41,7 +42,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
Expand Down Expand Up @@ -327,7 +327,8 @@ public long getGenStamp() {

void start() {
shouldRun = true;
long offset = DFSUtil.getRandom().nextInt((int) (scanPeriodMsecs/1000L)) * 1000L; //msec
long offset = ThreadLocalRandom.current().nextInt(
(int) (scanPeriodMsecs/1000L)) * 1000L; //msec
long firstScanTime = Time.now() + offset;
LOG.info("Periodic Directory Tree Verification scan starting at "
+ firstScanTime + " with interval " + scanPeriodMsecs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
Expand All @@ -33,6 +32,8 @@
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.source.JvmMetrics;

import java.util.concurrent.ThreadLocalRandom;

/**
*
* This class is for maintaining the various DataNode statistics
Expand Down Expand Up @@ -177,7 +178,7 @@ public static DataNodeMetrics create(Configuration conf, String dnName) {
MetricsSystem ms = DefaultMetricsSystem.instance();
JvmMetrics jm = JvmMetrics.create("DataNode", sessionId, ms);
String name = "DataNodeActivity-"+ (dnName.isEmpty()
? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt()
? "UndefinedDataNodeName"+ ThreadLocalRandom.current().nextInt()
: dnName.replace(':', '-'));

// Percentile measurement is off by default, by watching no intervals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -599,7 +600,7 @@ public void format() throws IOException {
private static int newNamespaceID() {
int newID = 0;
while(newID == 0)
newID = DFSUtil.getRandom().nextInt(0x7FFFFFFF); // use 31 bits only
newID = ThreadLocalRandom.current().nextInt(0x7FFFFFFF); // use 31 bits
return newID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
Expand All @@ -44,7 +45,6 @@
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.net.Peer;
Expand Down Expand Up @@ -933,7 +933,7 @@ private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes,
}
DatanodeInfo chosenNode;
do {
chosenNode = nodes[DFSUtil.getRandom().nextInt(nodes.length)];
chosenNode = nodes[ThreadLocalRandom.current().nextInt(nodes.length)];
} while (deadNodes.contains(chosenNode));
return chosenNode;
}
Expand Down
Loading

0 comments on commit 470c87d

Please sign in to comment.