From b8e079df76fd2fe022aa553d69486b59e7116b94 Mon Sep 17 00:00:00 2001 From: Laura Schanno Date: Tue, 3 Sep 2024 10:51:53 -0400 Subject: [PATCH 1/4] Replace usage of ScannerOptions Update SessionOptions to fully implement ScannerBase instead of extending ScannerOptions. This is done to avoid using ScannerOptions since it is not from Accumulo's public API. Part of work for #2443 --- .../src/main/java/datawave/util/TextUtil.java | 12 +- .../java/datawave/mr/bulk/RfileResource.java | 6 +- .../query/scheduler/PushdownFunction.java | 4 +- .../query/tables/BatchScannerSession.java | 5 +- .../datawave/query/tables/SessionOptions.java | 318 ++++++++++++++++-- 5 files changed, 308 insertions(+), 37 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/util/TextUtil.java b/warehouse/core/src/main/java/datawave/util/TextUtil.java index 00d60408648..e95c07c745b 100644 --- a/warehouse/core/src/main/java/datawave/util/TextUtil.java +++ b/warehouse/core/src/main/java/datawave/util/TextUtil.java @@ -108,11 +108,19 @@ public static String fromUtf8(byte[] bytes) { } } + /** + * Returns the bytes of the {@link Text}. This is guaranteed to return a byte array that is the full length of the text, and avoids that particular pitfall + * of {@link Text#getBytes()}. This method is more efficient than {@link Text#copyBytes()} in the case where the byte array returned by + * {@link Text#getBytes()} is already the length of the full data. + * + * @param text + * the text to return the bytes of + * @return the bytes + */ public static byte[] getBytes(Text text) { byte[] bytes = text.getBytes(); if (bytes.length != text.getLength()) { - bytes = new byte[text.getLength()]; - System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length); + bytes = text.copyBytes(); } return bytes; } diff --git a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileResource.java b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileResource.java index 0a10cf3f876..4ec6df8b401 100644 --- a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileResource.java +++ b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileResource.java @@ -108,12 +108,12 @@ public AccumuloResource setOptions(SessionOptions options) { if (log.isDebugEnabled()) { log.debug("Setting Options"); } - if (null != options.getConfiguration() && null != options.getConfiguration().getAccumuloPassword()) { + if (null != options.getQueryConfiguration() && null != options.getQueryConfiguration().getAccumuloPassword()) { if (log.isDebugEnabled()) { log.debug("Setting and configuration"); } - AccumuloHelper.setPassword(conf, options.getConfiguration().getAccumuloPassword().getBytes()); - BulkInputFormat.setMemoryInput(conf, getClient().whoami(), options.getConfiguration().getAccumuloPassword().getBytes(), tableName, + AccumuloHelper.setPassword(conf, options.getQueryConfiguration().getAccumuloPassword().getBytes()); + BulkInputFormat.setMemoryInput(conf, getClient().whoami(), options.getQueryConfiguration().getAccumuloPassword().getBytes(), tableName, auths.iterator().next()); ((RfileScanner) baseScanner).setConfiguration(conf); } diff --git a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java index 9f168e63e32..847d65a3c3e 100644 --- a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java +++ b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java @@ -6,7 +6,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -33,7 +32,6 @@ import datawave.core.common.connection.AccumuloConnectionFactory; import datawave.core.query.configuration.QueryData; import datawave.query.config.ShardQueryConfiguration; -import datawave.query.iterator.QueryIterator; import datawave.query.iterator.QueryOptions; import datawave.query.jexl.visitors.JexlStringBuildingVisitor; import datawave.query.planner.QueryPlan; @@ -118,7 +116,7 @@ public List apply(QueryData qd) { options.fetchColumnFamily(new Text(cf)); } - options.setQueryConfig(this.config); + options.setQueryConfiguration(this.config); chunks.add(new ScannerChunk(options, plan.getRanges(), qd, server)); } catch (Exception e) { diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java index c00e0decb3e..fd39e138e72 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java @@ -171,7 +171,7 @@ public Thread newThread(Runnable r) { } public BatchScannerSession(ScannerSession other) { - this(other.tableName, other.auths, other.sessionDelegator, other.maxResults, other.settings, other.options, other.ranges); + this(other.tableName, other.auths, other.sessionDelegator, other.maxResults, other.settings, other.ranges); } @@ -193,8 +193,7 @@ public BatchScannerSession(ScannerSession other) { * @param ranges * list of ranges */ - public BatchScannerSession(String tableName, Set auths, ResourceQueue delegator, int maxResults, Query settings, ScannerOptions options, - Collection ranges) { + public BatchScannerSession(String tableName, Set auths, ResourceQueue delegator, int maxResults, Query settings, Collection ranges) { super(tableName, auths, delegator, maxResults, settings); Preconditions.checkNotNull(delegator); diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/SessionOptions.java b/warehouse/query-core/src/main/java/datawave/query/tables/SessionOptions.java index 4303e13f5ff..9ff0d79e6e4 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/SessionOptions.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/SessionOptions.java @@ -1,49 +1,315 @@ package datawave.query.tables; -import java.util.Collection; +import static java.util.Objects.requireNonNull; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.clientImpl.ScannerOptions; -import org.apache.accumulo.core.dataImpl.thrift.IterInfo; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; -import com.google.common.collect.Lists; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; import datawave.query.config.ShardQueryConfiguration; +import datawave.util.TextUtil; /** - * Extension to allow an open constructor - * - * Justification: constructor + * An implementation of {@link ScannerBase} to be used instead of {@link org.apache.accumulo.core.clientImpl.ScannerOptions} since that class is not part of + * Accumulo's public API. */ -public class SessionOptions extends ScannerOptions { +public class SessionOptions implements ScannerBase { + + protected List scanIterators = new ArrayList<>(); + protected Map> scanIteratorOptions = new HashMap<>(); + protected SortedSet fetchedColumns = new TreeSet<>(); + protected long retryTimeout = Long.MAX_VALUE; + protected long batchTimeout = Long.MAX_VALUE; + protected SamplerConfiguration samplerConfig = null; + protected String classLoaderContext = null; + protected Map executionHints = new HashMap<>(); + protected ConsistencyLevel consistencyLevel = ConsistencyLevel.IMMEDIATE; + protected ShardQueryConfiguration queryConfig; + + protected ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + public SessionOptions() {} + + public SessionOptions(SessionOptions options) { + setOptions(this, options); + } + + protected static void setOptions(SessionOptions dst, SessionOptions src) { + synchronized (dst) { + synchronized (src) { + dst.scanIterators = new ArrayList<>(src.scanIterators); + if (!src.scanIteratorOptions.isEmpty()) { + dst.scanIteratorOptions = new HashMap<>(); + src.scanIteratorOptions.entrySet().forEach(e -> dst.scanIteratorOptions.put(e.getKey(), new HashMap<>(e.getValue()))); + } + dst.fetchedColumns = new TreeSet<>(src.fetchedColumns); + dst.retryTimeout = src.retryTimeout; + dst.batchTimeout = src.batchTimeout; + if (src.samplerConfig != null) { + dst.samplerConfig = new SamplerConfiguration(dst.samplerConfig.getSamplerClassName()); + dst.samplerConfig.setOptions(src.samplerConfig.getOptions()); + } + dst.classLoaderContext = src.classLoaderContext; + // executionHints is an immutable map, no copying required. + dst.executionHints = src.executionHints; + dst.consistencyLevel = src.consistencyLevel; + if (src.queryConfig != null) { + dst.queryConfig = new ShardQueryConfiguration(src.queryConfig); + } + } + } + } + + @Override + public synchronized void addScanIterator(IteratorSetting iterator) { + requireNonNull(iterator, "iterator setting is null"); + // Verify an iterator has not already been added with the same name or priority. + for (IterInfo info : scanIterators) { + if (info.name.equals(iterator.getName())) { + throw new IllegalArgumentException("Iterator name is already in use: " + iterator.getName()); + } + if (info.priority == iterator.getPriority()) { + throw new IllegalArgumentException("Iterator priority is already in use: " + iterator.getPriority()); + } + } + + scanIterators.add(new IterInfo(iterator)); + scanIteratorOptions.computeIfAbsent(iterator.getName(), name -> new HashMap<>()).putAll(iterator.getOptions()); + } + + @Override + public synchronized void removeScanIterator(String iteratorName) { + requireNonNull(iteratorName, "iterator name is null"); + if (!scanIterators.isEmpty()) { + for (IterInfo info : scanIterators) { + if (info.name.equals(iteratorName)) { + scanIterators.remove(info); + break; + } + } + scanIteratorOptions.remove(iteratorName); + } + } + + @Override + public synchronized void clearScanIterators() { + scanIterators.clear(); + scanIteratorOptions.clear(); + } + + /** + * Returns a copy of the server-side iterators in this {@link SessionOptions}. + * + * @return the iterators + */ + public List getIterators() { + // @formatter:off + return scanIterators.stream() + .map(info -> new IteratorSetting(info.priority, info.name, info.iteratorClass, scanIteratorOptions.get(info.name))) + .collect(Collectors.toList()); + // @formatter:on + } + + @Override + public void updateScanIteratorOption(String iteratorName, String key, String value) { + requireNonNull(iteratorName, "iterator name is null"); + requireNonNull(key, "key is null"); + requireNonNull(value, "value is null"); + scanIteratorOptions.computeIfAbsent(iteratorName, name -> new HashMap<>()).put(key, value); + } + + @Override + public synchronized void fetchColumnFamily(Text colFam) { + requireNonNull(colFam, "column family is null"); + Column column = new Column(TextUtil.getBytes(colFam), null, null); + fetchedColumns.add(column); + } + + @Override + public synchronized void fetchColumn(Text colFam, Text colQual) { + requireNonNull(colFam, "column family is null"); + requireNonNull(colQual, "column qualifier is null"); + Column column = new Column(TextUtil.getBytes(colFam), TextUtil.getBytes(colQual), null); + fetchedColumns.add(column); + } + + @Override + public void fetchColumn(IteratorSetting.Column column) { + requireNonNull(column, "column is null"); + fetchColumn(column.getColumnFamily(), column.getColumnQualifier()); + } + + /** + * Returns the set of columns that will be fetched. + * + * @return + */ + public synchronized SortedSet getFetchedColumns() { + return fetchedColumns; + } + + @Override + public synchronized void clearColumns() { + this.fetchedColumns.clear(); + } + + @Override + public Iterator> iterator() { + return null; + } + + @Override + public void setTimeout(long timeout, TimeUnit timeUnit) { + if (timeout < 0) { + throw new IllegalArgumentException("retry timeout must be a positive: " + timeout); + } + if (timeout == 0) { + this.retryTimeout = Long.MAX_VALUE; + } else { + this.retryTimeout = timeUnit.toMillis(timeout); + } + } + + @Override + public long getTimeout(TimeUnit timeUnit) { + return timeUnit.convert(retryTimeout, TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + // Nothing needs to be closed. + } + + @Override + public Authorizations getAuthorizations() { + throw new UnsupportedOperationException("No authorizations to return"); + } + + @Override + public synchronized void setSamplerConfiguration(SamplerConfiguration samplerConfig) { + this.samplerConfig = requireNonNull(samplerConfig, "samplerConfig is null"); + } + + @Override + public synchronized SamplerConfiguration getSamplerConfiguration() { + return samplerConfig; + } + + @Override + public synchronized void clearSamplerConfiguration() { + this.samplerConfig = null; + } - protected ShardQueryConfiguration config; + @Override + public void setBatchTimeout(long timeout, TimeUnit timeUnit) { + if (timeout < 0) { + throw new IllegalArgumentException("batch timeout must be a positive: " + timeout); + } + if (timeout == 0) { + this.batchTimeout = Long.MAX_VALUE; + } else { + this.batchTimeout = timeUnit.toMillis(timeout); + } + } + + @Override + public long getBatchTimeout(TimeUnit timeUnit) { + return timeUnit.convert(batchTimeout, TimeUnit.MILLISECONDS); + } + + @Override + public void setClassLoaderContext(String classLoaderContext) { + this.classLoaderContext = requireNonNull(classLoaderContext, "class loader context is null"); + } + + @Override + public void clearClassLoaderContext() { + this.classLoaderContext = null; + } - public SessionOptions() { - super(); + @Override + public String getClassLoaderContext() { + return this.classLoaderContext; } - public SessionOptions(SessionOptions other) { - super(other); - this.config = other.config; + @Override + public void setExecutionHints(Map hints) { + this.executionHints = Map.copyOf(requireNonNull(hints, "hints is null")); } - public void setQueryConfig(ShardQueryConfiguration config) { - this.config = config; + @Override + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; } - public ShardQueryConfiguration getConfiguration() { - return config; + @Override + public void setConsistencyLevel(ConsistencyLevel level) { + this.consistencyLevel = requireNonNull(level, "consistency level is null"); } - public Collection getIterators() { + /** + * Returns the query configuration that will be used by this scanner. + * + * @return the configuration + */ + public ShardQueryConfiguration getQueryConfiguration() { + return queryConfig; + } + + /** + * Set the query configuration that will be used by this scanner. + * + * @param queryConfig + * the configuration + */ + public void setQueryConfiguration(ShardQueryConfiguration queryConfig) { + this.queryConfig = queryConfig; + } + + private class IterInfo { + private final String name; + private final String iteratorClass; + private final int priority; + + public IterInfo(IteratorSetting iterator) { + this.name = iterator.getName(); + this.iteratorClass = iterator.getIteratorClass(); + this.priority = iterator.getPriority(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IterInfo iterInfo = (IterInfo) o; + return priority == iterInfo.priority && Objects.equals(name, iterInfo.name) && Objects.equals(iteratorClass, iterInfo.iteratorClass); + } - Collection settings = Lists.newArrayList(); - for (IterInfo iter : serverSideIteratorList) { - IteratorSetting setting = new IteratorSetting(iter.getPriority(), iter.getIterName(), iter.getClassName()); - setting.addOptions(serverSideIteratorOptions.get(iter.getIterName())); - settings.add(setting); + @Override + public int hashCode() { + return Objects.hash(name, iteratorClass, priority); } - return settings; } } From a8398215044cf28ac8e8a8b1d62dd55696034179 Mon Sep 17 00:00:00 2001 From: Laura Schanno Date: Fri, 6 Sep 2024 10:54:09 -0400 Subject: [PATCH 2/4] Fix javadoc --- .../main/java/datawave/query/tables/BatchScannerSession.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java index fd39e138e72..641a8d7c79c 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java @@ -188,8 +188,6 @@ public BatchScannerSession(ScannerSession other) { * the max results * @param settings * the query settings - * @param options - * the scanner options * @param ranges * list of ranges */ From 012b36fc735b46d88a2323423b60c5dc102821e2 Mon Sep 17 00:00:00 2001 From: Laura Schanno Date: Mon, 4 Nov 2024 21:39:30 -0500 Subject: [PATCH 3/4] Restore LocalBatchScanner --- .../query/scanner/LocalBatchScanner.java | 161 ++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 warehouse/query-core/src/test/java/datawave/query/scanner/LocalBatchScanner.java diff --git a/warehouse/query-core/src/test/java/datawave/query/scanner/LocalBatchScanner.java b/warehouse/query-core/src/test/java/datawave/query/scanner/LocalBatchScanner.java new file mode 100644 index 00000000000..39cd561e1ad --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/scanner/LocalBatchScanner.java @@ -0,0 +1,161 @@ +package datawave.query.scanner; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.thrift.IterInfo; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder; +import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; +import org.apache.accumulo.core.security.Authorizations; + +import datawave.query.iterator.SortedListKeyValueIterator; +import datawave.query.tables.SessionOptions; + +public class LocalBatchScanner extends SessionOptions implements BatchScanner { + private SortedListKeyValueIterator itr; + private Collection ranges; + private boolean statsEnabled = false; + private StatsIterator statsIterator; + + public LocalBatchScanner(SortedListKeyValueIterator itr) { + this(itr, false); + } + + public LocalBatchScanner(SortedListKeyValueIterator itr, boolean statsEnabled) { + this.itr = itr; + this.statsEnabled = statsEnabled; + } + + public long getNextCount() { + return statsIterator == null ? -1 : statsIterator.getNextCount(); + } + + public long getSeekCount() { + return statsIterator == null ? -1 : statsIterator.getSeekCount(); + } + + @Override + public Iterator> iterator() { + Collections.sort(this.scanIterators, (o1, o2) -> { + if (o1.getPriority() < o2.getPriority()) { + return -1; + } else if (o1.getPriority() > o2.getPriority()) { + return 1; + } else { + return 0; + } + }); + + SortedKeyValueIterator base = this.itr; + IteratorEnvironment env = new LocalIteratorEnvironment(); + + if (statsEnabled) { + statsIterator = new StatsIterator(); + try { + statsIterator.init(base, Collections.emptyMap(), env); + base = statsIterator; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // Make a copy of the scan iterators list using accumulo's IterInfo implementation. This should be a temporary solution until the usage of + // IteratorBuilder is removed from Datawave since it's not part of accumulo's public API. + List serverSideIteratorList = this.scanIterators.stream() + .map(info -> new org.apache.accumulo.core.dataImpl.thrift.IterInfo(info.getPriority(), info.getIteratorClass(), info.getName())) + .collect(Collectors.toList()); + + IteratorBuilder iteratorBuilder = IteratorBuilder.builder(serverSideIteratorList).opts(this.scanIteratorOptions).env(env).build(); + + List> list = new ArrayList<>(); + try { + SortedKeyValueIterator created = IteratorConfigUtil.loadIterators(base, iteratorBuilder); + List columns = new ArrayList<>(); + for (Column c : fetchedColumns) { + columns.add(new ArrayByteSequence(c.columnFamily)); + } + + for (Range range : ranges) { + created.seek(range, columns, true); + while (created.hasTop()) { + list.add(new AbstractMap.SimpleImmutableEntry<>(created.getTopKey(), created.getTopValue())); + created.next(); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return list.iterator(); + } + + @Override + public void setRanges(Collection ranges) { + this.ranges = ranges; + } + + public static class LocalIteratorEnvironment implements IteratorEnvironment { + @Override + public IteratorUtil.IteratorScope getIteratorScope() { + return IteratorUtil.IteratorScope.scan; + } + + @Override + public boolean isUserCompaction() { + return false; + } + + @Override + public boolean isFullMajorCompaction() { + return false; + } + + @Override + public Authorizations getAuthorizations() { + return new Authorizations(); + } + } + + public static class StatsIterator extends WrappingIterator { + private long nextCount = 0; + private long seekCount = 0; + + @Override + public void next() throws IOException { + super.next(); + nextCount++; + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { + super.seek(range, columnFamilies, inclusive); + seekCount++; + } + + public long getNextCount() { + return nextCount; + } + + public long getSeekCount() { + return seekCount; + } + } +} From a5b5550c865b0ab3bf0adea375919606069cc4b4 Mon Sep 17 00:00:00 2001 From: Laura Schanno Date: Mon, 4 Nov 2024 23:50:01 -0500 Subject: [PATCH 4/4] Precisely replicate copy/initialize behavior from ScannerOptions --- .../datawave/query/tables/SessionOptions.java | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/SessionOptions.java b/warehouse/query-core/src/main/java/datawave/query/tables/SessionOptions.java index c3fdb47d4ed..67e741b8281 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/SessionOptions.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/SessionOptions.java @@ -3,6 +3,7 @@ import static java.util.Objects.requireNonNull; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -33,14 +34,14 @@ */ public class SessionOptions implements ScannerBase { - protected List scanIterators = new ArrayList<>(); - protected Map> scanIteratorOptions = new HashMap<>(); + protected List scanIterators = Collections.emptyList(); + protected Map> scanIteratorOptions = Collections.emptyMap(); protected SortedSet fetchedColumns = new TreeSet<>(); protected long retryTimeout = Long.MAX_VALUE; protected long batchTimeout = Long.MAX_VALUE; protected SamplerConfiguration samplerConfig = null; protected String classLoaderContext = null; - protected Map executionHints = new HashMap<>(); + protected Map executionHints = Collections.emptyMap(); protected ConsistencyLevel consistencyLevel = ConsistencyLevel.IMMEDIATE; protected ShardQueryConfiguration queryConfig; @@ -56,24 +57,19 @@ protected static void setOptions(SessionOptions dst, SessionOptions src) { synchronized (dst) { synchronized (src) { dst.scanIterators = new ArrayList<>(src.scanIterators); + dst.scanIteratorOptions = new HashMap<>(); if (!src.scanIteratorOptions.isEmpty()) { - dst.scanIteratorOptions = new HashMap<>(); src.scanIteratorOptions.entrySet().forEach(e -> dst.scanIteratorOptions.put(e.getKey(), new HashMap<>(e.getValue()))); } dst.fetchedColumns = new TreeSet<>(src.fetchedColumns); dst.retryTimeout = src.retryTimeout; dst.batchTimeout = src.batchTimeout; - if (src.samplerConfig != null) { - dst.samplerConfig = new SamplerConfiguration(dst.samplerConfig.getSamplerClassName()); - dst.samplerConfig.setOptions(src.samplerConfig.getOptions()); - } + dst.samplerConfig = src.samplerConfig; dst.classLoaderContext = src.classLoaderContext; // executionHints is an immutable map, no copying required. dst.executionHints = src.executionHints; dst.consistencyLevel = src.consistencyLevel; - if (src.queryConfig != null) { - dst.queryConfig = new ShardQueryConfiguration(src.queryConfig); - } + dst.queryConfig = src.queryConfig; } } } @@ -81,6 +77,10 @@ protected static void setOptions(SessionOptions dst, SessionOptions src) { @Override public synchronized void addScanIterator(IteratorSetting iterator) { requireNonNull(iterator, "iterator setting is null"); + if (scanIterators.isEmpty()) { + scanIterators = new ArrayList<>(); + } + // Verify an iterator has not already been added with the same name or priority. for (IterInfo info : scanIterators) { if (info.name.equals(iterator.getName())) { @@ -92,6 +92,10 @@ public synchronized void addScanIterator(IteratorSetting iterator) { } scanIterators.add(new IterInfo(iterator)); + + if (scanIteratorOptions.isEmpty()) { + scanIteratorOptions = new HashMap<>(); + } scanIteratorOptions.computeIfAbsent(iterator.getName(), name -> new HashMap<>()).putAll(iterator.getOptions()); } @@ -111,8 +115,8 @@ public synchronized void removeScanIterator(String iteratorName) { @Override public synchronized void clearScanIterators() { - scanIterators.clear(); - scanIteratorOptions.clear(); + scanIterators = Collections.emptyList(); + scanIteratorOptions = Collections.emptyMap(); } /** @@ -173,7 +177,7 @@ public synchronized void clearColumns() { @Override public Iterator> iterator() { - return null; + throw new UnsupportedOperationException(); } @Override @@ -205,7 +209,7 @@ public Authorizations getAuthorizations() { @Override public synchronized void setSamplerConfiguration(SamplerConfiguration samplerConfig) { - this.samplerConfig = requireNonNull(samplerConfig, "samplerConfig is null"); + this.samplerConfig = requireNonNull(samplerConfig, "sampler config cannot be null"); } @Override @@ -237,7 +241,7 @@ public long getBatchTimeout(TimeUnit timeUnit) { @Override public void setClassLoaderContext(String classLoaderContext) { - this.classLoaderContext = requireNonNull(classLoaderContext, "class loader context is null"); + this.classLoaderContext = requireNonNull(classLoaderContext, "class loader context cannot be null"); } @Override @@ -252,7 +256,7 @@ public String getClassLoaderContext() { @Override public void setExecutionHints(Map hints) { - this.executionHints = Map.copyOf(requireNonNull(hints, "hints is null")); + this.executionHints = Map.copyOf(requireNonNull(hints, "hints cannot be null")); } public void applyExecutionHints(Map scanHints) { @@ -282,7 +286,7 @@ public ConsistencyLevel getConsistencyLevel() { @Override public void setConsistencyLevel(ConsistencyLevel level) { - this.consistencyLevel = requireNonNull(level, "consistency level is null"); + this.consistencyLevel = requireNonNull(level, "consistency level cannot be null"); } /**