-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): improve ingest deploy
command
#10944
Changes from 5 commits
52ed0a1
d1e53ce
7a129ea
8bfe7e3
2ffbb1a
bfcb59d
9a9b10d
efe175f
c833629
2914870
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,9 @@ | |
import datahub as datahub_package | ||
from datahub.cli import cli_utils | ||
from datahub.cli.config_utils import CONDENSED_DATAHUB_CONFIG_PATH | ||
from datahub.configuration.common import ConfigModel | ||
from datahub.configuration.config_loader import load_config_file | ||
from datahub.emitter.mce_builder import datahub_guid | ||
from datahub.ingestion.graph.client import get_default_graph | ||
from datahub.ingestion.run.connection import ConnectionManager | ||
from datahub.ingestion.run.pipeline import Pipeline | ||
|
@@ -204,6 +206,24 @@ async def run_ingestion_and_check_upgrade() -> int: | |
# don't raise SystemExit if there's no error | ||
|
||
|
||
def _make_ingestion_urn(name: str) -> str: | ||
guid = datahub_guid( | ||
{ | ||
"name": name, | ||
} | ||
) | ||
return f"urn:li:dataHubIngestionSource:deploy-{guid}" | ||
|
||
|
||
class DeployOptions(ConfigModel): | ||
name: str | ||
description: Optional[str] = None | ||
schedule: Optional[str] = None | ||
time_zone: str = "UTC" | ||
cli_version: Optional[str] = None | ||
executor_id: str = "default" | ||
|
||
|
||
@ingest.command() | ||
@upgrade.check_upgrade | ||
@telemetry.with_telemetry() | ||
|
@@ -212,7 +232,12 @@ async def run_ingestion_and_check_upgrade() -> int: | |
"--name", | ||
type=str, | ||
help="Recipe Name", | ||
required=True, | ||
) | ||
@click.option( | ||
"--description", | ||
type=str, | ||
help="Recipe description", | ||
required=False, | ||
) | ||
@click.option( | ||
"-c", | ||
|
@@ -224,7 +249,7 @@ async def run_ingestion_and_check_upgrade() -> int: | |
@click.option( | ||
"--urn", | ||
type=str, | ||
help="Urn of recipe to update. Creates recipe if provided urn does not exist", | ||
help="Urn of recipe to update. If not specified here or in the recipe's pipeline_name, this will create a new ingestion source.", | ||
required=False, | ||
) | ||
@click.option( | ||
|
@@ -256,7 +281,8 @@ async def run_ingestion_and_check_upgrade() -> int: | |
default="UTC", | ||
) | ||
def deploy( | ||
name: str, | ||
name: Optional[str], | ||
description: Optional[str], | ||
config: str, | ||
urn: Optional[str], | ||
executor_id: str, | ||
|
@@ -280,69 +306,96 @@ def deploy( | |
resolve_env_vars=False, | ||
) | ||
|
||
deploy_options_raw = pipeline_config.pop("deployment", None) | ||
if deploy_options_raw is not None: | ||
deploy_options = DeployOptions.parse_obj(deploy_options_raw) | ||
|
||
logger.info(f"Using {repr(deploy_options)}") | ||
|
||
if urn: | ||
raise click.UsageError( | ||
"Cannot specify both --urn and deployment field in config" | ||
) | ||
elif name: | ||
raise click.UsageError( | ||
"Cannot specify both --name and deployment field in config" | ||
) | ||
else: | ||
logger.info( | ||
"The deployment field is set in the recipe, any CLI args will be ignored" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hsheth2 IMHO this is odd behavior. Typically command-line arguments override config options, which is probably what most would expect. It would also simplify the logic here - first initialize
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I agree - will make some tweaks |
||
) | ||
|
||
# When urn/name is not specified, we will generate a unique urn based on the deployment name. | ||
urn = _make_ingestion_urn(deploy_options.name) | ||
logger.info(f"Will create or update a recipe with urn: {urn}") | ||
elif name: | ||
if not urn: | ||
# When the urn is not specified, generate an urn based on the name. | ||
urn = _make_ingestion_urn(name) | ||
logger.info( | ||
f"No urn was explicitly specified, will create or update the recipe with urn: {urn}" | ||
) | ||
|
||
deploy_options = DeployOptions( | ||
name=name, | ||
description=description, | ||
schedule=schedule, | ||
time_zone=time_zone, | ||
cli_version=cli_version, | ||
executor_id=executor_id, | ||
) | ||
|
||
logger.info(f"Using {repr(deploy_options)}") | ||
else: # neither deployment_name nor name is set | ||
raise click.UsageError( | ||
"Either --name must be set or deployment_name specified in the config" | ||
) | ||
|
||
# Invariant - at this point, both urn and deploy_options are set. | ||
|
||
variables: dict = { | ||
"urn": urn, | ||
"name": name, | ||
"name": deploy_options.name, | ||
"description": deploy_options.description, | ||
"type": pipeline_config["source"]["type"], | ||
"recipe": json.dumps(pipeline_config), | ||
"executorId": executor_id, | ||
"version": cli_version, | ||
"executorId": deploy_options.executor_id, | ||
"version": deploy_options.cli_version, | ||
} | ||
|
||
if schedule is not None: | ||
variables["schedule"] = {"interval": schedule, "timezone": time_zone} | ||
|
||
if urn: | ||
|
||
graphql_query: str = textwrap.dedent( | ||
""" | ||
mutation updateIngestionSource( | ||
$urn: String!, | ||
$name: String!, | ||
$type: String!, | ||
$schedule: UpdateIngestionSourceScheduleInput, | ||
$recipe: String!, | ||
$executorId: String! | ||
$version: String) { | ||
|
||
updateIngestionSource(urn: $urn, input: { | ||
name: $name, | ||
type: $type, | ||
schedule: $schedule, | ||
config: { | ||
recipe: $recipe, | ||
executorId: $executorId, | ||
version: $version, | ||
} | ||
}) | ||
} | ||
""" | ||
) | ||
else: | ||
logger.info("No URN specified recipe urn, will create a new recipe.") | ||
graphql_query = textwrap.dedent( | ||
""" | ||
mutation createIngestionSource( | ||
$name: String!, | ||
$type: String!, | ||
$schedule: UpdateIngestionSourceScheduleInput, | ||
$recipe: String!, | ||
$executorId: String!, | ||
$version: String) { | ||
|
||
createIngestionSource(input: { | ||
name: $name, | ||
type: $type, | ||
schedule: $schedule, | ||
config: { | ||
recipe: $recipe, | ||
executorId: $executorId, | ||
version: $version, | ||
} | ||
}) | ||
} | ||
""" | ||
) | ||
if deploy_options.schedule is not None: | ||
variables["schedule"] = { | ||
"interval": deploy_options.schedule, | ||
"timezone": deploy_options.time_zone, | ||
} | ||
|
||
# The updateIngestionSource endpoint can actually do upserts as well. | ||
graphql_query: str = textwrap.dedent( | ||
""" | ||
mutation updateIngestionSource( | ||
$urn: String!, | ||
$name: String!, | ||
$description: String, | ||
$type: String!, | ||
$schedule: UpdateIngestionSourceScheduleInput, | ||
$recipe: String!, | ||
$executorId: String! | ||
$version: String) { | ||
|
||
updateIngestionSource(urn: $urn, input: { | ||
name: $name, | ||
description: $description, | ||
type: $type, | ||
schedule: $schedule, | ||
config: { | ||
recipe: $recipe, | ||
executorId: $executorId, | ||
version: $version, | ||
} | ||
}) | ||
} | ||
""" | ||
) | ||
|
||
response = datahub_graph.execute_graphql(graphql_query, variables=variables) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix grammatical error.
Add a comma after "uploading" for clarity.
Committable suggestion
Tools
LanguageTool