Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub artifact to log #7667

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/dbt/events/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,18 @@ message NoNodesForSelectionCriteriaMsg {
NoNodesForSelectionCriteria data = 2;
}

// P - Artifacts

// P001
message PublicationArtifactAvailable {
google.protobuf.Struct pub_artifact = 1;
}

message PublicationArtifactAvailableMsg {
EventInfo info = 1;
PublicationArtifactAvailable data = 2;
}

// Q - Node execution

// Q001
Expand Down
14 changes: 14 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# | E | DB adapter |
# | I | Project parsing |
# | M | Deps generation |
# | P | Artifacts |
# | Q | Node execution |
# | W | Node testing |
# | Z | Misc |
Expand Down Expand Up @@ -1398,6 +1399,19 @@ def message(self) -> str:
# =======================================================


class PublicationArtifactAvailable(DebugLevel):
def code(self):
return "P001"

def message(self) -> str:
return "Publication artifact available"


# =======================================================
# Q - Node execution
# =======================================================


class RunningOperationCaughtError(ErrorLevel):
def code(self):
return "Q001"
Expand Down
646 changes: 325 additions & 321 deletions core/dbt/events/types_pb2.py

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
StateCheckVarsHash,
Note,
PublicationArtifactChanged,
PublicationArtifactAvailable,
)
from dbt.logger import DbtProcessState
from dbt.node_types import NodeType, AccessType
Expand Down Expand Up @@ -507,7 +508,7 @@ def load(self):

if not skip_parsing or public_nodes_changed:
# Write out the <project_name>_publication.json file for this project
write_publication_artifact(self.root_project, self.manifest)
log_publication_artifact(self.root_project, self.manifest)
# write out the fully parsed manifest
self.write_manifest_for_partial_parse()

Expand Down Expand Up @@ -1676,7 +1677,7 @@ def process_node(config: RuntimeConfig, manifest: Manifest, node: ManifestNode):
_process_docs_for_node(ctx, node)


def write_publication_artifact(root_project: RuntimeConfig, manifest: Manifest):
def log_publication_artifact(root_project: RuntimeConfig, manifest: Manifest):
# The manifest.json is written out in a task, so we're not writing it here

# build publication metadata
Expand Down Expand Up @@ -1742,10 +1743,8 @@ def write_publication_artifact(root_project: RuntimeConfig, manifest: Manifest):
public_models=public_models,
dependencies=dependencies,
)
# write out publication artifact <project_name>_publication.json
publication_file_name = f"{root_project.project_name}_publication.json"
path = os.path.join(root_project.target_path, publication_file_name)
publication.write(path)

fire_event(PublicationArtifactAvailable(pub_artifact=publication.to_dict()))


def write_manifest(manifest: Manifest, target_path: str):
Expand Down
29 changes: 22 additions & 7 deletions core/dbt/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,24 +116,39 @@ def run_dbt(
# If you want the logs that are normally written to a file, you must
# start with the "--debug" flag. The structured schema log CI test
# will turn the logs into json, so you have to be prepared for that.
def run_dbt_and_capture(args: List[str] = None, expect_pass=True):
def run_dbt_and_capture(
args: List[str] = None,
expect_pass: bool = True,
publications: List[PublicationArtifact] = None,
):
try:
stringbuf = StringIO()
capture_stdout_logs(stringbuf)
res = run_dbt(args, expect_pass=expect_pass)
res = run_dbt(args, expect_pass=expect_pass, publications=publications)
stdout = stringbuf.getvalue()

finally:
stop_capture_stdout_logs()

# Json logs will have lots of escape characters which will
# make checks for strings in the logs fail, so remove those.
if '{"code":' in stdout:
stdout = stdout.replace("\\", "")

return res, stdout


def get_logging_events(log_output, event_name):
logging_events = []
for log_line in log_output.split("\n"):
# skip empty lines
if len(log_line) == 0:
continue
# The adapter logging also shows up, so skip non-json lines
if not log_line.startswith("{"):
continue
if event_name in log_line:
log_dct = json.loads(log_line)
if log_dct["info"]["name"] == event_name:
logging_events.append(log_dct)
return logging_events


# Used in test cases to get the manifest from the partial parsing file
# Note: this uses an internal version of the manifest, and in the future
# parts of it will not be supported for external use.
Expand Down
28 changes: 19 additions & 9 deletions tests/functional/multi_project/test_publication.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import pytest
import os

from dbt.tests.util import run_dbt, get_artifact, write_file
from dbt.tests.util import (
run_dbt,
write_file,
run_dbt_and_capture,
get_logging_events,
)
from dbt.contracts.publication import PublicationArtifact, PublicModel
from dbt.exceptions import (
PublicationConfigNotFound,
Expand Down Expand Up @@ -102,10 +107,11 @@ def models(self):
}

def test_publication_artifact(self, project):
results = run_dbt(["run"])
results, log_output = run_dbt_and_capture(["--debug", "--log-format=json", "run"])
assert len(results) == 3

publication_dict = get_artifact(project.project_root, "target", "test_publication.json")
pub_available_events = get_logging_events(log_output, "PublicationArtifactAvailable")
publication_dict = pub_available_events[0]["data"]["pub_artifact"]
publication = PublicationArtifact.from_dict(publication_dict)
assert publication
assert len(publication.public_models) == 2
Expand Down Expand Up @@ -134,13 +140,16 @@ def test_pub_artifacts(self, project):
# Provide publication and try again
m_pub_json = marketing_pub_json.replace("test_schema", project.test_schema)
publications = [PublicationArtifact.from_dict(json.loads(m_pub_json))]
manifest = run_dbt(["parse"], publications=publications)
manifest, log_output = run_dbt_and_capture(
["--debug", "--log-format=json", "parse"], publications=publications
)
assert manifest.publications
assert "marketing" in manifest.publications
assert "model.marketing.fct_one" in manifest.publications["marketing"].public_node_ids

# Check dependencies in publication_artifact
publication_dict = get_artifact(project.project_root, "target", "test_publication.json")
pub_available_events = get_logging_events(log_output, "PublicationArtifactAvailable")
publication_dict = pub_available_events[0]["data"]["pub_artifact"]
publication = PublicationArtifact.from_dict(publication_dict)
assert publication.dependencies == ["marketing"]

Expand Down Expand Up @@ -234,13 +243,14 @@ def test_multi_projects(self, project, project_alt):
# run the alternate project by using the alternate project root
# (There is currently a bug where project-dir requires a chdir to work.)
os.chdir(project_alt.project_root)
results = run_dbt(["run", "--project-dir", str(project_alt.project_root)])
results, log_output = run_dbt_and_capture(
["--debug", "--log-format=json", "run", "--project-dir", str(project_alt.project_root)]
)
assert len(results) == 1

# Check publication artifact
publication_dict = get_artifact(
project_alt.project_root, "target", "test_alt_publication.json"
)
pub_available_events = get_logging_events(log_output, "PublicationArtifactAvailable")
publication_dict = pub_available_events[0]["data"]["pub_artifact"]
publication = PublicationArtifact.from_dict(publication_dict)
assert len(publication.public_models) == 1

Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ def test_event_codes(self):
types.RegistryResponseMissingNestedKeys(response=""),
types.RegistryResponseExtraNestedKeys(response=""),
types.DepsSetDownloadDirectory(path=""),
# P - Artifacts ===================
types.PublicationArtifactAvailable(),
# Q - Node execution ======================
types.RunningOperationCaughtError(exc=""),
types.CompileComplete(),
Expand Down