Skip to content

Commit

Permalink
Updating to the latest driver and removing flag that are no longer ne…
Browse files Browse the repository at this point in the history
…eded. (#1300)

* Updating Cosmos DB to the latest driver and removing flags for
features that are no longer used.  For examply myid was needed
when id only supported hash lookup.  Now id supports range index
so scan can be handled.

* Updating version
  • Loading branch information
voellm authored and stfeng2 committed Apr 23, 2019
1 parent 0397cb6 commit ac13d7f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 70 deletions.
22 changes: 7 additions & 15 deletions azurecosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ https://azure.microsoft.com/services/cosmos-db/

### 1. Setup
This benchmark expects you to have pre-created the database "ycsb" and
collection "usertable" before running the benchmark commands. You can
override the default database name with the azurecosmos.databaseName
configuration value.
collection "usertable" before running the benchmark commands. When
prompted for a Partition Key use id and for RUs select a value you
want to benchmark. [RUs are the measure of provisioned thoughput](https://docs.microsoft.com/azure/cosmos-db/request-units)
that Azure Cosmos defines. The higher the RUs the more throughput you will
get. You can override the default database name with the
azurecosmos.databaseName configuration value for side-by-side
benchmarking.

You must set the uri and the primaryKey in the azurecosmos.properties file in the commands below.
$YCSB_HOME/bin/ycsb load azurecosmos -P workloads/workloada -P azurecosmos/conf/azurecosmos.properties
Expand Down Expand Up @@ -57,12 +61,6 @@ Optionally you can set the uri and primaryKey as follows:
- Name of the database to use.
- Default: ycsb

- azurecosmos.useSinglePartitionCollection (true | false):
- It should be true if you created the collection with a single parition. If
you created the collection with a partitioning key this value should be set
to false.
- Default: true

- azurecosmos.useUpsert (true | false):
- Set to true to allow inserts to update existing documents. If this is
false and a document already exists the insert will fail.
Expand All @@ -88,12 +86,6 @@ Optionally you can set the uri and primaryKey as follows:
- Sets the maximum timeout to for retry in seconds
- Default: uses default value of azurecosmos Java SDK

- azurecosmos.useHashQueryForScan (true | false):
- This setting indicates whether SCAN operation should use hash query instead of range query.
Range query: SELECT * FROM root r WHERE r.id = @startkey
Hash query: SELECT TOP @recordcount * FROM root r WHERE r.id >= @startkey
- Default: false

- azurecosmos.maxDegreeOfParallelismForQuery < integer >
- Sets the maximum degree of parallelism for the FeedOptions used in Query operation
- Default: 0
Expand Down
63 changes: 9 additions & 54 deletions azurecosmos/src/main/java/com/yahoo/ycsb/db/AzureCosmosClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ public class AzureCosmosClient extends DB {
private static final String DEFAULT_CONSISTENCY_LEVEL = "Session";
private static final String DEFAULT_DATABASE_NAME = "ycsb";
private static final String DEFAULT_CONNECTION_MODE = "DirectHttps";
private static final boolean DEFAULT_USE_SINGLE_PARTITION_COLLECTION = true;
private static final boolean DEFAULT_USE_UPSERT = false;
private static final boolean DEFAULT_USE_HASH_QUERY_FOR_SCAN = false;
private static final int DEFAULT_MAX_DEGREE_OF_PARALLELISM_FOR_QUERY = 0;
private static final boolean DEFAULT_INCLUDE_EXCEPTION_STACK_IN_LOG = false;

Expand All @@ -68,12 +66,8 @@ public class AzureCosmosClient extends DB {
private static final AtomicInteger INIT_COUNT = new AtomicInteger(0);

private static DocumentClient client;

private String queryText;
private String databaseName;
private boolean useSinglePartitionCollection;
private boolean useUpsert;
private boolean useHashQueryForScan;
private int maxDegreeOfParallelismForQuery;
private boolean includeExceptionStackInLog;

Expand All @@ -99,18 +93,10 @@ private void initAzureCosmosClient() throws DBException {
throw new DBException("Missing uri required to connect to the database.");
}

this.useSinglePartitionCollection = this.getBooleanProperty(
"azurecosmos.useSinglePartitionCollection",
DEFAULT_USE_SINGLE_PARTITION_COLLECTION);

this.useUpsert = this.getBooleanProperty(
"azurecosmos.useUpsert",
DEFAULT_USE_UPSERT);

this.useHashQueryForScan = this.getBooleanProperty(
"azurecosmos.useHashQueryForScan",
DEFAULT_USE_HASH_QUERY_FOR_SCAN);

this.databaseName = this.getStringProperty(
"azurecosmos.databaseName",
DEFAULT_DATABASE_NAME);
Expand Down Expand Up @@ -147,23 +133,16 @@ private void initAzureCosmosClient() throws DBException {
retryOptions.getMaxRetryWaitTimeInSeconds()));
connectionPolicy.setRetryOptions(retryOptions);

// Query text
this.queryText = this.getQueryText();

try {
LOGGER.info("Creating azurecosmos client {}.. connectivityMode={}, consistencyLevel={},"
+ " maxRetryAttemptsOnThrottledRequests={}, maxRetryWaitTimeInSeconds={}"
+ " useSinglePartitionCollection={}, useUpsert={}, useHashQueryForScan={}, "
+ "queryText={}",
+ " useUpsert={}",
uri,
connectionPolicy.getConnectionMode(),
consistencyLevel.toString(),
connectionPolicy.getRetryOptions().getMaxRetryAttemptsOnThrottledRequests(),
connectionPolicy.getRetryOptions().getMaxRetryWaitTimeInSeconds(),
this.useSinglePartitionCollection,
this.useUpsert,
this.useHashQueryForScan,
this.queryText);
this.useUpsert);
AzureCosmosClient.client = new DocumentClient(uri, primaryKey, connectionPolicy, consistencyLevel);
LOGGER.info("Azure Cosmos connection created: {}", uri);
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -202,12 +181,6 @@ private int getIntProperty(String propertyName, int defaultValue) {
}
}

private String getQueryText() {
return this.useHashQueryForScan ?
"SELECT * FROM root r WHERE r.id = @startkey" :
"SELECT TOP @recordcount * FROM root r WHERE r.myid >= @startkey";
}

/**
* Cleanup any state for this DB. Called once per DB instance; there is one DB
* instance per client thread.
Expand Down Expand Up @@ -246,7 +219,7 @@ public Status read(String table, String key, Set<String> fields, Map<String, Byt
return Status.ERROR;
}

if (null != document) {
if (document != null) {
result.putAll(extractResult(document));
}

Expand All @@ -259,11 +232,14 @@ public Status scan(String table, String startkey, int recordcount, Set<String> f
List<Document> documents;
FeedResponse<Document> feedResponse = null;
try {
FeedOptions feedOptions = new FeedOptions();
feedOptions.setEnableCrossPartitionQuery(true);
feedOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelismForQuery);
feedResponse = AzureCosmosClient.client.queryDocuments(getDocumentCollectionLink(this.databaseName, table),
new SqlQuerySpec(queryText,
new SqlQuerySpec("SELECT TOP @recordcount * FROM root r WHERE r.id >= @startkey",
new SqlParameterCollection(new SqlParameter("@recordcount", recordcount),
new SqlParameter("@startkey", startkey))),
getFeedOptions(startkey));
new SqlParameter("@startkey", startkey))),
feedOptions);
documents = feedResponse.getQueryIterable().toList();
} catch (Exception e) {
if (!this.includeExceptionStackInLog) {
Expand Down Expand Up @@ -392,21 +368,6 @@ private HashMap<String, ByteIterator> extractResult(Document item) {
return rItems;
}

private FeedOptions getFeedOptions(String key) {
if (useSinglePartitionCollection) {
return null;
}
FeedOptions feedOptions = new FeedOptions();
if (this.useHashQueryForScan) {
feedOptions.setEnableCrossPartitionQuery(false);
feedOptions.setPartitionKey(new PartitionKey(key));
} else {
feedOptions.setEnableCrossPartitionQuery(true);
feedOptions.setMaxDegreeOfParallelism(this.maxDegreeOfParallelismForQuery);
}
return feedOptions;
}

private RequestOptions getRequestOptions(String key) {
RequestOptions requestOptions = new RequestOptions();
requestOptions.setPartitionKey(new PartitionKey(key));
Expand Down Expand Up @@ -434,12 +395,6 @@ private static String getDocumentLink(String databaseName, String table, String
private Document getDocumentDefinition(String key, Map<String, ByteIterator> values) {
Document document = new Document();
document.set("id", key);
if (!this.useHashQueryForScan) {
// This field is only needed for range scans.
// Even if this field is present in the document you
// should still partition on id for simplicity of config.
document.set("myid", key);
}
for (Entry<String, ByteIterator> entry : values.entrySet()) {
document.set(entry.getKey(), entry.getValue().toString());
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ LICENSE file.
<aerospike.version>3.1.2</aerospike.version>
<arangodb.version>4.4.1</arangodb.version>
<asynchbase.version>1.7.1</asynchbase.version>
<azurecosmos.version>2.0.0</azurecosmos.version>
<azurecosmos.version>2.2.3</azurecosmos.version>
<azurestorage.version>4.0.0</azurestorage.version>
<cassandra.cql.version>3.0.0</cassandra.cql.version>
<cloudspanner.version>0.24.0-beta</cloudspanner.version>
Expand Down

0 comments on commit ac13d7f

Please sign in to comment.