Skip to content

Commit

Permalink
POC/unit test code to use painless scripts to do lease acquisition.
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Jun 10, 2024
1 parent 8d0c599 commit dd4b1d1
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 3 deletions.
3 changes: 1 addition & 2 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ dependencies {


// Integration tests
testImplementation group: 'org.opensearch', name: 'opensearch-testcontainers'
testImplementation group: 'org.testcontainers', name: 'testcontainers'
testImplementation group: 'org.hamcrest', name: 'hamcrest'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api'
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package com.rfs.cms;


import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.opensearch.testcontainers.OpensearchContainer;

import java.time.Instant;

/**
* The contract here is that the first request in will acquire a lease for the duration that was requested.
*
* Once the work is complete, the worker will mark it as such and as long as the workerId matches what was set,
* the work will be marked for completion and no other lease requests will be granted.
*
* When a lease has NOT been acquired, the update request will return a noop. If it was created,
* the expiration period will be equal to the original timestamp that the client sent + the expiration window.
*
* In case there was an expired lease and this worker has acquired the lease, the result will be 'updated'.
* The client will need to retrieve the document to find out what the expiration value was. That means that
* in all non-contentious cases, clients only need to make one call per work item. Multiple calls are only
* required when a lease has expired and a new one is being granted since the worker/client needs to make the
* GET call to find out the new expiration value.
*/
@Slf4j
public class TransactionalOpenSearchDataStoreTest {
private static final String INDEX_NAME = ".migrations_working_state";

final static OpensearchContainer<?> container =
new OpensearchContainer<>("opensearchproject/opensearch:2.11.0");

@BeforeAll
static void setupOpenSearchContainer() throws Exception {
// Start the container. This step might take some time...
container.start();

var body = "{\n" +
" \"settings\": {\n" +
" \"index\": {" +
" \"number_of_shards\": 1,\n" +
" \"number_of_replicas\": 1\n" +
" }\n" +
" }\n" +
"}\n";
final var httpCreateIndex = new HttpPut(container.getHttpHostAddress() + "/" + INDEX_NAME);
httpCreateIndex.setHeader("Content-Type", "application/json");
httpCreateIndex.setHeader("Accept-Encoding", "identity");
httpCreateIndex.setEntity(new StringEntity(body));

try (CloseableHttpClient client = HttpClients.createDefault()) {
var responseBody1 = client.execute(httpCreateIndex, r -> {
Assertions.assertEquals(HttpStatus.SC_OK, r.getCode());
return r.getEntity();
});
}
}

static String updateLeaseBodyTemplate = "{\n" +
" \"scripted_upsert\": true,\n" +
" \"upsert\": {\n" +
" \"scriptVersion\": \"{SCRIPT_VERSION}\",\n" +
" \"expiration\": 0,\n" +
" \"workerId\": \"{WORKER_ID}\",\n" +
" \"numAttempts\": 0\n" +
" },\n" +
" \"script\": {\n" +
" \"lang\": \"painless\",\n" +
" \"params\": { \n" +
" \"clientTimestamp\": {CLIENT_TIMESTAMP},\n" +
" \"expirationWindow\": {EXPIRATION_WINDOW},\n" +
" \"workerId\": \"{WORKER_ID}\"\n" +
" },\n" +
" \"source\": \"" +
" if (ctx._source.scriptVersion != \\\"{SCRIPT_VERSION}\\\") {" +
" throw new IllegalArgumentException(\\\"scriptVersion mismatch. Not all participants are using the same script: sourceVersion=\\\" + ctx.source.scriptVersion);" +
" } " +
" long serverTimeSeconds = System.currentTimeMillis() / 1000;" +
" if (Math.abs(params.clientTimestamp - serverTimeSeconds) > {CLOCK_DEVIATION_SECONDS_THRESHOLD}) {" +
" throw new IllegalArgumentException(\\\"The current times indicated between the client and server are too different.\\\");" +
" }" +
" long newExpiration = params.clientTimestamp + (((long)Math.pow(2, ctx._source.numAttempts)) * params.expirationWindow);" +
" if (ctx._source.completedAt == null && " + // not completed
" (ctx._source.expiration == 0 ||" + // first time
" ctx._source.expiration < serverTimeSeconds) && " + // expired lease
" ctx._source.expiration < newExpiration) {" + // sanity check
" ctx._source.expiration = newExpiration;" +
" ctx._source.workerId = params.workerId;" +
" ctx._source.numAttempts += 1;" +
" } else {" +
" ctx.op = \\\"noop\\\";" +
" }" +
"\"\n" +
" }\n" + // close script
"}"; // close top-level

String makeAcquireLeasePayload(String workerId, int tolerableClockShiftSeconds,
Instant currentTime, int expirationWindowSeconds) {
// the notion of 'now' isn't supported with painless scripts
// https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-datetime.html#_datetime_now
var body = updateLeaseBodyTemplate
.replace("{SCRIPT_VERSION}", "poc")
.replace("{WORKER_ID}", workerId)
.replace("{CLIENT_TIMESTAMP}", Long.toString(currentTime.toEpochMilli()/1000))
.replace("{EXPIRATION_WINDOW}", Integer.toString(expirationWindowSeconds))
.replace("{CLOCK_DEVIATION_SECONDS_THRESHOLD}", Integer.toString(tolerableClockShiftSeconds));
log.info("Update body: "+ body);
return body;
}

static String markWorkAsCompleteBodyTemplate = "{\n" +
" \"script\": {\n" +
" \"lang\": \"painless\",\n" +
" \"params\": { \n" +
" \"clientTimestamp\": {CLIENT_TIMESTAMP},\n" +
" \"workerId\": \"{WORKER_ID}\"\n" +
" },\n" +
" \"source\": \"" +
" if (ctx._source.scriptVersion != \\\"{SCRIPT_VERSION}\\\") {" +
" throw new IllegalArgumentException(\\\"scriptVersion mismatch. Not all participants are using the same script: sourceVersion=\\\" + ctx.source.scriptVersion);" +
" } " +
" if (ctx._source.workerId != params.workerId) {" +
" throw new IllegalArgumentException(\\\"work item was owned by \\\" + ctx._source.workerId + \\\" not \\\" + params.workerId);" +
" } else {" +
" ctx._source.completedAt = System.currentTimeMillis() / 1000;" +
" }" +
"\"\n" +
" }\n" +
"}";

String makeCompleteWorkPayload(String workerId, Instant currentTime) {
var body = markWorkAsCompleteBodyTemplate
.replace("{SCRIPT_VERSION}", "poc")
.replace("{WORKER_ID}", workerId)
.replace("{CLIENT_TIMESTAMP}", Long.toString(currentTime.toEpochMilli()/1000));
log.info("Mark complete body: "+ body);
return body;
}

@NotNull
private HttpPost makePostRequest(String documentId, String payload) {
final HttpPost httpPost = new HttpPost(container.getHttpHostAddress() + "/" + INDEX_NAME + "/_update/" + documentId);

httpPost.setEntity(new StringEntity(payload));
httpPost.setHeader("Content-Type", "application/json");
httpPost.setHeader("Accept-Encoding", "identity");
return httpPost;
}

private HttpGet makeGetDocument(String documentId) {
final HttpGet httpGet = new HttpGet(container.getHttpHostAddress() + "/" + INDEX_NAME + "/_doc/" + documentId);
httpGet.setHeader("Accept-Encoding", "identity");
return httpGet;
}

@NotNull
private HttpPost makeUpdateRequest(String document, String workerId, Instant currentTime,
int expirationWindowSeconds) {
return makePostRequest(document, makeAcquireLeasePayload(workerId, 5,
currentTime, expirationWindowSeconds));
}

private HttpPost makeCompletionRequest(String document, String workerId, Instant currentTime) {
final HttpPost httpPost = new HttpPost(container.getHttpHostAddress() + "/" + INDEX_NAME + "/_update/" + document);

httpPost.setEntity(new StringEntity(makeCompleteWorkPayload(workerId, currentTime)));
httpPost.setHeader("Content-Type", "application/json");
httpPost.setHeader("Accept-Encoding", "identity");
return httpPost;
}

@Test
void testCreateOrUpdateOrReturnAsIsRequest() throws Exception {
var objMapper = new ObjectMapper();
try (CloseableHttpClient client = HttpClients.createDefault()) {
var response1 = client.execute(
makeUpdateRequest("A", "node_1", Instant.now(), 2),
r -> {
Assertions.assertEquals(HttpStatus.SC_CREATED, r.getCode());
return objMapper.readTree(r.getEntity().getContent());
});
Assertions.assertEquals("created", response1.get("result").textValue());
var doc1 = client.execute(makeGetDocument("A"), r -> {
return objMapper.readTree(r.getEntity().getContent());
});
Assertions.assertEquals(1, doc1.get("_source").get("numAttempts").longValue());
var response2 = client.execute(
makeUpdateRequest("A", "node_1", Instant.now(), 2),
r -> {
Assertions.assertEquals(HttpStatus.SC_OK, r.getCode());
return objMapper.readTree(r.getEntity().getContent());
});
Assertions.assertEquals("noop", response2.get("result").textValue());
var doc2 = client.execute(makeGetDocument("A"), r -> {
return objMapper.readTree(r.getEntity().getContent());
});
Assertions.assertEquals(1, doc2.get("_source").get("numAttempts").longValue());

Thread.sleep(2500);

var response3 = client.execute(
makeUpdateRequest("A", "node_1", Instant.now(), 2),
r -> {
Assertions.assertEquals(HttpStatus.SC_OK, r.getCode());
return objMapper.readTree(r.getEntity().getContent());
});
Assertions.assertEquals("updated", response3.get("result").textValue());
var doc3 = client.execute(makeGetDocument("A"), r -> {
return objMapper.readTree(r.getEntity().getContent());
});
Assertions.assertEquals(2, doc3.get("_source").get("numAttempts").longValue());
Assertions.assertTrue(
doc2.get("_source").get("expiration").longValue() <
doc3.get("_source").get("expiration").longValue());

var response4 = client.execute(
makeCompletionRequest("A", "node_1", Instant.now()), r -> {
Assertions.assertEquals(HttpStatus.SC_OK, r.getCode());
return objMapper.readTree(r.getEntity().getContent());
});
var doc4 = client.execute(makeGetDocument("A"), r -> {
return objMapper.readTree(r.getEntity().getContent());
});
Assertions.assertEquals("updated", response4.get("result").textValue());
Assertions.assertTrue(doc4.get("_source").get("completedAt").longValue() > 0);
log.info("doc4="+doc4);
}
}
}
5 changes: 4 additions & 1 deletion commonDependencyVersionConstraints/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ dependencies {
api group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: jackson
api group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-smile', version: jackson

def jupiter = '5.9.3'
def jupiter = '5.10.2'
api group: 'org.junit.jupiter', name:'junit-jupiter-api', version: jupiter
api group: 'org.junit.jupiter', name:'junit-jupiter-params', version: jupiter
api group: 'org.junit.jupiter', name:'junit-jupiter-engine', version: jupiter
Expand Down Expand Up @@ -90,6 +90,9 @@ dependencies {
api group: 'org.testcontainers', name: 'testcontainers', version: testcontainers
api group: 'org.testcontainers', name: 'toxiproxy', version: testcontainers

// make sure that this is compatible with the testcontainers version
api group: 'org.opensearch', name: 'opensearch-testcontainers', version: '2.0.1'

def mockito = '5.11.0'
api group: 'org.mockito', name:'mockito-core', version: mockito
api group: 'org.mockito', name:'mockito-junit-jupiter', version: mockito
Expand Down

0 comments on commit dd4b1d1

Please sign in to comment.