Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3AsyncClient Support #19

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.NotThreadSafe;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;

Expand Down Expand Up @@ -35,6 +36,7 @@ public class PayloadStorageConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(PayloadStorageConfiguration.class);

private S3Client s3;
private S3AsyncClient s3Async;
private String s3BucketName;
private int payloadSizeThreshold = 0;
private boolean alwaysThroughS3 = false;
Expand All @@ -57,6 +59,7 @@ public PayloadStorageConfiguration() {

public PayloadStorageConfiguration(PayloadStorageConfiguration other) {
this.s3 = other.getS3Client();
this.s3Async = other.getS3AsyncClient();
this.s3BucketName = other.getS3BucketName();
this.payloadSupport = other.isPayloadSupportEnabled();
this.alwaysThroughS3 = other.isAlwaysThroughS3();
Expand All @@ -82,6 +85,7 @@ public void setPayloadSupportEnabled(S3Client s3, String s3BucketName) {
LOG.warn("Payload support is already enabled. Overwriting AmazonS3Client and S3BucketName.");
}
this.s3 = s3;
this.s3Async = null;
this.s3BucketName = s3BucketName;
this.payloadSupport = true;
LOG.info("Payload support enabled.");
Expand All @@ -100,11 +104,48 @@ public PayloadStorageConfiguration withPayloadSupportEnabled(S3Client s3, String
return this;
}

/**
* Enables support for payloads using asynchronous storage.
*
* @param s3Async Amazon S3 client which is going to be used for storing payload.
* @param s3BucketName Name of the bucket which is going to be used for storing payload.
* The bucket must be already created and configured in s3.
*/
public void setPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have a different method name here?

if (s3Async == null || s3BucketName == null) {
String errorMessage = "S3 client and/or S3 bucket name cannot be null.";
LOG.error(errorMessage);
throw SdkClientException.create(errorMessage);
}
if (isPayloadSupportEnabled()) {
LOG.warn("Payload support is already enabled. Overwriting AmazonS3Client and S3BucketName.");
}
this.s3 = null;
this.s3Async = s3Async;
this.s3BucketName = s3BucketName;
this.payloadSupport = true;
LOG.info("Payload support enabled.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we wanna add some text to distinguish standard and async s3 clients here?

}

/**
* Enables support for payload.
*
* @param s3Async Amazon S3 client which is going to be used for storing payload.
* @param s3BucketName Name of the bucket which is going to be used for storing payloads.
* The bucket must be already created and configured in s3.
* @return the updated PayloadStorageConfiguration object.
*/
public PayloadStorageConfiguration withPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) {
setPayloadSupportEnabled(s3Async, s3BucketName);
return this;
}

/**
* Disables support for payloads.
*/
public void setPayloadSupportDisabled() {
s3 = null;
s3Async = null;
s3BucketName = null;
payloadSupport = false;
LOG.info("Payload support disabled.");
Expand Down Expand Up @@ -138,6 +179,15 @@ public S3Client getS3Client() {
return s3;
}

/**
* Gets the Amazon S3 async client which is being used for storing payloads.
*
* @return Reference to the Amazon S3 async client which is being used.
*/
public S3AsyncClient getS3AsyncClient() {
return s3Async;
}

/**
* Gets the name of the S3 bucket which is being used for storing payload.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package software.amazon.payloadoffloading;

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.S3Exception;

/**
* An AWS storage service that supports saving high payload sizes.
*/
public interface PayloadStoreAsync {

/**
* Stores payload in a store that has higher payload size limit than that is supported by original payload store.
* <p>
* This call is asynchronous, and so documented return values and exceptions are propagated through
* the returned {@link CompletableFuture}.
*
* @param payload
* @return future value of a pointer that must be used to retrieve the original payload later.
* @throws SdkClientException If any internal errors are encountered on the client side while
* attempting to make the request or handle the response. For example
* if a network connection is not available.
* @throws S3Exception If an error response is returned by actual PayloadStore indicating
* either a problem with the data in the request, or a server side issue.
*/
CompletableFuture<String> storeOriginalPayload(String payload);

/**
* Stores payload in a store that has higher payload size limit than that is supported by original payload store.
* <p>
* This call is asynchronous, and so documented return values and exceptions are propagated through
* the returned {@link CompletableFuture}.
*
* @param payload
* @param s3Key
* @return future value of a pointer that must be used to retrieve the original payload later.
* @throws SdkClientException If any internal errors are encountered on the client side while
* attempting to make the request or handle the response. For example
* if a network connection is not available.
* @throws S3Exception If an error response is returned by actual PayloadStore indicating
* either a problem with the data in the request, or a server side issue.
*/
CompletableFuture<String> storeOriginalPayload(String payload, String s3Key);

/**
* Retrieves the original payload using the given payloadPointer. The pointer must
* have been obtained using {@link #storeOriginalPayload(String)}
* <p>
* This call is asynchronous, and so documented return values and exceptions are propagated through
* the returned {@link CompletableFuture}.
*
* @param payloadPointer
* @return future value of the original payload
* @throws SdkClientException If any internal errors are encountered on the client side while
* attempting to make the request or handle the response. For example
* if payloadPointer is invalid or a network connection is not available.
* @throws S3Exception If an error response is returned by actual PayloadStore indicating
* a server side issue.
*/
CompletableFuture<String> getOriginalPayload(String payloadPointer);

/**
* Deletes the original payload using the given payloadPointer. The pointer must
* have been obtained using {@link #storeOriginalPayload(String)}
* <p>
* This call is asynchronous, and so documented return values and exceptions are propagated through
* the returned {@link CompletableFuture}.
*
* @param payloadPointer
* @return future value that completes when the delete operation finishes
* @throws SdkClientException If any internal errors are encountered on the client side while
* attempting to make the request or handle the response to/from PayloadStore.
* For example, if payloadPointer is invalid or a network connection is not available.
* @throws S3Exception If an error response is returned by actual PayloadStore indicating
* a server side issue.
*/
CompletableFuture<Void> deleteOriginalPayload(String payloadPointer);
}
118 changes: 118 additions & 0 deletions src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package software.amazon.payloadoffloading;

import java.io.UncheckedIOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

/**
* Dao layer to access S3.
*/
public class S3AsyncDao {
private static final Logger LOG = LoggerFactory.getLogger(S3AsyncDao.class);
private final S3AsyncClient s3Client;
private final ServerSideEncryptionStrategy serverSideEncryptionStrategy;
private final ObjectCannedACL objectCannedACL;

public S3AsyncDao(S3AsyncClient s3Client) {
this(s3Client, null, null);
}

public S3AsyncDao(
S3AsyncClient s3Client,
ServerSideEncryptionStrategy serverSideEncryptionStrategy,
ObjectCannedACL objectCannedACL) {
this.s3Client = s3Client;
this.serverSideEncryptionStrategy = serverSideEncryptionStrategy;
this.objectCannedACL = objectCannedACL;
}

public CompletableFuture<String> getTextFromS3(String s3BucketName, String s3Key) {
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(s3BucketName)
.key(s3Key)
.build();

return s3Client.getObject(getObjectRequest, AsyncResponseTransformer.toBytes())
.thenApply(ResponseBytes::asUtf8String)
.handle((v, tIn) -> {
if (tIn != null) {
Throwable t = Util.unwrapFutureException(tIn);
if (t instanceof SdkException) {
String errorMessage = "Failed to get the S3 object which contains the payload.";
LOG.error(errorMessage, t);
throw SdkException.create(errorMessage, t);
}
if (t instanceof UncheckedIOException) {
String errorMessage = "Failure when handling the message which was read from S3 object.";
LOG.error(errorMessage, t);
throw SdkClientException.create(errorMessage, t);
}
throw new CompletionException(t);
}
return v;
});
}

public CompletableFuture<Void> storeTextInS3(String s3BucketName, String s3Key, String payloadContentStr) {
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(s3BucketName)
.key(s3Key);

if (objectCannedACL != null) {
putObjectRequestBuilder.acl(objectCannedACL);
}

// https://docs.aws.amazon.com/AmazonS3/latest/dev/kms-using-sdks.html
if (serverSideEncryptionStrategy != null) {
serverSideEncryptionStrategy.decorate(putObjectRequestBuilder);
}

return s3Client.putObject(putObjectRequestBuilder.build(), AsyncRequestBody.fromString(payloadContentStr))
.handle((v, tIn) -> {
if (tIn != null) {
Throwable t = Util.unwrapFutureException(tIn);
if (t instanceof SdkException) {
String errorMessage = "Failed to store the message content in an S3 object.";
LOG.error(errorMessage, t);
throw SdkException.create(errorMessage, t);
}
throw new CompletionException(t);
}
return null;
});
}

public CompletableFuture<Void> deletePayloadFromS3(String s3BucketName, String s3Key) {
DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder()
.bucket(s3BucketName)
.key(s3Key)
.build();
return s3Client.deleteObject(deleteObjectRequest)
.handle((v, tIn) -> {
if (tIn != null) {
Throwable t = Util.unwrapFutureException(tIn);
if (t instanceof SdkException) {
String errorMessage = "Failed to delete the S3 object which contains the payload";
LOG.error(errorMessage, t);
throw SdkException.create(errorMessage, t);
}
throw new CompletionException(t);
}

LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
return null;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package software.amazon.payloadoffloading;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.payloadoffloading.PayloadS3Pointer;

/**
* S3 based implementation for PayloadStoreAsync.
*/
public class S3BackedPayloadStoreAsync implements PayloadStoreAsync {
private static final Logger LOG = LoggerFactory.getLogger(S3BackedPayloadStoreAsync.class);

private final String s3BucketName;
private final S3AsyncDao s3Dao;

public S3BackedPayloadStoreAsync(S3AsyncDao s3Dao, String s3BucketName) {
this.s3BucketName = s3BucketName;
this.s3Dao = s3Dao;
}

@Override
public CompletableFuture<String> storeOriginalPayload(String payload) {
String s3Key = UUID.randomUUID().toString();
return storeOriginalPayload(payload, s3Key);
}

@Override
public CompletableFuture<String> storeOriginalPayload(String payload, String s3Key) {
return s3Dao.storeTextInS3(s3BucketName, s3Key, payload)
.thenApply(v -> {
LOG.info("S3 object created, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");

// Convert S3 pointer (bucket name, key, etc) to JSON string
PayloadS3Pointer s3Pointer = new PayloadS3Pointer(s3BucketName, s3Key);

return s3Pointer.toJson();
});
}

@Override
public CompletableFuture<String> getOriginalPayload(String payloadPointer) {
try {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(payloadPointer);

String s3BucketName = s3Pointer.getS3BucketName();
String s3Key = s3Pointer.getS3Key();

return s3Dao.getTextFromS3(s3BucketName, s3Key)
.thenApply(originalPayload -> {
LOG.info("S3 object read, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
return originalPayload;
});
} catch (Exception e) {
CompletableFuture<String> futureEx = new CompletableFuture<>();
futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e));
return futureEx;
}
}

@Override
public CompletableFuture<Void> deleteOriginalPayload(String payloadPointer) {
try {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(payloadPointer);

String s3BucketName = s3Pointer.getS3BucketName();
String s3Key = s3Pointer.getS3Key();
return s3Dao.deletePayloadFromS3(s3BucketName, s3Key);
} catch (Exception e) {
CompletableFuture<Void> futureEx = new CompletableFuture<>();
futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e));
return futureEx;
}
}
}
9 changes: 9 additions & 0 deletions src/main/java/software/amazon/payloadoffloading/Util.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package software.amazon.payloadoffloading;

import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
Expand Down Expand Up @@ -34,4 +35,12 @@ public static long getStringSizeInBytes(String str) {
public static String getUserAgentHeader(String clientName) {
return clientName + "/" + VersionInfo.SDK_VERSION;
}

public static Throwable unwrapFutureException(Throwable t) {
if ((t instanceof CompletionException) && t.getCause() != null) {
t = t.getCause();
}
return t;
}

}
Loading