From 1e888f43611916412fb047780f0cceb5de15aa7e Mon Sep 17 00:00:00 2001 From: Maxim Kolchin Date: Sat, 28 Jun 2014 13:56:45 +0400 Subject: [PATCH] Merge changes from r196. Removed an unused import and dependency --- pom.xml | 12 +- .../org/deri/cqels/engine/CQELSEngine.java | 4 +- .../cqels/engine/IndexedTripleRouter.java | 1 - src/test/java/org/deri/cqels/QueryTest.java | 406 ++++++++++-------- 4 files changed, 231 insertions(+), 192 deletions(-) diff --git a/pom.xml b/pom.xml index 329714a..08c94e9 100644 --- a/pom.xml +++ b/pom.xml @@ -13,11 +13,6 @@ - - org.apache.jena - jena-arq - 2.9.3 - org.apache.jena jena-tdb @@ -26,18 +21,13 @@ com.sleepycat je - 5.0.73 + 4.1.10 com.espertech esper 4.2.0 - - org.hamcrest - hamcrest-core - 1.3 - junit junit diff --git a/src/main/java/org/deri/cqels/engine/CQELSEngine.java b/src/main/java/org/deri/cqels/engine/CQELSEngine.java index 1729a79..4fd8ba9 100644 --- a/src/main/java/org/deri/cqels/engine/CQELSEngine.java +++ b/src/main/java/org/deri/cqels/engine/CQELSEngine.java @@ -102,12 +102,12 @@ private String matchPattern(Quad quad) { private EnQuad encode(Quad quad) { return new EnQuad(encode(quad.getGraph()), encode(quad.getSubject()), - encode(quad.getSubject()), encode(quad.getObject())); + encode(quad.getPredicate()), encode(quad.getObject())); } private EnQuad encode(Node graph,Triple triple) { return new EnQuad(encode(graph), encode(triple.getSubject()), - encode(triple.getSubject()), encode(triple.getObject())); + encode(triple.getPredicate()), encode(triple.getObject())); } public long encode(Node node) { diff --git a/src/main/java/org/deri/cqels/engine/IndexedTripleRouter.java b/src/main/java/org/deri/cqels/engine/IndexedTripleRouter.java index e5d56dd..690bd2c 100644 --- a/src/main/java/org/deri/cqels/engine/IndexedTripleRouter.java +++ b/src/main/java/org/deri/cqels/engine/IndexedTripleRouter.java @@ -9,7 +9,6 @@ import org.deri.cqels.data.Mapping; import org.deri.cqels.engine.iterator.MappingIterator; import org.deri.cqels.lang.cqels.OpStream; -import org.hamcrest.core.IsInstanceOf; import com.hp.hpl.jena.graph.Node; import com.hp.hpl.jena.sparql.algebra.op.OpTriple; diff --git a/src/test/java/org/deri/cqels/QueryTest.java b/src/test/java/org/deri/cqels/QueryTest.java index f0227bd..44ffa7e 100644 --- a/src/test/java/org/deri/cqels/QueryTest.java +++ b/src/test/java/org/deri/cqels/QueryTest.java @@ -1,178 +1,228 @@ -package org.deri.cqels; - -import com.hp.hpl.jena.graph.Node; -import com.hp.hpl.jena.graph.Triple; -import com.hp.hpl.jena.sparql.core.Var; -import static com.jayway.awaitility.Awaitility.*; -import java.io.File; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.Callable; -import org.deri.cqels.data.Mapping; -import org.deri.cqels.engine.ContinuousListener; -import org.deri.cqels.engine.ContinuousSelect; -import org.deri.cqels.engine.ExecContext; -import org.deri.cqels.engine.RDFStream; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; -import org.junit.BeforeClass; -import org.junit.Test; - -public class QueryTest { - - private static final String STREAM_ID_PREFIX = "http://example.org/simpletest/test"; - private static final String CQELS_HOME = "cqels_home"; - private static ExecContext context; - - @BeforeClass - public static void beforeClass() { - File home = new File(CQELS_HOME); - if (!home.exists()) { - home.mkdir(); - } - context = new ExecContext(CQELS_HOME, true); - } - - @Test(timeout = 5000) - public void simple() { - final String STREAM_ID = STREAM_ID_PREFIX + "_1"; - RDFStream stream = new DefaultRDFStream(context, STREAM_ID); - - ContinuousSelect query = context.registerSelect("" - + "SELECT ?x ?y ?z WHERE {" - + "STREAM <" + STREAM_ID + "> [NOW] {?x ?y ?z}" - + "}"); - AssertListener listener = new AssertListener(); - query.register(listener); - - stream.stream(new Triple( - Node.createURI("http://example.org/resource/1"), - Node.createURI("http://example.org/ontology#hasValue"), - Node.createLiteral("123"))); - - List mappings = await().until(listener, hasSize(1)); - List nodes = toNodeList(mappings.get(0)); - assertEquals("http://example.org/resource/1", nodes.get(0).getURI()); - assertEquals("http://example.org/ontology#hasValue", - nodes.get(1).getURI()); - assertEquals("123", nodes.get(2).getLiteralValue()); - } - - @Test(timeout = 5000) - public void streamURIAsVar() { - final String STREAM_ID = STREAM_ID_PREFIX + "_1"; - RDFStream stream = new DefaultRDFStream(context, STREAM_ID); - - context.loadDefaultDataset( - "src/test/resources/org/deri/cqels/dataset.ttl"); - - ContinuousSelect query = context.registerSelect("" - + "SELECT ?x ?y ?z WHERE {" - + "STREAM ?stream [NOW] {?x ?y ?z}" - + " ?stream ." - + "}"); - AssertListener listener = new AssertListener(); - query.register(listener); - - stream.stream(new Triple( - Node.createURI("http://example.org/resource/1"), - Node.createURI("http://example.org/ontology#hasValue"), - Node.createLiteral("123"))); - - List mappings = await().until(listener, hasSize(1)); - List nodes = toNodeList(mappings.get(0)); - assertEquals(3, nodes.size()); - assertEquals("http://example.org/resource/1", nodes.get(0).getURI()); - assertEquals("http://example.org/ontology#hasValue", - nodes.get(1).getURI()); - assertEquals("123", nodes.get(2).getLiteralValue()); - } - - @Test - public void severalStreamsAsVarsFromDataset() throws InterruptedException { - RDFStream stream_1 = new DefaultRDFStream(context, - STREAM_ID_PREFIX + "_1"); - RDFStream stream_2 = new DefaultRDFStream(context, - STREAM_ID_PREFIX + "_2"); - - context.loadDefaultDataset( - "src/test/resources/org/deri/cqels/dataset.ttl"); - - ContinuousSelect query = context.registerSelect("" - + "SELECT ?x ?y ?z WHERE {" - + "STREAM ?stream [NOW] {?x ?y ?z}" - + "[] ?stream ." - + "}"); - AssertListener listener = new AssertListener(); - query.register(listener); - - stream_1.stream(new Triple( - Node.createURI("http://example.org/resource/1"), - Node.createURI("http://example.org/ontology#hasValue"), - Node.createLiteral("123"))); - stream_2.stream(new Triple( - Node.createURI("http://example.org/resource/2"), - Node.createURI("http://example.org/ontology#hasValue"), - Node.createLiteral("321"))); - - List mappings = await().until(listener, hasSize(2)); - - List nodes = toNodeList(mappings.get(0)); - assertEquals("http://example.org/resource/1", nodes.get(0).getURI()); - assertEquals("http://example.org/ontology#hasValue", - nodes.get(1).getURI()); - assertEquals("123", nodes.get(2).getLiteralValue()); - - nodes = toNodeList(mappings.get(1)); - assertEquals("http://example.org/resource/2", nodes.get(0).getURI()); - assertEquals("http://example.org/ontology#hasValue", - nodes.get(1).getURI()); - assertEquals("321", nodes.get(2).getLiteralValue()); - } - - private List toNodeList(Mapping mapping) { - List nodes = new ArrayList(); - for (Iterator vars = mapping.vars(); vars.hasNext();) { - final long id = mapping.get(vars.next()); - if (id > 0) { - nodes.add(context.engine().decode(id)); - } else { - nodes.add(null); - } - } - return nodes; - } - - private class AssertListener - implements ContinuousListener, Callable> { - - private final List mapping = Collections.synchronizedList( - new ArrayList()); - - @Override - public void update(Mapping mapping) { - this.mapping.add(mapping); - } - - @Override - public List call() throws Exception { - return mapping; - } - - } - - private class DefaultRDFStream extends RDFStream { - - public DefaultRDFStream(ExecContext context, String uri) { - super(context, uri); - } - - @Override - public void stop() { - } - - } - -} +package org.deri.cqels; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.sparql.core.Var; +import static com.jayway.awaitility.Awaitility.*; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import org.deri.cqels.data.Mapping; +import org.deri.cqels.engine.ConstructListener; +import org.deri.cqels.engine.ContinuousConstruct; +import org.deri.cqels.engine.ContinuousListener; +import org.deri.cqels.engine.ContinuousSelect; +import org.deri.cqels.engine.ExecContext; +import org.deri.cqels.engine.RDFStream; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; +import org.junit.BeforeClass; +import org.junit.Test; + +public class QueryTest { + + private static final String STREAM_ID_PREFIX = "http://example.org/simpletest/test"; + private static final String CQELS_HOME = "cqels_home"; + private static ExecContext context; + + @BeforeClass + public static void beforeClass() { + File home = new File(CQELS_HOME); + if (!home.exists()) { + home.mkdir(); + } + context = new ExecContext(CQELS_HOME, true); + } + + @Test(timeout = 5000) + public void simpleSelect() { + final String STREAM_ID = STREAM_ID_PREFIX + "_1"; + RDFStream stream = new DefaultRDFStream(context, STREAM_ID); + + ContinuousSelect query = context.registerSelect("" + + "SELECT ?x ?y ?z WHERE {" + + "STREAM <" + STREAM_ID + "> [NOW] {?x ?y ?z}" + + "}"); + SelectAssertListener listener = new SelectAssertListener(); + query.register(listener); + + stream.stream(new Triple( + Node.createURI("http://example.org/resource/1"), + Node.createURI("http://example.org/ontology#hasValue"), + Node.createLiteral("123"))); + + List mappings = await().until(listener, hasSize(1)); + List nodes = toNodeList(mappings.get(0)); + assertEquals("http://example.org/resource/1", nodes.get(0).getURI()); + assertEquals("http://example.org/ontology#hasValue", + nodes.get(1).getURI()); + assertEquals("123", nodes.get(2).getLiteralValue()); + } + + @Test(timeout = 5000) + public void streamURIAsVar() { + final String STREAM_ID = STREAM_ID_PREFIX + "_1"; + RDFStream stream = new DefaultRDFStream(context, STREAM_ID); + + context.loadDefaultDataset( + "src/test/resources/org/deri/cqels/dataset.ttl"); + + ContinuousSelect query = context.registerSelect("" + + "SELECT ?x ?y ?z WHERE {" + + "STREAM ?stream [NOW] {?x ?y ?z}" + + " ?stream ." + + "}"); + SelectAssertListener listener = new SelectAssertListener(); + query.register(listener); + + stream.stream(new Triple( + Node.createURI("http://example.org/resource/1"), + Node.createURI("http://example.org/ontology#hasValue"), + Node.createLiteral("123"))); + + List mappings = await().until(listener, hasSize(1)); + List nodes = toNodeList(mappings.get(0)); + assertEquals(3, nodes.size()); + assertEquals("http://example.org/resource/1", nodes.get(0).getURI()); + assertEquals("http://example.org/ontology#hasValue", + nodes.get(1).getURI()); + assertEquals("123", nodes.get(2).getLiteralValue()); + } + + @Test + public void severalStreamsAsVarsFromDataset() throws InterruptedException { + RDFStream stream_1 = new DefaultRDFStream(context, + STREAM_ID_PREFIX + "_1"); + RDFStream stream_2 = new DefaultRDFStream(context, + STREAM_ID_PREFIX + "_2"); + + context.loadDefaultDataset( + "src/test/resources/org/deri/cqels/dataset.ttl"); + + ContinuousSelect query = context.registerSelect("" + + "SELECT ?x ?y ?z WHERE {" + + "STREAM ?stream [NOW] {?x ?y ?z}" + + "[] ?stream ." + + "}"); + SelectAssertListener listener = new SelectAssertListener(); + query.register(listener); + + stream_1.stream(new Triple( + Node.createURI("http://example.org/resource/1"), + Node.createURI("http://example.org/ontology#hasValue"), + Node.createLiteral("123"))); + stream_2.stream(new Triple( + Node.createURI("http://example.org/resource/2"), + Node.createURI("http://example.org/ontology#hasValue"), + Node.createLiteral("321"))); + + List mappings = await().until(listener, hasSize(2)); + + List nodes = toNodeList(mappings.get(0)); + assertEquals("http://example.org/resource/1", nodes.get(0).getURI()); + assertEquals("http://example.org/ontology#hasValue", + nodes.get(1).getURI()); + assertEquals("123", nodes.get(2).getLiteralValue()); + + nodes = toNodeList(mappings.get(1)); + assertEquals("http://example.org/resource/2", nodes.get(0).getURI()); + assertEquals("http://example.org/ontology#hasValue", + nodes.get(1).getURI()); + assertEquals("321", nodes.get(2).getLiteralValue()); + } + + @Test(timeout = 10000) + public void simpleConstruct() { + final String STREAM_ID = STREAM_ID_PREFIX + "_1"; + RDFStream stream = new DefaultRDFStream(context, STREAM_ID); + + ContinuousConstruct query = context.registerConstruct("" + + "CONSTRUCT{?x ?y ?z} WHERE {" + + "STREAM <" + STREAM_ID + "> [NOW] {?x ?y ?z}" + + "}"); + ConstructAssertListener listener = new ConstructAssertListener( + context, STREAM_ID); + query.register(listener); + + stream.stream(new Triple( + Node.createURI("http://example.org/resource/1"), + Node.createURI("http://example.org/ontology#hasValue"), + Node.createLiteral("123"))); + + List graph = await().until(listener, hasSize(1)); + assertEquals("http://example.org/resource/1", + graph.get(0).getSubject().getURI()); + assertEquals("http://example.org/ontology#hasValue", + graph.get(0).getPredicate().getURI()); + assertEquals("123", graph.get(0).getObject().getLiteralLexicalForm()); + } + + private List toNodeList(Mapping mapping) { + List nodes = new ArrayList(); + for (Iterator vars = mapping.vars(); vars.hasNext();) { + final long id = mapping.get(vars.next()); + if (id > 0) { + nodes.add(context.engine().decode(id)); + } else { + nodes.add(null); + } + } + return nodes; + } + + private class SelectAssertListener + implements ContinuousListener, + Callable> { + + private final List mapping = Collections.synchronizedList( + new ArrayList()); + + @Override + public void update(Mapping mapping) { + this.mapping.add(mapping); + } + + @Override + public List call() throws Exception { + return mapping; + } + + } + + private class ConstructAssertListener extends ConstructListener + implements Callable> { + + private final List graph = Collections.synchronizedList( + new ArrayList()); + + public ConstructAssertListener(ExecContext context, String streamUri) { + super(context, streamUri); + } + + @Override + public void update(List graph) { + this.graph.addAll(graph); + } + + @Override + public List call() throws Exception { + return graph; + } + } + + private class DefaultRDFStream extends RDFStream { + + public DefaultRDFStream(ExecContext context, String uri) { + super(context, uri); + } + + @Override + public void stop() { + } + + } + +}