Skip to content

Commit

Permalink
Let migrate-tables command run as collection (#2654)
Browse files Browse the repository at this point in the history
## Changes
Let `migrate-tables` command to run as collection

### Linked issues

Resolves #2610 

### Functionality

- [x] modified existing command: `databricks labs ucx migrate-tables`

### Tests

- [x] manually tested
- [x] added unit tests
- [ ] ~added integration tests~ : Covering after #2507
  • Loading branch information
JCZuurmond authored Sep 18, 2024
1 parent fad0c3c commit bef3afd
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 43 deletions.
7 changes: 5 additions & 2 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,11 @@ commands:

- name: migrate-tables
description: |
Trigger the migrate-tables workflow and, optionally, migrate-external-hiveserde-tables-in-place-experimental
workflow and migrate-external-tables-ctas workflow.
Trigger the `migrate-tables` workflow and, optionally, `migrate-external-hiveserde-tables-in-place-experimental`
workflow and `migrate-external-tables-ctas workflow`.
flags:
- name: run-as-collection
description: Run the command for the collection of workspaces with ucx installed. Default is False.

- name: migrate-acls
description: |
Expand Down
60 changes: 36 additions & 24 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,34 +519,46 @@ def assign_metastore(


@ucx.command
def migrate_tables(w: WorkspaceClient, prompts: Prompts, *, ctx: WorkspaceContext | None = None):
def migrate_tables(
w: WorkspaceClient,
prompts: Prompts,
*,
ctx: WorkspaceContext | None = None,
run_as_collection: bool = False,
a: AccountClient | None = None,
) -> None:
"""
Trigger the migrate-tables workflow and, optionally, the migrate-external-hiveserde-tables-in-place-experimental
workflow and migrate-external-tables-ctas.
"""
if ctx is None:
ctx = WorkspaceContext(w)
deployed_workflows = ctx.deployed_workflows
deployed_workflows.run_workflow("migrate-tables")

tables = ctx.tables_crawler.snapshot()
hiveserde_tables = [table for table in tables if table.what == What.EXTERNAL_HIVESERDE]
if len(hiveserde_tables) > 0:
percentage_hiveserde_tables = len(hiveserde_tables) / len(tables) * 100
if prompts.confirm(
f"Found {len(hiveserde_tables)} ({percentage_hiveserde_tables:.2f}%) hiveserde tables, do you want to run "
f"the migrate-external-hiveserde-tables-in-place-experimental workflow?"
):
deployed_workflows.run_workflow("migrate-external-hiveserde-tables-in-place-experimental")

external_ctas_tables = [table for table in tables if table.what == What.EXTERNAL_NO_SYNC]
if len(external_ctas_tables) > 0:
percentage_external_ctas_tables = len(external_ctas_tables) / len(tables) * 100
if prompts.confirm(
f"Found {len(external_ctas_tables)} ({percentage_external_ctas_tables:.2f}%) external tables which cannot be migrated using sync"
f", do you want to run the migrate-external-tables-ctas workflow?"
):
deployed_workflows.run_workflow("migrate-external-tables-ctas")
if ctx:
workspace_contexts = [ctx]
else:
workspace_contexts = _get_workspace_contexts(w, a, run_as_collection)
for workspace_context in workspace_contexts:
deployed_workflows = workspace_context.deployed_workflows
deployed_workflows.run_workflow("migrate-tables")

tables = workspace_context.tables_crawler.snapshot()
hiveserde_tables = [table for table in tables if table.what == What.EXTERNAL_HIVESERDE]
if len(hiveserde_tables) > 0:
percentage_hiveserde_tables = len(hiveserde_tables) / len(tables) * 100
if prompts.confirm(
f"Found {len(hiveserde_tables)} ({percentage_hiveserde_tables:.2f}%) hiveserde tables in "
f"{workspace_context.workspace_client.config.host}, do you want to run "
f"the `migrate-external-hiveserde-tables-in-place-experimental` workflow?"
):
deployed_workflows.run_workflow("migrate-external-hiveserde-tables-in-place-experimental")

external_ctas_tables = [table for table in tables if table.what == What.EXTERNAL_NO_SYNC]
if len(external_ctas_tables) > 0:
percentage_external_ctas_tables = len(external_ctas_tables) / len(tables) * 100
if prompts.confirm(
f"Found {len(external_ctas_tables)} ({percentage_external_ctas_tables:.2f}%) external tables which "
f"cannot be migrated using sync in {workspace_context.workspace_client.config.host}, do you want to "
"run the `migrate-external-tables-ctas` workflow?"
):
deployed_workflows.run_workflow("migrate-external-tables-ctas")


@ucx.command
Expand Down
64 changes: 47 additions & 17 deletions tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,30 +718,52 @@ def test_assign_metastore(acc_client, caplog):
assign_metastore(acc_client, "123")


def test_migrate_tables(ws):
ws.jobs.wait_get_run_job_terminated_or_skipped.return_value = Run(
state=RunState(result_state=RunResultState.SUCCESS), start_time=0, end_time=1000, run_duration=1000
@pytest.mark.parametrize("run_as_collection", [False, True])
def test_migrate_tables_calls_migrate_table_job_run_now(
run_as_collection,
workspace_clients,
acc_client,
) -> None:
if not run_as_collection:
workspace_clients = [workspace_clients[0]]
run = Run(
state=RunState(result_state=RunResultState.SUCCESS),
start_time=0,
end_time=1000,
run_duration=1000,
)
prompts = MockPrompts({})
migrate_tables(ws, prompts)
ws.jobs.run_now.assert_called_with(456)
ws.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once()
for workspace_client in workspace_clients:
workspace_client.jobs.wait_get_run_job_terminated_or_skipped.return_value = run

migrate_tables(workspace_clients[0], MockPrompts({}), run_as_collection=run_as_collection, a=acc_client)

for workspace_client in workspace_clients:
workspace_client.jobs.run_now.assert_called_with(456)
workspace_client.jobs.wait_get_run_job_terminated_or_skipped.assert_called_once()


def test_migrate_external_hiveserde_tables_in_place(ws):
def test_migrate_tables_calls_external_hiveserde_tables_job_run_now(ws) -> None:
# TODO: Test for running on a collection when context injection for multiple workspaces is supported.
tables_crawler = create_autospec(TablesCrawler)
table = Table(
catalog="hive_metastore", database="test", name="hiveserde", object_type="UNKNOWN", table_format="HIVE"
catalog="hive_metastore",
database="test",
name="hiveserde",
object_type="UNKNOWN",
table_format="HIVE",
)
tables_crawler.snapshot.return_value = [table]
ctx = WorkspaceContext(ws).replace(tables_crawler=tables_crawler)
ws.jobs.wait_get_run_job_terminated_or_skipped.return_value = Run(
state=RunState(result_state=RunResultState.SUCCESS), start_time=0, end_time=1000, run_duration=1000
state=RunState(result_state=RunResultState.SUCCESS),
start_time=0,
end_time=1000,
run_duration=1000,
)

prompt = (
"Found 1 (.*) hiveserde tables, do you want to run the "
"migrate-external-hiveserde-tables-in-place-experimental workflow?"
"Found 1 (.*) hiveserde tables in https://localhost, do you want to run the "
"`migrate-external-hiveserde-tables-in-place-experimental` workflow?"
)
prompts = MockPrompts({prompt: "Yes"})

Expand All @@ -751,20 +773,28 @@ def test_migrate_external_hiveserde_tables_in_place(ws):
ws.jobs.wait_get_run_job_terminated_or_skipped.call_count = 2


def test_migrate_external_tables_ctas(ws):
def test_migrate_tables_calls_external_tables_ctas_job_run_now(ws) -> None:
# TODO: Test for running on a collection when context injection for multiple workspaces is supported.
tables_crawler = create_autospec(TablesCrawler)
table = Table(
catalog="hive_metastore", database="test", name="externalctas", object_type="UNKNOWN", table_format="EXTERNAL"
catalog="hive_metastore",
database="test",
name="externalctas",
object_type="UNKNOWN",
table_format="EXTERNAL",
)
tables_crawler.snapshot.return_value = [table]
ctx = WorkspaceContext(ws).replace(tables_crawler=tables_crawler)
ws.jobs.wait_get_run_job_terminated_or_skipped.return_value = Run(
state=RunState(result_state=RunResultState.SUCCESS), start_time=0, end_time=1000, run_duration=1000
state=RunState(result_state=RunResultState.SUCCESS),
start_time=0,
end_time=1000,
run_duration=1000,
)

prompt = (
"Found 1 (.*) external tables which cannot be migrated using sync, do you want to run the "
"migrate-external-tables-ctas workflow?"
"Found 1 (.*) external tables which cannot be migrated using sync in https://localhost, do you want to run the "
"`migrate-external-tables-ctas` workflow?"
)

prompts = MockPrompts({prompt: "Yes"})
Expand Down

0 comments on commit bef3afd

Please sign in to comment.