Skip to content

Commit

Permalink
Merge remote-tracking branch 'elastic/master' into pr/31285
Browse files Browse the repository at this point in the history
* elastic/master: (29 commits)
  [DOC] Extend SQL docs
  Immediately flush channel after writing to buffer (elastic#31301)
  [DOCS] Shortens ML API intros
  Use quotes in the call invocation (elastic#31249)
  move security ingest processors to a sub ingest directory (elastic#31306)
  Add 5.6.11 version constant.
  Fix version detection.
  SQL: Whitelist SQL utility class for better scripting (elastic#30681)
  [Docs] All Rollup docs experimental, agg limitations, clarify DeleteJob (elastic#31299)
  CCS: don't proxy requests for already connected node (elastic#31273)
  Mute ScriptedMetricAggregatorTests testSelfReferencingAggStateAfterMap
  [test] opensuse packaging turn up debug logging
  Add unreleased version 6.3.1
  Removes experimental tag from scripted_metric aggregation (elastic#31298)
  [Rollup] Metric config parser must use builder so validation runs (elastic#31159)
  [ML] Check licence when datafeeds use cross cluster search  (elastic#31247)
  Add notion of internal index settings (elastic#31286)
  Test: Remove broken yml test feature (elastic#31255)
  REST hl client: cluster health to default to cluster level (elastic#31268)
  [ML] Update test thresholds to account for changes to memory control (elastic#31289)
  ...
  • Loading branch information
jasontedor committed Jun 14, 2018
2 parents 9d82a67 + 870a913 commit 3770e05
Show file tree
Hide file tree
Showing 185 changed files with 2,835 additions and 2,460 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ task verifyVersions {
new URL('https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch/maven-metadata.xml').openStream().withStream { s ->
xml = new XmlParser().parse(s)
}
Set<Version> knownVersions = new TreeSet<>(xml.versioning.versions.version.collect { it.text() }.findAll { it ==~ /\d\.\d\.\d/ }.collect { Version.fromString(it) })
Set<Version> knownVersions = new TreeSet<>(xml.versioning.versions.version.collect { it.text() }.findAll { it ==~ /\d+\.\d+\.\d+/ }.collect { Version.fromString(it) })

// Limit the known versions to those that should be index compatible, and are not future versions
knownVersions = knownVersions.findAll { it.major >= bwcVersions.currentVersion.major - 1 && it.before(VersionProperties.elasticsearch) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class PrecommitTasks {
configProperties = [
suppressions: checkstyleSuppressions
]
toolVersion = 7.5
toolVersion = '8.10.1'
}

project.tasks.withType(Checkstyle) { task ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ public class AntFixture extends AntTask implements Fixture {
}

// the process is started (has a pid) and is bound to a network interface
// so now wait undil the waitCondition has been met
// so now evaluates if the waitCondition is successful
// TODO: change this to a loop?
boolean success
try {
success = waitCondition(this, ant) == false
success = waitCondition(this, ant)
} catch (Exception e) {
String msg = "Wait condition caught exception for ${name}"
logger.error(msg, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void testClusterHealthYellowClusterLevel() throws IOException {
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
request.level(ClusterHealthRequest.Level.CLUSTER);
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertYellowShards(response);
Expand Down Expand Up @@ -170,6 +169,7 @@ public void testClusterHealthYellowSpecificIndex() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest("index");
request.level(ClusterHealthRequest.Level.SHARDS);
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ public void testClusterHealth() {
healthRequest.level(level);
expectedParams.put("level", level.name().toLowerCase(Locale.ROOT));
} else {
expectedParams.put("level", "shards");
expectedParams.put("level", "cluster");
}
if (randomBoolean()) {
Priority priority = randomFrom(Priority.values());
Expand Down
2 changes: 1 addition & 1 deletion distribution/src/bin/elasticsearch-cli.bat
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ call "%~dp0elasticsearch-env.bat" || exit /b 1

if defined ES_ADDITIONAL_SOURCES (
for %%a in ("%ES_ADDITIONAL_SOURCES:;=","%") do (
call %~dp0%%a
call "%~dp0%%a"
)
)

Expand Down
1 change: 1 addition & 0 deletions docs/java-rest/high-level/cluster/health.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wai
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-level]
--------------------------------------------------
<1> The level of detail of the returned health information. Accepts a `ClusterHealthRequest.Level` value.
Default value is `cluster`.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
[[search-aggregations-metrics-scripted-metric-aggregation]]
=== Scripted Metric Aggregation

experimental[]

A metric aggregation that executes using scripts to provide a metric output.

Example:
Expand Down
9 changes: 8 additions & 1 deletion docs/reference/migration/migrate_7_0/restclient.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,11 @@ header, e.g. `client.index(indexRequest)` becomes
`client.index(indexRequest, RequestOptions.DEFAULT)`.
In case you are specifying headers
e.g. `client.index(indexRequest, new Header("name" "value"))` becomes
`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());`
`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());`

==== Cluster Health API default to `cluster` level

The Cluster Health API used to default to `shards` level to ease migration
from transport client that doesn't support the `level` parameter and always
returns information including indices and shards details. The level default
value has been aligned with the Elasticsearch default level: `cluster`.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected void setSelectionKey(SelectionKey selectionKey) {
* @throws IOException during channel / context close
*/
public void closeFromSelector() throws IOException {
if (closeContext.isDone() == false) {
if (isOpen()) {
try {
rawChannel.close();
closeContext.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ protected void listenerException(Exception exception) {
}

/**
* This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a
* channel.
* This method is called after events (READ, WRITE, CONNECT) have been handled for a channel.
*
* @param context that was handled
*/
Expand Down
43 changes: 24 additions & 19 deletions libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
* {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
* of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
* this selector.
* <p>
* Children of this class should implement the specific {@link #processKey(SelectionKey)},
* {@link #preSelect()}, and {@link #cleanup()} functionality.
*/
public class NioSelector implements Closeable {

Expand All @@ -65,7 +62,7 @@ public NioSelector(EventHandler eventHandler) throws IOException {
this(eventHandler, Selector.open());
}

public NioSelector(EventHandler eventHandler, Selector selector) throws IOException {
public NioSelector(EventHandler eventHandler, Selector selector) {
this.selector = selector;
this.eventHandler = eventHandler;
}
Expand Down Expand Up @@ -165,7 +162,7 @@ void singleLoop() {
}

void cleanupAndCloseChannels() {
cleanup();
cleanupPendingWrites();
channelsToClose.addAll(channelsToRegister);
channelsToRegister.clear();
channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList()));
Expand Down Expand Up @@ -234,16 +231,6 @@ void preSelect() {
handleQueuedWrites();
}

/**
* Called once as the selector is being closed.
*/
void cleanup() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
}

/**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
* api available for non-selector threads to schedule writes.
Expand Down Expand Up @@ -284,20 +271,31 @@ public void scheduleForRegistration(NioChannel channel) {
}

/**
* Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed
* by the selector thread. As a result, this method should only be called by the selector thread.
* Queues a write operation directly in a channel's buffer. If this channel does not have pending writes
* already, the channel will be flushed. Channel buffers are only safe to be accessed by the selector
* thread. As a result, this method should only be called by the selector thread. If this channel does
* not have pending writes already, the channel will be flushed.
*
* @param writeOperation to be queued in a channel's buffer
*/
public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
public void writeToChannel(WriteOperation writeOperation) {
assertOnSelectorThread();
SocketChannelContext context = writeOperation.getChannel();
// If the channel does not currently have anything that is ready to flush, we should flush after
// the write operation is queued.
boolean shouldFlushAfterQueuing = context.readyForFlush() == false;
try {
SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
context.queueWriteOperation(writeOperation);
} catch (Exception e) {
shouldFlushAfterQueuing = false;
executeFailedListener(writeOperation.getListener(), e);
}

if (shouldFlushAfterQueuing) {
handleWrite(context);
eventHandler.postHandling(context);
}
}

/**
Expand Down Expand Up @@ -332,6 +330,13 @@ public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Excepti
}
}

private void cleanupPendingWrites() {
WriteOperation op;
while ((op = queuedWrites.poll()) != null) {
executeFailedListener(op.getListener(), new ClosedSelectorException());
}
}

private void wakeup() {
// TODO: Do we need the wakeup optimizations that some other libraries use?
selector.wakeup();
Expand Down Expand Up @@ -394,7 +399,7 @@ private void handleQueuedWrites() {
WriteOperation writeOperation;
while ((writeOperation = queuedWrites.poll()) != null) {
if (writeOperation.getChannel().isOpen()) {
queueWriteInChannelBuffer(writeOperation);
writeToChannel(writeOperation);
} else {
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void sendMessage(Object message, BiConsumer<Void, Exception> listener) {
return;
}

selector.queueWriteInChannelBuffer(writeOperation);
selector.writeToChannel(writeOperation);
}

public void queueWriteOperation(WriteOperation writeOperation) {
Expand Down Expand Up @@ -164,7 +164,7 @@ protected FlushOperation getPendingFlush() {
@Override
public void closeFromSelector() throws IOException {
getSelector().assertOnSelectorThread();
if (channel.isOpen()) {
if (isOpen()) {
ArrayList<IOException> closingExceptions = new ArrayList<>(3);
try {
super.closeFromSelector();
Expand Down
26 changes: 23 additions & 3 deletions libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,28 @@ public void testQueueWriteSuccessful() throws Exception {
public void testQueueDirectlyInChannelBufferSuccessful() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);

assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));

selector.queueWriteInChannelBuffer(writeOperation);
when(channelContext.readyForFlush()).thenReturn(true);
selector.writeToChannel(writeOperation);

verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
verify(eventHandler, times(0)).postHandling(channelContext);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
}

public void testShouldFlushIfNoPendingFlushes() throws Exception {
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);

assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));

when(channelContext.readyForFlush()).thenReturn(false);
selector.writeToChannel(writeOperation);

verify(channelContext).queueWriteOperation(writeOperation);
verify(eventHandler).handleWrite(channelContext);
verify(eventHandler).postHandling(channelContext);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
}

Expand All @@ -277,10 +294,13 @@ public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws
CancelledKeyException cancelledKeyException = new CancelledKeyException();

when(channelContext.getSelectionKey()).thenReturn(selectionKey);
when(channelContext.readyForFlush()).thenReturn(false);
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
selector.queueWriteInChannelBuffer(writeOperation);
selector.writeToChannel(writeOperation);

verify(channelContext, times(0)).queueWriteOperation(writeOperation);
verify(eventHandler, times(0)).handleWrite(channelContext);
verify(eventHandler, times(0)).postHandling(channelContext);
verify(listener).accept(null, cancelledKeyException);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() {
when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
context.sendMessage(buffers, listener);

verify(selector).queueWriteInChannelBuffer(writeOpCaptor.capture());
verify(selector).writeToChannel(writeOpCaptor.capture());
WriteOperation writeOp = writeOpCaptor.getValue();

assertSame(writeOperation, writeOp);
Expand Down
Loading

0 comments on commit 3770e05

Please sign in to comment.