Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large activity parameter support #764

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/main/java/com/uber/cadence/largeblob/Configuration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.uber.cadence.largeblob;

import com.uber.cadence.converter.DataConverter;
import com.uber.cadence.converter.JsonDataConverter;

import java.time.Duration;

public class Configuration {

private Storage storage;
private DataConverter dataConverter;
private Long maxBytes;
private Duration ttl;

private Configuration() {
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {

private Storage storage;
private DataConverter dataConverter = JsonDataConverter.getInstance();
private Duration ttl;
private Long maxBytes = 4096L;

public Configuration build() {
if (storage == null) {
throw new IllegalArgumentException("storage must be provided");
}

Configuration configuration = new Configuration();
configuration.storage = this.storage;
configuration.dataConverter = this.dataConverter;
configuration.ttl = this.ttl;
configuration.maxBytes = this.maxBytes;
return configuration;
}

public Builder setDataConverter(DataConverter dataConverter) {
this.dataConverter = dataConverter;
return this;
}

public Builder setStorage(Storage storage) {
this.storage = storage;
return this;
}

public Builder setTtl(Duration ttl) {
this.ttl = ttl;
return this;
}

public Builder setMaxBytes(Long maxBytes) {
this.maxBytes = maxBytes;
return this;
}
}

public Storage getStorage() {
return storage;
}

public DataConverter getDataConverter() {
return dataConverter;
}

public Duration getTtl() {
return ttl;
}

public Long getMaxBytes() {
return maxBytes;
}
}
89 changes: 89 additions & 0 deletions src/main/java/com/uber/cadence/largeblob/Future.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.largeblob;

import com.uber.cadence.converter.DataConverterException;

import java.io.IOException;

/**
* Future is used for passing around potentially large parameters between activities. Future can never be used in Workflow code because it uses an external storage and that will make
* workflow code non deterministic. Small amounts of data are stored in the future instance itself, while larger amounts of data are stored in an external storage.
*/
public class Future<T> {

private byte[] encoded;
private String url;
private final Configuration config;
private final Class<T> clazz;

public Future(Configuration config, Class<T> clazz, byte[] encoded) {
this.config = config;
this.encoded = encoded;
this.clazz = clazz;
}

public Future(Configuration config, Class<T> clazz, String url) {
this.url = url;
this.config = config;
this.clazz = clazz;
}

public Future(T obj, Configuration configuration) throws IOException {
byte[] bytes;
try {
bytes = configuration.getDataConverter().toData(obj);
} catch (DataConverterException e) {
throw new IOException(e);
}

this.config = configuration;
this.clazz = (Class<T>) obj.getClass();
if (bytes.length <= configuration.getMaxBytes()) {
this.encoded = bytes;
} else {
this.url = configuration.getStorage().put(bytes);
}
}

public T get() throws IOException {
if (encoded != null) {
return config.getDataConverter().fromData(encoded, clazz, clazz);
}

if (url != null) {
return config .getDataConverter().fromData(config.getStorage().get(url), clazz, clazz);
}

return null;
}

public void delete() throws IOException {
if (this.encoded == null) {
config.getStorage().delete(url);
} else {
this.encoded = null;
}
}

public byte[] getEncoded() {
return encoded;
}

public String getUrl() {
return url;
}
}
19 changes: 19 additions & 0 deletions src/main/java/com/uber/cadence/largeblob/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
This package contains an Activity-oriented Future, which can semi-transparently upload large amounts of data to
external stores, effectively avoiding Cadence's per-event / per-workflow size limits.

Multiple Futures can be used in a single Activity's arguments or response values, e.g. in separate struct fields or
slices, but each Future will be unaware of the others. Any max-size limit you choose will not be shared between all
Futures (they each have their own limit), so the cumulative size of your response may be much larger.
Take care to keep WithMaxBytes limits low enough for all values, and be aware that the URLs that replace the data
also take space - dozens are fine, thousands are probably not.

# Caveats

This tool comes with some semi-severe caveats, but if they are acceptable for your use, it may allow you to easily
reduce your Workflow's history's data use:

# Workflows will not have access to data

By design, this tool does not allow you to access wrapped data in your workflows. You can use the type to forward data
between activities, but not inspect the contents - they may not exist, and you cannot safely perform the download in
your workflow.
63 changes: 63 additions & 0 deletions src/main/java/com/uber/cadence/largeblob/Storage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.largeblob;

import java.io.IOException;
import java.time.Duration;

/**
* Storage is an abstraction for storing large parameters to access inside of activities.0
*/
public interface Storage {

/**
* Gets the data based on uri provided
* @param uri uri.
* @return the data as a byte array.
* @throws IOException should be thrown in any implementation class in case of problems accessing the datastore.
*/
byte[] get(String uri) throws IOException;

/**
* Stores data based on uri provided.
* @param bytes bytes.
* @throws IOException should be thrown in any implementation class in case of problems with the datastore
*/
String put(byte[] bytes) throws IOException;

/**
* Stores data based on uri provided.
* @param bytes bytes.
* @param ttl ttl is used for storages like s3 to define the total time to store the object.
* @throws IOException should be thrown in any implementation class in case of problems with the datastore
*/
String put(byte[] bytes, Duration ttl) throws IOException;

/**
* Stores data based on uri provided.
* @param key of the data.
* @param bytes bytes.
* @throws IOException should be thrown in any implementation class in case of problems with the datastore
*/
String put(String key, byte[] bytes) throws IOException;

/**
* Deletes data based on uri provided.
* @param uri uri.
* @throws IOException should be thrown in any implementation class in case of problems with the datastore
*/
void delete(String uri) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.largeblob.impl;

import com.uber.cadence.largeblob.Storage;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class InMemoryStorage implements Storage {

private final Map<String, byte[]> storage = new HashMap<>();

@Override
public byte[] get(String uri) throws IOException {
return storage.get(uri);
}

@Override
public String put(byte[] bytes) throws IOException {
String uuid = UUID.randomUUID().toString();
storage.put(uuid, bytes);
return uuid;
}

@Override
public String put(byte[] bytes, Duration ttl) throws IOException {
return put(bytes);
}

@Override
public String put(String key, byte[] bytes) throws IOException {
storage.put(key, bytes);
return key;
}

@Override
public void delete(String uri) throws IOException {
storage.remove(uri);
}
}
Loading