Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸŽ‰ octavia-cli: Add ability to get existing resources #13254

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 272 additions & 48 deletions octavia-cli/README.md

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions octavia-cli/octavia_cli/apply/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@
from .yaml_loaders import EnvVarLoader


class DuplicateResourceError(click.ClickException):
pass


class NonExistingResourceError(click.ClickException):
pass

Expand Down
9 changes: 8 additions & 1 deletion octavia-cli/octavia_cli/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@
from .apply import commands as apply_commands
from .check_context import check_api_health, check_is_initialized, check_workspace_exists
from .generate import commands as generate_commands
from .get import commands as get_commands
from .init import commands as init_commands
from .list import commands as list_commands
from .telemetry import TelemetryClient, build_user_agent

AVAILABLE_COMMANDS: List[click.Command] = [list_commands._list, init_commands.init, generate_commands.generate, apply_commands.apply]
AVAILABLE_COMMANDS: List[click.Command] = [
list_commands._list,
get_commands.get,
init_commands.init,
generate_commands.generate,
apply_commands.apply,
]


def set_context_object(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ definition_version: {{ definition.docker_image_tag }}


{%- macro render_one_of(field) %}
{{ field.name }}:
{{ field.name }}:
{%- for one_of_value in field.one_of_values %}
{%- if loop.first %}
## -------- Pick one valid structure among the examples below: --------
Expand All @@ -41,17 +41,17 @@ definition_version: {{ definition.docker_image_tag }}
## -------- Another valid structure for {{ field.name }}: --------
{{- render_sub_fields(one_of_value, True)|indent(2, False) }}
{%- endif %}
{%- endfor %}
{%- endfor %}
{%- endmacro %}

{%- macro render_object_field(field) %}
{{ field.name }}:
{{- render_sub_fields(field.object_properties, is_commented=False)|indent(2, False)}}
{{ field.name }}:
{{- render_sub_fields(field.object_properties, is_commented=False)|indent(2, False)}}
{%- endmacro %}

{%- macro render_array_of_objects(field) %}
{{ field.name }}:
{{- render_array_sub_fields(field.array_items, is_commented=False)|indent(2, False)}}
{{ field.name }}:
{{- render_array_sub_fields(field.array_items, is_commented=False)|indent(2, False)}}
{%- endmacro %}

{%- macro render_root(root, is_commented) %}
Expand Down
3 changes: 3 additions & 0 deletions octavia-cli/octavia_cli/get/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
108 changes: 108 additions & 0 deletions octavia-cli/octavia_cli/get/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import uuid
from typing import List, Optional, Tuple, Type, Union

import airbyte_api_client
import click
from octavia_cli.base_commands import OctaviaCommand

from .resources import Connection, Destination, Source

COMMON_HELP_MESSAGE_PREFIX = "Get a JSON representation of a remote"


def build_help_message(resource_type: str) -> str:
"""Helper function to build help message consistently for all the commands in this module.

Args:
resource_type (str): source, destination or connection

Returns:
str: The generated help message.
"""
return f"Get a JSON representation of a remote {resource_type}."


def get_resource_id_or_name(resource: str) -> Tuple[Optional[str], Optional[str]]:
"""Helper function to detect if the resource argument passed to the CLI is a resource ID or name.

Args:
resource (str): the resource ID or name passed as an argument to the CLI.

Returns:
Tuple[Optional[str], Optional[str]]: the resource_id and resource_name, the not detected kind is set to None.
"""
resource_id, resource_name = None, None
try:
uuid.UUID(resource)
resource_id = resource
except ValueError:
resource_name = resource
return resource_id, resource_name


def get_json_representation(
api_client: airbyte_api_client.ApiClient,
workspace_id: str,
ResourceCls: Type[Union[Source, Destination, Connection]],
resource_to_get: str,
) -> str:
"""Helper function to retrieve a resource json representation and avoid repeating the same logic for Source/Destination and connection.


Args:
api_client (airbyte_api_client.ApiClient): The Airbyte API client.
workspace_id (str): Current workspace id.
ResourceCls (Type[Union[Source, Destination, Connection]]): Resource class to use
resource_to_get (str): resource name or id to get JSON representation for.

Returns:
str: The resource's JSON representation.
"""
resource_id, resource_name = get_resource_id_or_name(resource_to_get)
resource = ResourceCls(api_client, workspace_id, resource_id=resource_id, resource_name=resource_name)
return resource.to_json()


@click.group(
"get",
help=f'{build_help_message("source, destination or connection")} ID or name can be used as argument. Example: \'octavia get source "My Pokemon source"\' or \'octavia get source cb5413b2-4159-46a2-910a-dc282a439d2d\'',
)
@click.pass_context
def get(ctx: click.Context): # pragma: no cover
pass


@get.command(cls=OctaviaCommand, name="source", help=build_help_message("source"))
@click.argument("resource", type=click.STRING)
@click.pass_context
def source(ctx: click.Context, resource: str):
click.echo(get_json_representation(ctx.obj["API_CLIENT"], ctx.obj["WORKSPACE_ID"], Source, resource))


@get.command(cls=OctaviaCommand, name="destination", help=build_help_message("destination"))
@click.argument("resource", type=click.STRING)
@click.pass_context
def destination(ctx: click.Context, resource: str):
click.echo(get_json_representation(ctx.obj["API_CLIENT"], ctx.obj["WORKSPACE_ID"], Destination, resource))


@get.command(cls=OctaviaCommand, name="connection", help=build_help_message("connection"))
@click.argument("resource", type=click.STRING)
@click.pass_context
def connection(ctx: click.Context, resource: str):
click.echo(get_json_representation(ctx.obj["API_CLIENT"], ctx.obj["WORKSPACE_ID"], Connection, resource))


AVAILABLE_COMMANDS: List[click.Command] = [source, destination, connection]


def add_commands_to_list():
for command in AVAILABLE_COMMANDS:
get.add_command(command)


add_commands_to_list()
193 changes: 193 additions & 0 deletions octavia-cli/octavia_cli/get/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import abc
import json
from typing import Optional, Union

import airbyte_api_client
import click
from airbyte_api_client.api import destination_api, source_api, web_backend_api
from airbyte_api_client.model.destination_id_request_body import DestinationIdRequestBody
from airbyte_api_client.model.destination_read import DestinationRead
from airbyte_api_client.model.source_id_request_body import SourceIdRequestBody
from airbyte_api_client.model.source_read import SourceRead
from airbyte_api_client.model.web_backend_connection_read import WebBackendConnectionRead
from airbyte_api_client.model.web_backend_connection_request_body import WebBackendConnectionRequestBody
from airbyte_api_client.model.workspace_id_request_body import WorkspaceIdRequestBody


class DuplicateResourceError(click.ClickException):
pass


class ResourceNotFoundError(click.ClickException):
pass


class BaseResource(abc.ABC):
@property
@abc.abstractmethod
def api(
self,
): # pragma: no cover
pass

@property
@abc.abstractmethod
def name(
self,
) -> str: # pragma: no cover
pass

@property
@abc.abstractmethod
def get_function_name(
self,
) -> str: # pragma: no cover
pass

@property
def _get_fn(self):
return getattr(self.api, self.get_function_name)

@property
@abc.abstractmethod
def get_payload(
self,
): # pragma: no cover
pass

@property
@abc.abstractmethod
def list_for_workspace_function_name(
self,
) -> str: # pragma: no cover
pass

@property
def _list_for_workspace_fn(self):
return getattr(self.api, self.list_for_workspace_function_name)

@property
def list_for_workspace_payload(
self,
):
return WorkspaceIdRequestBody(workspace_id=self.workspace_id)

def __init__(
self,
api_client: airbyte_api_client.ApiClient,
workspace_id: str,
resource_id: Optional[str] = None,
resource_name: Optional[str] = None,
):
if resource_id is None and resource_name is None:
raise ValueError("resource_id and resource_name keyword arguments can't be both None.")
if resource_id is not None and resource_name is not None:
raise ValueError("resource_id and resource_name keyword arguments can't be both set.")
self.resource_id = resource_id
self.resource_name = resource_name
self.api_instance = self.api(api_client)
self.workspace_id = workspace_id

def _find_by_resource_name(
self,
) -> Union[WebBackendConnectionRead, SourceRead, DestinationRead]:
"""Retrieve a remote resource from its name by listing the available resources on the Airbyte instance.

Raises:
ResourceNotFoundError: Raised if no resource was found with the current resource_name.
DuplicateResourceError: Raised if multiple resources were found with the current resource_name.

Returns:
Union[WebBackendConnectionRead, SourceRead, DestinationRead]: The remote resource model instance.
"""

api_response = self._list_for_workspace_fn(self.api_instance, self.list_for_workspace_payload)
matching_resources = []
for resource in getattr(api_response, f"{self.name}s"):
if resource.name == self.resource_name:
matching_resources.append(resource)
if not matching_resources:
raise ResourceNotFoundError(f"The {self.name} {self.resource_name} was not found in your current Airbyte workspace.")
if len(matching_resources) > 1:
raise DuplicateResourceError(
f"{len(matching_resources)} {self.name}s with the name {self.resource_name} were found in your current Airbyte workspace."
)
return matching_resources[0]

def _find_by_resource_id(
self,
) -> Union[WebBackendConnectionRead, SourceRead, DestinationRead]:
"""Retrieve a remote resource from its id by calling the get endpoint of the resource type.

Returns:
Union[WebBackendConnectionRead, SourceRead, DestinationRead]: The remote resource model instance.
"""
return self._get_fn(self.api_instance, self.get_payload)

def get_remote_resource(self) -> Union[WebBackendConnectionRead, SourceRead, DestinationRead]:
"""Retrieve a remote resource with a resource_name or a resource_id

Returns:
Union[WebBackendConnectionRead, SourceRead, DestinationRead]: The remote resource model instance.
"""
if self.resource_id is not None:
return self._find_by_resource_id()
else:
return self._find_by_resource_name()

def to_json(self) -> str:
"""Get the JSON representation of the remote resource model instance.

Returns:
str: The JSON representation of the remote resource model instance.
"""
return json.dumps(self.get_remote_resource().to_dict())


class Source(BaseResource):
name = "source"
api = source_api.SourceApi
get_function_name = "get_source"
list_for_workspace_function_name = "list_sources_for_workspace"

@property
def get_payload(self) -> Optional[SourceIdRequestBody]:
"""Defines the payload to retrieve the remote source according to its resource_id.
Returns:
SourceIdRequestBody: The SourceIdRequestBody payload.
"""
return SourceIdRequestBody(self.resource_id)


class Destination(BaseResource):
name = "destination"
api = destination_api.DestinationApi
get_function_name = "get_destination"
list_for_workspace_function_name = "list_destinations_for_workspace"

@property
def get_payload(self) -> Optional[DestinationIdRequestBody]:
"""Defines the payload to retrieve the remote destination according to its resource_id.
Returns:
DestinationIdRequestBody: The DestinationIdRequestBody payload.
"""
return DestinationIdRequestBody(self.resource_id)


class Connection(BaseResource):
name = "connection"
api = web_backend_api.WebBackendApi
get_function_name = "web_backend_get_connection"
list_for_workspace_function_name = "web_backend_list_connections_for_workspace"

@property
def get_payload(self) -> Optional[WebBackendConnectionRequestBody]:
"""Defines the payload to retrieve the remote connection according to its resource_id.
Returns:
WebBackendConnectionRequestBody: The WebBackendConnectionRequestBody payload.
"""
return WebBackendConnectionRequestBody(with_refreshed_catalog=False, connection_id=self.resource_id)
1 change: 1 addition & 0 deletions octavia-cli/unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def test_not_implemented_commands(command):
def test_available_commands():
assert entrypoint.AVAILABLE_COMMANDS == [
entrypoint.list_commands._list,
entrypoint.get_commands.get,
entrypoint.init_commands.init,
entrypoint.generate_commands.generate,
entrypoint.apply_commands.apply,
Expand Down
3 changes: 3 additions & 0 deletions octavia-cli/unit_tests/test_get/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
Loading