diff --git a/pom.xml b/pom.xml
index 2766ccb..793fe96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
io.keyko.monitoring
web3-event-streamer
- 0.1.6
+ 0.2.0
Web3 Monitoring Event Streamer
https://github.com/keyko-io/web3-event-streamer
2019
@@ -20,7 +20,7 @@
20190722
1.4.0
UTF-8
- 0.2.5
+ 0.3.1
4.4
diff --git a/src/main/java/io/keyko/monitoring/examples/celo/CeloProcessor.java b/src/main/java/io/keyko/monitoring/examples/celo/CeloProcessor.java
index 3ddbbd5..2006249 100644
--- a/src/main/java/io/keyko/monitoring/examples/celo/CeloProcessor.java
+++ b/src/main/java/io/keyko/monitoring/examples/celo/CeloProcessor.java
@@ -38,7 +38,7 @@ public static KStream accountDailyAggregation
KTable, Long> accountsCreatedDayTable =
accountsCreatedStream
- .selectKey((key, event) -> event.getDetails().getName())
+ .selectKey((key, event) -> event.getEvent().getName())
.groupByKey(Grouped.with(Serdes.String(), CeloSerdes.getEventBlockSerde()))
//.windowedBy(new DailyTimeWindows(zone, windowStartHour, gracePeriod))
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
@@ -90,15 +90,15 @@ private static KStream formatAccountCreatedAg
public static KStream alertNoEpochRewardsDistributed(StreamsBuilder builder, List EpochRewardsDistributedToVoters) {
return builder.stream(EpochRewardsDistributedToVoters, Consumed.with(Serdes.String(), CeloSerdes.getEventBlockSerde()))
- .filter((key, event) -> ((NumberParameter) event.getDetails().getNonIndexedParameters().get(0)).getValue().equals("0"))
+ .filter((key, event) -> ((NumberParameter) event.getEvent().getNonIndexedParameters().get(0)).getValue().equals("0"))
.map((key, event) ->
KeyValue.pair(key,
AlertRecord.newBuilder()
.setName("alertNoEpochRewardsDistributed")
.setReference(event.getId())
.setStatus(AlertEventStatus.ERROR)
- .setTimestamp(event.getDetailsBlock().getTimestamp())
- .setDescription("NoEpochRewardsDistributed for group: " + ((StringParameter) event.getDetails().getIndexedParameters().get(0)).getValue())
+ .setTimestamp(event.getBlock().getTimestamp())
+ .setDescription("NoEpochRewardsDistributed for group: " + ((StringParameter) event.getEvent().getIndexedParameters().get(0)).getValue())
.build())
);
// .to("w3m-alerts");
diff --git a/src/main/java/io/keyko/monitoring/postprocessing/Output.java b/src/main/java/io/keyko/monitoring/postprocessing/Output.java
index e6d0d38..a208e98 100644
--- a/src/main/java/io/keyko/monitoring/postprocessing/Output.java
+++ b/src/main/java/io/keyko/monitoring/postprocessing/Output.java
@@ -4,7 +4,6 @@
import io.keyko.monitoring.schemas.TimeSeriesRecord;
import io.keyko.monitoring.schemas.ViewBlockRecord;
import io.keyko.monitoring.serde.Web3MonitoringSerdes;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
@@ -27,7 +26,7 @@ public static void splitByEvent(KStream events) {
*/
public static void splitByEvent(KStream events, String suffix) {
events.to((key, value, recordContext) ->
- "w3m-".concat(value.getDetails().getName().toLowerCase()).concat(suffix),
+ "w3m-".concat(value.getEvent().getName().toLowerCase()).concat(suffix),
Produced.with(Serdes.String(), Web3MonitoringSerdes.getEventBlockSerde())
);
}
@@ -39,7 +38,7 @@ public static void splitByEvent(KStream events, String
*/
public static void splitByView(KStream views, String suffix) {
views.to((key, value, recordContext) ->
- "w3m-".concat(value.getDetails().getContractName().toLowerCase()).concat("-").concat(value.getDetails().getName().toLowerCase()).concat(suffix),
+ "w3m-".concat(value.getView().getContractName().toLowerCase()).concat("-").concat(value.getView().getName().toLowerCase()).concat(suffix),
Produced.with(Serdes.String(), Web3MonitoringSerdes.getViewBlockSerde())
);
}
diff --git a/src/main/java/io/keyko/monitoring/preprocessing/Filters.java b/src/main/java/io/keyko/monitoring/preprocessing/Filters.java
index 4625ea6..10a5347 100644
--- a/src/main/java/io/keyko/monitoring/preprocessing/Filters.java
+++ b/src/main/java/io/keyko/monitoring/preprocessing/Filters.java
@@ -7,6 +7,6 @@ public class Filters {
public static KStream filterConfirmed(KStream contractEvents) {
return contractEvents
- .filter((key, event) -> event.getDetails().getStatus().toString().equalsIgnoreCase("CONFIRMED"));
+ .filter((key, event) -> event.getStatus().toString().equalsIgnoreCase("CONFIRMED"));
}
}
diff --git a/src/main/java/io/keyko/monitoring/preprocessing/Transformations.java b/src/main/java/io/keyko/monitoring/preprocessing/Transformations.java
index 4fa4010..d1586ca 100644
--- a/src/main/java/io/keyko/monitoring/preprocessing/Transformations.java
+++ b/src/main/java/io/keyko/monitoring/preprocessing/Transformations.java
@@ -20,17 +20,14 @@ public class Transformations {
*/
public static KStream joinEventWithBlock(KStream eventStream, KTable blockStream) {
return eventStream
- .selectKey((key, event) -> event.getDetails().getBlockHash())
+ .selectKey((key, event) -> event.getBlockHash())
.join(blockStream,
(event, block) -> {
EventBlockRecord eventblock = new EventBlockRecord();
-
- eventblock.setDetails(event.getDetails());
- eventblock.setDetailsBlock(block.getDetails());
+ eventblock.setEvent(event);
+ eventblock.setBlock(block);
eventblock.setId(event.getId());
eventblock.setRetries(event.getRetries());
- eventblock.setType(event.getType());
-
return eventblock;
},
Joined.with(Serdes.String(), Web3MonitoringSerdes.getEventSerde(), Web3MonitoringSerdes.getBlockSerde())
@@ -48,16 +45,14 @@ public static KStream joinEventWithBlock(KStream joinViewWithBlock(KStream viewStream, KTable blockStream) {
return viewStream
- .selectKey((key, view) -> view.getDetails().getBlockHash())
+ .selectKey((key, view) -> view.getBlockHash())
.join(blockStream,
(view, block) -> {
ViewBlockRecord viewBlock = new ViewBlockRecord();
-
- viewBlock.setDetails(view.getDetails());
- viewBlock.setDetailsBlock(block.getDetails());
viewBlock.setId(view.getId());
viewBlock.setRetries(view.getRetries());
- viewBlock.setType(view.getType());
+ viewBlock.setBlock(block);
+ viewBlock.setView(view);
return viewBlock;
},
Joined.with(Serdes.String(), Web3MonitoringSerdes.getViewSerde(), Web3MonitoringSerdes.getBlockSerde())
@@ -77,13 +72,13 @@ public static KStream transformToTimeSeries(KStream {
- List