From c906c6ea148bf70d671341e78e0cf333b258d8d7 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Fri, 21 Jun 2019 12:00:15 +0200 Subject: [PATCH 1/5] ARUHA-2372 Create job to publish event type stats report --- .../repository/KafkaRepositoryCreator.java | 9 +- .../nakadi/repository/TopicRepository.java | 8 ++ .../repository/kafka/BubukuSizeStats.java | 45 +++++++ .../kafka/KafkaTopicRepository.java | 33 ++++- .../nakadi/service/EventsProcessor.java | 2 +- .../service/job/DiskUsageStatsConfig.java | 55 +++++++++ .../nakadi/service/job/DiskUsageStatsJob.java | 114 ++++++++++++++++++ src/main/resources/application.yml | 6 + src/main/resources/disk_usage_event_type.json | 52 ++++++++ 9 files changed, 320 insertions(+), 4 deletions(-) create mode 100644 src/main/java/org/zalando/nakadi/repository/kafka/BubukuSizeStats.java create mode 100644 src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsConfig.java create mode 100644 src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java create mode 100644 src/main/resources/disk_usage_event_type.json diff --git a/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java b/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java index 5929f92a32..7186ba9814 100644 --- a/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java +++ b/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java @@ -1,6 +1,7 @@ package org.zalando.nakadi.repository; import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.zalando.nakadi.config.NakadiSettings; @@ -29,6 +30,7 @@ public class KafkaRepositoryCreator implements TopicRepositoryCreator { private final ZookeeperSettings zookeeperSettings; private final KafkaTopicConfigFactory kafkaTopicConfigFactory; private final MetricRegistry metricRegistry; + private final ObjectMapper objectMapper; @Autowired public KafkaRepositoryCreator( @@ -36,12 +38,14 @@ public KafkaRepositoryCreator( final KafkaSettings kafkaSettings, final ZookeeperSettings zookeeperSettings, final KafkaTopicConfigFactory kafkaTopicConfigFactory, - final MetricRegistry metricRegistry) { + final MetricRegistry metricRegistry, + final ObjectMapper objectMapper) { this.nakadiSettings = nakadiSettings; this.kafkaSettings = kafkaSettings; this.zookeeperSettings = zookeeperSettings; this.kafkaTopicConfigFactory = kafkaTopicConfigFactory; this.metricRegistry = metricRegistry; + this.objectMapper = objectMapper; } @Override @@ -59,7 +63,8 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic final KafkaFactory kafkaFactory = new KafkaFactory(new KafkaLocationManager(zooKeeperHolder, kafkaSettings), metricRegistry); final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository(zooKeeperHolder, - kafkaFactory, nakadiSettings, kafkaSettings, zookeeperSettings, kafkaTopicConfigFactory); + kafkaFactory, nakadiSettings, kafkaSettings, zookeeperSettings, kafkaTopicConfigFactory, + objectMapper); // check that it does work kafkaTopicRepository.listTopics(); return kafkaTopicRepository; diff --git a/src/main/java/org/zalando/nakadi/repository/TopicRepository.java b/src/main/java/org/zalando/nakadi/repository/TopicRepository.java index 02033448ee..e83d3cbc80 100644 --- a/src/main/java/org/zalando/nakadi/repository/TopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/TopicRepository.java @@ -5,6 +5,7 @@ import org.zalando.nakadi.domain.PartitionEndStatistics; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.domain.TopicPartition; import org.zalando.nakadi.exceptions.runtime.EventPublishingException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; @@ -15,6 +16,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; public interface TopicRepository { @@ -67,6 +69,12 @@ List loadTopicEndStatistics(Collection topics) List listPartitionNames(String topicId); + /** + * Provides estimation of disk size occupied by particular topic partition. Replicated data is not included + * @return Maximum size occupied by topic partitions. + */ + Map getSizeStats(); + EventConsumer.LowLevelConsumer createEventConsumer(String clientId, List positions) throws InvalidCursorException; diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/BubukuSizeStats.java b/src/main/java/org/zalando/nakadi/repository/kafka/BubukuSizeStats.java new file mode 100644 index 0000000000..06b625dfbc --- /dev/null +++ b/src/main/java/org/zalando/nakadi/repository/kafka/BubukuSizeStats.java @@ -0,0 +1,45 @@ +package org.zalando.nakadi.repository.kafka; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +public class BubukuSizeStats { + public static class TotalStats { + private final long usedKb; + private final long freeKb; + + public TotalStats( + @JsonProperty("used_kb") final long usedKb, + @JsonProperty("free_kb") final long freeKb) { + this.usedKb = usedKb; + this.freeKb = freeKb; + } + + public long getUsedKb() { + return usedKb; + } + + public long getFreeKb() { + return freeKb; + } + } + + private final TotalStats totalStats; + private final Map> perPartitionStats; + + public BubukuSizeStats( + @JsonProperty("disk") final TotalStats totalStats, + @JsonProperty("topics") final Map> perPartitionStats) { + this.totalStats = totalStats; + this.perPartitionStats = perPartitionStats; + } + + public TotalStats getTotalStats() { + return totalStats; + } + + public Map> getPerPartitionStats() { + return perPartitionStats; + } +} diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 5275fb4464..df5d24a1cb 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.repository.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import kafka.admin.AdminUtils; import kafka.server.ConfigType; @@ -83,13 +84,15 @@ public class KafkaTopicRepository implements TopicRepository { private final ZookeeperSettings zookeeperSettings; private final ConcurrentMap circuitBreakers; private final KafkaTopicConfigFactory kafkaTopicConfigFactory; + private final ObjectMapper objectMapper; public KafkaTopicRepository(final ZooKeeperHolder zkFactory, final KafkaFactory kafkaFactory, final NakadiSettings nakadiSettings, final KafkaSettings kafkaSettings, final ZookeeperSettings zookeeperSettings, - final KafkaTopicConfigFactory kafkaTopicConfigFactory) { + final KafkaTopicConfigFactory kafkaTopicConfigFactory, + final ObjectMapper objectMapper) { this.zkFactory = zkFactory; this.kafkaFactory = kafkaFactory; this.nakadiSettings = nakadiSettings; @@ -97,6 +100,7 @@ public KafkaTopicRepository(final ZooKeeperHolder zkFactory, this.zookeeperSettings = zookeeperSettings; this.kafkaTopicConfigFactory = kafkaTopicConfigFactory; this.circuitBreakers = new ConcurrentHashMap<>(); + this.objectMapper = objectMapper; } private CompletableFuture publishItem( @@ -479,6 +483,33 @@ public List listPartitionNames(final String topicId) { .withExceptionsThatForceRetry(org.apache.kafka.common.errors.TimeoutException.class)); } + @Override + public Map getSizeStats() { + final Map result = new HashMap<>(); + + try { + final List brokers = zkFactory.get().getChildren().forPath("/bubuku/size_stats"); + + for (final String broker : brokers) { + final BubukuSizeStats stats = objectMapper.readValue( + zkFactory.get().getData().forPath("/bubuku/size_stats/" + broker), + BubukuSizeStats.class); + stats.getPerPartitionStats().forEach((topic, partitionSizes) -> { + partitionSizes.forEach((partition, size) -> { + final org.zalando.nakadi.domain.TopicPartition tp = + new org.zalando.nakadi.domain.TopicPartition(topic, partition); + + result.compute(tp, (ignore, oldSize) -> + Optional.ofNullable(oldSize).map(v -> Math.max(oldSize, size)).orElse(size)); + }); + }); + } + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to acquire size statistics", e); + } + } + public List listPartitionNamesInternal(final String topicId) { final Producer producer = kafkaFactory.takeProducer(); try { diff --git a/src/main/java/org/zalando/nakadi/service/EventsProcessor.java b/src/main/java/org/zalando/nakadi/service/EventsProcessor.java index 2a54c13fe8..3e1e111124 100644 --- a/src/main/java/org/zalando/nakadi/service/EventsProcessor.java +++ b/src/main/java/org/zalando/nakadi/service/EventsProcessor.java @@ -91,7 +91,7 @@ private void sendEventBatch(final String etName) { } } - void enrichAndSubmit(final String etName, final JSONObject event) { + public void enrichAndSubmit(final String etName, final JSONObject event) { final JSONObject metadata = new JSONObject() .put("occurred_at", Instant.now()) .put("eid", uuidGenerator.randomUUID()) diff --git a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsConfig.java b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsConfig.java new file mode 100644 index 0000000000..f90fc2d29f --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsConfig.java @@ -0,0 +1,55 @@ +package org.zalando.nakadi.service.job; + + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Component +@ConfigurationProperties(prefix = "nakadi.jobs.diskUsageStats") +public class DiskUsageStatsConfig { + private String authDataType; + private String authValue; + private String owningApplication; + private String eventTypeName; + private long runPeriodMs; + + public String getAuthDataType() { + return authDataType; + } + + public void setAuthDataType(String authDataType) { + this.authDataType = authDataType; + } + + public String getAuthValue() { + return authValue; + } + + public void setAuthValue(String authValue) { + this.authValue = authValue; + } + + public String getOwningApplication() { + return owningApplication; + } + + public void setOwningApplication(String owningApplication) { + this.owningApplication = owningApplication; + } + + public String getEventTypeName() { + return eventTypeName; + } + + public void setEventTypeName(String eventTypeName) { + this.eventTypeName = eventTypeName; + } + + public long getRunPeriodMs() { + return runPeriodMs; + } + + public void setRunPeriodMs(long runPeriodMs) { + this.runPeriodMs = runPeriodMs; + } +} diff --git a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java new file mode 100644 index 0000000000..28875c3f2b --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java @@ -0,0 +1,114 @@ +package org.zalando.nakadi.service.job; + +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.domain.TopicPartition; +import org.zalando.nakadi.repository.TopicRepositoryHolder; +import org.zalando.nakadi.repository.db.TimelineDbRepository; +import org.zalando.nakadi.service.EventsProcessor; +import org.zalando.nakadi.service.SystemEventTypeInitializer; + +import javax.annotation.PostConstruct; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Service +public class DiskUsageStatsJob { + private static final String JOB_NAME = "disk-usage"; + + private final ExclusiveJobWrapper wrapper; + private final TimelineDbRepository timelineDbRepository; + private final TopicRepositoryHolder topicRepositoryHolder; + private final EventsProcessor eventsProcessor; + private final SystemEventTypeInitializer systemEventTypeInitializer; + private final DiskUsageStatsConfig config; + + private static final Logger LOG = LoggerFactory.getLogger(DiskUsageStatsJob.class); + + public DiskUsageStatsJob( + final JobWrapperFactory jobWrapperFactory, + final TimelineDbRepository timelineDbRepository, + final TopicRepositoryHolder topicRepositoryHolder, + final EventsProcessor eventsProcessor, + final SystemEventTypeInitializer systemEventTypeInitializer, + final DiskUsageStatsConfig config) { + this.timelineDbRepository = timelineDbRepository; + this.topicRepositoryHolder = topicRepositoryHolder; + this.eventsProcessor = eventsProcessor; + this.systemEventTypeInitializer = systemEventTypeInitializer; + this.config = config; + this.wrapper = jobWrapperFactory.createExclusiveJobWrapper(JOB_NAME, config.getRunPeriodMs()); + } + + @PostConstruct + public void prepareEventType() throws IOException { + final Map nameReplacements = new HashMap<>(); + nameReplacements.put("event_type_name_placeholder", config.getEventTypeName()); + nameReplacements.put("owning_application_placeholder", config.getOwningApplication()); + nameReplacements.put("auth_data_type_placeholder", config.getAuthDataType()); + nameReplacements.put("auth_value_placeholder", config.getAuthValue()); + this.systemEventTypeInitializer.createEventTypesFromResource( + "disk_usage_event_type.json", + nameReplacements); + } + + @Scheduled( + fixedDelayString = "${nakadi.jobs.checkRunMs}", + initialDelayString = "${random.int(${nakadi.jobs.checkRunMs})}") + public void dumpDiskStats() { + try { + wrapper.runJobLocked(this::dumpDiskStatsLocked); + } catch (RuntimeException ex) { + LOG.warn("Disk stats dump failed", ex); + } + } + + private void dumpDiskStatsLocked() { + final Map> storageTopicToEventType = new HashMap<>(); + final Map storages = new HashMap<>(); + final List allTimelines = timelineDbRepository.listTimelinesOrdered(); + for (Timeline t : allTimelines) { + if (t.isDeleted()) { + continue; + } + storages.put(t.getStorage().getId(), t.getStorage()); + storageTopicToEventType + .computeIfAbsent(t.getStorage().getId(), v -> new HashMap<>()) + .put(t.getTopic(), t.getEventType()); + } + + final Map eventTypeSize = new HashMap<>(); + + for (Map.Entry> storageEntry : storageTopicToEventType.entrySet()) { + final Map data = topicRepositoryHolder + .getTopicRepository(storages.get(storageEntry.getKey())) + .getSizeStats(); + data.entrySet().stream() + .filter(v -> storageEntry.getValue().containsKey(v.getKey().getTopic())) + .forEach(item -> { + final String eventType = storageEntry.getValue().get(item.getKey().getTopic()); + eventTypeSize.compute(eventType, (et, oldSize) -> null == oldSize ? item.getValue() : (oldSize + item.getValue())); + }); + } + // Stats are here. + publishSizeStats(eventTypeSize); + } + + private void publishSizeStats(Map eventTypeSizes) { + eventTypeSizes.entrySet().stream() + .map(x -> { + final JSONObject obj = new JSONObject(); + obj.put("event_type", x.getKey()); + obj.put("size_bytes", x.getValue() * 1024); + return obj; + }) + .forEach(item -> eventsProcessor.enrichAndSubmit(config.getEventTypeName(), item)); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d0223fe91a..354c5792af 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -102,6 +102,12 @@ nakadi: timelineCleanup: runPeriodMs: 3600000 # 1 hour deletionDelayMs: 2000 # 2 seconds, to be on the safe side + diskUsageStats: + runPeriodMs: 3600000 # 1 hour + authDataType: "*" + authValue: "*" + owningApplication: "stups_nakadi" + eventTypeName: "nakadi.disk.usage" consumerNodesCleanup.runPeriodMs: 21600000 # 6 hours http.pool.connection: max.total: 20 diff --git a/src/main/resources/disk_usage_event_type.json b/src/main/resources/disk_usage_event_type.json new file mode 100644 index 0000000000..c0d22345df --- /dev/null +++ b/src/main/resources/disk_usage_event_type.json @@ -0,0 +1,52 @@ +[ + { + "name": "event_type_name_placeholder", + "owning_application": "owning_application_placeholder", + "category": "business", + "enrichment_strategies": [ + "metadata_enrichment" + ], + "partition_strategy": "random", + "partition_key_fields": [], + "cleanup_policy": "delete", + "ordering_key_fields": [], + "ordering_instance_ids": [], + "schema": { + "type": "json_schema", + "schema": "{\n \"description\": \"Nakadi statistics of disk usage per event-type\",\n \"type\": \"object\",\n \"properties\": {\n \"event_type\": {\n \"type\": \"string\"\n },\n \"size_bytes\": {\n \"type\": \"number\"\n }\n },\n \"required\": [\n \"event_type\",\n \"size_bytes\"\n ]\n}", + "version": "1.0.0", + "created_at": "2018-11-28T16:48:13.043Z" + }, + "default_statistic": { + "messages_per_minute": 100, + "message_size": 100, + "read_parallelism": 1, + "write_parallelism": 1 + }, + "options": { + "retention_time": 345600000 + }, + "compatibility_mode": "forward", + "audience": "company-internal", + "authorization": { + "admins": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "readers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "writers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ] + } + } +] \ No newline at end of file From 4598dcf8b3de5b6259b3227da1a77d0ffb49bd03 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Fri, 21 Jun 2019 13:26:48 +0200 Subject: [PATCH 2/5] ARUHA-2372 Create job to publish event type stats report --- .../nakadi/repository/kafka/KafkaRepositoryAT.java | 4 +++- .../nakadi/service/job/DiskUsageStatsConfig.java | 10 +++++----- .../zalando/nakadi/service/job/DiskUsageStatsJob.java | 10 ++++++---- .../repository/kafka/KafkaTopicRepositoryTest.java | 6 ++++-- .../nakadi/service/CursorOperationsServiceTest.java | 4 +++- 5 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index 556b525919..52d74db9cc 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.repository.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.CuratorZookeeperClient; import org.apache.curator.framework.CuratorFramework; import org.apache.kafka.clients.consumer.Consumer; @@ -254,7 +255,8 @@ private KafkaTopicRepository createKafkaTopicRepository() { nakadiSettings, kafkaSettings, zookeeperSettings, - kafkaTopicConfigFactory); + kafkaTopicConfigFactory, + new ObjectMapper()); } } diff --git a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsConfig.java b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsConfig.java index f90fc2d29f..61e8ea1c25 100644 --- a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsConfig.java +++ b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsConfig.java @@ -17,7 +17,7 @@ public String getAuthDataType() { return authDataType; } - public void setAuthDataType(String authDataType) { + public void setAuthDataType(final String authDataType) { this.authDataType = authDataType; } @@ -25,7 +25,7 @@ public String getAuthValue() { return authValue; } - public void setAuthValue(String authValue) { + public void setAuthValue(final String authValue) { this.authValue = authValue; } @@ -33,7 +33,7 @@ public String getOwningApplication() { return owningApplication; } - public void setOwningApplication(String owningApplication) { + public void setOwningApplication(final String owningApplication) { this.owningApplication = owningApplication; } @@ -41,7 +41,7 @@ public String getEventTypeName() { return eventTypeName; } - public void setEventTypeName(String eventTypeName) { + public void setEventTypeName(final String eventTypeName) { this.eventTypeName = eventTypeName; } @@ -49,7 +49,7 @@ public long getRunPeriodMs() { return runPeriodMs; } - public void setRunPeriodMs(long runPeriodMs) { + public void setRunPeriodMs(final long runPeriodMs) { this.runPeriodMs = runPeriodMs; } } diff --git a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java index 28875c3f2b..4fdcd05538 100644 --- a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java +++ b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java @@ -74,7 +74,7 @@ private void dumpDiskStatsLocked() { final Map> storageTopicToEventType = new HashMap<>(); final Map storages = new HashMap<>(); final List allTimelines = timelineDbRepository.listTimelinesOrdered(); - for (Timeline t : allTimelines) { + for (final Timeline t : allTimelines) { if (t.isDeleted()) { continue; } @@ -86,7 +86,7 @@ private void dumpDiskStatsLocked() { final Map eventTypeSize = new HashMap<>(); - for (Map.Entry> storageEntry : storageTopicToEventType.entrySet()) { + for (final Map.Entry> storageEntry : storageTopicToEventType.entrySet()) { final Map data = topicRepositoryHolder .getTopicRepository(storages.get(storageEntry.getKey())) .getSizeStats(); @@ -94,14 +94,16 @@ private void dumpDiskStatsLocked() { .filter(v -> storageEntry.getValue().containsKey(v.getKey().getTopic())) .forEach(item -> { final String eventType = storageEntry.getValue().get(item.getKey().getTopic()); - eventTypeSize.compute(eventType, (et, oldSize) -> null == oldSize ? item.getValue() : (oldSize + item.getValue())); + eventTypeSize.compute( + eventType, + (et, oldSize) -> null == oldSize ? item.getValue() : (oldSize + item.getValue())); }); } // Stats are here. publishSizeStats(eventTypeSize); } - private void publishSizeStats(Map eventTypeSizes) { + private void publishSizeStats(final Map eventTypeSizes) { eventTypeSizes.entrySet().stream() .map(x -> { final JSONObject obj = new JSONObject(); diff --git a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index 6ec57d914a..680cb6ac96 100644 --- a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.repository.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.GetChildrenBuilder; @@ -23,8 +24,8 @@ import org.zalando.nakadi.domain.PartitionEndStatistics; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.Timeline; -import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; import org.zalando.nakadi.exceptions.runtime.EventPublishingException; +import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; import org.zalando.nakadi.view.Cursor; @@ -359,7 +360,8 @@ private KafkaTopicRepository createKafkaRepository(final KafkaFactory kafkaFacto nakadiSettings, kafkaSettings, zookeeperSettings, - kafkaTopicConfigFactory); + kafkaTopicConfigFactory, + new ObjectMapper()); } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java b/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java index 66ca27ee74..b439d29f98 100644 --- a/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.service; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import org.junit.Test; import org.zalando.nakadi.config.NakadiSettings; @@ -325,7 +326,8 @@ private Timeline mockTimeline(final int order, @Nullable final Long latestOffset mock(NakadiSettings.class), mock(KafkaSettings.class), mock(ZookeeperSettings.class), - mock(KafkaTopicConfigFactory.class)); + mock(KafkaTopicConfigFactory.class), + mock(ObjectMapper.class)); when(timelineService.getTopicRepository(timeline)).thenReturn(repository); return timeline; } From 4212a36a9b2b04b419da7525a26f5369923a56ee Mon Sep 17 00:00:00 2001 From: dsorokin Date: Fri, 21 Jun 2019 15:22:31 +0200 Subject: [PATCH 3/5] ARUHA-2372 Create unit test for data size job --- .../nakadi/service/job/DiskUsageStatsJob.java | 11 ++- src/main/resources/disk_usage_event_type.json | 2 +- .../service/job/DiskUsageStatsJobTest.java | 82 +++++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java diff --git a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java index 4fdcd05538..a7a9cc80ab 100644 --- a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java +++ b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.service.job; +import com.google.common.annotations.VisibleForTesting; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +72,13 @@ public void dumpDiskStats() { } private void dumpDiskStatsLocked() { + final Map eventTypeSize = loadDiskUsage(); + // Stats are here. + publishSizeStats(eventTypeSize); + } + + @VisibleForTesting + Map loadDiskUsage() { final Map> storageTopicToEventType = new HashMap<>(); final Map storages = new HashMap<>(); final List allTimelines = timelineDbRepository.listTimelinesOrdered(); @@ -99,8 +107,7 @@ private void dumpDiskStatsLocked() { (et, oldSize) -> null == oldSize ? item.getValue() : (oldSize + item.getValue())); }); } - // Stats are here. - publishSizeStats(eventTypeSize); + return eventTypeSize; } private void publishSizeStats(final Map eventTypeSizes) { diff --git a/src/main/resources/disk_usage_event_type.json b/src/main/resources/disk_usage_event_type.json index c0d22345df..6d05ba0b02 100644 --- a/src/main/resources/disk_usage_event_type.json +++ b/src/main/resources/disk_usage_event_type.json @@ -49,4 +49,4 @@ ] } } -] \ No newline at end of file +] diff --git a/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java b/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java new file mode 100644 index 0000000000..df05c7c113 --- /dev/null +++ b/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java @@ -0,0 +1,82 @@ +package org.zalando.nakadi.service.job; + +import org.junit.Test; +import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.domain.TopicPartition; +import org.zalando.nakadi.repository.TopicRepository; +import org.zalando.nakadi.repository.TopicRepositoryHolder; +import org.zalando.nakadi.repository.db.TimelineDbRepository; + +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DiskUsageStatsJobTest { + + @Test + public void testLoadDiskUsage() { + + final TimelineDbRepository timelineDbRepository = mock(TimelineDbRepository.class); + final TopicRepositoryHolder topicRepositoryHolder = mock(TopicRepositoryHolder.class); + final DiskUsageStatsJob job = new DiskUsageStatsJob( + mock(JobWrapperFactory.class), + timelineDbRepository, + topicRepositoryHolder, + null, + null, + new DiskUsageStatsConfig()); + + // Et | Top | Partitions | presence in several storages + // et1 | t1 | 1 | false + // et2 | t2 | 2 | false + // et3 | t31, t32 | 1 | true + // et4 | t41, t42 | 2 | true + + final Storage storage1 = new Storage("id1", Storage.Type.KAFKA); + final Storage storage2 = new Storage("id2", Storage.Type.KAFKA); + + final Timeline t1 = new Timeline("et1", 0, storage1, "t1", new Date()); + final Timeline t2 = new Timeline("et2", 0, storage1, "t2", new Date()); + final Timeline t31 = new Timeline("et3", 0, storage1, "t31", new Date()); + final Timeline t32 = new Timeline("et3", 0, storage2, "t32", new Date()); + final Timeline t41 = new Timeline("et4", 0, storage1, "t41", new Date()); + final Timeline t42 = new Timeline("et4", 0, storage2, "t42", new Date()); + + when(timelineDbRepository.listTimelinesOrdered()).thenReturn(Arrays.asList(t1, t2, t31, t32, t41, t42)); + + final TopicRepository storage1TopicRepo = mock(TopicRepository.class); + when(topicRepositoryHolder.getTopicRepository(eq(storage1))).thenReturn(storage1TopicRepo); + final Map storage1SizeStats = new HashMap<>(); + storage1SizeStats.put(new TopicPartition("t1", "0"), 5L); + storage1SizeStats.put(new TopicPartition("t2", "0"), 7L); + storage1SizeStats.put(new TopicPartition("t2", "1"), 11L); + storage1SizeStats.put(new TopicPartition("t31", "0"), 13L); + storage1SizeStats.put(new TopicPartition("t41", "0"), 17L); + storage1SizeStats.put(new TopicPartition("t41", "1"), 19L); + when(storage1TopicRepo.getSizeStats()).thenReturn(storage1SizeStats); + + final TopicRepository storage2TopicRepo = mock(TopicRepository.class); + when(topicRepositoryHolder.getTopicRepository(eq(storage2))).thenReturn(storage2TopicRepo); + final Map storage2SizeStats = new HashMap<>(); + storage2SizeStats.put(new TopicPartition("t32", "0"), 500L); + storage2SizeStats.put(new TopicPartition("t42", "0"), 700L); + storage2SizeStats.put(new TopicPartition("t42", "1"), 1100L); + when(storage2TopicRepo.getSizeStats()).thenReturn(storage2SizeStats); + + final Map expectedResult = new HashMap<>(); + expectedResult.put("et1", 5L); + expectedResult.put("et2", 18L); + expectedResult.put("et3", 513L); + expectedResult.put("et4", 1836L); + + final Map actualResult = job.loadDiskUsage(); + assertEquals(expectedResult, actualResult); + } +} \ No newline at end of file From 99de3d740f2632fbe570d9879591e761e260c8d9 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Mon, 24 Jun 2019 11:31:22 +0200 Subject: [PATCH 4/5] ARUHA-2372 Add tests as requested on review --- .../repository/kafka/KafkaRepositoryAT.java | 19 ++---- .../repository/KafkaRepositoryCreator.java | 7 ++- .../kafka/KafkaTopicRepository.java | 33 ++++------ .../repository/kafka/KafkaZookeeper.java | 39 ++++++++++++ .../nakadi/service/job/DiskUsageStatsJob.java | 1 - .../kafka/KafkaTopicRepositoryTest.java | 62 ++++++++++++------- .../service/CursorOperationsServiceTest.java | 8 +-- 7 files changed, 104 insertions(+), 65 deletions(-) create mode 100644 src/main/java/org/zalando/nakadi/repository/kafka/KafkaZookeeper.java diff --git a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index 52d74db9cc..7a5e4e1ef2 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -1,8 +1,5 @@ package org.zalando.nakadi.repository.kafka; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.curator.CuratorZookeeperClient; -import org.apache.curator.framework.CuratorFramework; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -16,7 +13,6 @@ import org.zalando.nakadi.domain.CleanupPolicy; import org.zalando.nakadi.domain.EventPublishingStatus; import org.zalando.nakadi.repository.NakadiTopicConfig; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; import org.zalando.nakadi.util.UUIDGenerator; import org.zalando.nakadi.utils.TestUtils; @@ -230,14 +226,8 @@ private Map> getAllTopics() { } private KafkaTopicRepository createKafkaTopicRepository() { - final CuratorZookeeperClient zookeeperClient = mock(CuratorZookeeperClient.class); - when(zookeeperClient.getCurrentConnectionString()).thenReturn(ZOOKEEPER_URL); - - final CuratorFramework curatorFramework = mock(CuratorFramework.class); - when(curatorFramework.getZookeeperClient()).thenReturn(zookeeperClient); - - final ZooKeeperHolder zooKeeperHolder = mock(ZooKeeperHolder.class); - when(zooKeeperHolder.get()).thenReturn(curatorFramework); + final KafkaZookeeper kafkaZookeeper = mock(KafkaZookeeper.class); + when(kafkaZookeeper.getZookeeperConnectionString()).thenReturn(ZOOKEEPER_URL); final Consumer consumer = mock(Consumer.class); when(consumer.partitionsFor(any())).thenReturn(new ArrayList<>()); @@ -250,13 +240,12 @@ private KafkaTopicRepository createKafkaTopicRepository() { .when(factory) .takeProducer(); - return new KafkaTopicRepository(zooKeeperHolder, + return new KafkaTopicRepository(kafkaZookeeper, factory, nakadiSettings, kafkaSettings, zookeeperSettings, - kafkaTopicConfigFactory, - new ObjectMapper()); + kafkaTopicConfigFactory); } } diff --git a/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java b/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java index 7186ba9814..bec0616082 100644 --- a/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java +++ b/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java @@ -15,6 +15,7 @@ import org.zalando.nakadi.repository.kafka.KafkaSettings; import org.zalando.nakadi.repository.kafka.KafkaTopicConfigFactory; import org.zalando.nakadi.repository.kafka.KafkaTopicRepository; +import org.zalando.nakadi.repository.kafka.KafkaZookeeper; import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; @@ -62,9 +63,9 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic nakadiSettings); final KafkaFactory kafkaFactory = new KafkaFactory(new KafkaLocationManager(zooKeeperHolder, kafkaSettings), metricRegistry); - final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository(zooKeeperHolder, - kafkaFactory, nakadiSettings, kafkaSettings, zookeeperSettings, kafkaTopicConfigFactory, - objectMapper); + final KafkaZookeeper zk = new KafkaZookeeper(zooKeeperHolder, objectMapper); + final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository(zk, + kafkaFactory, nakadiSettings, kafkaSettings, zookeeperSettings, kafkaTopicConfigFactory); // check that it does work kafkaTopicRepository.listTopics(); return kafkaTopicRepository; diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index df5d24a1cb..197bc3162b 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.repository.kafka; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import kafka.admin.AdminUtils; import kafka.server.ConfigType; @@ -39,7 +38,6 @@ import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.repository.NakadiTopicConfig; import org.zalando.nakadi.repository.TopicRepository; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; import javax.annotation.Nullable; @@ -77,30 +75,27 @@ public class KafkaTopicRepository implements TopicRepository { private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicRepository.class); - private final ZooKeeperHolder zkFactory; + private final KafkaZookeeper kafkaZookeeper; private final KafkaFactory kafkaFactory; private final NakadiSettings nakadiSettings; private final KafkaSettings kafkaSettings; private final ZookeeperSettings zookeeperSettings; private final ConcurrentMap circuitBreakers; private final KafkaTopicConfigFactory kafkaTopicConfigFactory; - private final ObjectMapper objectMapper; - public KafkaTopicRepository(final ZooKeeperHolder zkFactory, + public KafkaTopicRepository(final KafkaZookeeper kafkaZookeeper, final KafkaFactory kafkaFactory, final NakadiSettings nakadiSettings, final KafkaSettings kafkaSettings, final ZookeeperSettings zookeeperSettings, - final KafkaTopicConfigFactory kafkaTopicConfigFactory, - final ObjectMapper objectMapper) { - this.zkFactory = zkFactory; + final KafkaTopicConfigFactory kafkaTopicConfigFactory) { + this.kafkaZookeeper = kafkaZookeeper; this.kafkaFactory = kafkaFactory; this.nakadiSettings = nakadiSettings; this.kafkaSettings = kafkaSettings; this.zookeeperSettings = zookeeperSettings; this.kafkaTopicConfigFactory = kafkaTopicConfigFactory; this.circuitBreakers = new ConcurrentHashMap<>(); - this.objectMapper = objectMapper; } private CompletableFuture publishItem( @@ -165,9 +160,7 @@ private static boolean hasKafkaConnectionException(final Exception exception) { public List listTopics() throws TopicRepositoryException { try { - return zkFactory.get() - .getChildren() - .forPath("/brokers/topics"); + return kafkaZookeeper.listTopics(); } catch (final Exception e) { throw new TopicRepositoryException("Failed to list topics", e); } @@ -488,12 +481,10 @@ public Map getSizeStats() { final Map result = new HashMap<>(); try { - final List brokers = zkFactory.get().getChildren().forPath("/bubuku/size_stats"); + final List brokers = kafkaZookeeper.getBrokerIdsForSizeStats(); - for (final String broker : brokers) { - final BubukuSizeStats stats = objectMapper.readValue( - zkFactory.get().getData().forPath("/bubuku/size_stats/" + broker), - BubukuSizeStats.class); + for (final String brokerId : brokers) { + final BubukuSizeStats stats = kafkaZookeeper.getSizeStatsForBroker(brokerId); stats.getPerPartitionStats().forEach((topic, partitionSizes) -> { partitionSizes.forEach((partition, size) -> { final org.zalando.nakadi.domain.TopicPartition tp = @@ -609,9 +600,11 @@ private void validateCursorForNulls(final NakadiCursor cursor) throws InvalidCur private void doWithZkUtils(final ZkUtilsAction action) throws Exception { ZkUtils zkUtils = null; try { - final String connectionString = zkFactory.get().getZookeeperClient().getCurrentConnectionString(); - zkUtils = ZkUtils.apply(connectionString, zookeeperSettings.getZkSessionTimeoutMs(), - zookeeperSettings.getZkConnectionTimeoutMs(), false); + zkUtils = ZkUtils.apply( + kafkaZookeeper.getZookeeperConnectionString(), + zookeeperSettings.getZkSessionTimeoutMs(), + zookeeperSettings.getZkConnectionTimeoutMs(), + false); action.execute(zkUtils); } finally { if (zkUtils != null) { diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaZookeeper.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaZookeeper.java new file mode 100644 index 0000000000..0f134e9e42 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaZookeeper.java @@ -0,0 +1,39 @@ +package org.zalando.nakadi.repository.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; + +import java.util.List; + +public class KafkaZookeeper { + private final ZooKeeperHolder zooKeeperHolder; + private final ObjectMapper objectMapper; + + public KafkaZookeeper( + final ZooKeeperHolder zooKeeperHolder, + final ObjectMapper objectMapper) { + this.zooKeeperHolder = zooKeeperHolder; + this.objectMapper = objectMapper; + } + + public List listTopics() throws Exception { + return zooKeeperHolder.get() + .getChildren() + .forPath("/brokers/topics"); + } + + public List getBrokerIdsForSizeStats() throws Exception { + return zooKeeperHolder.get().getChildren() + .forPath("/bubuku/size_stats"); + } + + public BubukuSizeStats getSizeStatsForBroker(final String brokerId) throws Exception { + return objectMapper.readValue( + zooKeeperHolder.get().getData().forPath("/bubuku/size_stats/" + brokerId), + BubukuSizeStats.class); + } + + public String getZookeeperConnectionString() { + return zooKeeperHolder.get().getZookeeperClient().getCurrentConnectionString(); + } +} diff --git a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java index a7a9cc80ab..3fa7ccacea 100644 --- a/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java +++ b/src/main/java/org/zalando/nakadi/service/job/DiskUsageStatsJob.java @@ -73,7 +73,6 @@ public void dumpDiskStats() { private void dumpDiskStatsLocked() { final Map eventTypeSize = loadDiskUsage(); - // Stats are here. publishSizeStats(eventTypeSize); } diff --git a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index 680cb6ac96..b6e40067b5 100644 --- a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -1,9 +1,6 @@ package org.zalando.nakadi.repository.kafka; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.clients.producer.Callback; @@ -24,17 +21,20 @@ import org.zalando.nakadi.domain.PartitionEndStatistics; import org.zalando.nakadi.domain.PartitionStatistics; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.domain.TopicPartition; import org.zalando.nakadi.exceptions.runtime.EventPublishingException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; import org.zalando.nakadi.view.Cursor; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -50,6 +50,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyVararg; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -349,38 +350,57 @@ public void whenKafkaPublishTimeoutThenCircuitIsOpened() { .count() >= 1); } + @Test + public void testGetSizeStatsWorksProperly() throws Exception { + final KafkaZookeeper kz = mock(KafkaZookeeper.class); + when(kz.getBrokerIdsForSizeStats()).thenReturn(Arrays.asList("1", "2")); + final Map> stats1 = new HashMap<>(); + stats1.put("t1", new HashMap<>()); + stats1.get("t1").put("0", 1234L); + stats1.get("t1").put("1", 321L); + stats1.put("t2", new HashMap<>()); + stats1.get("t2").put("0", 111L); + when(kz.getSizeStatsForBroker(eq("1"))).thenReturn(new BubukuSizeStats(null, stats1)); + final Map> stats2 = new HashMap<>(); + stats2.put("t1", new HashMap<>()); + stats2.get("t1").put("0", 4321L); + stats2.get("t1").put("1", 123L); + stats2.put("t3", new HashMap<>()); + stats2.get("t3").put("0", 222L); + when(kz.getSizeStatsForBroker(eq("2"))).thenReturn(new BubukuSizeStats(null, stats2)); + + final KafkaTopicRepository ktr = new KafkaTopicRepository(kz, null, null, null, null, null); + + final Map result = ktr.getSizeStats(); + + Assert.assertEquals(4, result.size()); + Assert.assertEquals(new Long(4321L), result.get(new TopicPartition("t1", "0"))); + Assert.assertEquals(new Long(321L), result.get(new TopicPartition("t1", "1"))); + Assert.assertEquals(new Long(111L), result.get(new TopicPartition("t2", "0"))); + Assert.assertEquals(new Long(222L), result.get(new TopicPartition("t3", "0"))); + } + private static Cursor cursor(final String partition, final String offset) { return new Cursor(partition, offset); } private KafkaTopicRepository createKafkaRepository(final KafkaFactory kafkaFactory) { try { - return new KafkaTopicRepository(createZooKeeperHolder(), + return new KafkaTopicRepository(createKafkaZookeeper(), kafkaFactory, nakadiSettings, kafkaSettings, zookeeperSettings, - kafkaTopicConfigFactory, - new ObjectMapper()); + kafkaTopicConfigFactory); } catch (final Exception e) { throw new RuntimeException(e); } } - private ZooKeeperHolder createZooKeeperHolder() throws Exception { - // GetChildrenBuilder - final GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath("/brokers/topics")).thenReturn(allTopics()); - - // Curator Framework - final CuratorFramework curatorFramework = mock(CuratorFramework.class); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - - // ZooKeeperHolder - final ZooKeeperHolder zkHolder = mock(ZooKeeperHolder.class); - when(zkHolder.get()).thenReturn(curatorFramework); - - return zkHolder; + private KafkaZookeeper createKafkaZookeeper() throws Exception { + final KafkaZookeeper result = mock(KafkaZookeeper.class); + when(result.listTopics()).thenReturn(allTopics()); + return result; } private static List allTopics() { diff --git a/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java b/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java index b439d29f98..1743ef1638 100644 --- a/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.service; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import org.junit.Test; import org.zalando.nakadi.config.NakadiSettings; @@ -14,7 +13,7 @@ import org.zalando.nakadi.repository.kafka.KafkaSettings; import org.zalando.nakadi.repository.kafka.KafkaTopicConfigFactory; import org.zalando.nakadi.repository.kafka.KafkaTopicRepository; -import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder; +import org.zalando.nakadi.repository.kafka.KafkaZookeeper; import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; import org.zalando.nakadi.service.timeline.TimelineService; @@ -321,13 +320,12 @@ private Timeline mockTimeline(final int order, @Nullable final Long latestOffset when(timeline.isActive()).thenReturn(null == latestOffset); final TopicRepository repository = new KafkaTopicRepository( - mock(ZooKeeperHolder.class), + mock(KafkaZookeeper.class), mock(KafkaFactory.class), mock(NakadiSettings.class), mock(KafkaSettings.class), mock(ZookeeperSettings.class), - mock(KafkaTopicConfigFactory.class), - mock(ObjectMapper.class)); + mock(KafkaTopicConfigFactory.class)); when(timelineService.getTopicRepository(timeline)).thenReturn(repository); return timeline; } From 36ba315afe99da0629ba3b040fb4fe28f7ccc2eb Mon Sep 17 00:00:00 2001 From: dsorokin Date: Mon, 24 Jun 2019 14:04:40 +0200 Subject: [PATCH 5/5] ARUHA-2372 Remove absence of newline )) --- .../org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java b/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java index df05c7c113..67fd86e9ed 100644 --- a/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java +++ b/src/test/java/org/zalando/nakadi/service/job/DiskUsageStatsJobTest.java @@ -79,4 +79,4 @@ public void testLoadDiskUsage() { final Map actualResult = job.loadDiskUsage(); assertEquals(expectedResult, actualResult); } -} \ No newline at end of file +}