Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unify caching across workers to maximize cache hits #977

Merged
merged 15 commits into from
May 10, 2024

Conversation

ankitk-me
Copy link
Contributor

Fixes #937

@@ -54,11 +57,13 @@ public class KarapaceCatalogHandler implements CatalogHandler
private final long maxAgeMillis;
private final KarapaceEventContext event;
private final long catalogId;
private final ConcurrentHashMap<Integer, CompletableFuture<String>> cache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final ConcurrentHashMap<Integer, CompletableFuture<String>> cache;
private final ConcurrentMap<Integer, CompletableFuture<String>> cachedSchemas;


public KarapaceCatalogHandler(
KarapaceOptionsConfig config,
EngineContext context,
long catalogId)
long catalogId,
ConcurrentHashMap<Integer, CompletableFuture<String>> cache)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ConcurrentHashMap<Integer, CompletableFuture<String>> cache)
ConcurrentMap<Integer, CompletableFuture<String>> cachedSchemas)

@@ -69,24 +74,41 @@ public KarapaceCatalogHandler(
this.maxAgeMillis = config.maxAge.toMillis();
this.event = new KarapaceEventContext(context);
this.catalogId = catalogId;
this.cache = cache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.cache = cache;
this.cachedSchemas = cachedSchemas;

Comment on lines 91 to 111
CompletableFuture<String> future = cache.get(schemaId);
if (future == null)
{
schemas.put(schemaId, schema);
future = CompletableFuture.supplyAsync(() ->
{
String response = sendHttpRequest(MessageFormat.format(SCHEMA_PATH, schemaId));
return response != null ? request.resolveSchemaResponse(response) : null;
});
}
try
{
schema = future.get();
if (schema != null)
{
cache.put(schemaId, future);
schemas.put(schemaId, schema);
}
}
catch (ExecutionException | InterruptedException e)
{
// TODO: log an event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still a race here, between cache.get(schemaId) and cache.put(schemaId, future).

To solve the race, we need to use one of the methods on ConcurrentMap that is not available on Map, such as:

CompletableFuture<> future = cache.get(schemaId);
if (future == null)
{
    CompletableFuture<> newFuture = new CompletableFuture<String>();
    future = cache.putIfAbsent(schemaId, newFuture);
    if (future == null)
    {
        // send request and complete newFuture
        future = newFuture;
    }
}
assert future != null;
schema = future.get();
if (schema != null)
{
    schemas.put(schemaId, schema);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, we also need to prevent unnecessary concurrent requests to resolve the schemaIds, such as latest, same as for resolving the schemas themselves from schemaIds.

@@ -113,6 +135,10 @@ public int resolve(
{
schemaIds.put(checkSum, new CachedSchemaId(System.currentTimeMillis(), schemaId));
}
else if (schemaIds.containsKey(checkSum))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
else if (schemaIds.containsKey(checkSum))
else if (schemaIds.containsKey(schemaKey))

Comment on lines 139 to 141
{
schemaId = schemaIds.get(checkSum).id;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed to handle the case where remote access fails, but we can serve from (stale) cache instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

@@ -72,7 +73,7 @@ public void shouldResolveSchemaViaSchemaId() throws Exception
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

KarapaceCatalogHandler catalog = new KarapaceCatalogHandler(config, context, 0L);
KarapaceCatalogHandler catalog = new KarapaceCatalogHandler(config, context, 0L, new ConcurrentHashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a convenience constructor that calls this(config, context, catalogId, new ConcurrentHashMap<>()) instead of requiring the test to handle the complexity.

Comment on lines 110 to 115
newFuture = CompletableFuture.supplyAsync(() ->
{
String response = sendHttpRequest(MessageFormat.format(SCHEMA_PATH, schemaId));
return response != null ? request.resolveSchemaResponse(response) : null;
});
future = newFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can reassign newFuture here, as the value being lost is the one that already made it into the concurrent map, so if we reassign it then the one in the map would not be completed, agree?

Comment on lines 157 to 163
newFuture = CompletableFuture.supplyAsync(() ->
{
String response = sendHttpRequest(MessageFormat.format(SUBJECT_VERSION_PATH, subject, version));
return new CachedSchemaId(System.currentTimeMillis(),
response != null ? request.resolveResponse(response) : NO_SCHEMA_ID);
});
future = newFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here as above, agree?

Comment on lines 177 to 195
CompletableFuture<CachedSchemaId> newFuture =
cachedSchemaIds.computeIfPresent(schemaKey, (key, existingFuture) ->
{
if (existingFuture.isDone() || existingFuture.isCompletedExceptionally())
{
CompletableFuture<CachedSchemaId> id = CompletableFuture.supplyAsync(() ->
{
String response = sendHttpRequest(MessageFormat.format(SUBJECT_VERSION_PATH,
subject, version));
return new CachedSchemaId(System.currentTimeMillis(),
response != null ? request.resolveResponse(response) : NO_SCHEMA_ID);
});
return id;
}
else
{
return existingFuture;
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can perhaps simplify the logic by creating the candidate newFuture and calling:

CompletableFuture<> newFuture = new CompletableFuture<CachedSchemaId>();
CompletableFuture<> future = cachedSchemaIds.merge(schemaKey, newFuture, (v1, v2) -> v1 != null && !v1.isDone() ? v1 : v2);
if (future == newFuture)
{
    // send request
    // complete future
}
cachedSchemaId  = future.get();

@@ -26,19 +25,21 @@
public class KarapaceCatalogContext implements CatalogContext
{
private final EngineContext context;
private final ConcurrentHashMap<Integer, CompletableFuture<String>> cache;
private final ConcurrentMap<Long, KarapaceCache> cache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final ConcurrentMap<Long, KarapaceCache> cache;
private final ConcurrentMap<Long, KarapaceCache> cachesById;

Comment on lines 42 to 43
KarapaceCache karapaceCache = cache.computeIfAbsent(catalog.id, id -> new KarapaceCache());
return new KarapaceCatalogHandler(KarapaceOptionsConfig.class.cast(catalog.options), context, catalog.id, karapaceCache);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
KarapaceCache karapaceCache = cache.computeIfAbsent(catalog.id, id -> new KarapaceCache());
return new KarapaceCatalogHandler(KarapaceOptionsConfig.class.cast(catalog.options), context, catalog.id, karapaceCache);
KarapaceCache cache = cachesById.computeIfAbsent(catalog.id, id -> new KarapaceCache());
return new KarapaceCatalogHandler(KarapaceOptionsConfig.class.cast(catalog.options), context, catalog.id, cache);

Comment on lines 23 to 24
public final ConcurrentMap<Integer, CompletableFuture<String>> cachedSchemas;
public final ConcurrentMap<Integer, CompletableFuture<CachedSchemaId>> cachedSchemaIds;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public final ConcurrentMap<Integer, CompletableFuture<String>> cachedSchemas;
public final ConcurrentMap<Integer, CompletableFuture<CachedSchemaId>> cachedSchemaIds;
public final ConcurrentMap<Integer, CompletableFuture<String>> schemas;
public final ConcurrentMap<Integer, CompletableFuture<CachedSchemaId>> schemaIds;

Probably don't need the cached prefix now that these are fields of KarapaceCache.

@@ -47,6 +50,8 @@ public KarapaceEventContext(
{
this.karapaceTypeId = context.supplyTypeId(KarapaceCatalog.NAME);
this.remoteAccessRejectedEventId = context.supplyEventId("catalog.karapace.remote.access.rejected");
this.futureCompletedExceptionallyId = context.supplyEventId("catalog.karapace.future.completed.exceptionally");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concept of a future is internal implementation detail. Let's change this to still make sense from an external perspective for someone observing the logs with no understanding of the implementation internals.

result = String.format(FUTURE_COMPLETED_EXCEPTIONALLY, asString(ex.error()));
break;
}
case STALE_SCHEMA_SERVED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many times is this going to be logged while Karapace remains unreachable?

Perhaps we need to reconsider what events would convey the information necessary without overwhelming the logs when things are not operating perfectly.

The state transition that has occurred is that Karapace has been unreachable for long enough that we are forced to serve stale schemas, and then the next state transition would be that we had reestablished connectivity to Karapace so that stale schemas are no longer being served. Do we think these transition log events are sufficient or do we need something more verbose?

Comment on lines 21 to 23
REMOTE_ACCESS_REJECTED (1),
FUTURE_COMPLETED_EXCEPTIONALLY (2),
STALE_SCHEMA_SERVED (3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do these differ in the information they convey to the log observer?

Comment on lines 24 to 26
public String schema;
public AtomicInteger event;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these are both unmodifiable, then they should be marked final too.

public class CachedSchema
{
public static final String SCHEMA_PLACEHOLDER = "schema";
public static final CachedSchema IN_PROGRESS = new CachedSchema(SCHEMA_PLACEHOLDER, new AtomicInteger(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to explicitly pass 0 to AtomicInteger constructor.

Comment on lines 27 to 33
public CachedSchema(
String schema,
AtomicInteger event)
{
this.schema = schema;
this.event = event;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have a convenience constructor with just schema defauting to new AtomicInteger().

public static final CachedSchema IN_PROGRESS = new CachedSchema(SCHEMA_PLACEHOLDER, new AtomicInteger(0));

public String schema;
public AtomicInteger event;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this is called event.
Isn't this just tracking the number of retries, which is normally 0 when everything is working normally?

Comment on lines 25 to 28
public long timestamp;
public int id;
public AtomicInteger event;
public long retry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should some or all of these be final?

this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}

public void remoteAccessRejected(
public void unretrievableSchemaSubjectVersion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be more readable if these methods were named onXxx instead of xxx.

So in the calling context it would be something like events.onUnretrievableSchemaSubjectVersion(...) (not event...), wdyt?

Comment on lines 49 to 51
public static final long RESET_RETRY_DELAY_MS_DEFAULT = 0L;
public static final long RETRY_INITIAL_DELAY_MS_DEFAULT = 1000L;
public static final int RETRY_MULTIPLER_DEFAULT = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these be private?

Comment on lines 219 to 221
long current = cachedSchemaId.retry;
long update = current * RETRY_MULTIPLER_DEFAULT;
retry = update < maxAgeMillis ? update : current;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is trying to prevent the value from increasing if it would end up larger than maxAgeMillis, but that means maxAgeMillis is okay too.

Suggest it is easier to follow if you change to max out at maxAgeMillis instead of current.

Suggested change
long current = cachedSchemaId.retry;
long update = current * RETRY_MULTIPLER_DEFAULT;
retry = update < maxAgeMillis ? update : current;
retryAfter = Math.min(cachedSchemaId.retryAfter << 1, maxAgeMillis);

@@ -64,22 +64,17 @@ public void shouldLogEvents() throws Exception
}

@Test
public void shouldLogFutureCompletedExceptionally()
public void shouldLogRemoteRegistryEvents()
{
EngineContext context = mock(EngineContext.class);
when(context.clock()).thenReturn(Clock.systemUTC());
when(context.supplyEventWriter()).thenReturn(mock(MessageConsumer.class));
KarapaceEventContext event = new KarapaceEventContext(context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest we call this events instead of event.

if (schemaId != NO_SCHEMA_ID)
{
schemaIds.put(schemaKey, cachedSchemaId);
if (cachedSchemaId.retryAttempts.compareAndSet(1, 2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right to me, because the value of retryAttempts should be independent of serving stale schema, right?

Comment on lines 351 to 352
boolean success = httpResponse.statusCode() == 200;
String responseBody = success ? httpResponse.body() : null;
if (!success)
{
event.remoteAccessRejected(catalogId, httpRequest, httpResponse.statusCode());
}
return responseBody;
responseBody = success ? httpResponse.body() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need success any more, right?

responseBody = httpResponse.statusCode() == 200 ? httpResponse.body() : null;

Comment on lines 65 to 79

@Test
public void shouldLogRemoteRegistryEvents()
{
EngineContext context = mock(EngineContext.class);
when(context.clock()).thenReturn(Clock.systemUTC());
when(context.supplyEventWriter()).thenReturn(mock(MessageConsumer.class));
KarapaceEventContext events = new KarapaceEventContext(context);
events.onUnretrievableSchemaSubjectVersion(0L, "subject", "version");
events.onRetrievableSchemaSubjectVersion(0L, "subject", "version");
events.onUnretrievableSchemaSubjectVersionStaleSchema(0L, "subject", "version", 1);
events.onUnretrievableSchemaId(0L, 1);
events.onRetrievableSchemaId(0L, 1);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done as an IT with scripts verifying each different possible behavior of the remote Karapace system.

Is anything missing from test binding that would let the karapace catalog be tested that way?

Note: please see filesystem catalog EventIT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer this IT done via scripts not mocks.

Note: if using mocks then it is a unit test, not an integration test - unit tests with mocks are not stable after a refactor of the code as they also need to be changed, whereas integration tests remain stable, providing better confidence after changing implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no need to drive any direct calls to KarapaceCatalogHandler via code and mocks.

Instead, this should be driven by test binding in zilla.yaml for the test. It may be necessary to add support for expected schema in test binding options so that it can verify the retrieved schema matches expectations.

Note: this would be for test binding only, as other bindings have no need to validate expectations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no need to drive any direct calls to KarapaceCatalogHandler via code and mocks.

Instead, this should be driven by test binding in zilla.yaml for the test. It may be necessary to add support for expected schema in test binding options so that it can verify the retrieved schema matches expectations.

Note: this would be for test binding only, as other bindings have no need to validate expectations.

else
{
handler.resolve(catalog.id);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is excellent. As mentioned above, we may need to add support for verifying the expected schema here, then reject the initial begin with a reset if the schema doesn't match expectations, as that will cause the IT to fail.

The expected schemas may be a separate test binding option, independent of cataloged so as not to interfere with that generic definition used by other bindings that don't need to validate expectations.

- catalog0
catalog:
catalog0:
- id: 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would seem more readable and descriptive of the intent to put subject: not-subject1 instead of id: 1 here, agree?

Comment on lines 43 to 45
catalog-assertion:
- schema: null
- schema: '{"fields":[{"name":"id","type":"string"},{"name":"status","type":"string"}],"name":"Event","namespace":"io.aklivity.example","type":"record"}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great!

Suggest we generalize the concept of assertions as we'll potentially need that later for vault, etc.

Suggested change
catalog-assertion:
- schema: null
- schema: '{"fields":[{"name":"id","type":"string"},{"name":"status","type":"string"}],"name":"Event","namespace":"io.aklivity.example","type":"record"}'
assertions:
catalog:
catalog0:
- schema: null
- schema: '{"fields":[{"name":"id","type":"string"},{"name":"status","type":"string"}],"name":"Event","namespace":"io.aklivity.example","type":"record"}'

{
"type": ["string", "null"]
},
"interval":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps delay instead of interval since it is not repeating.

catalog0:
- id: 9
catalog-assertion:
- schema: '{"fields":[{"name":"id","type":"string"},{"name":"status","type":"string"}],"name":"Event","namespace":"io.aklivity.example","type":"record"}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ti possible to use yaml syntax magic to let this be pretty printed json in the file, but compact format when parsed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to find a way to implement this feedback, tried using > (Folded Block Scalar) instead of |. But the result was still varying from the expected Schema string.

@jfallows jfallows merged commit d0356d5 into aklivity:develop May 10, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Resiliently handle karapace catalog unreachable
2 participants