diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 33b68dd39fd..889379250c1 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -25,7 +25,7 @@ files="(.*).java"/> + files="(.*).java"/> diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java index 5304ee6b0ae..6f6a30b5d9d 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java @@ -19,6 +19,7 @@ import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableMap; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; import io.confluent.kafka.schemaregistry.utils.QualifiedSubject; @@ -72,6 +73,8 @@ public class CachedSchemaRegistryClient implements SchemaRegistryClient { private final Map> idToSchemaCache; private final Map> schemaToVersionCache; private final Map> versionToSchemaCache; + private final Cache latestVersionCache; + private final Cache latestWithMetadataCache; private final Cache missingSchemaCache; private final Cache missingIdCache; private final Cache missingVersionCache; @@ -198,6 +201,25 @@ public CachedSchemaRegistryClient( this.restService = restService; this.ticker = ticker; + long latestTTL = SchemaRegistryClientConfig.getLatestTTL(configs); + + CacheBuilder latestVersionBuilder = CacheBuilder.newBuilder() + .maximumSize(cacheCapacity) + .ticker(ticker); + if (latestTTL >= 0) { + latestVersionBuilder = latestVersionBuilder.expireAfterWrite( + latestTTL, TimeUnit.SECONDS); + } + this.latestVersionCache = latestVersionBuilder.build(); + CacheBuilder latestWithMetadataBuilder = CacheBuilder.newBuilder() + .maximumSize(cacheCapacity) + .ticker(ticker); + if (latestTTL >= 0) { + latestWithMetadataBuilder = latestWithMetadataBuilder.expireAfterWrite( + latestTTL, TimeUnit.SECONDS); + } + this.latestWithMetadataCache = latestWithMetadataBuilder.build(); + long missingIdTTL = SchemaRegistryClientConfig.getMissingIdTTL(configs); long missingVersionTTL = SchemaRegistryClientConfig.getMissingVersionTTL(configs); long missingSchemaTTL = SchemaRegistryClientConfig.getMissingSchemaTTL(configs); @@ -578,17 +600,32 @@ public SchemaMetadata getSchemaMetadata(String subject, int version, boolean loo @Override public SchemaMetadata getLatestSchemaMetadata(String subject) throws IOException, RestClientException { + SchemaMetadata schema = latestVersionCache.getIfPresent(subject); + if (schema != null) { + return schema; + } + io.confluent.kafka.schemaregistry.client.rest.entities.Schema response = restService.getLatestVersion(subject); - return new SchemaMetadata(response); + schema = new SchemaMetadata(response); + latestVersionCache.put(subject, schema); + return schema; } @Override public SchemaMetadata getLatestWithMetadata(String subject, Map metadata, boolean lookupDeletedSchema) throws IOException, RestClientException { + SubjectAndMetadata subjectAndMetadata = new SubjectAndMetadata(subject, metadata); + SchemaMetadata schema = latestWithMetadataCache.getIfPresent(subjectAndMetadata); + if (schema != null) { + return schema; + } + io.confluent.kafka.schemaregistry.client.rest.entities.Schema response = restService.getLatestWithMetadata(subject, metadata, lookupDeletedSchema); - return new SchemaMetadata(response); + schema = new SchemaMetadata(response); + latestWithMetadataCache.put(subjectAndMetadata, schema); + return schema; } @Override @@ -684,6 +721,8 @@ public synchronized List deleteSubject( idToSchemaCache.remove(subject); schemaToIdCache.remove(subject); schemaToResponseCache.remove(subject); + latestVersionCache.invalidate(subject); + latestWithMetadataCache.invalidateAll(); return restService.deleteSubject(requestProperties, subject, isPermanent); } @@ -709,6 +748,8 @@ public synchronized Integer deleteSchemaVersion( .getOrDefault(subject, Collections.emptyMap()) .remove(Integer.valueOf(version)); } + latestVersionCache.invalidate(subject); + latestWithMetadataCache.invalidateAll(); return restService.deleteSchemaVersion(requestProperties, subject, version, isPermanent); } @@ -819,6 +860,8 @@ public synchronized void reset() { idToSchemaCache.clear(); schemaToVersionCache.clear(); versionToSchemaCache.clear(); + latestVersionCache.invalidateAll(); + latestWithMetadataCache.invalidateAll(); missingSchemaCache.invalidateAll(); missingIdCache.invalidateAll(); missingVersionCache.invalidateAll(); @@ -943,4 +986,44 @@ public String toString() { return "SubjectAndId{" + "subject='" + subject + '\'' + ", id=" + id + '}'; } } + + static class SubjectAndMetadata { + private final String subject; + private final Map metadata; + + public SubjectAndMetadata(String subject, Map metadata) { + this.subject = subject; + this.metadata = ImmutableMap.copyOf(metadata); + } + + public String subject() { + return subject; + } + + public Map metadata() { + return metadata; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SubjectAndMetadata that = (SubjectAndMetadata) o; + return Objects.equals(subject, that.subject) && Objects.equals(metadata, that.metadata); + } + + @Override + public int hashCode() { + return Objects.hash(subject, metadata); + } + + @Override + public String toString() { + return "SubjectAndMetadata{" + "subject='" + subject + '\'' + ", metadata=" + metadata + '}'; + } + } } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java index 16c75334cef..32ed87f2536 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java @@ -44,6 +44,9 @@ public class SchemaRegistryClientConfig { public static final String PROXY_HOST = "proxy.host"; public static final String PROXY_PORT = "proxy.port"; + public static final String LATEST_CACHE_TTL_CONFIG = "latest.cache.ttl.sec"; + public static final long LATEST_CACHE_TTL_DEFAULT = 60; + public static final String MISSING_CACHE_SIZE_CONFIG = "missing.cache.size"; public static final String MISSING_ID_CACHE_TTL_CONFIG = "missing.id.cache.ttl.sec"; public static final String MISSING_VERSION_CACHE_TTL_CONFIG = "missing.version.cache.ttl.sec"; @@ -124,6 +127,17 @@ public static Integer getHttpReadTimeoutMs(Map configs) { } } + public static long getLatestTTL(Map configs) { + if (configs != null && configs.containsKey(LATEST_CACHE_TTL_CONFIG)) { + Object latestVal = configs.get(LATEST_CACHE_TTL_CONFIG); + return latestVal instanceof String + ? Long.parseLong((String) latestVal) + : ((Number) latestVal).longValue(); + } else { + return LATEST_CACHE_TTL_DEFAULT; + } + } + public static long getMissingIdTTL(Map configs) { return configs != null && configs.containsKey(MISSING_ID_CACHE_TTL_CONFIG) ? (Long) configs.get(MISSING_ID_CACHE_TTL_CONFIG) diff --git a/client/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java b/client/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java index 18f8f55e978..9fbf9549caf 100644 --- a/client/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java +++ b/client/src/test/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClientTest.java @@ -15,6 +15,7 @@ */ package io.confluent.kafka.schemaregistry.client; +import com.google.common.collect.ImmutableMap; import com.google.common.testing.FakeTicker; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; @@ -549,6 +550,85 @@ public void testGetSchemasEmptyReturn() throws Exception { assertEquals(0, parsedSchemas.size()); } + @Test + public void testLatestVersionCache() throws Exception { + Map configs = new HashMap<>(); + configs.put(SchemaRegistryClientConfig.LATEST_CACHE_TTL_CONFIG, 60L); + + FakeTicker fakeTicker = new FakeTicker(); + client = new CachedSchemaRegistryClient( + restService, + CACHE_CAPACITY, + null, + configs, + null, + fakeTicker + ); + + expect(restService.getLatestVersion(eq(SUBJECT_0))) + .andReturn( + new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(SUBJECT_0, 1, + ID_25, AvroSchema.TYPE, Collections.emptyList(), SCHEMA_STR_0)) + .andReturn( + new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(SUBJECT_0, 1, + ID_25, AvroSchema.TYPE, Collections.emptyList(), SCHEMA_STR_0)); + + replay(restService); + + SchemaMetadata schemaMetadata = client.getLatestSchemaMetadata(SUBJECT_0); + assertEquals(ID_25, schemaMetadata.getId()); + + fakeTicker.advance(59, TimeUnit.SECONDS); + + // Should hit the cache + schemaMetadata = client.getLatestSchemaMetadata(SUBJECT_0); + assertEquals(ID_25, schemaMetadata.getId()); + + fakeTicker.advance(2, TimeUnit.SECONDS); + Thread.sleep(100); + assertNotNull(client.getLatestSchemaMetadata(SUBJECT_0)); + } + + @Test + public void testLatestWithMetadataCache() throws Exception { + Map configs = new HashMap<>(); + configs.put(SchemaRegistryClientConfig.LATEST_CACHE_TTL_CONFIG, 60L); + + FakeTicker fakeTicker = new FakeTicker(); + client = new CachedSchemaRegistryClient( + restService, + CACHE_CAPACITY, + null, + configs, + null, + fakeTicker + ); + + expect(restService.getLatestWithMetadata(eq(SUBJECT_0), anyObject(), eq(false))) + .andReturn( + new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(SUBJECT_0, 1, + ID_25, AvroSchema.TYPE, Collections.emptyList(), SCHEMA_STR_0)) + .andReturn( + new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(SUBJECT_0, 1, + ID_25, AvroSchema.TYPE, Collections.emptyList(), SCHEMA_STR_0)); + + replay(restService); + + Map metadata = ImmutableMap.of("key", "value"); + SchemaMetadata schemaMetadata = client.getLatestWithMetadata(SUBJECT_0, metadata, false); + assertEquals(ID_25, schemaMetadata.getId()); + + fakeTicker.advance(59, TimeUnit.SECONDS); + + // Should hit the cache + schemaMetadata = client.getLatestWithMetadata(SUBJECT_0, metadata, false); + assertEquals(ID_25, schemaMetadata.getId()); + + fakeTicker.advance(2, TimeUnit.SECONDS); + Thread.sleep(100); + assertNotNull(client.getLatestWithMetadata(SUBJECT_0, metadata, false)); + } + @Test public void testMissingIdCache() throws Exception { Map configs = new HashMap<>(); diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java index eddc77407eb..6e141a6142f 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDeConfig.java @@ -121,7 +121,7 @@ public class AbstractKafkaSchemaSerDeConfig extends AbstractConfig { public static final String LATEST_CACHE_SIZE_DOC = "The maximum size for caches holding latest schemas"; - public static final String LATEST_CACHE_TTL = "latest.cache.ttl.sec"; + public static final String LATEST_CACHE_TTL = SchemaRegistryClientConfig.LATEST_CACHE_TTL_CONFIG; public static final int LATEST_CACHE_TTL_DEFAULT = -1; public static final String LATEST_CACHE_TTL_DOC = "The TTL for caches holding latest schemas, or -1 for no TTL";