Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE]nativity support PPL inside spark #30

Closed
YANG-DB opened this issue Sep 6, 2023 · 0 comments
Closed

[FEATURE]nativity support PPL inside spark #30

YANG-DB opened this issue Sep 6, 2023 · 0 comments
Labels
enhancement New feature or request

Comments

@YANG-DB
Copy link
Member

YANG-DB commented Sep 6, 2023

Is your feature request related to a problem?

The purpose of this issue is to propose an alternative for allowing to query spark using PPL query language.
The next concepts are the main purpose of introduction this functionality:

  • Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals)
  • Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
  • Seamlessly Interact with different datasources (S3 / Prometheus / data-lake) from within OpenSearch
  • Improve and promote PPL to become extensible and general purpose query language to be adopted by the community

Acknowledging spark is an excellent conduit for promoting these goals and showcasing the capabilities of PPL to interact & federate data across multiple sources and domains.

Another byproduct of introducing PPL on spark would be the much anticipated JOIN capability that will emerge from the usage of Spark compute engine.

What solution would you like?

For PPL to become a library which has a simple and easy means of importing and extending, PPL client (the thin API layer) which can interact and provide a generic query composition framework to be used in any type of application independently of OpenSearch plugins.

PPL endpoint

As depicted in the above image, the protocol & AST (antler based language traversals ) verticals should be detached and composed into a self sustainable component that can be imported regardless of OpenSearch plugins.


PPL On Spark

Running PPL on spark is a goal for allowing simple adoption of PPL query language and also for simplifying the Flint project to allow visualization for federated queries using the Observability dashboards capabilities.

Background

In Apache Spark, the DataFrame API serves as a programmatic interface for data manipulation and queries, allowing the construction of complex operations using a chain of method calls. This API can work in tandem with other query languages like SQL or PPL.

For instance, if you have a PPL query and a translator, you can convert it into DataFrame operations to generate an optimized execution plan. Spark's underlying Catalyst optimizer will convert these DataFrame transformations and actions into an optimized physical plan executed over RDDs or Datasets.

The following section describes the two main options for translating the PPL query (using the logical plan) into the spark corespondent component (either dataframe API or spark logical plan)

Translation Process

Using Dataframe API

The following PPL query:
search source=t'| where a=1

Would produce the next dataframe set of API method chaining:

// Equivalent of 'search source=t' 
val df = spark.read.format("some_format").load("some_path") 
// Equivalent of '| where a=1' 
val filteredDf = df.filter("a = 1") filteredDf.show()

The following PPL query:
source=t | stats count(a) by b

Would produce the next dataframe set of API method chaining:

// Equivalent of 'source=t'
val df = spark.read.format("some_format").load("some_path") 
// Equivalent of 'stats count(a) by b' 
val aggregatedDf = df.groupBy("b").agg(count("a")) 

Using Catalyst Logical Plan Grammar
Another Option for translation would be using the Catalyst Grammar for directly translating the Logical plan steps
Here is an example of such translation outcome:

Our goal would be translating the PPL into the Unresolved logical plan so that the Analysis phase would behave in the similar manner to the SQL originated query.

spark execution process

The following PPL query:
search source=t'| where a=1

Translates into the PPL logical plan:
Relation(tableName=t, alias=null), Compare(operator==, left=Field(field=a, fieldArgs=[]), right=1)

Would be transformed into the next catalyst Plan:

// Create an UnresolvedRelation for the table 't'
val table = UnresolvedRelation(TableIdentifier("t"))
// Create an EqualTo expression for "a == 1" 
val equalToCondition = EqualTo(UnresolvedAttribute("a"), ..Literal(1))
// Create a Filter LogicalPlan
val filterPlan = Filter(equalToCondition, table) 

The following PPL query:
source=t | stats count(a) by b

Would produce the next PPL Logical Plan":

Aggregation(aggExprList=[Alias(name=count(a), delegated=count(Field(field=a, fieldArgs=[])), alias=null)], 
sortExprList=[], groupExprList=[Alias(name=b, delegated=Field(field=b, fieldArgs=[]), alias=null)], span=null, argExprList=[Argument(argName=partitions, value=1), Argument(argName=allnum, value=false), Argument(argName=delim, value= ), Argument(argName=dedupsplit, value=false)], child=[Relation(tableName=t, alias=null)])

Would be transformed into the next catalyst Plan:

// Create an UnresolvedRelation for the table 't'
 val table = UnresolvedRelation(TableIdentifier("t"))
 // Create an Alias for the aggregation expression 'count(a)' 
val aggExpr = Alias(Count(UnresolvedAttribute("a")), "count(a)")() 
// Create an Alias for the grouping expression 'b' 
val groupExpr = Alias(UnresolvedAttribute("b"), "b")() 
// Create an Aggregate LogicalPlan val aggregatePlan = Aggregate(Seq(groupExpr), Seq(groupExpr, aggExpr), table) 

Design Options

In general when translating between two query languages we have the following options:

1) Source Grammar Tree To destination Dataframe API Translation
This option uses the syntax tree to directly translate from one language syntax grammar tree to the other language (dataframe) API thus eliminating the parsing phase and creating a strongly validated process that can be verified and tested with high degree of confidence.

Advantages :

  • Simpler solution to develop since the abstract structure of the query language is simpler to transform into compared with other transformation options. -using the build-in traverse visitor API
  • Optimization potential by leveraging the specific knowledge of the actual original language and being able to directly use specific grammar function and commands directly.

Disadvantages :

  • Fully depended on the Source Code of the target language including potentially internal structure of its grammatical components - In spark case this is not a severe disadvantage since this is a very well know and well structured API grammar.
  • Not sufficiently portable since this api is coupled with the

2) Source Logical Plan To destination Logical Plan (Catalyst) [Preferred Option]
This option uses the syntax tree to directly translate from one language syntax grammar tree to the other language syntax grammar tree thus eliminating the parsing phase and creating a strongly validated process that can be verified and tested with high degree of confidence.

Once the target plan is created - it can be analyzed and executed separately from the translations process (or location)

  SparkSession spark = SparkSession.builder()
                .appName("SparkExecuteLogicalPlan")
                .master("local")
                .getOrCreate();

        // catalyst logical plan - translated from PPL Logical plan
        Seq<NamedExpression> scalaProjectList = //... your project list
        LogicalPlan unresolvedTable = //... your unresolved table
        LogicalPlan projectNode = new Project(scalaProjectList, unresolvedTable);

        // Analyze and execute
        Analyzer analyzer = new Analyzer(spark.sessionState().catalog(), spark.sessionState().conf());
        LogicalPlan analyzedPlan = analyzer.execute(projectNode);
        LogicalPlan optimizedPlan = spark.sessionState().optimizer().execute(analyzedPlan);

        QueryExecution qe = spark.sessionState().executePlan(optimizedPlan);
        Dataset<Row> result = new Dataset<>(spark, qe, RowEncoder.apply(qe.analyzed().schema()));

Advantages :

  • A little more complex develop compared to the first option but still relatively simple since the abstract structure of the query language is simpler to transform into another’s language syntax grammar tree

  • Optimization potential by leveraging the specific knowledge of the actual original language and being able to directly use specific grammar function and commands directly.

Disadvantages :

  • Fully depended on the Source Code of the target language including potentially internal structure of its grammatical components - In spark case this is not a severe disadvantage since this is a very well know and well structured API grammar.
  • Add the additional phase for analyzing the logical plan and generating the physical plan and the execution part itself.

3) Source Grammar Tree To destination Query Translation
This option uses the syntax tree to from the original query language into the target query (SQL in our case). This is a more generalized solution that may be utilized for additional purposes such as direct query to an RDBMS server.

Advantages :

  • A general purpose solution that may be utilized for other SQL compliant servers

Disadvantages :

  • This is a more complicated use case since it requires additional layer of complexity to be able to correctly translate the original syntax tree to a textual representation of the outcome language that has to be parsed and verified
  • SQL plugin already support SQL so its not clear what is the advantage of translating PPL back to SQL since our plugin already supports SQL out of the box.

Do you have any additional context?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants