Skip to content

Commit

Permalink
Add ESQL operator (#1791)
Browse files Browse the repository at this point in the history
This pull request introduces the ES|QL operator. This operator, for now, 
supports only two main parameters: query and filter.
  • Loading branch information
dnhatn authored Oct 18, 2023
1 parent aba239a commit d82295a
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 36 deletions.
34 changes: 34 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3132,6 +3132,40 @@ Meta-data

The operation returns no meta-data.

esql
~~~~~~~~~~~~~

With the operation type ``esql`` you can execute `ES|QL query <https://www.elastic.co/guide/en/elasticsearch/reference/master/esql.html>`_.

Properties
""""""""""

* ``query`` (mandatory): An ES|QL query starts with a source command followed processing commands.
* ``filter`` (optional): A query filter defined in `Elasticsearch query DSL <https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html>`_.
* ``body`` (optional): The query body.

Example::

{
"name": "default",
"operation-type": "esql",
"query": "FROM logs-* | STATS count=count(*) BY agent.hostname | SORT count DESC | LIMIT 20",
"filter": {
"range": {
"timestamp": {
"gte": "now-1d/d",
"lte": "now/d"
}
}
}
}

Meta-data
"""""""""

* ``weight``: "weight" of an operation, in this case the number of retrieved pages.
* ``unit``: The unit in which to interpret ``weight``, in this case ``pages``.
* ``success``: A boolean indicating whether the query has succeeded.

.. _track_dependencies:

Expand Down
23 changes: 23 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def register_default_runners(config=None):
register_runner(track.OperationType.ClosePointInTime, ClosePointInTime(), async_runner=True)
register_runner(track.OperationType.Sql, Sql(), async_runner=True)
register_runner(track.OperationType.FieldCaps, FieldCaps(), async_runner=True)
register_runner(track.OperationType.Esql, Esql(), async_runner=True)

# This is an administrative operation but there is no need for a retry here as we don't issue a request
register_runner(track.OperationType.Sleep, Sleep(), async_runner=True)
Expand Down Expand Up @@ -2833,6 +2834,28 @@ def __repr__(self, *args, **kwargs):
return "field-caps"


class Esql(Runner):
async def __call__(self, es, params):
params, request_params, transport_params, headers = self._transport_request_params(params)
es = es.options(**transport_params)
query = mandatory(params, "query", self)
body = params.get("body", {})
body["query"] = query
query_filter = params.get("filter")
if query_filter:
body["filter"] = query_filter
if not bool(headers):
# counter-intuitive, but preserves prior behavior
headers = None
# disable eager response parsing - responses might be huge thus skewing results
es.return_raw_response()
await es.perform_request(method="POST", path="/_query", headers=headers, body=body, params=request_params)
return {"success": True, "unit": "ops", "weight": 1}

def __repr__(self, *args, **kwargs):
return "esql"


class RequestTiming(Runner, Delegator):
def __init__(self, delegate):
super().__init__(delegate=delegate)
Expand Down
75 changes: 39 additions & 36 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,44 +701,45 @@ class OperationType(Enum):
CompositeAgg = (18, AdminStatus.No, serverless.Status.Public)
WaitForCurrentSnapshotsCreate = (19, AdminStatus.No, serverless.Status.Internal)
Downsample = (20, AdminStatus.No, serverless.Status.Internal)
Esql = (21, AdminStatus.No, serverless.Status.Blocked)

# administrative actions
ForceMerge = (21, AdminStatus.Yes, serverless.Status.Internal)
ClusterHealth = (22, AdminStatus.Yes, serverless.Status.Internal)
PutPipeline = (23, AdminStatus.Yes, serverless.Status.Public)
Refresh = (24, AdminStatus.Yes, serverless.Status.Public)
CreateIndex = (25, AdminStatus.Yes, serverless.Status.Public)
DeleteIndex = (26, AdminStatus.Yes, serverless.Status.Public)
CreateIndexTemplate = (27, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIndexTemplate = (28, AdminStatus.Yes, serverless.Status.Blocked)
ShrinkIndex = (29, AdminStatus.Yes, serverless.Status.Blocked)
CreateMlDatafeed = (30, AdminStatus.Yes, serverless.Status.Public)
DeleteMlDatafeed = (31, AdminStatus.Yes, serverless.Status.Public)
StartMlDatafeed = (32, AdminStatus.Yes, serverless.Status.Public)
StopMlDatafeed = (33, AdminStatus.Yes, serverless.Status.Public)
CreateMlJob = (34, AdminStatus.Yes, serverless.Status.Public)
DeleteMlJob = (35, AdminStatus.Yes, serverless.Status.Public)
OpenMlJob = (36, AdminStatus.Yes, serverless.Status.Public)
CloseMlJob = (37, AdminStatus.Yes, serverless.Status.Public)
Sleep = (38, AdminStatus.Yes, serverless.Status.Public)
DeleteSnapshotRepository = (39, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshotRepository = (40, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshot = (41, AdminStatus.Yes, serverless.Status.Internal)
RestoreSnapshot = (42, AdminStatus.Yes, serverless.Status.Internal)
PutSettings = (43, AdminStatus.Yes, serverless.Status.Internal)
CreateTransform = (44, AdminStatus.Yes, serverless.Status.Public)
StartTransform = (45, AdminStatus.Yes, serverless.Status.Public)
WaitForTransform = (46, AdminStatus.Yes, serverless.Status.Public)
DeleteTransform = (47, AdminStatus.Yes, serverless.Status.Public)
CreateDataStream = (48, AdminStatus.Yes, serverless.Status.Public)
DeleteDataStream = (49, AdminStatus.Yes, serverless.Status.Public)
CreateComposableTemplate = (50, AdminStatus.Yes, serverless.Status.Public)
DeleteComposableTemplate = (51, AdminStatus.Yes, serverless.Status.Public)
CreateComponentTemplate = (52, AdminStatus.Yes, serverless.Status.Public)
DeleteComponentTemplate = (53, AdminStatus.Yes, serverless.Status.Public)
TransformStats = (54, AdminStatus.Yes, serverless.Status.Public)
CreateIlmPolicy = (55, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIlmPolicy = (56, AdminStatus.Yes, serverless.Status.Blocked)
ForceMerge = (22, AdminStatus.Yes, serverless.Status.Internal)
ClusterHealth = (23, AdminStatus.Yes, serverless.Status.Internal)
PutPipeline = (24, AdminStatus.Yes, serverless.Status.Public)
Refresh = (25, AdminStatus.Yes, serverless.Status.Public)
CreateIndex = (26, AdminStatus.Yes, serverless.Status.Public)
DeleteIndex = (27, AdminStatus.Yes, serverless.Status.Public)
CreateIndexTemplate = (28, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIndexTemplate = (29, AdminStatus.Yes, serverless.Status.Blocked)
ShrinkIndex = (30, AdminStatus.Yes, serverless.Status.Blocked)
CreateMlDatafeed = (31, AdminStatus.Yes, serverless.Status.Public)
DeleteMlDatafeed = (32, AdminStatus.Yes, serverless.Status.Public)
StartMlDatafeed = (33, AdminStatus.Yes, serverless.Status.Public)
StopMlDatafeed = (34, AdminStatus.Yes, serverless.Status.Public)
CreateMlJob = (35, AdminStatus.Yes, serverless.Status.Public)
DeleteMlJob = (36, AdminStatus.Yes, serverless.Status.Public)
OpenMlJob = (37, AdminStatus.Yes, serverless.Status.Public)
CloseMlJob = (38, AdminStatus.Yes, serverless.Status.Public)
Sleep = (39, AdminStatus.Yes, serverless.Status.Public)
DeleteSnapshotRepository = (40, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshotRepository = (41, AdminStatus.Yes, serverless.Status.Internal)
CreateSnapshot = (42, AdminStatus.Yes, serverless.Status.Internal)
RestoreSnapshot = (43, AdminStatus.Yes, serverless.Status.Internal)
PutSettings = (44, AdminStatus.Yes, serverless.Status.Internal)
CreateTransform = (45, AdminStatus.Yes, serverless.Status.Public)
StartTransform = (46, AdminStatus.Yes, serverless.Status.Public)
WaitForTransform = (47, AdminStatus.Yes, serverless.Status.Public)
DeleteTransform = (48, AdminStatus.Yes, serverless.Status.Public)
CreateDataStream = (49, AdminStatus.Yes, serverless.Status.Public)
DeleteDataStream = (50, AdminStatus.Yes, serverless.Status.Public)
CreateComposableTemplate = (51, AdminStatus.Yes, serverless.Status.Public)
DeleteComposableTemplate = (52, AdminStatus.Yes, serverless.Status.Public)
CreateComponentTemplate = (53, AdminStatus.Yes, serverless.Status.Public)
DeleteComponentTemplate = (54, AdminStatus.Yes, serverless.Status.Public)
TransformStats = (55, AdminStatus.Yes, serverless.Status.Public)
CreateIlmPolicy = (56, AdminStatus.Yes, serverless.Status.Blocked)
DeleteIlmPolicy = (57, AdminStatus.Yes, serverless.Status.Blocked)

def __init__(self, id: int, admin_status: AdminStatus, serverless_status: serverless.Status):
self.id = id
Expand Down Expand Up @@ -870,6 +871,8 @@ def from_hyphenated_string(cls, v):
return OperationType.FieldCaps
elif v == "downsample":
return OperationType.Downsample
elif v == "esql":
return OperationType.Esql
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down
38 changes: 38 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7537,3 +7537,41 @@ async def test_field_caps_with_index_filter(self, es):

expected_body = {"index_filter": index_filter}
es.field_caps.assert_awaited_once_with(index="_all", fields="time-*", body=expected_body, params=None)


class TestEsqlRunner:
@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_esql_without_query_filter(self, es):
es.options.return_value = es
es.perform_request = mock.AsyncMock()
esql = runner.Esql()
result = await esql(es, params={"query": "from logs-* | stats c = count(*)"})
assert result == {"weight": 1, "unit": "ops", "success": True}
expected_body = {"query": "from logs-* | stats c = count(*)"}
es.perform_request.assert_awaited_once_with(method="POST", path="/_query", headers=None, body=expected_body, params={})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_esql_with_query_filter(self, es):
es.options.return_value = es
es.perform_request = mock.AsyncMock()
esql = runner.Esql()
query_filter = {"range": {"@timestamp": {"gte": "2023"}}}
result = await esql(es, params={"query": "from * | limit 1", "filter": query_filter})
assert result == {"weight": 1, "unit": "ops", "success": True}
expected_body = {"query": "from * | limit 1", "filter": query_filter}
es.perform_request.assert_awaited_once_with(method="POST", path="/_query", headers=None, body=expected_body, params={})

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_esql_with_body(self, es):
es.options.return_value = es
es.perform_request = mock.AsyncMock()
esql = runner.Esql()
pragma = {"data_partitioning": "doc"}
result = await esql(es, params={"query": "from * | limit 1", "body": {"pragma": pragma}})
assert result == {"weight": 1, "unit": "ops", "success": True}

expected_body = {"pragma": pragma, "query": "from * | limit 1"}
es.perform_request.assert_awaited_once_with(method="POST", path="/_query", headers=None, body=expected_body, params={})

0 comments on commit d82295a

Please sign in to comment.