From 4aa9119fa25118f1de2814849ca3387053dddf6c Mon Sep 17 00:00:00 2001 From: eruizgar91 Date: Wed, 25 Mar 2020 09:17:10 +0100 Subject: [PATCH 1/2] Update framework to work with the new flattened schemas --- pom.xml | 4 +- .../examples/celo/CeloProcessor.java | 8 ++-- .../monitoring/postprocessing/Output.java | 5 +-- .../monitoring/preprocessing/Filters.java | 2 +- .../preprocessing/Transformations.java | 39 ++++++++----------- .../time/EventBlockTimestampExtractor.java | 2 +- .../io/keyko/monitoring/ProcessorsTest.java | 39 +++++++++---------- 7 files changed, 45 insertions(+), 54 deletions(-) diff --git a/pom.xml b/pom.xml index 2766ccb..793fe96 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 io.keyko.monitoring web3-event-streamer - 0.1.6 + 0.2.0 Web3 Monitoring Event Streamer https://github.com/keyko-io/web3-event-streamer 2019 @@ -20,7 +20,7 @@ 20190722 1.4.0 UTF-8 - 0.2.5 + 0.3.1 4.4 diff --git a/src/main/java/io/keyko/monitoring/examples/celo/CeloProcessor.java b/src/main/java/io/keyko/monitoring/examples/celo/CeloProcessor.java index 3ddbbd5..2006249 100644 --- a/src/main/java/io/keyko/monitoring/examples/celo/CeloProcessor.java +++ b/src/main/java/io/keyko/monitoring/examples/celo/CeloProcessor.java @@ -38,7 +38,7 @@ public static KStream accountDailyAggregation KTable, Long> accountsCreatedDayTable = accountsCreatedStream - .selectKey((key, event) -> event.getDetails().getName()) + .selectKey((key, event) -> event.getEvent().getName()) .groupByKey(Grouped.with(Serdes.String(), CeloSerdes.getEventBlockSerde())) //.windowedBy(new DailyTimeWindows(zone, windowStartHour, gracePeriod)) .windowedBy(TimeWindows.of(Duration.ofSeconds(60))) @@ -90,15 +90,15 @@ private static KStream formatAccountCreatedAg public static KStream alertNoEpochRewardsDistributed(StreamsBuilder builder, List EpochRewardsDistributedToVoters) { return builder.stream(EpochRewardsDistributedToVoters, Consumed.with(Serdes.String(), CeloSerdes.getEventBlockSerde())) - .filter((key, event) -> ((NumberParameter) event.getDetails().getNonIndexedParameters().get(0)).getValue().equals("0")) + .filter((key, event) -> ((NumberParameter) event.getEvent().getNonIndexedParameters().get(0)).getValue().equals("0")) .map((key, event) -> KeyValue.pair(key, AlertRecord.newBuilder() .setName("alertNoEpochRewardsDistributed") .setReference(event.getId()) .setStatus(AlertEventStatus.ERROR) - .setTimestamp(event.getDetailsBlock().getTimestamp()) - .setDescription("NoEpochRewardsDistributed for group: " + ((StringParameter) event.getDetails().getIndexedParameters().get(0)).getValue()) + .setTimestamp(event.getBlock().getTimestamp()) + .setDescription("NoEpochRewardsDistributed for group: " + ((StringParameter) event.getEvent().getIndexedParameters().get(0)).getValue()) .build()) ); // .to("w3m-alerts"); diff --git a/src/main/java/io/keyko/monitoring/postprocessing/Output.java b/src/main/java/io/keyko/monitoring/postprocessing/Output.java index e6d0d38..a208e98 100644 --- a/src/main/java/io/keyko/monitoring/postprocessing/Output.java +++ b/src/main/java/io/keyko/monitoring/postprocessing/Output.java @@ -4,7 +4,6 @@ import io.keyko.monitoring.schemas.TimeSeriesRecord; import io.keyko.monitoring.schemas.ViewBlockRecord; import io.keyko.monitoring.serde.Web3MonitoringSerdes; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; @@ -27,7 +26,7 @@ public static void splitByEvent(KStream events) { */ public static void splitByEvent(KStream events, String suffix) { events.to((key, value, recordContext) -> - "w3m-".concat(value.getDetails().getName().toLowerCase()).concat(suffix), + "w3m-".concat(value.getEvent().getName().toLowerCase()).concat(suffix), Produced.with(Serdes.String(), Web3MonitoringSerdes.getEventBlockSerde()) ); } @@ -39,7 +38,7 @@ public static void splitByEvent(KStream events, String */ public static void splitByView(KStream views, String suffix) { views.to((key, value, recordContext) -> - "w3m-".concat(value.getDetails().getContractName().toLowerCase()).concat("-").concat(value.getDetails().getName().toLowerCase()).concat(suffix), + "w3m-".concat(value.getView().getContractName().toLowerCase()).concat("-").concat(value.getView().getName().toLowerCase()).concat(suffix), Produced.with(Serdes.String(), Web3MonitoringSerdes.getViewBlockSerde()) ); } diff --git a/src/main/java/io/keyko/monitoring/preprocessing/Filters.java b/src/main/java/io/keyko/monitoring/preprocessing/Filters.java index 4625ea6..10a5347 100644 --- a/src/main/java/io/keyko/monitoring/preprocessing/Filters.java +++ b/src/main/java/io/keyko/monitoring/preprocessing/Filters.java @@ -7,6 +7,6 @@ public class Filters { public static KStream filterConfirmed(KStream contractEvents) { return contractEvents - .filter((key, event) -> event.getDetails().getStatus().toString().equalsIgnoreCase("CONFIRMED")); + .filter((key, event) -> event.getStatus().toString().equalsIgnoreCase("CONFIRMED")); } } diff --git a/src/main/java/io/keyko/monitoring/preprocessing/Transformations.java b/src/main/java/io/keyko/monitoring/preprocessing/Transformations.java index 4fa4010..d1586ca 100644 --- a/src/main/java/io/keyko/monitoring/preprocessing/Transformations.java +++ b/src/main/java/io/keyko/monitoring/preprocessing/Transformations.java @@ -20,17 +20,14 @@ public class Transformations { */ public static KStream joinEventWithBlock(KStream eventStream, KTable blockStream) { return eventStream - .selectKey((key, event) -> event.getDetails().getBlockHash()) + .selectKey((key, event) -> event.getBlockHash()) .join(blockStream, (event, block) -> { EventBlockRecord eventblock = new EventBlockRecord(); - - eventblock.setDetails(event.getDetails()); - eventblock.setDetailsBlock(block.getDetails()); + eventblock.setEvent(event); + eventblock.setBlock(block); eventblock.setId(event.getId()); eventblock.setRetries(event.getRetries()); - eventblock.setType(event.getType()); - return eventblock; }, Joined.with(Serdes.String(), Web3MonitoringSerdes.getEventSerde(), Web3MonitoringSerdes.getBlockSerde()) @@ -48,16 +45,14 @@ public static KStream joinEventWithBlock(KStream joinViewWithBlock(KStream viewStream, KTable blockStream) { return viewStream - .selectKey((key, view) -> view.getDetails().getBlockHash()) + .selectKey((key, view) -> view.getBlockHash()) .join(blockStream, (view, block) -> { ViewBlockRecord viewBlock = new ViewBlockRecord(); - - viewBlock.setDetails(view.getDetails()); - viewBlock.setDetailsBlock(block.getDetails()); viewBlock.setId(view.getId()); viewBlock.setRetries(view.getRetries()); - viewBlock.setType(view.getType()); + viewBlock.setBlock(block); + viewBlock.setView(view); return viewBlock; }, Joined.with(Serdes.String(), Web3MonitoringSerdes.getViewSerde(), Web3MonitoringSerdes.getBlockSerde()) @@ -77,13 +72,13 @@ public static KStream transformToTimeSeries(KStream { - List output = viewBlock.getDetails().getOutput(); + List output = viewBlock.getView().getOutput(); TimeSeriesRecord timeSeries = new TimeSeriesRecord(); - timeSeries.setContractName(viewBlock.getDetails().getContractName()); - timeSeries.setMethodName(viewBlock.getDetails().getName()); - timeSeries.setTimestamp(viewBlock.getDetailsBlock().getTimestamp()); - timeSeries.setBlockNumber(viewBlock.getDetailsBlock().getNumber()); + timeSeries.setContractName(viewBlock.getView().getContractName()); + timeSeries.setMethodName(viewBlock.getView().getName()); + timeSeries.setTimestamp(viewBlock.getBlock().getTimestamp()); + timeSeries.setBlockNumber(viewBlock.getBlock().getNumber()); for (int i = 0; i < output.size(); i++) { @@ -131,14 +126,14 @@ public static KStream transformToTimeSeries(KStream transformEventToTimeSeries(KStream stream) { return stream.mapValues(eventBlock -> { - List output = eventBlock.getDetails().getIndexedParameters(); - output.addAll(eventBlock.getDetails().getNonIndexedParameters()); + List output = eventBlock.getEvent().getIndexedParameters(); + output.addAll(eventBlock.getEvent().getNonIndexedParameters()); TimeSeriesRecord timeSeries = new TimeSeriesRecord(); - timeSeries.setContractName(eventBlock.getDetails().getContractName()); - timeSeries.setMethodName(eventBlock.getDetails().getName()); - timeSeries.setTimestamp(eventBlock.getDetailsBlock().getTimestamp()); - timeSeries.setBlockNumber(eventBlock.getDetailsBlock().getNumber()); + timeSeries.setContractName(eventBlock.getEvent().getContractName()); + timeSeries.setMethodName(eventBlock.getEvent().getName()); + timeSeries.setTimestamp(eventBlock.getBlock().getTimestamp()); + timeSeries.setBlockNumber(eventBlock.getBlock().getNumber()); for (int i = 0; i < output.size(); i++) { diff --git a/src/main/java/io/keyko/monitoring/time/EventBlockTimestampExtractor.java b/src/main/java/io/keyko/monitoring/time/EventBlockTimestampExtractor.java index a6b970d..dab08ee 100644 --- a/src/main/java/io/keyko/monitoring/time/EventBlockTimestampExtractor.java +++ b/src/main/java/io/keyko/monitoring/time/EventBlockTimestampExtractor.java @@ -11,7 +11,7 @@ public long extract(ConsumerRecord record, long previousTimestam if (record != null && record.value() != null) { if (record.value() instanceof EventBlockRecord) { - return ((EventBlockRecord) record.value()).getDetailsBlock().getTimestamp(); + return ((EventBlockRecord) record.value()).getBlock().getTimestamp(); } } diff --git a/src/test/java/io/keyko/monitoring/ProcessorsTest.java b/src/test/java/io/keyko/monitoring/ProcessorsTest.java index 6740af3..931e808 100644 --- a/src/test/java/io/keyko/monitoring/ProcessorsTest.java +++ b/src/test/java/io/keyko/monitoring/ProcessorsTest.java @@ -28,42 +28,39 @@ public class ProcessorsTest { private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE; - public EventDetailsRecord transferDetails = new EventDetailsRecord("Transfer", "Transfer", "filter1", "default", + public EventRecord transferEvent = new EventRecord("0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0", + "Transfer", "Transfer", "filter1", "default", Arrays.asList(new StringParameter("from", "address", "0xcCCD3999D5b421F906c4a35c0c95bcD533e1CFBb"), new StringParameter("to", "address", "0xC8FD77490A12F46709BffbCC0FCe35740Da8D860")), Collections.singletonList(new NumberParameter("amount", "uint256", "23873204128892815319", Long.valueOf( "23873204128892815319".substring(0, String.valueOf(Long.MAX_VALUE).length())))), "0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf", "0", 15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", "0x5c7197E1147ebF98658A2a8Bc3D32BeBF1692829", ContractEventStatus.CONFIRMED, - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "default", - "0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0"); - public EventRecord transferEvent = new EventRecord("0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0", - "CONTRACT_EVENT", transferDetails, 0); + "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "default", 0); - public EventDetailsRecord oracleReportedDetails = new EventDetailsRecord("OracleReported", "OracleReported", "filter2", "default", + public EventRecord oracleReportedEvent = new EventRecord("0x27bc3eda4e3eaae838dd44f4a9fd4564f4455c51e336daa4232afd4ea190f0f1-0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d-0", "OracleReported", "OracleReported", "filter2", "default", Collections.emptyList(), Arrays.asList(new StringParameter("token", "address", "0x5c7197E1147ebF98658A2a8Bc3D32BeBF1692829"), new StringParameter("oracle", "address", "0x0d473f73AAf1C2bf7EBd2be7196C71dBa6C1724b"), new NumberParameter("timestamp", "uint256", "1576176348", 1576176348L) , new NumberParameter("numerator", "uint256", "18299170121119875203", Long.valueOf( "18299170121119875203".substring(0, String.valueOf(Long.MAX_VALUE).length()))), new NumberParameter("denominator", "uint256", "18446744073709551616", Long.valueOf( "18446744073709551616".substring(0, String.valueOf(Long.MAX_VALUE).length())))), "0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf", "0", 14750L, "0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d", "0x91061bF2F509AF76aa01F46E9F3E97577a5a80BA", ContractEventStatus.CONFIRMED, - "0xdbf09271932e018b9c31e9988e4fbe3109fdd79d78f5d19a764dfb56035ed775", "default", - "0x27bc3eda4e3eaae838dd44f4a9fd4564f4455c51e336daa4232afd4ea190f0f1-0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d-0"); - public BlockDetailsRecord blockOracleReportedDetails = new BlockDetailsRecord(14750L, "0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d", 1234L, "default"); + "0xdbf09271932e018b9c31e9988e4fbe3109fdd79d78f5d19a764dfb56035ed775", "default", 0 + ); + + public BlockRecord blockOracleReported = new BlockRecord("0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d", 14750L, "0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d", 1234L, "default", 0); - public BlockDetailsRecord blockTransferDetails = new BlockDetailsRecord(15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", 1234L, "default"); - public BlockRecord transferBlock = new BlockRecord("0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", "BLOCK", blockTransferDetails, 0); + public BlockRecord transferBlock = new BlockRecord("0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", 15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", 1234L, "default", 0); - public EventBlockRecord transferEventWithBlock = new EventBlockRecord("0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0", "", transferDetails, blockTransferDetails, 0); + public EventBlockRecord transferEventWithBlock = new EventBlockRecord("0x294d73910e7c1e7cd8f0bf341e513c0269a089b36c22c2ac006269eb59e6e6bf-0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a-0", transferEvent, transferBlock, 0); - public EventBlockRecord oracleReportedEventWithBlock = new EventBlockRecord("0x27bc3eda4e3eaae838dd44f4a9fd4564f4455c51e336daa4232afd4ea190f0f1-0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d-0", "", oracleReportedDetails, blockOracleReportedDetails, 0); + public EventBlockRecord oracleReportedEventWithBlock = new EventBlockRecord("0x27bc3eda4e3eaae838dd44f4a9fd4564f4455c51e336daa4232afd4ea190f0f1-0x73090d8e7bb7b2a2b550474c2c90e8059d9bfdcd752c5fc55af18f54debfb88d-0", oracleReportedEvent, blockOracleReported, 0); - public ViewDetailsRecord contractViewDetailsBalance = new ViewDetailsRecord("CeloGold-balanceOf", + + public ViewRecord viewBalance = new ViewRecord("CeloGold-balanceOf-7c", "CeloGold-balanceOf", "1", "default", "default", Collections.singletonList(new NumberParameter("balance", "uint256", "12", 12L)), - 15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", "default", "default", "CeloGold-balanceOf-7c"); - - public ViewRecord viewBalance = new ViewRecord("CeloGold-balanceOf-7c", "CONTRACT_VIEW", contractViewDetailsBalance, 0); + 15129L, "0x8ce40858181dccf410331c4b3edf0187ac7b887aeb5c6e0bce2dbc09635f470a", "default", "default", 0); final Serde eventAvroSerde = Web3MonitoringSerdes.getEventSerde(); @@ -111,8 +108,8 @@ public void shouldJoinEventWithBlock() { TestOutputTopic joinTopic = driver.createOutputTopic("join", new StringDeserializer(), eventBlockAvroSerde.deserializer()); EventBlockRecord result = joinTopic.readValue(); - assertEquals(result.getDetailsBlock(), transferBlock.getDetails()); - assertEquals(result.getDetails(), transferEvent.getDetails()); + assertEquals(result.getBlock(), transferBlock); + assertEquals(result.getEvent(), transferEvent); driver.close(); } @@ -157,8 +154,8 @@ public void shouldJoinViewWithBlock() { TestOutputTopic joinTopic = driver.createOutputTopic("join", new StringDeserializer(), viewBlockAvroSerde.deserializer()); ViewBlockRecord result = joinTopic.readValue(); - assertEquals(result.getDetailsBlock(), transferBlock.getDetails()); - assertEquals(result.getDetails(), viewBalance.getDetails()); + assertEquals(result.getBlock(), transferBlock); + assertEquals(result.getView(), viewBalance); driver.close(); } From f2de07cfc0e1e8d24fd302b50c500d8cf331d364 Mon Sep 17 00:00:00 2001 From: eruizgar91 Date: Wed, 25 Mar 2020 09:44:01 +0100 Subject: [PATCH 2/2] Add log schemas to serdes --- .../keyko/monitoring/serde/Web3MonitoringSerdes.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/java/io/keyko/monitoring/serde/Web3MonitoringSerdes.java b/src/main/java/io/keyko/monitoring/serde/Web3MonitoringSerdes.java index 8d6c082..5001f1a 100644 --- a/src/main/java/io/keyko/monitoring/serde/Web3MonitoringSerdes.java +++ b/src/main/java/io/keyko/monitoring/serde/Web3MonitoringSerdes.java @@ -18,6 +18,8 @@ public class Web3MonitoringSerdes { private final static SpecificAvroSerde viewSerde = new SpecificAvroSerde<>(); private final static SpecificAvroSerde viewBlockSerde = new SpecificAvroSerde<>(); private final static SpecificAvroSerde timeSeriesSerde = new SpecificAvroSerde<>(); + private final static SpecificAvroSerde logAvroSerde = new SpecificAvroSerde<>(); + private final static SpecificAvroSerde logFlattenedAvroSerde = new SpecificAvroSerde<>(); protected static Map serdeConfig; @@ -33,6 +35,8 @@ public static void configureSerdes(String schemaRegistryUrl) { viewSerde.configure(serdeConfig, false); viewBlockSerde.configure(serdeConfig, false); timeSeriesSerde.configure(serdeConfig, false); + logAvroSerde.configure(serdeConfig, false); + logFlattenedAvroSerde.configure(serdeConfig, false); } protected static void configureSerde(SpecificAvroSerde serde) { @@ -61,6 +65,14 @@ public static SpecificAvroSerde getAlertSerde() { public static SpecificAvroSerde getTimeSerieserde() { return timeSeriesSerde; } + public static SpecificAvroSerde getLogAvroSerde() { + return logAvroSerde; + } + + public static SpecificAvroSerde getLogFlattenedAvroSerde() { + return logFlattenedAvroSerde; + } + }