Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs generate RPC task (#1781) #1801

Merged
merged 2 commits into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dbt.contracts.graph.manifest import CompileResultNode
from dbt.contracts.graph.unparsed import Time, FreshnessStatus
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.contracts.util import Writable
from dbt.contracts.util import Writable, Replaceable
from dbt.logger import LogMessage
from hologram.helpers import StrEnum
from hologram import JsonSchemaMixin
Expand All @@ -10,7 +10,7 @@

from dataclasses import dataclass, field
from datetime import datetime
from typing import Union, Dict, List, Optional, Any
from typing import Union, Dict, List, Optional, Any, NamedTuple
from numbers import Real


Expand Down Expand Up @@ -227,3 +227,72 @@ class ResultTable(JsonSchemaMixin):
@dataclass
class RemoteRunResult(RemoteCompileResult):
table: ResultTable


Primitive = Union[bool, str, float, None]

CatalogKey = NamedTuple(
'CatalogKey',
[('database', str), ('schema', str), ('name', str)]
)


@dataclass
class StatsItem(JsonSchemaMixin):
id: str
label: str
value: Primitive
description: str
include: bool


StatsDict = Dict[str, StatsItem]


@dataclass
class ColumnMetadata(JsonSchemaMixin):
type: str
comment: Optional[str]
index: int
name: str


ColumnMap = Dict[str, ColumnMetadata]


@dataclass
class TableMetadata(JsonSchemaMixin):
type: str
database: str
schema: str
name: str
comment: Optional[str]
owner: Optional[str]


@dataclass
class CatalogTable(JsonSchemaMixin, Replaceable):
metadata: TableMetadata
columns: ColumnMap
stats: StatsDict
# the same table with two unique IDs will just be listed two times
unique_id: Optional[str] = None

def key(self) -> CatalogKey:
return CatalogKey(
self.metadata.database.lower(),
self.metadata.schema.lower(),
self.metadata.name.lower(),
)


@dataclass
class CatalogResults(JsonSchemaMixin, Writable):
nodes: Dict[str, CatalogTable]
generated_at: datetime
_compile_results: Optional[Any] = None


@dataclass
class RemoteCatalogResults(CatalogResults):
logs: List[LogMessage] = field(default_factory=list)
10 changes: 8 additions & 2 deletions core/dbt/rpc/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@
from queue import Empty
from typing import Optional, Any, Union

from dbt.contracts.results import RemoteCompileResult, RemoteExecutionResult
from dbt.contracts.results import (
RemoteCompileResult, RemoteExecutionResult, RemoteCatalogResults
)
from dbt.exceptions import InternalException
from dbt.utils import restrict_to


RemoteCallableResult = Union[RemoteCompileResult, RemoteExecutionResult]
RemoteCallableResult = Union[
RemoteCompileResult,
RemoteExecutionResult,
RemoteCatalogResults,
]


class QueueMessageType(StrEnum):
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/rpc/response_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def _get_responses(cls, requests, dispatcher):
# to_dict
if hasattr(output, 'result'):
if isinstance(output.result, JsonSchemaMixin):
output.result = output.result.to_dict(omit_empty=False)
output.result = output.result.to_dict(omit_none=False)
yield output

@classmethod
Expand Down
21 changes: 21 additions & 0 deletions core/dbt/rpc/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
RemoteCompileResult,
RemoteRunResult,
RemoteExecutionResult,
RemoteCatalogResults,
)
from dbt.logger import LogMessage
from dbt.rpc.error import dbt_error, RPCException
Expand Down Expand Up @@ -210,6 +211,24 @@ def from_result(cls, status, base):
)


@dataclass
class PollCatalogSuccessResult(PollResult, RemoteCatalogResults):
status: TaskHandlerState = field(
metadata=restrict_to(TaskHandlerState.Success),
default=TaskHandlerState.Success
)

@classmethod
def from_result(cls, status, base):
return cls(
status=status,
nodes=base.nodes,
generated_at=base.generated_at,
_compile_results=base._compile_results,
logs=base.logs,
)


def poll_success(status, logs, result):
if status != TaskHandlerState.Success:
raise dbt.exceptions.InternalException(
Expand All @@ -223,6 +242,8 @@ def poll_success(status, logs, result):
return PollRunSuccessResult.from_result(status=status, base=result)
elif isinstance(result, RemoteCompileResult):
return PollCompileSuccessResult.from_result(status=status, base=result)
elif isinstance(result, RemoteCatalogResults):
return PollCatalogSuccessResult.from_result(status=status, base=result)
else:
raise dbt.exceptions.InternalException(
'got invalid result in poll_success: {}'.format(result)
Expand Down
Loading