Skip to content

Commit

Permalink
Merge pull request #21 from keyko-io/feature/update-to-flat-schemas
Browse files Browse the repository at this point in the history
Update framework to work with the new flattened schemas
  • Loading branch information
josepablofm78 authored Mar 25, 2020
2 parents b1a1194 + f2de07c commit e5a603f
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 54 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.keyko.monitoring</groupId>
<artifactId>web3-event-streamer</artifactId>
<version>0.1.6</version>
<version>0.2.0</version>
<name>Web3 Monitoring Event Streamer</name>
<url>https://github.com/keyko-io/web3-event-streamer</url>
<inceptionYear>2019</inceptionYear>
Expand All @@ -20,7 +20,7 @@
<org.json.version>20190722</org.json.version>
<typesafe.config.version>1.4.0</typesafe.config.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<keyko.schemas.version>0.2.5</keyko.schemas.version>
<keyko.schemas.version>0.3.1</keyko.schemas.version>
<junit.version>4.4</junit.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static KStream<String, AccountCreatedAggregation> accountDailyAggregation

KTable<Windowed<String>, Long> accountsCreatedDayTable =
accountsCreatedStream
.selectKey((key, event) -> event.getDetails().getName())
.selectKey((key, event) -> event.getEvent().getName())
.groupByKey(Grouped.with(Serdes.String(), CeloSerdes.getEventBlockSerde()))
//.windowedBy(new DailyTimeWindows(zone, windowStartHour, gracePeriod))
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
Expand Down Expand Up @@ -90,15 +90,15 @@ private static KStream<String, AccountCreatedAggregation> formatAccountCreatedAg

public static KStream<String, AlertRecord> alertNoEpochRewardsDistributed(StreamsBuilder builder, List<String> EpochRewardsDistributedToVoters) {
return builder.stream(EpochRewardsDistributedToVoters, Consumed.with(Serdes.String(), CeloSerdes.getEventBlockSerde()))
.filter((key, event) -> ((NumberParameter) event.getDetails().getNonIndexedParameters().get(0)).getValue().equals("0"))
.filter((key, event) -> ((NumberParameter) event.getEvent().getNonIndexedParameters().get(0)).getValue().equals("0"))
.map((key, event) ->
KeyValue.pair(key,
AlertRecord.newBuilder()
.setName("alertNoEpochRewardsDistributed")
.setReference(event.getId())
.setStatus(AlertEventStatus.ERROR)
.setTimestamp(event.getDetailsBlock().getTimestamp())
.setDescription("NoEpochRewardsDistributed for group: " + ((StringParameter) event.getDetails().getIndexedParameters().get(0)).getValue())
.setTimestamp(event.getBlock().getTimestamp())
.setDescription("NoEpochRewardsDistributed for group: " + ((StringParameter) event.getEvent().getIndexedParameters().get(0)).getValue())
.build())
);
// .to("w3m-alerts");
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/keyko/monitoring/postprocessing/Output.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.keyko.monitoring.schemas.TimeSeriesRecord;
import io.keyko.monitoring.schemas.ViewBlockRecord;
import io.keyko.monitoring.serde.Web3MonitoringSerdes;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
Expand All @@ -27,7 +26,7 @@ public static void splitByEvent(KStream<String, EventBlockRecord> events) {
*/
public static void splitByEvent(KStream<String, EventBlockRecord> events, String suffix) {
events.to((key, value, recordContext) ->
"w3m-".concat(value.getDetails().getName().toLowerCase()).concat(suffix),
"w3m-".concat(value.getEvent().getName().toLowerCase()).concat(suffix),
Produced.with(Serdes.String(), Web3MonitoringSerdes.getEventBlockSerde())
);
}
Expand All @@ -39,7 +38,7 @@ public static void splitByEvent(KStream<String, EventBlockRecord> events, String
*/
public static void splitByView(KStream<String, ViewBlockRecord> views, String suffix) {
views.to((key, value, recordContext) ->
"w3m-".concat(value.getDetails().getContractName().toLowerCase()).concat("-").concat(value.getDetails().getName().toLowerCase()).concat(suffix),
"w3m-".concat(value.getView().getContractName().toLowerCase()).concat("-").concat(value.getView().getName().toLowerCase()).concat(suffix),
Produced.with(Serdes.String(), Web3MonitoringSerdes.getViewBlockSerde())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ public class Filters {

public static KStream<String, EventRecord> filterConfirmed(KStream<String, EventRecord> contractEvents) {
return contractEvents
.filter((key, event) -> event.getDetails().getStatus().toString().equalsIgnoreCase("CONFIRMED"));
.filter((key, event) -> event.getStatus().toString().equalsIgnoreCase("CONFIRMED"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@ public class Transformations {
*/
public static KStream<String, EventBlockRecord> joinEventWithBlock(KStream<String, EventRecord> eventStream, KTable<String, BlockRecord> blockStream) {
return eventStream
.selectKey((key, event) -> event.getDetails().getBlockHash())
.selectKey((key, event) -> event.getBlockHash())
.join(blockStream,
(event, block) -> {
EventBlockRecord eventblock = new EventBlockRecord();

eventblock.setDetails(event.getDetails());
eventblock.setDetailsBlock(block.getDetails());
eventblock.setEvent(event);
eventblock.setBlock(block);
eventblock.setId(event.getId());
eventblock.setRetries(event.getRetries());
eventblock.setType(event.getType());

return eventblock;
},
Joined.with(Serdes.String(), Web3MonitoringSerdes.getEventSerde(), Web3MonitoringSerdes.getBlockSerde())
Expand All @@ -48,16 +45,14 @@ public static KStream<String, EventBlockRecord> joinEventWithBlock(KStream<Strin

public static KStream<String, ViewBlockRecord> joinViewWithBlock(KStream<String, ViewRecord> viewStream, KTable<String, BlockRecord> blockStream) {
return viewStream
.selectKey((key, view) -> view.getDetails().getBlockHash())
.selectKey((key, view) -> view.getBlockHash())
.join(blockStream,
(view, block) -> {
ViewBlockRecord viewBlock = new ViewBlockRecord();

viewBlock.setDetails(view.getDetails());
viewBlock.setDetailsBlock(block.getDetails());
viewBlock.setId(view.getId());
viewBlock.setRetries(view.getRetries());
viewBlock.setType(view.getType());
viewBlock.setBlock(block);
viewBlock.setView(view);
return viewBlock;
},
Joined.with(Serdes.String(), Web3MonitoringSerdes.getViewSerde(), Web3MonitoringSerdes.getBlockSerde())
Expand All @@ -77,13 +72,13 @@ public static KStream<String, TimeSeriesRecord> transformToTimeSeries(KStream<St

return stream.mapValues(viewBlock -> {

List<Object> output = viewBlock.getDetails().getOutput();
List<Object> output = viewBlock.getView().getOutput();

TimeSeriesRecord timeSeries = new TimeSeriesRecord();
timeSeries.setContractName(viewBlock.getDetails().getContractName());
timeSeries.setMethodName(viewBlock.getDetails().getName());
timeSeries.setTimestamp(viewBlock.getDetailsBlock().getTimestamp());
timeSeries.setBlockNumber(viewBlock.getDetailsBlock().getNumber());
timeSeries.setContractName(viewBlock.getView().getContractName());
timeSeries.setMethodName(viewBlock.getView().getName());
timeSeries.setTimestamp(viewBlock.getBlock().getTimestamp());
timeSeries.setBlockNumber(viewBlock.getBlock().getNumber());

for (int i = 0; i < output.size(); i++) {

Expand Down Expand Up @@ -131,14 +126,14 @@ public static KStream<String, TimeSeriesRecord> transformToTimeSeries(KStream<St
public static KStream<String, TimeSeriesRecord> transformEventToTimeSeries(KStream<String, EventBlockRecord> stream) {
return stream.mapValues(eventBlock -> {

List<Object> output = eventBlock.getDetails().getIndexedParameters();
output.addAll(eventBlock.getDetails().getNonIndexedParameters());
List<Object> output = eventBlock.getEvent().getIndexedParameters();
output.addAll(eventBlock.getEvent().getNonIndexedParameters());

TimeSeriesRecord timeSeries = new TimeSeriesRecord();
timeSeries.setContractName(eventBlock.getDetails().getContractName());
timeSeries.setMethodName(eventBlock.getDetails().getName());
timeSeries.setTimestamp(eventBlock.getDetailsBlock().getTimestamp());
timeSeries.setBlockNumber(eventBlock.getDetailsBlock().getNumber());
timeSeries.setContractName(eventBlock.getEvent().getContractName());
timeSeries.setMethodName(eventBlock.getEvent().getName());
timeSeries.setTimestamp(eventBlock.getBlock().getTimestamp());
timeSeries.setBlockNumber(eventBlock.getBlock().getNumber());

for (int i = 0; i < output.size(); i++) {

Expand Down
12 changes: 12 additions & 0 deletions src/main/java/io/keyko/monitoring/serde/Web3MonitoringSerdes.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class Web3MonitoringSerdes {
private final static SpecificAvroSerde<ViewRecord> viewSerde = new SpecificAvroSerde<>();
private final static SpecificAvroSerde<ViewBlockRecord> viewBlockSerde = new SpecificAvroSerde<>();
private final static SpecificAvroSerde<TimeSeriesRecord> timeSeriesSerde = new SpecificAvroSerde<>();
private final static SpecificAvroSerde<LogRecord> logAvroSerde = new SpecificAvroSerde<>();
private final static SpecificAvroSerde<LogRecordTopicsFlattened> logFlattenedAvroSerde = new SpecificAvroSerde<>();

protected static Map<String, String> serdeConfig;

Expand All @@ -33,6 +35,8 @@ public static void configureSerdes(String schemaRegistryUrl) {
viewSerde.configure(serdeConfig, false);
viewBlockSerde.configure(serdeConfig, false);
timeSeriesSerde.configure(serdeConfig, false);
logAvroSerde.configure(serdeConfig, false);
logFlattenedAvroSerde.configure(serdeConfig, false);
}

protected static void configureSerde(SpecificAvroSerde serde) {
Expand Down Expand Up @@ -61,6 +65,14 @@ public static SpecificAvroSerde<AlertRecord> getAlertSerde() {

public static SpecificAvroSerde<TimeSeriesRecord> getTimeSerieserde() { return timeSeriesSerde; }

public static SpecificAvroSerde<LogRecord> getLogAvroSerde() {
return logAvroSerde;
}

public static SpecificAvroSerde<LogRecordTopicsFlattened> getLogFlattenedAvroSerde() {
return logFlattenedAvroSerde;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public long extract(ConsumerRecord<Object, Object> record, long previousTimestam
if (record != null && record.value() != null) {

if (record.value() instanceof EventBlockRecord) {
return ((EventBlockRecord) record.value()).getDetailsBlock().getTimestamp();
return ((EventBlockRecord) record.value()).getBlock().getTimestamp();
}

}
Expand Down
39 changes: 18 additions & 21 deletions src/test/java/io/keyko/monitoring/ProcessorsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,39 @@ public class ProcessorsTest {
private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE;


public EventDetailsRecord transferDetails = new EventDetailsRecord("Transfer", "Transfer", "filter1", "default",
public EventRecord transferEvent = new EventRecord("0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0",
"Transfer", "Transfer", "filter1", "default",
Arrays.asList(new StringParameter("from", "address", "0xcCCD3999D5b421F906c4a35c0c95bcD533e1CFBb"), new StringParameter("to", "address", "0xC8FD77490A12F46709BffbCC0FCe35740Da8D860")),
Collections.singletonList(new NumberParameter("amount", "uint256", "23873204128892815319", Long.valueOf(
"23873204128892815319".substring(0, String.valueOf(Long.MAX_VALUE).length())))), "0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf",
"0", 15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", "0x5c7197E1147ebF98658A2a8Bc3D32BeBF1692829", ContractEventStatus.CONFIRMED,
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "default",
"0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0");
public EventRecord transferEvent = new EventRecord("0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0",
"CONTRACT_EVENT", transferDetails, 0);
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "default", 0);


public EventDetailsRecord oracleReportedDetails = new EventDetailsRecord("OracleReported", "OracleReported", "filter2", "default",
public EventRecord oracleReportedEvent = new EventRecord("0x27bc3eda4e3eaae838dd44f4a9fd4564f4455c51e336daa4232afd4ea190f0f1-0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d-0", "OracleReported", "OracleReported", "filter2", "default",
Collections.emptyList(),
Arrays.asList(new StringParameter("token", "address", "0x5c7197E1147ebF98658A2a8Bc3D32BeBF1692829"), new StringParameter("oracle", "address", "0x0d473f73AAf1C2bf7EBd2be7196C71dBa6C1724b"), new NumberParameter("timestamp", "uint256", "1576176348", 1576176348L)
, new NumberParameter("numerator", "uint256", "18299170121119875203", Long.valueOf(
"18299170121119875203".substring(0, String.valueOf(Long.MAX_VALUE).length()))), new NumberParameter("denominator", "uint256", "18446744073709551616", Long.valueOf(
"18446744073709551616".substring(0, String.valueOf(Long.MAX_VALUE).length())))), "0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf",
"0", 14750L, "0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d", "0x91061bF2F509AF76aa01F46E9F3E97577a5a80BA", ContractEventStatus.CONFIRMED,
"0xdbf09271932e018b9c31e9988e4fbe3109fdd79d78f5d19a764dfb56035ed775", "default",
"0x27bc3eda4e3eaae838dd44f4a9fd4564f4455c51e336daa4232afd4ea190f0f1-0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d-0");
public BlockDetailsRecord blockOracleReportedDetails = new BlockDetailsRecord(14750L, "0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d", 1234L, "default");
"0xdbf09271932e018b9c31e9988e4fbe3109fdd79d78f5d19a764dfb56035ed775", "default", 0
);

public BlockRecord blockOracleReported = new BlockRecord("0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d", 14750L, "0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d", 1234L, "default", 0);


public BlockDetailsRecord blockTransferDetails = new BlockDetailsRecord(15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", 1234L, "default");
public BlockRecord transferBlock = new BlockRecord("0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", "BLOCK", blockTransferDetails, 0);
public BlockRecord transferBlock = new BlockRecord("0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", 15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", 1234L, "default", 0);

public EventBlockRecord transferEventWithBlock = new EventBlockRecord("0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0", "", transferDetails, blockTransferDetails, 0);
public EventBlockRecord transferEventWithBlock = new EventBlockRecord("0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0", transferEvent, transferBlock, 0);

public EventBlockRecord oracleReportedEventWithBlock = new EventBlockRecord("0x27bc3eda4e3eaae838dd44f4a9fd4564f4455c51e336daa4232afd4ea190f0f1-0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d-0", "", oracleReportedDetails, blockOracleReportedDetails, 0);
public EventBlockRecord oracleReportedEventWithBlock = new EventBlockRecord("0x27bc3eda4e3eaae838dd44f4a9fd4564f4455c51e336daa4232afd4ea190f0f1-0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d-0", oracleReportedEvent, blockOracleReported, 0);

public ViewDetailsRecord contractViewDetailsBalance = new ViewDetailsRecord("CeloGold-balanceOf",

public ViewRecord viewBalance = new ViewRecord("CeloGold-balanceOf-7c", "CeloGold-balanceOf",
"1", "default", "default",
Collections.singletonList(new NumberParameter("balance", "uint256", "12", 12L)),
15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", "default", "default", "CeloGold-balanceOf-7c");

public ViewRecord viewBalance = new ViewRecord("CeloGold-balanceOf-7c", "CONTRACT_VIEW", contractViewDetailsBalance, 0);
15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", "default", "default", 0);


final Serde<EventRecord> eventAvroSerde = Web3MonitoringSerdes.getEventSerde();
Expand Down Expand Up @@ -111,8 +108,8 @@ public void shouldJoinEventWithBlock() {
TestOutputTopic<String, EventBlockRecord> joinTopic = driver.createOutputTopic("join", new StringDeserializer(), eventBlockAvroSerde.deserializer());
EventBlockRecord result = joinTopic.readValue();

assertEquals(result.getDetailsBlock(), transferBlock.getDetails());
assertEquals(result.getDetails(), transferEvent.getDetails());
assertEquals(result.getBlock(), transferBlock);
assertEquals(result.getEvent(), transferEvent);
driver.close();
}

Expand Down Expand Up @@ -157,8 +154,8 @@ public void shouldJoinViewWithBlock() {
TestOutputTopic<String, ViewBlockRecord> joinTopic = driver.createOutputTopic("join", new StringDeserializer(), viewBlockAvroSerde.deserializer());
ViewBlockRecord result = joinTopic.readValue();

assertEquals(result.getDetailsBlock(), transferBlock.getDetails());
assertEquals(result.getDetails(), viewBalance.getDetails());
assertEquals(result.getBlock(), transferBlock);
assertEquals(result.getView(), viewBalance);
driver.close();
}

Expand Down

0 comments on commit e5a603f

Please sign in to comment.