Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INGEST: Add Pipeline Processor #32473

Merged
merged 13 commits into from
Aug 29, 2018

Conversation

original-brownbear
Copy link
Member

Example:

Put inner and outer pipeline (with new processor):

PUT _ingest/pipeline/inner-pipeline
{
  "description" : "inner pipeline",
  "processors" : [
    {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
    },
    {
      "set" : {
        "field": "baz",
        "value": "blub"
      }
    }
  ]
}

PUT _ingest/pipeline/outer-pipeline
{
  "description" : "outer pipeline",
  "processors" : [
    {
      "pipeline" : {
        "pipeline": "inner-pipeline"
      }
    }
  ]
}

Add document:

POST /myindex/1?pipeline=outer-pipeline
{
  "field": "value"
}

Works:

POST /myindex/_search
{}

=>

{
  "took": 30,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "myindex",
        "_type": "1",
        "_id": "LSGj7GQBWAIRG1UBRP2D",
        "_score": 1,
        "_source": {
          "field": "value",
          "foo": "bar",
          "baz": "blub"
        }
      }
    ]
  }
}

* Adds Processor capable of invoking other pipelines
* Closes elastic#31842
@original-brownbear original-brownbear added >enhancement WIP :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v7.0.0 v6.5.0 labels Jul 30, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra

@original-brownbear
Copy link
Member Author

@rjernst @tsg @talevy @jakelandis

just a suggestion for the inter pipeline communication we talked about a while back. Maybe take a look if this kind of API is what we're looking for here, then I'd add tests and clean things up a bit. This version already works fine functionally at least I think :)

@rjernst
Copy link
Member

rjernst commented Jul 30, 2018

I think there are a couple things needed before this could be merged:

  • Validation - we should have validation when PUTting the outer pipeline that the name of the inner pipeline exists. Similar validation should exist when deleting a pipeline, to ensure a pipeline in use by a pipeline processor cannot be deleted.
  • Ingest services consolidation - I don't like that this adds essentially a volatile global to get access to the pipelines. It has always confused me that we have IngestService being essentially a dummy service, and the real logic being split between PipelineStore and PipelineExecutionService. I think these could all be consolidate together. Then, part of the Processor.Parameters could be the IngestService, which would have a public method for executing a pipeline.

Additionally, we need to think how this can be exposed as a method in painless. I've started discussions on tweaking how SPI works to possibly allow functors to be bound to a method in painless through the whitelist, so that the method call to say runPipeline("mypipeline", ctx) could be backed by a call to the ingest service.

@original-brownbear
Copy link
Member Author

@rjernst

Validation - we should have validation when PUTting the outer pipeline that the name of the inner pipeline exists. Similar validation should exist when deleting a pipeline, to ensure a pipeline in use by a pipeline processor cannot be deleted.

Sure that's a trivial if to add to the code :)

Ingest services consolidation - I don't like that this adds essentially a volatile global to get access to the pipelines. It has always confused me that we have IngestService being essentially a dummy service, and the real logic being split between PipelineStore and PipelineExecutionService. I think these could all be consolidate together. Then, part of the Processor.Parameters could be the IngestService, which would have a public method for executing a pipeline.

Def, that sounds good but would be a much larger refactoring. I just went with the most straightforward approach here for the POC. I can look into making that refactoring that if this is something we want now?

@jakelandis
Copy link
Contributor

I like the approach, it is pretty intuitive.

I assume that if you have a pipeline processor in the middle of your (outer) pipeline that control will eventually come back to the original (outer) pipeline, and finish out that pipeline's execution ?

Also, we should probably detect loops, preferably at time of creation... however that will be much more complicated when we get the if conditional and ability to call other pipelines from painless.

@original-brownbear
Copy link
Member Author

@jakelandis

I assume that if you have a pipeline processor in the middle of your (outer) pipeline that control will eventually come back to the original (outer) pipeline, and finish out that pipeline's execution ?

yea that should work just fine and transparently :)

Also, we should probably detect loops, preferably at time of creation... however that will be much more complicated when we get the if conditional and ability to call other pipelines from painless.

Yea this is a good one: I guess detecting loops if we also want to factor in conditionals is impossible (that's hypercomputation imo :P). I think theoretically we have the choice between not detecting this and allowing some crazy recursive pipelines or simply walking the pipeline graph and not allowing circles even if conditionals are present.
I'm strongly in favour of the latter. There's almost no reasonable use cases for the former that you couldn't also handle via Painless/scripting and allowing recursion would really allow for a lot of "shooting yourself in the foot" .

@jakelandis
Copy link
Contributor

simply walking the pipeline graph and not allowing circles even if conditionals are present
+1

We could, in-addition to walking the pipeline at creation, provide runtime checks such that no single processor can execute more then N times in the context of a single document (a circuit breaker of sorts). This would however, require maintaining some additional per document state accessible to all processors in the pipeline. I think this may be warranted, since "shooting yourself in the foot" here could mean a full stop of all ingest.

@original-brownbear
Copy link
Member Author

@jakelandis

provide runtime checks such that no single processor can execute more then N times in the context of a single document (a circuit breaker of sorts)

Right now that you mention it ... we have to have this imo. You can update pipelines ... so some up front check isn't sufficient here in preventing infinite loops.
I think tracking the "stacktrace" of executed pipelines isn't too hard though :) We're passing the same ingest document instance from pipeline to pipeline.
The most low-tech solution here could simply be tracking the "stack depth" of pipeline calls in an int and putting some limit on it. => Runtime checking should easy :)

