Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Job in index: Datafeed node selector #34218

Merged

Conversation

davidkyle
Copy link
Member

Breaks the DatafeedNodeSelectors dependency on datafeed and job configuration defined in the cluster state.

Similar to #33994 the required configuration is added to the persistent task's parameters. TransportStartDatafeedAction now collects the required config and adds it to StartDatafeedAction.DatafeedParams. The new fields added to the params cannot be streamed in BWC safe way but this isn't an issue as TransportStartDatafeedAction is a master node action and StartDatafeedPersistentTasksExecutor.getAssignment is also only called on the master node (by the PersistentTasksClusterService).

DatafeedManager.run is changed to accept the config as a parameter rather than reading from the clusterstate, this has the additional benefit that the validated configs are used rather than re-reading the config some point after validation. However, the BWC breaks down here as the datafeed may start on a node that isn't upgraded (mixed cluster) and the required config will not have been streamed in StartDatafeedAction.DatafeedParams.

Discuss

I can think of 2 solutions to this, either re-read the missing config in DatafeedManager or prevent the datafeed from starting on an old node/in a mixed cluster state. I prefer the later as I think it will simplify the migration process. That change can sensibly be done as part of the migration/upgrade work.

@davidkyle davidkyle added >feature :ml Machine learning labels Oct 2, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core

Copy link
Contributor

@droberts195 droberts195 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can think of 2 solutions to this, either re-read the missing config in DatafeedManager or prevent the datafeed from starting on an old node/in a mixed cluster state.

I think the first of these solutions, re-read the missing config in DatafeedManager, would be more consistent with what we do today. Today, if a datafeed relocates from a failed node to another node then the latest updates to the datafeed config are picked up at the time of relocation. Putting the whole datafeed in the task params is changing this behaviour. That's not necessarily a problem as it's an edge case for which the current behaviour is undocumented. However, there are other problems with putting the whole datafeed config in the task params as I noted in the detailed comments, so I think it would be best not to.

