Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #987 from zalando/ARUHA-2106
Browse files Browse the repository at this point in the history
ARUHA-2106: Added populating of kafka event key
  • Loading branch information
v-stepanov authored Jan 2, 2019
2 parents b36b05b + 0bd6cb1 commit aabbe99
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@
import org.springframework.stereotype.Component;
import org.zalando.nakadi.domain.EventCategory;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.exceptions.Try;
import org.zalando.nakadi.exceptions.runtime.InvalidPartitionKeyFieldsException;
import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.Try;
import org.zalando.nakadi.util.JsonPathAccess;
import org.zalando.nakadi.validation.JsonSchemaEnrichment;

import java.util.List;
import java.util.stream.Collectors;

import static java.lang.Math.abs;
import static org.zalando.nakadi.validation.JsonSchemaEnrichment.DATA_PATH_PREFIX;

@Component
public class HashPartitionStrategy implements PartitionStrategy {

private static final String DATA_PATH_PREFIX = JsonSchemaEnrichment.DATA_CHANGE_WRAP_FIELD + ".";

private final HashPartitionStrategyCrutch hashPartitioningCrutch;
private final StringHash stringHash;

Expand Down
24 changes: 22 additions & 2 deletions src/main/java/org/zalando/nakadi/service/EventPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.zalando.nakadi.domain.BatchItem;
import org.zalando.nakadi.domain.BatchItemResponse;
import org.zalando.nakadi.domain.CleanupPolicy;
import org.zalando.nakadi.domain.EventCategory;
import org.zalando.nakadi.domain.EventPublishResult;
import org.zalando.nakadi.domain.EventPublishingStatus;
import org.zalando.nakadi.domain.EventPublishingStep;
Expand All @@ -26,9 +27,11 @@
import org.zalando.nakadi.exceptions.runtime.PartitioningException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.partitioning.PartitionResolver;
import org.zalando.nakadi.partitioning.PartitionStrategy;
import org.zalando.nakadi.repository.db.EventTypeCache;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.service.timeline.TimelineSync;
import org.zalando.nakadi.util.JsonPathAccess;
import org.zalando.nakadi.validation.EventTypeValidator;
import org.zalando.nakadi.validation.ValidationError;

Expand All @@ -39,6 +42,8 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.zalando.nakadi.validation.JsonSchemaEnrichment.DATA_PATH_PREFIX;

@Component
public class EventPublisher {

Expand Down Expand Up @@ -99,7 +104,7 @@ EventPublishResult publishInternal(final String events,

validate(batch, eventType);
partition(batch, eventType);
compact(batch, eventType);
setEventKey(batch, eventType);
enrich(batch, eventType);
submit(batch, eventType);

Expand Down Expand Up @@ -169,14 +174,29 @@ private void partition(final List<BatchItem> batch, final EventType eventType)
}
}

private void compact(final List<BatchItem> batch, final EventType eventType) {
private void setEventKey(final List<BatchItem> batch, final EventType eventType) {
if (eventType.getCleanupPolicy() == CleanupPolicy.COMPACT) {
for (final BatchItem item : batch) {
final String compactionKey = item.getEvent()
.getJSONObject("metadata")
.getString("partition_compaction_key");
item.setEventKey(compactionKey);
}
} else if (PartitionStrategy.HASH_STRATEGY.equals(eventType.getPartitionStrategy())) {
final List<String> partitionKeyFields = eventType.getPartitionKeyFields();
// we will set event key only if there is exactly one partition key field,
// in other case it's not clear what should be set as event key
if (partitionKeyFields.size() == 1) {
String partitionKeyField = partitionKeyFields.get(0);
if (EventCategory.DATA.equals(eventType.getCategory())) {
partitionKeyField = DATA_PATH_PREFIX + partitionKeyField;
}
for (final BatchItem item : batch) {
final JsonPathAccess jsonPath = new JsonPathAccess(item.getEvent());
final String eventKey = jsonPath.get(partitionKeyField).toString();
item.setEventKey(eventKey);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

public class JsonSchemaEnrichment {
public static final String DATA_CHANGE_WRAP_FIELD = "data";
public static final String DATA_PATH_PREFIX = JsonSchemaEnrichment.DATA_CHANGE_WRAP_FIELD + ".";

private static final String ADDITIONAL_PROPERTIES = "additionalProperties";
private static final String ADDITIONAL_ITEMS = "additionalItems";
Expand Down
68 changes: 66 additions & 2 deletions src/test/java/org/zalando/nakadi/service/EventPublisherTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.zalando.nakadi.service;

import com.google.common.collect.ImmutableList;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.domain.BatchItem;
Expand All @@ -15,12 +17,13 @@
import org.zalando.nakadi.domain.EventTypeBase;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.enrichment.Enrichment;
import org.zalando.nakadi.exceptions.runtime.EnrichmentException;
import org.zalando.nakadi.exceptions.runtime.PartitioningException;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.EnrichmentException;
import org.zalando.nakadi.exceptions.runtime.EventPublishingException;
import org.zalando.nakadi.exceptions.runtime.EventTypeTimeoutException;
import org.zalando.nakadi.exceptions.runtime.PartitioningException;
import org.zalando.nakadi.partitioning.PartitionResolver;
import org.zalando.nakadi.partitioning.PartitionStrategy;
import org.zalando.nakadi.repository.TopicRepository;
import org.zalando.nakadi.repository.db.EventTypeCache;
import org.zalando.nakadi.service.timeline.TimelineService;
Expand All @@ -44,6 +47,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -404,6 +408,66 @@ public void whenEnrichmentFailsThenResultIsAborted() throws Exception {
verify(topicRepository, times(0)).syncPostBatch(any(), any());
}

@Test
public void whenSinglePartitioningKeyThenEventKeyIsSet() throws Exception {
final EventType eventType = EventTypeTestBuilder.builder()
.partitionStrategy(PartitionStrategy.HASH_STRATEGY)
.partitionKeyFields(ImmutableList.of("my_field"))
.build();

final JSONArray batch = buildDefaultBatch(1);
batch.getJSONObject(0).put("my_field", "my_key");

mockSuccessfulValidation(eventType);

publisher.publish(batch.toString(), eventType.getName());

final List<BatchItem> publishedBatch = capturePublishedBatch();
assertThat(publishedBatch.get(0).getEventKey(), equalTo("my_key"));
}

@Test
public void whenMultiplePartitioningKeyThenEventKeyIsNotSet() throws Exception {
final EventType eventType = EventTypeTestBuilder.builder()
.partitionStrategy(PartitionStrategy.HASH_STRATEGY)
.partitionKeyFields(ImmutableList.of("my_field", "other_field"))
.build();

final JSONArray batch = buildDefaultBatch(1);
final JSONObject event = batch.getJSONObject(0);
event.put("my_field", "my_key");
event.put("other_field", "other_value");

mockSuccessfulValidation(eventType);

publisher.publish(batch.toString(), eventType.getName());

final List<BatchItem> publishedBatch = capturePublishedBatch();
assertThat(publishedBatch.get(0).getEventKey(), equalTo(null));
}

@Test
public void whenNoneHashPartitioningStrategyThenEventKeyIsNotSet() throws Exception {
final EventType eventType = EventTypeTestBuilder.builder()
.partitionStrategy(PartitionStrategy.RANDOM_STRATEGY)
.build();
final JSONArray batch = buildDefaultBatch(1);

mockSuccessfulValidation(eventType);

publisher.publish(batch.toString(), eventType.getName());

final List<BatchItem> publishedBatch = capturePublishedBatch();
assertThat(publishedBatch.get(0).getEventKey(), equalTo(null));
}

@SuppressWarnings("unchecked")
private List<BatchItem> capturePublishedBatch() {
final ArgumentCaptor<List> batchCaptor = ArgumentCaptor.forClass(List.class);
verify(topicRepository, atLeastOnce()).syncPostBatch(any(), batchCaptor.capture());
return (List<BatchItem>) batchCaptor.getValue();
}

@Test
public void whenEnrichmentFailsThenSubsequentItemsAreAborted() throws Exception {
final EventType eventType = buildDefaultEventType();
Expand Down

0 comments on commit aabbe99

Please sign in to comment.