-
Notifications
You must be signed in to change notification settings - Fork 2
Home
Streaming MASSIF uses the concept of PipeLines to combine various processing components. It allows the flexibility to combine various components in various orders. In the original paper, the order was fixed to filtering, abstraction and then CEP, however, this order has now been relaxed.
There are 6 PipeLineElements:
- Sources: allow to define how the data should enter the platform. This ranges from setting up HTTP server accepting PUT requests, setting up websockets, or reading data from file.
- Sinks: allow to define how the platform should report its conclusions. HTTP Post, simple printing and websocket sinks are available.
- Windowing: allow to define a windowing functions.
- Filtering: allows to filter the incoming data through one or more SPARQL queries.
- Abstraction: allows to perform DL reasoning on the incoming data. One can define DL queries in manchester syntax.
- CEP: allows to perform Complex Event Processing on Class assertions present in the data. This is done through EPL definitions.
Each PipeLineElement should define its outputs, i.e. the PipeLineElements the output should be sent to. This forms a processing pipeline, check out the simplified API that allows to easily deploy a processing graph here.
In this simple example we define a sequential pipeline describing the orginial Streaming MASSIF architecture, i.e. an RSP layer for filtering, an abstraction layer and a CEP layer. Here we have decoupled the RSP layer as a Window and Filter component:
//define window
WindowInf window = new EsperWindow();
window.setWindowSize(1); //set the window size, as no slide parameter is given, the window is tumbling.
window.start();
//define filter
String query = "PREFIX : <http://streamreasoning.org/iminds/massif/> "
+ "CONSTRUCT{?work ?pred ?type.} "
+ "WHERE { ?work ?pred ?type. }";
FilterInf filter = new JenaFilter();
int filterQueryID=filter.registerContinuousQuery(query);
filter.start();
//define the abstraction layer
AbstractionInf abstractor = new HermitAbstractionComp();
//we load the ontology used for reasoning
OWLOntologyManager manager = OWLManager.createOWLOntologyManager();
OWLOntology ontology = manager.loadOntologyFromOntologyDocument(new StringDocumentSource(ONT_STRING));
abstractor.setOntology(ontology);
// register new DL queries
String classExpressiona = "Observation and (observedProperty some Sound)";
String classExpressionb = "Observation and (observedProperty some LightIntensity)";
String newHead = "http://massif.test/EventA";
String newHead2 = "http://massif.test/EventB";
int abstractionQueryID=abstractor.registerDLQuery(newHead, classExpressiona);
int abstractionQueryID2=abstractor.registerDLQuery(newHead2, classExpressionb);
//define CEP component
String querySub = "a=EventA -> b=EventB";
//for now we need to define all the types used in the query in the eventTypes, this will be soon fixed.
Set<String> eventTypes = new HashSet<String>();
eventTypes.add("EventA");
eventTypes.add("EventB");
String complexClass="http://massif.test/ComplexEvent";
CEPInf cep = new EsperCEPComp();
//this allows to define how the complex event should be composed.
//in this case, when the cep query matches, a blank node is created of the type complexClass and indMappins
//defines how the individuals that are asserted to the EventA and EventB classes are connected to the the created
//blank node.
Map<String,String> indMappings = new HashMap<String,String>();
indMappings.put("a","http://massif.test/hasComp");
indMappings.put("b","http://massif.test/hasComp");
int cepQueryID=cep.registerQuery(complexClass, querySub, eventTypes, indMappings);
cep.start();
// Let's compose the pipeline:
//we use a simple print sink
SinkInf printSink = new PrintSink();
//each component contains a PipeLineElement and defines a list of outputs.
PipeLineComponent sinkComp = new PipeLineComponent(printSink,null);
PipeLineComponent cepComp = new PipeLineComponent(cep,Collections.singletonList(sinkComp));
PipeLineComponent abstractionComp = new PipeLineComponent(abstractor,Collections.singletonList(cepComp));
PipeLineComponent filterComp = new PipeLineComponent(filter,Collections.singletonList(abstractionComp));
PipeLineComponent windowComp = new PipeLineComponent(window,Collections.singletonList(filterComp));