@@ -194,6 +195,10 @@ public DatafeedParams(StreamInput in) throws IOException {
startTime = in.readVLong();
endTime = in.readOptionalLong();
timeout = TimeValue.timeValueMillis(in.readVLong());
if (in.getVersion().onOrAfter(Version.CURRENT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this should be a specific version, not Version.CURRENT. Once we get to a mixed version 7.2/7.3 cluster surely it would be OK for the master node to be on 7.2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CURRENT is a placeholder. This is to be merged into a feature branch that will merge into 6.x at some point in the future into in an unknown version. There is no version constant > 6.5.0 so it's CURRENT for now. OpenJobAction is the same, I'll add a checkbox to the meta issue so this isn't forgotten

@@ -248,6 +271,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeOptionalLong(endTime);
out.writeVLong(timeout.millis());
if (out.getVersion().onOrAfter(Version.CURRENT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, surely this should be a specific version?

if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalWriteable(datafeedConfig);
out.writeOptionalWriteable(job);
}
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems wrong that toXContent() persists different information to writeTo(). This means that the behaviour on full cluster restart is different to behaviour within a running cluster. If someone did a full cluster restart with started datafeeds I suspect that things wouldn't work on restart. (It should be possible to add a test in x-pack/qa/full-cluster-restart to confirm this.)

However, I think the solution to this problem is not to put the whole datafeed config in the X-Content representation. That creates the same problem we have now where search syntax that's been removed prevents reading the cluster state from disk.

This leads to the conclusion that instead of storing the whole datafeed config in the task params we should store just enough fields from it to validate allocation, and definitely not the search or aggregations. Then toXContent() can be consistent with writeTo().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems wrong that toXContent() persists different information to writeTo()

This is because the params do not have a lenient parser (see #33950), in 7 this won't be the case and the new fields can be written.

I suspect that things wouldn't work on restart

The feature branch isn't at a point where the QA tests can be run yet but true. This will have to be handled by the migration process in a future PR but given that the persistent task params cannot be modified the only solution may be to stop and restart the job & datafeed.

I choose to put the datafeed in the params as once the datafeed has been validated the same config should be used but this does add weight to the cluster state and is not good for the reasons mentioned above especially deprecated search & aggs config. I've changed it to use the minimal set of fields (job id & datafeed indices).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration process will only move datafeeds from cluster state to index when the entire cluster has been upgraded to whatever version this goes into (6.6 or 6.7). So if we can find the minimum node version in the cluster in toXContent() then we can write the extra fields into the X-Content representation only after the entire cluster has been upgraded. That will make full cluster restarts work in the case where the entire cluster is on 6.6/6.7 (and it is essential this works because some people will run 6.7 for a year after 7.0 is released).

So if job_id is null after a full cluster restart then that implies the cluster was not completely upgraded to 6.6/6.7, and hence the migration will not have started, and hence the information can be obtained from the MlMetadata in cluster state.

Copy link
Member Author

@davidkyle davidkyle Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning is thus:

  • The job id is required to find the job task related to the datafeed and hence make the allocation decision
  • If job_id is null then the task was not started from a migrated config/upgraded node. i.e. it is a running task from a full cluster restart or rolling upgrade
  • In that case the datafeed config should be present in the clusterstate and the job_id can be found that way
  • It is not possible to modify the persistent task parameters
  • The persistent task may migrate once again before the task is stopped due to node failure and once again job_id will have to be found via the datafeed config in the clusterstate
  • Therefore config cannot be removed from the clusterstate until the task has stopped
  • Migration should duplicate the config in an index document and remove it from clusterstate on datafeed stopping

That seems reasonable enough until we hit a datafeed job created in 6.4 which is not stopped before 8

So if we can find the minimum node version in the cluster in toXContent() ...

I can't see a way of doing this and it won't help with full cluster restarts from 6.5

I could add the code to read job_id from the datafeed config in the clusterstate in this PR but I have no mechanism of testing it yet and have not started on the migration code. My preference is to finish the work on making jobs run from the new config and make it stable then tackle this issue (and others) in the migration work.

@davidkyle
Copy link
Member Author

I made a change to read and re-validate the datafeed & job config in DatafeedJobBuilder.build so that config does not have to be part of the task params. There remains the problem that the job id will not be available on a full cluster restart and this will have to be dealt with in the migration process.

Copy link
Contributor

@droberts195 droberts195 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@davidkyle davidkyle merged commit 7697e34 into elastic:feature/jindex-6.x Oct 17, 2018
@davidkyle davidkyle deleted the datafeed-selector-jindex branch October 17, 2018 12:50
davidkyle added a commit to davidkyle/elasticsearch that referenced this pull request Oct 29, 2018
davidkyle added a commit that referenced this pull request Dec 18, 2018
* [ML] Job and datafeed mappings with index template (#32719)

Index mappings for the configuration documents

* [ML] Job config document CRUD operations (#32738)

* [ML] Datafeed config CRUD operations (#32854)

* [ML] Change JobManager to work with Job config in index  (#33064)

* [ML] Change Datafeed actions to read config from the config index (#33273)

* [ML] Allocate jobs based on JobParams rather than cluster state config (#33994)

* [ML] Return missing job error when .ml-config is does not exist (#34177)

* [ML] Close job in index (#34217)

* [ML] Adjust finalize job action to work with documents (#34226)

* [ML] Job in index: Datafeed node selector (#34218)

* [ML] Job in Index: Stop and preview datafeed (#34605)

* [ML] Delete job document (#34595)

* [ML] Convert job data remover to work with index configs (#34532)

* [ML] Job in index: Get datafeed and job stats from index (#34645)

* [ML] Job in Index: Convert get calendar events to index docs (#34710)

* [ML] Job in index: delete filter action (#34642)

This changes the delete filter action to search
for jobs using the filter to be deleted in the index
rather than the cluster state.

* [ML] Job in Index: Enable integ tests (#34851)

Enables the ml integration tests excluding the rolling upgrade tests and a lot of fixes to
make the tests pass again.

* [ML] Reimplement established model memory (#35500)

This is the 7.0 implementation of a master node service to
keep track of the native process memory requirement of each ML
job with an associated native process.

The new ML memory tracker service works when the whole cluster
is upgraded to at least version 6.6. For mixed version clusters
the old mechanism of established model memory stored on the job
in cluster state was used. This means that the old (and complex)
code to keep established model memory up to date on the job object
has been removed in 7.0.

Forward port of #35263

* [ML] Need to wait for shards to replicate in distributed test (#35541)

Because the cluster was expanded from 1 node to 3 indices would
initially start off with 0 replicas.  If the original node was
killed before auto-expansion to 1 replica was complete then
the test would fail because the indices would be unavailable.

* [ML] DelayedDataCheckConfig index mappings (#35646)

* [ML] JIndex: Restore finalize job action (#35939)

* [ML] Replace Version.CURRENT in streaming functions (#36118)

* [ML] Use 'anomaly-detector' in job config doc name (#36254)

* [ML] Job In Index: Migrate config from the clusterstate (#35834)

Migrate ML configuration from clusterstate to index for closed jobs
only once all nodes are v6.6.0 or higher

* [ML] Check groups against job Ids on update (#36317)

* [ML] Adapt to periodic persistent task refresh (#36633)

* [ML] Adapt to periodic persistent task refresh

If https://github.com/elastic/elasticsearch/pull/36069/files is
merged then the approach for reallocating ML persistent tasks
after refreshing job memory requirements can be simplified.
This change begins the simplification process.

* Remove AwaitsFix and implement TODO

* [ML] Default search size for configs

* Fix TooManyJobsIT.testMultipleNodes

Two problems:

1. Stack overflow during async iteration when lots of
   jobs on same machine
2. Not effectively setting search size in all cases

* Use execute() instead of submit() in MlMemoryTracker

We don't need a Future to wait for completion

* [ML][TEST] Fix NPE in JobManagerTests

* [ML] JIindex: Limit the size of bulk migrations (#36481)

* [ML] Prevent updates and upgrade tests (#36649)

* [FEATURE][ML] Add cluster setting that enables/disables config  migration (#36700)

This commit adds a cluster settings called `xpack.ml.enable_config_migration`.
The setting is `true` by default. When set to `false`, no config migration will
be attempted and non-migrated resources (e.g. jobs, datafeeds) will be able
to be updated normally.

Relates #32905

* [ML] Snapshot ml configs before migrating (#36645)

* [FEATURE][ML] Split in batches and migrate all jobs and datafeeds (#36716)

Relates #32905

* SQL: Fix translation of LIKE/RLIKE keywords (#36672)

* SQL: Fix translation of LIKE/RLIKE keywords

Refactor Like/RLike functions to simplify internals and improve query
 translation when chained or within a script context.

Fix #36039
Fix #36584

* Fixing line length for EnvironmentTests and RecoveryTests (#36657)

Relates #34884

* Add back one line removed by mistake regarding java version check and
COMPAT jvm parameter existence

* Do not resolve addresses in remote connection info (#36671)

The remote connection info API leads to resolving addresses of seed
nodes when invoked. This is problematic because if a hostname fails to
resolve, we would not display any remote connection info. Yet, a
hostname not resolving can happen across remote clusters, especially in
the modern world of cloud services with dynamically chaning
IPs. Instead, the remote connection info API should be providing the
configured seed nodes. This commit changes the remote connection info to
display the configured seed nodes, avoiding a hostname resolution. Note
that care was taken to preserve backwards compatibility with previous
versions that expect the remote connection info to serialize a transport
address instead of a string representing the hostname.

* [Painless] Add boxed type to boxed type casts for method/return (#36571)

This adds implicit boxed type to boxed types casts for non-def types to create asymmetric casting relative to the def type when calling methods or returning values. This means that a user calling a method taking an Integer can call it with a Byte, Short, etc. legally which matches the way def works. This creates consistency in the casting model that did not previously exist.

* SNAPSHOTS: Adjust BwC Versions in Restore Logic (#36718)

* Re-enables bwc tests with adjusted version conditions now that #36397 enables concurrent snapshots in 6.6+

* ingest: fix on_failure with Drop processor (#36686)

This commit allows a document to be dropped when a Drop processor
is used in the on_failure fork of the processor chain.

Fixes #36151

* Initialize startup `CcrRepositories` (#36730)

Currently, the CcrRepositoryManger only listens for settings updates
and installs new repositories. It does not install the repositories that
are in the initial settings. This commit, modifies the manager to
install the initial repositories. Additionally, it modifies the ccr
integration test to configure the remote leader node at startup, instead
of using a settings update.

* [TEST] fix float comparison in RandomObjects#getExpectedParsedValue

This commit fixes a test bug introduced with #36597. This caused some
test failure as stored field values comparisons would not work when CBOR
xcontent type was used.

Closes #29080

* [Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) as default `geo_shape` indexing approach (#35320)

This commit  exposes lucene's LatLonShape field as the
default type in GeoShapeFieldMapper. To use the new 
indexing approach, simply set "type" : "geo_shape" in 
the mappings without setting any of the strategy, precision, 
tree_levels, or distance_error_pct parameters. Note the 
following when using the new indexing approach:

* geo_shape query does not support querying by 
MULTIPOINT.
* LINESTRING and MULTILINESTRING queries do not 
yet support WITHIN relation.
* CONTAINS relation is not yet supported.
The tree, precision, tree_levels, distance_error_pct, 
and points_only parameters are deprecated.

* TESTS:Debug Log. IndexStatsIT#testFilterCacheStats

* ingest: support default pipelines + bulk upserts (#36618)

This commit adds support to enable bulk upserts to use an index's
default pipeline. Bulk upsert, doc_as_upsert, and script_as_upsert
are all supported.

However, bulk script_as_upsert has slightly surprising behavior since
the pipeline is executed _before_ the script is evaluated. This means
that the pipeline only has access the data found in the upsert field
of the script_as_upsert. The non-bulk script_as_upsert (existing behavior)
runs the pipeline _after_ the script is executed. This commit
does _not_ attempt to consolidate the bulk and non-bulk behavior for
script_as_upsert.

This commit also adds additional testing for the non-bulk behavior,
which remains unchanged with this commit.

fixes #36219

* Fix duplicate phrase in shrink/split error message (#36734)

This commit removes a duplicate "must be a" from the shrink/split error
messages.

* Deprecate types in get_source and exist_source (#36426)

This change adds a new untyped endpoint `{index}/_source/{id}` for both the
GET and the HEAD methods to get the source of a document or check for its
existance. It also adds deprecation warnings to RestGetSourceAction that emit
a warning when the old deprecated "type" parameter is still used. Also updating
documentation and tests where appropriate.

Relates to #35190

* Revert "[Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) as default `geo_shape` indexing approach (#35320)"

This reverts commit 5bc7822.

* Enhance Invalidate Token API (#35388)

This change:

- Adds functionality to invalidate all (refresh+access) tokens for all users of a realm
- Adds functionality to invalidate all (refresh+access)tokens for a user in all realms
- Adds functionality to invalidate all (refresh+access) tokens for a user in a specific realm
- Changes the response format for the invalidate token API to contain information about the 
   number of the invalidated tokens and possible errors that were encountered.
- Updates the API Documentation

After back-porting to 6.x, the `created` field will be removed from master as a field in the 
response

Resolves: #35115
Relates: #34556

* Add raw sort values to SearchSortValues transport serialization (#36617)

In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one.

This commit adds such information to the `SearchSortValues` and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST.

* Watcher: Ensure all internal search requests count hits (#36697)

In previous commits only the stored toXContent version of a search
request was using the old format. However an executed search request was
already disabling hit counts. In 7.0 hit counts will stay enabled by
default to allow for proper migration.

Closes #36177

* [TEST] Ensure shard follow tasks have really stopped.

Relates to #36696

* Ensure MapperService#getAllMetaFields elements order is deterministic (#36739)

MapperService#getAllMetaFields returns an array, which is created out of
an `ObjectHashSet`. Such set does not guarantee deterministic hash
ordering. The array returned by its toArray may be sorted differently
at each run. This caused some repeatability issues in our tests (see #29080)
as we pick random fields from the array of possible metadata fields,
but that won't be repeatable if the input array is sorted differently at
every run. Once setting the tests seed, hppc picks that up and the sorting is
deterministic, but failures don't repeat with the seed that gets printed out
originally (as a seed was not originally set).
See also https://issues.carrot2.org/projects/HPPC/issues/HPPC-173.

With this commit, we simply create a static sorted array that is used for
`getAllMetaFields`. The change is in production code but really affects
only testing as the only production usage of this method was to iterate
through all values when parsing fields in the high-level REST client code.
Anyways, this seems like a good change as returning an array would imply
that it's deterministically sorted.

* Expose Sequence Number based Optimistic Concurrency Control in the rest layer (#36721)

Relates #36148 
Relates #10708

* [ML] Mute MlDistributedFailureIT
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>feature :ml Machine learning
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants