Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: #756 - implement python workflow submissions #762

Merged
merged 51 commits into from
Oct 10, 2024

Conversation

kdazzle
Copy link
Contributor

@kdazzle kdazzle commented Aug 8, 2024

WIP - Stubs out implementation for #756

This pretty much implements what a workflow job submission type would look like, though I'm sure I'm missing something. Tests haven't been added yet.

Sample

Outside of the new submission type, models are the same. Here is what one could look like:

# my_model.py
import pyspark.sql.types as T
import pyspark.sql.functions as F


def model(dbt, session):
    dbt.config(
        materialized='incremental',
        submission_method='workflow_job'
    )

    output_schema = T.StructType([
        T.StructField("id", T.StringType(), True),
        T.StructField("odometer_meters", T.DoubleType(), True),
        T.StructField("timestamp", T.TimestampType(), True),
    ])
    return spark.createDataFrame(data=spark.sparkContext.emptyRDD(), schema=output_schema)

The config for a model could look like (forgive my jsonification...yaml data structures still freak me out):

models:
  - name: my_model
      workflow_job_config:
        email_notifications: {
          on_failure: ["reynoldxin@databricks.com"]
        }
        max_retries: 2
        timeout_seconds: 18000
        existing_job_id: 12341234  # not part of Databricks API (+ optional)
        additional_task_settings: {  # not part of Databricks API (+ optional)
          "task_key": "my_dbt_task"
        }
        post_hook_tasks: [{  # not part of Databricks API (+ optional)
          "depends_on": [{ "task_key": "my_dbt_task" }],
          "task_key": 'OPTIMIZE_AND_VACUUM',
          "notebook_task": {
            "notebook_path": "/my_notebook_path",
            "source": "WORKSPACE",
          },
        }]
        grants:  # not part of Databricks API (+ optional)
          view: [
            {"group_name": "marketing-team"},
          ]
          run: [
            {"user_name": "alighodsi@databricks.com"}
          ]
          manage: []
      job_cluster_config:
        spark_version: "15.3.x-scala2.12"
        node_type_id: "rd-fleet.2xlarge"
        runtime_engine: "STANDARD"
        data_security_mode: "SINGLE_USER"
        autoscale: {
          "min_workers": 1,
          "max_workers": 4
        }

Explanation

For all of the dbt configs that I added (in addition to the Databricks API attributes), I tried to roughly mediate between the dbt convention of requiring minimal configuration, but also allowing for the full flexibility of the Databricks API. Attribute names were trying to split the difference between the Databricks API and the dbt API. Happy to change the approach for anything.

  • added existing_job_id in case users want to reuse an existing workflow. If no name is provided in this config, it will get renamed to the default job name (currently f"dbt__{self.database}-{self.schema}-{self.identifier}")
  • Job names must be unique unless existing_job_id is also provided
  • The task key for the model run task is hardcoded as task_a - configurable in additional_task_settings
  • Allow for "post_hook tasks"
    • Can specify a different cluster type using Databricks' new_cluster or existing_cluster_id. Leaving blank is serverless
    • post_hook might be a misnomer, because you could technically set the dbt model to depend on one of these tasks, making it also a pre hook
  • grants - allow for permissions to be set on the workflow so that additional users/teams can run the job ad hoc if needed (for initial runs/backfills, etc). The owner is the user/service principal that deployed, and the format needs to follow the Databricks API where you specify whether the user is a user, group, or SP.
  • additional_task_settings to add to/override the default dbt model task

Todo:

  • Reuse all_purpose_cluster attribute, similar to job_cluster_config?
  • Can I use a serverless job cluster? (by not defining any cluster)
  • Fix the run tracker
  • What happens if the workflow is already running?
    • I'd like the new dbt job run to start tracking the current Databricks workflow run, rather than failing
  • Log if workflow permissions are being changed? (Kind of mimicking TF apply logs, which have been helpful in the past when table permissions had been unexpectedly broadened)

Description

Checklist

  • I have run this code in development and it appears to resolve the stated issue
  • This PR includes tests, or tests are not required/relevant for this PR
  • I have updated the CHANGELOG.md and added information about my change to the "dbt-databricks next" section.

@kdazzle kdazzle changed the title #756 - stub out implementation for python workflow submissions Draft: #756 - stub out implementation for python workflow submissions Aug 8, 2024
Kyle Valade added 2 commits August 12, 2024 14:21
Signed-off-by: Kyle Valade <kylevalade@rivian.com>
@kdazzle kdazzle changed the title Draft: #756 - stub out implementation for python workflow submissions Draft: #756 - implement for python workflow submissions Aug 14, 2024
@kdazzle kdazzle changed the title Draft: #756 - implement for python workflow submissions Draft: #756 - implement python workflow submissions Aug 14, 2024
@benc-db
Copy link
Collaborator

benc-db commented Sep 27, 2024

@kdazzle can you rebase/target your PR against 1.9.latest? I have a couple of things that I need to wrap up, but I'm planning to take some version of this into the 1.9 release.

@kdazzle kdazzle changed the base branch from main to 1.9.latest September 27, 2024 21:12
@@ -247,97 +247,3 @@ def test_build_job_spec_with_post_hooks(self, mock_api_client):
assert len(result["tasks"]) == 2
assert result["tasks"][1]["task_key"] == "task_b"
assert result["tasks"][1]["new_cluster"]["spark_version"] == "14.3.x-scala2.12"

@patch("dbt.adapters.databricks.python_models.python_submissions.DatabricksApiClient")
def test_build_job_spec_with_post_hooks_serverless_job_cluster(self, mock_api_client):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing these since the logic to muck around with the cluster settings in additional tasks was removed here

@benc-db
Copy link
Collaborator

benc-db commented Oct 10, 2024

Going to merge in 1.9.latest changes (which is basically only 1.8 changes), ensure tests still pass, then merge.

@benc-db benc-db merged commit 0e821b0 into databricks:1.9.latest Oct 10, 2024
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants