Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1073 from zalando/ARUHA-2372
Browse files Browse the repository at this point in the history
ARUHA-2372 Store event disk size stats from nakadi
  • Loading branch information
antban authored Jun 25, 2019
2 parents e55cde3 + 36ba315 commit 753677f
Show file tree
Hide file tree
Showing 14 changed files with 498 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.zalando.nakadi.repository.kafka;

import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
Expand All @@ -15,7 +13,6 @@
import org.zalando.nakadi.domain.CleanupPolicy;
import org.zalando.nakadi.domain.EventPublishingStatus;
import org.zalando.nakadi.repository.NakadiTopicConfig;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings;
import org.zalando.nakadi.util.UUIDGenerator;
import org.zalando.nakadi.utils.TestUtils;
Expand Down Expand Up @@ -229,14 +226,8 @@ private Map<String, List<PartitionInfo>> getAllTopics() {
}

private KafkaTopicRepository createKafkaTopicRepository() {
final CuratorZookeeperClient zookeeperClient = mock(CuratorZookeeperClient.class);
when(zookeeperClient.getCurrentConnectionString()).thenReturn(ZOOKEEPER_URL);

final CuratorFramework curatorFramework = mock(CuratorFramework.class);
when(curatorFramework.getZookeeperClient()).thenReturn(zookeeperClient);

final ZooKeeperHolder zooKeeperHolder = mock(ZooKeeperHolder.class);
when(zooKeeperHolder.get()).thenReturn(curatorFramework);
final KafkaZookeeper kafkaZookeeper = mock(KafkaZookeeper.class);
when(kafkaZookeeper.getZookeeperConnectionString()).thenReturn(ZOOKEEPER_URL);

final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
when(consumer.partitionsFor(any())).thenReturn(new ArrayList<>());
Expand All @@ -249,7 +240,7 @@ private KafkaTopicRepository createKafkaTopicRepository() {
.when(factory)
.takeProducer();

return new KafkaTopicRepository(zooKeeperHolder,
return new KafkaTopicRepository(kafkaZookeeper,
factory,
nakadiSettings,
kafkaSettings,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.nakadi.repository;

import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.config.NakadiSettings;
Expand All @@ -14,6 +15,7 @@
import org.zalando.nakadi.repository.kafka.KafkaSettings;
import org.zalando.nakadi.repository.kafka.KafkaTopicConfigFactory;
import org.zalando.nakadi.repository.kafka.KafkaTopicRepository;
import org.zalando.nakadi.repository.kafka.KafkaZookeeper;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings;

Expand All @@ -29,19 +31,22 @@ public class KafkaRepositoryCreator implements TopicRepositoryCreator {
private final ZookeeperSettings zookeeperSettings;
private final KafkaTopicConfigFactory kafkaTopicConfigFactory;
private final MetricRegistry metricRegistry;
private final ObjectMapper objectMapper;

@Autowired
public KafkaRepositoryCreator(
final NakadiSettings nakadiSettings,
final KafkaSettings kafkaSettings,
final ZookeeperSettings zookeeperSettings,
final KafkaTopicConfigFactory kafkaTopicConfigFactory,
final MetricRegistry metricRegistry) {
final MetricRegistry metricRegistry,
final ObjectMapper objectMapper) {
this.nakadiSettings = nakadiSettings;
this.kafkaSettings = kafkaSettings;
this.zookeeperSettings = zookeeperSettings;
this.kafkaTopicConfigFactory = kafkaTopicConfigFactory;
this.metricRegistry = metricRegistry;
this.objectMapper = objectMapper;
}

@Override
Expand All @@ -58,7 +63,8 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic
nakadiSettings);
final KafkaFactory kafkaFactory =
new KafkaFactory(new KafkaLocationManager(zooKeeperHolder, kafkaSettings), metricRegistry);
final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository(zooKeeperHolder,
final KafkaZookeeper zk = new KafkaZookeeper(zooKeeperHolder, objectMapper);
final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository(zk,
kafkaFactory, nakadiSettings, kafkaSettings, zookeeperSettings, kafkaTopicConfigFactory);
// check that it does work
kafkaTopicRepository.listTopics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.zalando.nakadi.domain.PartitionEndStatistics;
import org.zalando.nakadi.domain.PartitionStatistics;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.domain.TopicPartition;
import org.zalando.nakadi.exceptions.runtime.EventPublishingException;
import org.zalando.nakadi.exceptions.runtime.InvalidCursorException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
Expand All @@ -15,6 +16,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public interface TopicRepository {
Expand Down Expand Up @@ -67,6 +69,12 @@ List<PartitionEndStatistics> loadTopicEndStatistics(Collection<Timeline> topics)

List<String> listPartitionNames(String topicId);

/**
* Provides estimation of disk size occupied by particular topic partition. Replicated data is not included
* @return Maximum size occupied by topic partitions.
*/
Map<TopicPartition, Long> getSizeStats();

EventConsumer.LowLevelConsumer createEventConsumer(String clientId, List<NakadiCursor> positions)
throws InvalidCursorException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.zalando.nakadi.repository.kafka;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Map;

public class BubukuSizeStats {
public static class TotalStats {
private final long usedKb;
private final long freeKb;

public TotalStats(
@JsonProperty("used_kb") final long usedKb,
@JsonProperty("free_kb") final long freeKb) {
this.usedKb = usedKb;
this.freeKb = freeKb;
}

public long getUsedKb() {
return usedKb;
}

public long getFreeKb() {
return freeKb;
}
}

private final TotalStats totalStats;
private final Map<String, Map<String, Long>> perPartitionStats;

public BubukuSizeStats(
@JsonProperty("disk") final TotalStats totalStats,
@JsonProperty("topics") final Map<String, Map<String, Long>> perPartitionStats) {
this.totalStats = totalStats;
this.perPartitionStats = perPartitionStats;
}

public TotalStats getTotalStats() {
return totalStats;
}

public Map<String, Map<String, Long>> getPerPartitionStats() {
return perPartitionStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.repository.NakadiTopicConfig;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;
import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -76,21 +75,21 @@ public class KafkaTopicRepository implements TopicRepository {

private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicRepository.class);

private final ZooKeeperHolder zkFactory;
private final KafkaZookeeper kafkaZookeeper;
private final KafkaFactory kafkaFactory;
private final NakadiSettings nakadiSettings;
private final KafkaSettings kafkaSettings;
private final ZookeeperSettings zookeeperSettings;
private final ConcurrentMap<String, HystrixKafkaCircuitBreaker> circuitBreakers;
private final KafkaTopicConfigFactory kafkaTopicConfigFactory;

public KafkaTopicRepository(final ZooKeeperHolder zkFactory,
public KafkaTopicRepository(final KafkaZookeeper kafkaZookeeper,
final KafkaFactory kafkaFactory,
final NakadiSettings nakadiSettings,
final KafkaSettings kafkaSettings,
final ZookeeperSettings zookeeperSettings,
final KafkaTopicConfigFactory kafkaTopicConfigFactory) {
this.zkFactory = zkFactory;
this.kafkaZookeeper = kafkaZookeeper;
this.kafkaFactory = kafkaFactory;
this.nakadiSettings = nakadiSettings;
this.kafkaSettings = kafkaSettings;
Expand Down Expand Up @@ -161,9 +160,7 @@ private static boolean hasKafkaConnectionException(final Exception exception) {

public List<String> listTopics() throws TopicRepositoryException {
try {
return zkFactory.get()
.getChildren()
.forPath("/brokers/topics");
return kafkaZookeeper.listTopics();
} catch (final Exception e) {
throw new TopicRepositoryException("Failed to list topics", e);
}
Expand Down Expand Up @@ -479,6 +476,31 @@ public List<String> listPartitionNames(final String topicId) {
.withExceptionsThatForceRetry(org.apache.kafka.common.errors.TimeoutException.class));
}

@Override
public Map<org.zalando.nakadi.domain.TopicPartition, Long> getSizeStats() {
final Map<org.zalando.nakadi.domain.TopicPartition, Long> result = new HashMap<>();

try {
final List<String> brokers = kafkaZookeeper.getBrokerIdsForSizeStats();

for (final String brokerId : brokers) {
final BubukuSizeStats stats = kafkaZookeeper.getSizeStatsForBroker(brokerId);
stats.getPerPartitionStats().forEach((topic, partitionSizes) -> {
partitionSizes.forEach((partition, size) -> {
final org.zalando.nakadi.domain.TopicPartition tp =
new org.zalando.nakadi.domain.TopicPartition(topic, partition);

result.compute(tp, (ignore, oldSize) ->
Optional.ofNullable(oldSize).map(v -> Math.max(oldSize, size)).orElse(size));
});
});
}
return result;
} catch (Exception e) {
throw new RuntimeException("Failed to acquire size statistics", e);
}
}

public List<String> listPartitionNamesInternal(final String topicId) {
final Producer<String, String> producer = kafkaFactory.takeProducer();
try {
Expand Down Expand Up @@ -578,9 +600,11 @@ private void validateCursorForNulls(final NakadiCursor cursor) throws InvalidCur
private void doWithZkUtils(final ZkUtilsAction action) throws Exception {
ZkUtils zkUtils = null;
try {
final String connectionString = zkFactory.get().getZookeeperClient().getCurrentConnectionString();
zkUtils = ZkUtils.apply(connectionString, zookeeperSettings.getZkSessionTimeoutMs(),
zookeeperSettings.getZkConnectionTimeoutMs(), false);
zkUtils = ZkUtils.apply(
kafkaZookeeper.getZookeeperConnectionString(),
zookeeperSettings.getZkSessionTimeoutMs(),
zookeeperSettings.getZkConnectionTimeoutMs(),
false);
action.execute(zkUtils);
} finally {
if (zkUtils != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.zalando.nakadi.repository.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.zalando.nakadi.repository.zookeeper.ZooKeeperHolder;

import java.util.List;

public class KafkaZookeeper {
private final ZooKeeperHolder zooKeeperHolder;
private final ObjectMapper objectMapper;

public KafkaZookeeper(
final ZooKeeperHolder zooKeeperHolder,
final ObjectMapper objectMapper) {
this.zooKeeperHolder = zooKeeperHolder;
this.objectMapper = objectMapper;
}

public List<String> listTopics() throws Exception {
return zooKeeperHolder.get()
.getChildren()
.forPath("/brokers/topics");
}

public List<String> getBrokerIdsForSizeStats() throws Exception {
return zooKeeperHolder.get().getChildren()
.forPath("/bubuku/size_stats");
}

public BubukuSizeStats getSizeStatsForBroker(final String brokerId) throws Exception {
return objectMapper.readValue(
zooKeeperHolder.get().getData().forPath("/bubuku/size_stats/" + brokerId),
BubukuSizeStats.class);
}

public String getZookeeperConnectionString() {
return zooKeeperHolder.get().getZookeeperClient().getCurrentConnectionString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void sendEventBatch(final String etName) {
}
}

void enrichAndSubmit(final String etName, final JSONObject event) {
public void enrichAndSubmit(final String etName, final JSONObject event) {
final JSONObject metadata = new JSONObject()
.put("occurred_at", Instant.now())
.put("eid", uuidGenerator.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.zalando.nakadi.service.job;


import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "nakadi.jobs.diskUsageStats")
public class DiskUsageStatsConfig {
private String authDataType;
private String authValue;
private String owningApplication;
private String eventTypeName;
private long runPeriodMs;

public String getAuthDataType() {
return authDataType;
}

public void setAuthDataType(final String authDataType) {
this.authDataType = authDataType;
}

public String getAuthValue() {
return authValue;
}

public void setAuthValue(final String authValue) {
this.authValue = authValue;
}

public String getOwningApplication() {
return owningApplication;
}

public void setOwningApplication(final String owningApplication) {
this.owningApplication = owningApplication;
}

public String getEventTypeName() {
return eventTypeName;
}

public void setEventTypeName(final String eventTypeName) {
this.eventTypeName = eventTypeName;
}

public long getRunPeriodMs() {
return runPeriodMs;
}

public void setRunPeriodMs(final long runPeriodMs) {
this.runPeriodMs = runPeriodMs;
}
}
Loading

0 comments on commit 753677f

Please sign in to comment.