Skip to content

Commit

Permalink
Optimizing the driver to remove unneeded work and (#1298)
Browse files Browse the repository at this point in the history
fixing a bug in update.
  • Loading branch information
voellm authored and stfeng2 committed Apr 20, 2019
1 parent a90a5a7 commit 0397cb6
Showing 1 changed file with 35 additions and 40 deletions.
75 changes: 35 additions & 40 deletions azurecosmos/src/main/java/com/yahoo/ycsb/db/AzureCosmosClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ public class AzureCosmosClient extends DB {
private static final int DEFAULT_MAX_DEGREE_OF_PARALLELISM_FOR_QUERY = 0;
private static final boolean DEFAULT_INCLUDE_EXCEPTION_STACK_IN_LOG = false;

private static final int NS_IN_US = 1000;
private static final String NA_STRING = "N/A";

private static final Logger LOGGER = LoggerFactory.getLogger(AzureCosmosClient.class);

/**
Expand Down Expand Up @@ -236,8 +233,7 @@ public Status read(String table, String key, Set<String> fields, Map<String, Byt
String documentLink = getDocumentLink(this.databaseName, table, key);

ResourceResponse<Document> readResource = null;
Document document;
long startTime = System.nanoTime();
Document document = null;

try {
readResource = AzureCosmosClient.client.readDocument(documentLink, getRequestOptions(key));
Expand All @@ -248,23 +244,18 @@ public Status read(String table, String key, Set<String> fields, Map<String, Byt
}
LOGGER.error("Failed to read key {} in collection {} in database {}", key, table, this.databaseName, e);
return Status.ERROR;
} finally {
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
LOGGER.debug("Read key {} in {}us - ActivityID: {}", key, elapsed,
readResource != null ? readResource.getActivityId() : NA_STRING);
}

if (null != document) {
result.putAll(extractResult(document));
LOGGER.trace("Read result: {}", document);
}

return Status.OK;
}

@Override
public Status scan(String table, String startkey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
long startTime = System.nanoTime();
List<Document> documents;
FeedResponse<Document> feedResponse = null;
try {
Expand All @@ -280,11 +271,6 @@ public Status scan(String table, String startkey, int recordcount, Set<String> f
}
LOGGER.error("Failed to scan with startKey={}, recordCount={}", startkey, recordcount, e);
return Status.ERROR;
} finally {
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
LOGGER.debug("Queried {} records for key {} in {}us - ActivityID: {}",
recordcount, startkey, elapsed,
feedResponse != null ? feedResponse.getActivityId() : NA_STRING);
}

if (documents != null) {
Expand All @@ -298,20 +284,41 @@ public Status scan(String table, String startkey, int recordcount, Set<String> f

@Override
public Status update(String table, String key, Map<String, ByteIterator> values) {
// Azure Cosmos does not have patch support. Until then we need to read
// the document, update in place, and then write back.
// This could actually be made more efficient by using a stored procedure
// and doing the read/modify write on the server side. Perhaps
// that will be a future improvement.

String documentLink = getDocumentLink(this.databaseName, table, key);
Document document = getDocumentDefinition(key, values);
ResourceResponse<Document> updatedResource = null;
ResourceResponse<Document> readResouce = null;
RequestOptions reqOptions = null;
Document document = null;

try {
reqOptions = getRequestOptions(key);
readResouce = AzureCosmosClient.client.readDocument(documentLink, reqOptions);
document = readResouce.getResource();
} catch (DocumentClientException e) {
if (!this.includeExceptionStackInLog) {
e = null;
}
LOGGER.error("Failed to read key {} in collection {} in database {} during update operation",
key, table, this.databaseName, e);
return Status.ERROR;
}

RequestOptions reqOptions = getRequestOptions(key);
if (reqOptions == null) {
reqOptions = new RequestOptions();
// Update values
for (Entry<String, ByteIterator> entry : values.entrySet()) {
document.set(entry.getKey(), entry.getValue().toString());
}

AccessCondition accessCondition = new AccessCondition();
accessCondition.setCondition(document.getETag());
accessCondition.setType(AccessConditionType.IfMatch);
reqOptions.setAccessCondition(accessCondition);

ResourceResponse<Document> updatedResource = null;
long startTime = System.nanoTime();
try {
updatedResource = AzureCosmosClient.client.replaceDocument(documentLink, document, reqOptions);
} catch (DocumentClientException e) {
Expand All @@ -320,30 +327,27 @@ public Status update(String table, String key, Map<String, ByteIterator> values)
}
LOGGER.error("Failed to update key {}", key, e);
return Status.ERROR;
} finally {
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
LOGGER.debug("Updated key {} in {}us - ActivityID: {}", key, elapsed,
updatedResource != null ? updatedResource.getActivityId() : NA_STRING);
}

return Status.OK;
}

@Override
public Status insert(String table, String key, Map<String, ByteIterator> values) {
Document documentDefinition = getDocumentDefinition(key, values);
ResourceResponse<Document> resourceResponse = null;
long startTime = System.nanoTime();
RequestOptions requestOptions = getRequestOptions(key);

try {
if (this.useUpsert) {
resourceResponse = AzureCosmosClient.client.upsertDocument(getDocumentCollectionLink(this.databaseName, table),
documentDefinition,
getRequestOptions(key),
requestOptions,
true);
} else {
resourceResponse = AzureCosmosClient.client.createDocument(getDocumentCollectionLink(this.databaseName, table),
documentDefinition,
getRequestOptions(key),
requestOptions,
true);
}

Expand All @@ -353,10 +357,6 @@ public Status insert(String table, String key, Map<String, ByteIterator> values)
}
LOGGER.error("Failed to insert key {} to collection {} in database {}", key, table, this.databaseName, e);
return Status.ERROR;
} finally {
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
LOGGER.debug("Inserted key {} in {}us - ActivityID: {}", key, elapsed,
resourceResponse != null ? resourceResponse.getActivityId() : NA_STRING);
}

return Status.OK;
Expand All @@ -365,7 +365,7 @@ public Status insert(String table, String key, Map<String, ByteIterator> values)
@Override
public Status delete(String table, String key) {
ResourceResponse<Document> deletedResource = null;
long startTime = System.nanoTime();

try {
deletedResource = AzureCosmosClient.client.deleteDocument(getDocumentLink(this.databaseName, table, key),
getRequestOptions(key));
Expand All @@ -375,10 +375,6 @@ public Status delete(String table, String key) {
}
LOGGER.error("Failed to delete key {} in collection {} in database {}", key, table, this.databaseName, e);
return Status.ERROR;
} finally {
long elapsed = (System.nanoTime() - startTime) / NS_IN_US;
LOGGER.debug("Deleted key {} in {}us - ActivityID: {}", key, elapsed,
deletedResource != null ? deletedResource.getActivityId() : NA_STRING);
}

return Status.OK;
Expand All @@ -391,7 +387,6 @@ private HashMap<String, ByteIterator> extractResult(Document item) {
HashMap<String, ByteIterator> rItems = new HashMap<>(item.getHashMap().size());

for (Entry<String, Object> attr : item.getHashMap().entrySet()) {
LOGGER.trace("Result- key: {}, value: {}", attr.getKey(), attr.getValue().toString());
rItems.put(attr.getKey(), new StringByteIterator(attr.getValue().toString()));
}
return rItems;
Expand Down

0 comments on commit 0397cb6

Please sign in to comment.