diff --git a/solr/core/src/java/org/apache/solr/handler/LoggingStream.java b/solr/core/src/java/org/apache/solr/handler/LoggingStream.java
index e101ab68d9f..8839686c1e7 100644
--- a/solr/core/src/java/org/apache/solr/handler/LoggingStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/LoggingStream.java
@@ -71,6 +71,8 @@ public class LoggingStream extends TupleStream implements Expressible {
*/
private String filepath;
+ private Path filePath;
+
private int updateBatchSize;
private int batchNumber;
@@ -122,14 +124,14 @@ private void init(String filepath, TupleStream tupleSource) {
this.tupleSource = tupleSource;
}
- /** The name of the file being updated */
- protected String getFilePath() {
- return filepath;
+ /** The path of the file being logged to */
+ public Path getFilePath() {
+ return filePath;
}
@Override
public void open() throws IOException {
- Path filePath = chroot.resolve(filepath).normalize();
+ filePath = chroot.resolve(filepath).normalize();
if (!filePath.startsWith(chroot)) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "file to log to must be under " + chroot);
diff --git a/solr/core/src/java/org/apache/solr/handler/component/UBIComponent.java b/solr/core/src/java/org/apache/solr/handler/component/UBIComponent.java
index ca20ddef63a..ef644fecad3 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/UBIComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/UBIComponent.java
@@ -230,7 +230,7 @@ public void process(ResponseBuilder rb) throws IOException {
ubiQuery.setUserQuery(params.get(USER_QUERY));
ubiQuery.setApplication(params.get(APPLICATION));
- Object queryAttributes = params.get(QUERY_ATTRIBUTES);
+ String queryAttributes = params.get(QUERY_ATTRIBUTES);
if (queryAttributes != null && queryAttributes.toString().startsWith("{")) {
// Look up the original nested JSON format, typically passed in
@@ -241,8 +241,9 @@ public void process(ResponseBuilder rb) throws IOException {
@SuppressWarnings("rawtypes")
Map paramsProperties = (Map) jsonProperties.get("params");
if (paramsProperties.containsKey(QUERY_ATTRIBUTES)) {
- queryAttributes = paramsProperties.get(QUERY_ATTRIBUTES);
- ubiQuery.setQueryAttributes(queryAttributes);
+ @SuppressWarnings("rawtypes")
+ Map queryAttributesAsMap = (Map) paramsProperties.get(QUERY_ATTRIBUTES);
+ ubiQuery.setQueryAttributes(queryAttributesAsMap);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/UBIQuery.java b/solr/core/src/java/org/apache/solr/handler/component/UBIQuery.java
index 4642f32bd97..8605142029c 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/UBIQuery.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/UBIQuery.java
@@ -34,7 +34,10 @@ public class UBIQuery {
private String application;
private String queryId;
private String userQuery;
- private Object queryAttributes;
+
+ @SuppressWarnings("rawtypes")
+ private Map queryAttributes;
+
private String docIds;
public UBIQuery(String queryId) {
@@ -69,11 +72,13 @@ public void setUserQuery(String userQuery) {
this.userQuery = userQuery;
}
- public Object getQueryAttributes() {
+ @SuppressWarnings("rawtypes")
+ public Map getQueryAttributes() {
return queryAttributes;
}
- public void setQueryAttributes(Object queryAttributes) {
+ @SuppressWarnings("rawtypes")
+ public void setQueryAttributes(Map queryAttributes) {
this.queryAttributes = queryAttributes;
}
@@ -90,9 +95,14 @@ public Map toMap() {
@SuppressWarnings({"rawtypes", "unchecked"})
Map map = new HashMap();
map.put(UBIComponent.QUERY_ID, this.queryId);
- map.put(UBIComponent.APPLICATION, this.application);
- map.put(UBIComponent.USER_QUERY, this.userQuery);
+ if (this.application != null) {
+ map.put(UBIComponent.APPLICATION, this.application);
+ }
+ if (this.userQuery != null) {
+ map.put(UBIComponent.USER_QUERY, this.userQuery);
+ }
if (this.queryAttributes != null) {
+
ObjectMapper objectMapper = new ObjectMapper();
try {
map.put(
diff --git a/solr/core/src/test/org/apache/solr/handler/component/UBIComponentRecordingTest.java b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentRecordingTest.java
index 0a7c9dd7a82..d32483fbc20 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/UBIComponentRecordingTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentRecordingTest.java
@@ -91,6 +91,8 @@ public void testRecordingUBIQueries() throws Exception {
final QueryResponse queryResponse = cluster.getSolrClient().query(COLLECTION, queryParams);
final SolrDocumentList documents = queryResponse.getResults();
+
+
//
//
// final ModifiableSolrParams overrideParams = new ModifiableSolrParams();
@@ -102,6 +104,6 @@ public void testRecordingUBIQueries() throws Exception {
// .setLimit(0);
// QueryResponse queryResponse = req.process(cluster.getSolrClient(), COLLECTION);
// assertResponseFoundNumDocs(queryResponse, expectedResults);
- System.out.println(queryResponse);
+ //System.out.println(queryResponse);
}
}
diff --git a/solr/core/src/test/org/apache/solr/handler/component/UBIComponentStreamingQueriesTest.java b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentStreamingQueriesTest.java
index 50bee76c7f8..5ffac3bcd22 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/UBIComponentStreamingQueriesTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentStreamingQueriesTest.java
@@ -1,389 +1,368 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.solr.handler.component;
-import java.io.File;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import org.apache.commons.io.input.ReversedLinesFileReader;
-import org.apache.lucene.util.IOUtils;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.io.Lang;
+import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
+import org.apache.solr.client.solrj.io.stream.SelectStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.UpdateStream;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.request.json.JsonQueryRequest;
-import org.apache.solr.client.solrj.request.json.TermsFacetMap;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.client.solrj.response.json.BucketBasedJsonFacet;
-import org.apache.solr.client.solrj.response.json.BucketJsonFacet;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.core.SolrCore;
import org.apache.solr.embedded.JettySolrRunner;
-import org.junit.After;
-import org.junit.AfterClass;
+import org.apache.solr.handler.LoggingStream;
+import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Test;
-/**
- * Tests the ability for {@link UBIComponent} to stream the gathered query data to another Solr
- * index using Streaming Expressions.
- *
- *
This guy needs simplification!!!!!!!!! Needs to look more like some of the tests Joel wrote.
- */
+@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"})
public class UBIComponentStreamingQueriesTest extends SolrCloudTestCase {
- public static final String COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION =
- "collection_stream_ubi_queries_to_ubi_collection";
- public static final String COLLECTION_STREAM_UBI_QUERIES_TO_LOG =
- "collection_stream_ubi_queries_to_log";
- public static final String UBI_QUERIES_COLLECTION = "ubi_queries";
-
- /** One client per node */
- private static final List NODE_CLIENTS = new ArrayList<>(7);
- /**
- * clients (including cloud client) for easy randomization and looping of collection level
- * requests
- */
- private static final List CLIENTS = new ArrayList<>(7);
+ private static final String COLLECTIONORALIAS = "collection1";
+ private static final int TIMEOUT = DEFAULT_TIMEOUT;
+ private static final String id = "id";
- private static String zkHost;
+ private static boolean useAlias;
@BeforeClass
public static void setupCluster() throws Exception {
-
- final int numShards = usually() ? 2 : 1;
- final int numReplicas = usually() ? 2 : 1;
- final int numNodes = 1 + (numShards * numReplicas); // at least one node w/o any replicas
-
- // The configset ubi_enabled has the UBIComponent configured and set to log to a collection
- // called "ubi".
- // The ubi collection itself just depends on the typical _default configset.
- configureCluster(numNodes)
- .addConfig("ubi-enabled", configset("ubi-enabled"))
+ configureCluster(4)
.addConfig(
- "config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+ "conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
- zkHost = cluster.getZkServer().getZkAddress();
-
- CLIENTS.add(cluster.getSolrClient());
- for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
- final SolrClient c = getHttpSolrClient(jetty.getBaseUrl().toString());
- NODE_CLIENTS.add(c);
- CLIENTS.add(c);
+ String collection;
+ useAlias = random().nextBoolean();
+ if (useAlias) {
+ collection = COLLECTIONORALIAS + "_collection";
+ } else {
+ collection = COLLECTIONORALIAS;
}
- assertEquals(
- "failed to create collection " + COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION,
- 0,
- CollectionAdminRequest.createCollection(
- COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION,
- "ubi-enabled",
- numShards,
- numReplicas)
- .process(cluster.getSolrClient())
- .getStatus());
-
- cluster.waitForActiveCollection(
- COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION, numShards, numShards * numReplicas);
-
- assertEquals(
- "failed to create collection " + COLLECTION_STREAM_UBI_QUERIES_TO_LOG,
- 0,
- CollectionAdminRequest.createCollection(
- COLLECTION_STREAM_UBI_QUERIES_TO_LOG, "config", numShards, numReplicas)
- .process(cluster.getSolrClient())
- .getStatus());
-
- cluster.waitForActiveCollection(
- COLLECTION_STREAM_UBI_QUERIES_TO_LOG, numShards, numShards * numReplicas);
-
- assertEquals(
- "failed to create UBI queries collection",
- 0,
- CollectionAdminRequest.createCollection(
- UBI_QUERIES_COLLECTION, "_default", numShards, numReplicas)
- .process(cluster.getSolrClient())
- .getStatus());
-
- cluster.waitForActiveCollection(UBI_QUERIES_COLLECTION, numShards, numShards * numReplicas);
- }
+ CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
+ .process(cluster.getSolrClient());
+
+ cluster.waitForActiveCollection(collection, 2, 2);
- @AfterClass
- public static void closeClients() throws Exception {
- try {
- IOUtils.close(NODE_CLIENTS);
- } finally {
- NODE_CLIENTS.clear();
- CLIENTS.clear();
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish(
+ collection, cluster.getZkStateReader(), false, true, TIMEOUT);
+ if (useAlias) {
+ CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection)
+ .process(cluster.getSolrClient());
}
}
- @After
- public void clearCollection() throws Exception {
- assertEquals(
- "DBQ failed",
- 0,
- cluster
- .getSolrClient()
- .deleteByQuery(COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION, "*:*")
- .getStatus());
- assertEquals(
- "commit failed",
- 0,
- cluster
- .getSolrClient()
- .commit(COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION)
- .getStatus());
- assertEquals(
- "DBQ failed",
- 0,
- cluster.getSolrClient().deleteByQuery(UBI_QUERIES_COLLECTION, "*:*").getStatus());
- assertEquals(
- "commit failed", 0, cluster.getSolrClient().commit(UBI_QUERIES_COLLECTION).getStatus());
+ @Before
+ public void cleanIndex() throws Exception {
+ new UpdateRequest().deleteByQuery("*:*").commit(cluster.getSolrClient(), COLLECTIONORALIAS);
}
- public void testCreatingStreamingExpression() {
- UBIQuery ubiQuery = new UBIQuery("5678");
- ubiQuery.setUserQuery("Apple Memory");
+ @Test
+ public void testUBIQueryStream() throws Exception {
+
+ UBIQuery ubiQuery;
+ StreamExpression expression;
+ TupleStream stream;
+ List tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
- String clause = getClause(ubiQuery);
- assertEquals(
- "Check the decoded version for ease of comparison",
- "commit(ubi,update(ubi,tuple(id=4.0,query_id=5678,user_query=Apple Memory)))",
- clause);
+ try (solrClientCache) {
+ streamContext.setSolrClientCache(solrClientCache);
+ StreamFactory factory =
+ new StreamFactory().withFunctionName("ubiQuery", UBIQueryStream.class);
+ // Basic test
+ ubiQuery = new UBIQuery("123");
+
+ expression = StreamExpressionParser.parse("ubiQuery()");
+ streamContext.put("ubi-query", ubiQuery);
+ stream = new UBIQueryStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(1, tuples.size());
+ assertFields(tuples, "query_id");
+ assertString(tuples.get(0), "query_id", "123");
+ // assertNotFields(tuples, "user_query", "event_attributes");
+
+ // Include another field to see what is returned
+ ubiQuery = new UBIQuery("234");
+ ubiQuery.setApplication("typeahead");
+
+ streamContext.put("ubi-query", ubiQuery);
+ stream = new UBIQueryStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(1, tuples.size());
+ assertFields(tuples, "query_id", "application");
+ assertString(tuples.get(0), "query_id", "234");
+ assertString(tuples.get(0), "application", "typeahead");
+
+ // Introduce event_attributes map of data
+ ubiQuery = new UBIQuery("345");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Map queryAttributes = new HashMap();
+ queryAttributes.put("attribute1", "one");
+ queryAttributes.put("attribute2", 2);
+ ubiQuery.setQueryAttributes(queryAttributes);
+
+ streamContext.put("ubi-query", ubiQuery);
+ stream = new UBIQueryStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(1, tuples.size());
+ assertFields(tuples, "query_id", "query_attributes");
+ assertString(tuples.get(0), "query_id", "345");
+ assertString(tuples.get(0), "query_attributes", "{\"attribute1\":\"one\",\"attribute2\":2}");
+ }
}
- public void testUsingStreamingExpressionDirectly() throws Exception {
+ @Test
+ public void testWritingToLogUbiQueryStream() throws Exception {
+ // Test that we can write out UBIQuery data cleanly to the jsonl file
+ UBIQuery ubiQuery = new UBIQuery("345");
+ ubiQuery.setUserQuery("Memory RAM");
+ ubiQuery.setApplication("typeahead");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Map queryAttributes = new HashMap();
+ queryAttributes.put("parsed_query", "memory OR ram");
+ queryAttributes.put("experiment", "secret");
+ queryAttributes.put("marginBoost", 2.1);
+ ubiQuery.setQueryAttributes(queryAttributes);
+
+ StreamExpression expression;
+ List tuples;
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+
+ try (solrClientCache) {
+ streamContext.setSolrClientCache(solrClientCache);
+ StreamFactory factory =
+ new StreamFactory()
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("ubiQuery", UBIQueryStream.class)
+ .withFunctionName("logging", LoggingStream.class);
+
+ expression = StreamExpressionParser.parse("logging(test.jsonl,ubiQuery())");
+ streamContext.put("ubi-query", ubiQuery);
+ streamContext.put("solr-core", findSolrCore());
+ LoggingStream stream = new LoggingStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ assertEquals(1, tuples.size());
+ assertFields(tuples, "totalIndexed");
+ assertLong(tuples.get(0), "totalIndexed", 1);
+
+ // Someday when we have parseJSON() streaming expression we can replace this.
+ Path filePath = stream.getFilePath();
+ try (ReversedLinesFileReader reader =
+ new ReversedLinesFileReader.Builder()
+ .setCharset(StandardCharsets.UTF_8)
+ .setPath(filePath)
+ .get()) {
+ String jsonLine = reader.readLine(); // Read the last line
+ assertNotNull(jsonLine);
+ ObjectMapper objectMapper = new ObjectMapper();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Map myObject = objectMapper.readValue(jsonLine, Map.class);
+ assertEquals(ubiQuery.getQueryId(), myObject.get("query_id"));
+ assertEquals(ubiQuery.getApplication(), myObject.get("application"));
+ // assertEquals(ubiQuery.getQueryAttributes(), myObject.get("query_attributes"));
+ assertEquals(
+ "{\"experiment\":\"secret\",\"marginBoost\":2.1,\"parsed_query\":\"memory OR ram\"}",
+ myObject.get("query_attributes"));
+ }
+ }
+ }
- UBIQuery ubiQuery = new UBIQuery("5678");
- ubiQuery.setUserQuery("Apple Memory");
+ @Test
+ public void testWritingToSolrUbiQueryStream() throws Exception {
+ // Test that we can write out UBIQuery, especially the queryAttributes map, to Solr collection
+ UBIQuery ubiQuery = new UBIQuery("345");
+ ubiQuery.setUserQuery("Memory RAM");
+ ubiQuery.setApplication("typeahead");
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Map queryAttributes = new HashMap();
+ queryAttributes.put("parsed_query", "memory OR ram");
+ queryAttributes.put("experiment", "secret");
+ queryAttributes.put("marginBoost", 2.1);
+ ubiQuery.setQueryAttributes(queryAttributes);
+
+ StreamExpression expression;
TupleStream stream;
List tuples;
StreamContext streamContext = new StreamContext();
SolrClientCache solrClientCache = new SolrClientCache();
- streamContext.setSolrClientCache(solrClientCache);
-
- StreamFactory streamFactory =
- new StreamFactory().withCollectionZkHost(UBI_QUERIES_COLLECTION, zkHost);
-
- Lang.register(streamFactory);
-
- String clause = getClause(ubiQuery);
- stream = streamFactory.constructStream(clause);
- stream.setStreamContext(streamContext);
- tuples = getTuples(stream);
- stream.close();
- solrClientCache.close();
-
- assertEquals("Total tuples returned", 1, tuples.size());
- Tuple tuple = tuples.get(0);
- assertEquals("1", tuple.getString(UpdateStream.BATCH_INDEXED_FIELD_NAME));
- assertEquals("1", tuple.getString("totalIndexed"));
-
- // Check the UBI collection
- final JsonQueryRequest requestFromUBICollection =
- new JsonQueryRequest().setQuery("id:4.0").setLimit(1);
-
- // Randomly grab a client, it shouldn't matter which is used to check UBI event.
- SolrClient client = getRandClient();
- final QueryResponse responseUBI =
- requestFromUBICollection.process(client, UBI_QUERIES_COLLECTION);
- try {
- assertEquals(0, responseUBI.getStatus());
- assertEquals(1, responseUBI.getResults().getNumFound());
- } catch (AssertionError e) {
- throw new AssertionError(responseUBI + " + " + client + " => " + e.getMessage(), e);
+ // String zkHost = cluster.getZkServer().getZkAddress();
+
+ try (solrClientCache) {
+ streamContext.setSolrClientCache(solrClientCache);
+ StreamFactory factory =
+ new StreamFactory()
+ .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress())
+ .withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("update", UpdateStream.class)
+ .withFunctionName("select", SelectStream.class)
+ .withFunctionName("ubiQuery", UBIQueryStream.class);
+
+ expression =
+ StreamExpressionParser.parse(
+ "update("
+ + COLLECTIONORALIAS
+ + ", batchSize=5, select(\n"
+ + " ubiQuery(),\n"
+ + " query_id as id,\n"
+ + " application,\n"
+ + " user_query,\n"
+ + " query_attributes\n"
+ + " ))");
+ streamContext.put("ubi-query", ubiQuery);
+ stream = new UpdateStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+
+ cluster.getSolrClient().commit(COLLECTIONORALIAS);
+
+ assertEquals(1, tuples.size());
+ Tuple t = tuples.get(0);
+ assertFalse(t.EOF);
+ assertEquals(1, t.get("batchIndexed"));
+ assertEquals(1L, t.get("totalIndexed"));
+
+ // Ensure that destinationCollection actually has the new ubi query docs.
+ expression =
+ StreamExpressionParser.parse(
+ "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,*\", sort=\"id asc\")");
+ stream = new CloudSolrStream(expression, factory);
+ stream.setStreamContext(streamContext);
+ tuples = getTuples(stream);
+ assertEquals(1, tuples.size());
+
+ Tuple tuple = tuples.get(0);
+ assertEquals(ubiQuery.getQueryId(), tuple.get("id"));
+ assertEquals(ubiQuery.getApplication(), tuple.get("application"));
+ assertEquals(
+ "{\"experiment\":\"secret\",\"marginBoost\":2.1,\"parsed_query\":\"memory OR ram\"}",
+ tuple.get("query_attributes"));
}
}
- private List getTuples(TupleStream tupleStream) throws IOException {
- tupleStream.open();
+ protected List getTuples(TupleStream tupleStream) throws IOException {
List tuples = new ArrayList<>();
- for (; ; ) {
- Tuple t = tupleStream.read();
- // log.info(" ... {}", t.fields);
- if (t.EOF) {
- break;
- } else {
+
+ try (tupleStream) {
+ tupleStream.open();
+ for (Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
tuples.add(t);
}
}
- tupleStream.close();
return tuples;
}
- private static String getClause(UBIQuery ubiQuery) {
- return "commit("
- + UBI_QUERIES_COLLECTION
- + ",update("
- + UBI_QUERIES_COLLECTION
- + ",tuple(id=4.0,"
- + toTuple(ubiQuery)
- + ")))";
+ protected void assertOrder(List tuples, int... ids) throws Exception {
+ assertOrderOf(tuples, "id", ids);
}
- public static String toTuple(UBIQuery ubiQuery) {
- return UBIComponent.QUERY_ID
- + "="
- + ubiQuery.getQueryId()
- + ","
- + UBIComponent.USER_QUERY
- + "="
- + ubiQuery.getUserQuery()
- + ","
- + UBIComponent.APPLICATION
- + "="
- + ubiQuery.getApplication();
+ protected void assertOrderOf(List tuples, String fieldName, int... ids) throws Exception {
+ int i = 0;
+ for (int val : ids) {
+ Tuple t = tuples.get(i);
+ String tip = t.getString(fieldName);
+ if (!tip.equals(Integer.toString(val))) {
+ throw new Exception("Found value:" + tip + " expecting:" + val);
+ }
+ ++i;
+ }
}
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void testRandomDocs() throws Exception {
-
- final UpdateRequest ureq = new UpdateRequest();
-
- ureq.add(sdoc("id", 1, "data_s", "data_1"));
- assertEquals(
- "add failed",
- 0,
- ureq.process(getRandClient(), COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION).getStatus());
- assertEquals(
- "commit failed",
- 0,
- getRandClient().commit(COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION).getStatus());
-
- // query our collection to generate a UBI event and then confirm it was recorded.
-
- String userQuery = "hot air";
- Map queryAttributes = new HashMap();
- queryAttributes.put("results_wanted", 1);
-
- final JsonQueryRequest req =
- new JsonQueryRequest()
- .setQuery("*:*")
- .setLimit(1)
- .withParam("ubi", "true")
- .withParam("query_id", "123")
- .withParam("user_query", userQuery)
- .withParam("query_attributes", queryAttributes);
-
- // Randomly grab a client, it shouldn't matter which is used to generate the query event.
- SolrClient client = getRandClient();
- final QueryResponse rsp = req.process(client, COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION);
- try {
- assertEquals(0, rsp.getStatus());
- assertEquals(1, rsp.getResults().getNumFound());
- } catch (AssertionError e) {
- throw new AssertionError(rsp + " + " + client + " => " + e.getMessage(), e);
+ public boolean assertString(Tuple tuple, String fieldName, String expected) throws Exception {
+ String actual = (String) tuple.get(fieldName);
+
+ if (!Objects.equals(expected, actual)) {
+ throw new Exception("Longs not equal:" + expected + " : " + actual);
}
- // Check the UBI collection
- final JsonQueryRequest requestUBI = new JsonQueryRequest().setQuery("id:49").setLimit(1);
-
- // Randomly grab a client, it shouldn't matter which is used, to check UBI event was actually
- // tracked.
- client = getRandClient();
- final QueryResponse responseUBI = requestUBI.process(client, UBI_QUERIES_COLLECTION);
- // try {
- assertEquals(0, responseUBI.getStatus());
- assertEquals(1, responseUBI.getResults().getNumFound());
- // } catch (AssertionError e) {
- // throw new AssertionError(responseUBI + " + " + client + " => " + e.getMessage(), e);
- // }
+ return true;
}
- public void randomDocs() throws Exception {
-
- // index some random documents, using a mix-match of batches, to various SolrClients
-
- final int uniqueMod = atLeast(43); // the number of unique sig values expected
- final int numBatches = atLeast(uniqueMod); // we'll add at least one doc per batch
- int docCounter = 0;
- for (int batchId = 0; batchId < numBatches; batchId++) {
- final UpdateRequest ureq = new UpdateRequest();
- final int batchSize = atLeast(2);
- for (int i = 0; i < batchSize; i++) {
- docCounter++;
- ureq.add(
- sdoc( // NOTE: No 'id' field, SignatureUpdateProcessor fills it in for us
- "data_s", (docCounter % uniqueMod)));
+ public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
+ long lv = (long) tuple.get(fieldName);
+ if (lv != l) {
+ throw new Exception("Longs not equal:" + l + " : " + lv);
+ }
+
+ return true;
+ }
+
+ protected void assertFields(List tuples, String... fields) throws Exception {
+ for (Tuple tuple : tuples) {
+ for (String field : fields) {
+ if (!tuple.getFields().containsKey(field)) {
+ throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field));
+ }
}
- assertEquals(
- "add failed",
- 0,
- ureq.process(getRandClient(), COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION)
- .getStatus());
}
- assertEquals(
- "commit failed",
- 0,
- getRandClient().commit(COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION).getStatus());
-
- assertTrue(docCounter > uniqueMod);
-
- // query our collection and confirm no duplicates on the signature field (using faceting)
- // Check every (node) for consistency...
- final JsonQueryRequest req =
- new JsonQueryRequest()
- .setQuery("*:*")
- .setLimit(0)
- .withFacet("data_facet", new TermsFacetMap("data_s").setLimit(uniqueMod + 1));
- for (SolrClient client : CLIENTS) {
- final QueryResponse rsp =
- req.process(client, COLLECTION_STREAM_UBI_QUERIES_TO_UBI_COLLECTION);
- try {
- assertEquals(0, rsp.getStatus());
- assertEquals(uniqueMod, rsp.getResults().getNumFound());
-
- final BucketBasedJsonFacet facet =
- rsp.getJsonFacetingResponse().getBucketBasedFacets("data_facet");
- assertEquals(uniqueMod, facet.getBuckets().size());
- for (BucketJsonFacet bucket : facet.getBuckets()) {
- assertEquals("Bucket " + bucket.getVal(), 1, bucket.getCount());
+ }
+
+ protected void assertNotFields(List tuples, String... fields) throws Exception {
+ for (Tuple tuple : tuples) {
+ for (String field : fields) {
+ if (tuple.getFields().containsKey(field)) {
+ throw new Exception(String.format(Locale.ROOT, "Unexpected field '%s' found", field));
}
- } catch (AssertionError e) {
- throw new AssertionError(rsp + " + " + client + " => " + e.getMessage(), e);
}
}
}
- /**
- * returns a random SolrClient -- either a CloudSolrClient, or an HttpSolrClient pointed at a node
- * in our cluster.
- */
- private static SolrClient getRandClient() {
- return CLIENTS.get(random().nextInt(CLIENTS.size()));
+ protected boolean assertGroupOrder(Tuple tuple, int... ids) throws Exception {
+ List> group = (List>) tuple.get("tuples");
+ int i = 0;
+ for (int val : ids) {
+ Map, ?> t = (Map, ?>) group.get(i);
+ Long tip = (Long) t.get("id");
+ if (tip.intValue() != val) {
+ throw new Exception("Found value:" + tip.intValue() + " expecting:" + val);
+ }
+ ++i;
+ }
+ return true;
}
- private static String readLastLineOfFile(File file) throws IOException {
- try (ReversedLinesFileReader reader =
- ReversedLinesFileReader.builder().setFile(file).setCharset(StandardCharsets.UTF_8).get()) {
- return reader.readLine();
+ private static SolrCore findSolrCore() {
+ for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
+ for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
+ if (solrCore != null) {
+ return solrCore;
+ }
+ }
}
+ throw new RuntimeException("Didn't find any valid cores.");
}
}