@rjernst
Copy link
Member

rjernst commented Aug 1, 2018

I don't think we should have or need stack depth limits like that. We should simply not allow recursion at all? If we use IngestDocument to track the stack, and do not allow any recursion, then an identity hash set of the processors being executed should work. When a processor starts, it adds itself to the hash set, erroring if it already existed, then removes itself when it is done.

@original-brownbear
Copy link
Member Author

@rjernst

We should simply not allow recursion at all?

I'm fine either way. I don't see a good use case for recursion.
Tracking by processor (I guess it's fine to just track pipeline processor calls themselves here) should work out :) Will add that to the PR.

original-brownbear added a commit to original-brownbear/elasticsearch that referenced this pull request Aug 3, 2018
* Moves all pipeline state into the ingest service
   * Retains the existing pipeline store and pipeline execution service as inner classes to make the review easier, they should be flattened out in the next step
   * All tests for these classes were copied (and adapted) to the ingest service tests
* This is a refactoring step to enable a clean implementation of a pipeline processor (See elastic#32473)
@tsg
Copy link

tsg commented Aug 4, 2018

From the user perspective, I think this is a wonderful enhancement, enabling for example code reuse between Beats modules. It also fits nicely with the per-processor conditionals. I guess it's going to be possible to add a condition to the pipeline processor, once both features land, right?

+1 on forbidding recursion, that seems like the right decision to me.

original-brownbear added a commit that referenced this pull request Aug 21, 2018
* INGEST: Move all Pipeline State into IngestService

* Moves all pipeline state into the ingest service
   * Retains the existing pipeline store and pipeline execution service as inner classes to make the review easier, they should be flattened out in the next step
   * All tests for these classes were copied (and adapted) to the ingest service tests
* This is a refactoring step to enable a clean implementation of a pipeline processor (See #32473)
@original-brownbear
Copy link
Member Author

@rjernst alright, now that #32617 (consolidated IngestService) got merged I updated this and added the recursive invocation check.
Can you maybe take a look and let me know if the approach is ok now, then I'd add some UTs (ITs are there already) and we should be good here?

Copy link
Member

@rjernst rjernst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple comments, and yes, unit tests would be good.

import java.util.HashMap;
import java.util.Map;

public final class PipelineHolder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this accidentally added back? It shouldn't be necessary now right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjernst yea I apparently suck at merging :) Will remove

* @throws Exception On exception in pipeline execution
*/
public boolean executePipeline(Pipeline pipeline) throws Exception {
if (this.executedPipelines.add(pipeline) == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a set, I think there a call stack could be passed through, that doesn't need to be a member variable? I don't know ingest document coming more mutable than it already is. This method signature is also odd, as I would expect this to be an exception, but it looks like you are avoiding that because it would collide with exceptions that could be thrown from the pipeline itself? But this should fail the pipeline anyways, so I think it is ok to use an exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjernst I can't really pass the callstack with the document can I? I only have the execute method available to pass things (because any called pipeline itself could contain additional pipeline processors).

The reason I made this return boolean instead of throwing right away was more of a style thing to make it clear that the exception was triggered by the pipeline processor. But in hindsight this may be a little needlessly complex :) Moving it in here.

@original-brownbear
Copy link
Member Author

@rjernst questions answered, unit tests added :)

Copy link
Member

@rjernst rjernst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally suggesting changing the signature of execute on the ingest service to take in the stack, but I don't feel that strongly about it. LGTM.

@original-brownbear
Copy link
Member Author

@rjernst thanks! Will merge once CI passes :)

@original-brownbear original-brownbear merged commit f690b49 into elastic:master Aug 29, 2018
dnhatn added a commit that referenced this pull request Aug 29, 2018
* master:
  Painless: Add Bindings (#33042)
  Update version after client credentials backport
  Fix forbidden apis on FIPS (#33202)
  Remote 6.x transport BWC Layer for `_shrink` (#33236)
  Test fix - Graph HLRC tests needed another field adding to randomisation exception list
  HLRC: Add ML Get Records API (#33085)
  [ML] Fix character set finder bug with unencodable charsets (#33234)
  TESTS: Fix overly long lines (#33240)
  Test fix - Graph HLRC test was missing field name to be excluded from randomisation logic
  Remove unsupported group_shard_failures parameter (#33208)
  Update BucketUtils#suggestShardSideQueueSize signature (#33210)
  Parse PEM Key files leniantly (#33173)
  INGEST: Add Pipeline Processor (#32473)
  Core: Add java time xcontent serializers (#33120)
  Consider multi release jars when running third party audit (#33206)
  Update MSI documentation (#31950)
  HLRC: create base timed request class (#33216)
  [DOCS] Fixes command page titles
  HLRC: Move ML protocol classes into client ml package (#33203)
  Scroll queries asking for rescore are considered invalid (#32918)
  Painless: Fix Semicolon Regression (#33212)
  ingest: minor - update test to include dissect (#33211)
  Switch remaining LLREST usage to new style Requests (#33171)
  HLREST: add reindex API (#32679)
original-brownbear added a commit to original-brownbear/elasticsearch that referenced this pull request Sep 4, 2018
* INGEST: Add Pipeline Processor

* Adds Processor capable of invoking other pipelines
* Closes elastic#31842
original-brownbear added a commit that referenced this pull request Sep 4, 2018
* INGEST: Add Pipeline Processor (#32473)
* Adds Processor capable of invoking other pipelines
* Closes #31842
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants