Skip to content

Commit

Permalink
Add OSS sensor backcompat tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Dec 9, 2021
1 parent 61d0210 commit 2a9c76c
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import graph, op, pipeline, repository, solid
from dagster import RunRequest, graph, op, pipeline, repository, sensor, solid


@solid
Expand Down Expand Up @@ -34,6 +34,11 @@ def basic():
the_job = basic.to_job(name="the_job")


@sensor(job=the_job, minimum_interval_seconds=1)
def the_sensor():
yield RunRequest(run_key=None, run_config={})


@repository
def basic_repo():
return [the_job, the_pipeline]
return [the_job, the_pipeline, the_sensor]
103 changes: 103 additions & 0 deletions integration_tests/test_suites/backcompat-test-suite/test_backcompat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests
from dagster import file_relative_path
from dagster.core.storage.pipeline_run import PipelineRunStatus
from dagster.utils import merge_dicts
from dagster_graphql import DagsterGraphQLClient

DAGSTER_CURRENT_BRANCH = "current_branch"
Expand All @@ -26,6 +27,66 @@
"dagit-latest-release": [MOST_RECENT_RELEASE_PLACEHOLDER, DAGSTER_CURRENT_BRANCH],
"user-code-latest-release": [DAGSTER_CURRENT_BRANCH, MOST_RECENT_RELEASE_PLACEHOLDER],
}
GET_SENSORS_QUERY = """
query SensorsQuery($repositorySelector: RepositorySelector!) {
sensorsOrError(repositorySelector: $repositorySelector) {
__typename
... on PythonError {
message
stack
}
... on Sensors {
results {
id
name
}
}
}
}
"""
START_SENSOR_MUTATION = """
mutation($sensorSelector: SensorSelector!) {
startSensor(sensorSelector: $sensorSelector) {
__typename
... on PythonError {
message
stack
}
... on Sensor {
jobOriginId
}
}
}
"""
STOP_SENSORS_QUERY = """
mutation($jobOriginId: String!) {
stopSensor(jobOriginId: $jobOriginId) {
... on PythonError {
message
className
stack
}
... on StopSensorMutationResult {
instigationState {
status
}
}
}
}
"""
RUNS_QUERY = """
query PipelineRunsRootQuery {
pipelineRunsOrError {
__typename
... on PipelineRuns {
results {
pipelineName
status
}
}
}
}
"""


def assert_run_success(client, run_id: int):
Expand Down Expand Up @@ -138,3 +199,45 @@ def assert_runs_and_exists(client, name):
)
assert len(locations) == 1
assert locations[0].pipeline_name == name


def test_sensor_run(graphql_client):
# pylint: disable=protected-access
repo_selector = {
"repositoryLocationName": "test_repo",
"repositoryName": "basic_repo",
}
sensors = graphql_client._execute(GET_SENSORS_QUERY, {"repositorySelector": repo_selector})

assert sensors["sensorsOrError"]["__typename"] == "Sensors"

sensor_selector = merge_dicts(
repo_selector,
{
"sensorName": "the_sensor",
},
)
response = graphql_client._execute(START_SENSOR_MUTATION, {"sensorSelector": sensor_selector})
assert response["startSensor"]["__typename"] == "Sensor"
time.sleep(5)

graphql_client._execute(
STOP_SENSORS_QUERY, {"jobOriginId": response["startSensor"]["jobOriginId"]}
)

runs_list = []
while are_all_runs_complete(runs_list):
response = graphql_client._execute(RUNS_QUERY)
runs_list = response["pipelineRunsOrError"]["results"]

for run in runs_list:
assert run["status"] == "SUCCESS"


def are_all_runs_complete(runs_list):
return runs_list and all(
[
run["status"] == "SUCCESS" or run["status"] == "FAILURE" or run["status"] == "CANCELED"
for run in runs_list
]
)

0 comments on commit 2a9c76c

Please sign in to comment.