Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19295. S3A: large uploads can timeout over slow links #7089

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,21 @@ private Constants() {
public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION =
Duration.ofSeconds(60);

/**
* Timeout for uploading all of a small object or a single part
* of a larger one.
* {@value}.
* Default unit is milliseconds for consistency with other options.
*/
public static final String PART_UPLOAD_TIMEOUT =
"fs.s3a.connection.part.upload.timeout";

/**
* Default part upload timeout: 15 minutes.
*/
public static final Duration DEFAULT_PART_UPLOAD_TIMEOUT =
Duration.ofMinutes(15);

/**
* Should TCP Keepalive be enabled on the socket?
* This adds some network IO, but finds failures faster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,13 @@ protected RequestFactory createRequestFactory() {
STORAGE_CLASS);
}

// optional custom timeout for bulk uploads
Duration partUploadTimeout = ConfigurationHelper.getDuration(getConf(),
PART_UPLOAD_TIMEOUT,
DEFAULT_PART_UPLOAD_TIMEOUT,
TimeUnit.MILLISECONDS,
Duration.ZERO);

return RequestFactoryImpl.builder()
.withBucket(requireNonNull(bucket))
.withCannedACL(getCannedACL())
Expand All @@ -1295,6 +1302,7 @@ protected RequestFactory createRequestFactory() {
.withContentEncoding(contentEncoding)
.withStorageClass(storageClass)
.withMultipartUploadEnabled(isMultipartUploadEnabled)
.withPartUploadTimeout(partUploadTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this part upload timeout different that multipart upload timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its my new option, same value for simple PUT as multipart; we patch the individual requests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyway

  1. will cut
  2. will modify info log to only print once per stream, to keep that log noise down.

.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryMode;
Expand Down Expand Up @@ -623,4 +625,24 @@ static ConnectionSettings createConnectionSettings(Configuration conf) {
socketTimeout);
}

/**
* Set a custom ApiCallTimeout for a single request.
* This allows for a longer timeout to be used in data upload
* requests than that for all other S3 interactions;
* This does not happen by default in the V2 SDK
* (see HADOOP-19295).
* <p>
* If the timeout is zero, the request is not patched.
* @param builder builder to patch.
* @param timeout timeout
*/
public static void setRequestTimeout(AwsRequest.Builder builder, Duration timeout) {
if (!timeout.isZero()) {
builder.overrideConfiguration(
AwsRequestOverrideConfiguration.builder()
.apiCallTimeout(timeout)
.apiCallAttemptTimeout(timeout)
.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a.impl;

import java.time.Duration;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -59,7 +60,9 @@
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -128,6 +131,12 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private final boolean isMultipartUploadEnabled;

/**
* Timeout for uploading objects/parts.
* This will be set on data put/post operations only.
*/
private final Duration partUploadTimeout;

/**
* Constructor.
* @param builder builder with all the configuration.
Expand All @@ -142,6 +151,7 @@ protected RequestFactoryImpl(
this.contentEncoding = builder.contentEncoding;
this.storageClass = builder.storageClass;
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
this.partUploadTimeout = builder.partUploadTimeout;
}

/**
Expand Down Expand Up @@ -344,6 +354,11 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
putObjectRequestBuilder.storageClass(storageClass);
}

// Set the timeout for object uploads but not directory markers.
if (!isDirectoryMarker) {
setRequestTimeout(putObjectRequestBuilder, partUploadTimeout);
}

return prepareRequest(putObjectRequestBuilder);
}

Expand Down Expand Up @@ -595,6 +610,9 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder(
.partNumber(partNumber)
.contentLength(size);
uploadPartEncryptionParameters(builder);

// Set the request timeout for the part upload
setRequestTimeout(builder, partUploadTimeout);
return prepareRequest(builder);
}

Expand Down Expand Up @@ -702,6 +720,13 @@ public static final class RequestFactoryBuilder {
*/
private boolean isMultipartUploadEnabled = true;

/**
* Timeout for uploading objects/parts.
* This will be set on data put/post operations only.
* A zero value means "no custom timeout"
*/
private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;

private RequestFactoryBuilder() {
}

Expand Down Expand Up @@ -799,6 +824,18 @@ public RequestFactoryBuilder withMultipartUploadEnabled(
this.isMultipartUploadEnabled = value;
return this;
}

/**
* Timeout for uploading objects/parts.
* This will be set on data put/post operations only.
* A zero value means "no custom timeout"
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withPartUploadTimeout(final Duration value) {
partUploadTimeout = value;
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.util.function.Supplier;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -224,6 +225,12 @@ public static abstract class BaseContentProvider<T extends InputStream>
*/
private T currentStream;

/**
* When did this upload start?
* Use in error messages.
*/
private final LocalDateTime startTime;

/**
* Constructor.
* @param size size of the data. Must be non-negative.
Expand All @@ -241,6 +248,7 @@ protected BaseContentProvider(int size, @Nullable Supplier<Boolean> isOpen) {
checkArgument(size >= 0, "size is negative: %s", size);
this.size = size;
this.isOpen = isOpen;
this.startTime = LocalDateTime.now();
}

/**
Expand Down Expand Up @@ -274,8 +282,11 @@ public final InputStream newStream() {
close();
checkOpen();
streamCreationCount++;
if (streamCreationCount > 1) {
LOG.info("Stream created more than once: {}", this);
if (streamCreationCount == 2) {
// the stream has been recreated for the first time.
// notify only once for this stream, so as not to flood
// the logs.
LOG.info("Stream recreated: {}", this);
}
return setCurrentStream(createNewStream());
}
Expand All @@ -302,6 +313,14 @@ public int getSize() {
return size;
}

/**
* When did this upload start?
* @return start time
*/
public LocalDateTime getStartTime() {
return startTime;
}

/**
* Current stream.
* When {@link #newStream()} is called, this is set to the new value,
Expand Down Expand Up @@ -330,6 +349,7 @@ protected T setCurrentStream(T stream) {
public String toString() {
return "BaseContentProvider{" +
"size=" + size +
", initiated at " + startTime +
", streamCreationCount=" + streamCreationCount +
", currentStream=" + currentStream +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
Expand Down Expand Up @@ -100,7 +101,10 @@ public void testCreateNonRecursiveSuccess() throws IOException {
public void testPutObjectDirect() throws Throwable {
final S3AFileSystem fs = getFileSystem();
try (AuditSpan span = span()) {
RequestFactory factory = RequestFactoryImpl.builder().withBucket(fs.getBucket()).build();
RequestFactory factory = RequestFactoryImpl.builder()
.withBucket(fs.getBucket())
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
.build();
Path path = path("putDirect");
PutObjectRequest.Builder putObjectRequestBuilder =
factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.util.Progressable;


import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class MockS3AFileSystem extends S3AFileSystem {
.withRequestPreparer(MockS3AFileSystem::prepareRequest)
.withBucket(BUCKET)
.withEncryptionSecrets(new EncryptionSecrets())
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ public Configuration createConfiguration() {
*/
@Override
public void setup() throws Exception {
SdkFaultInjector.resetEvaluator();
SdkFaultInjector.resetFaultInjector();
super.setup();
}

@Override
public void teardown() throws Exception {
// safety check in case the evaluation is failing any
// request needed in cleanup.
SdkFaultInjector.resetEvaluator();
SdkFaultInjector.resetFaultInjector();

super.teardown();
}
Expand Down
Loading
Loading