Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-warehouse): New pipeline WIP #26341

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft

Conversation

Gilbert09
Copy link
Member

WIP

Problem

Changes

👉 Stay up-to-date with PostHog coding conventions for a smoother review.

Does this work well for both Cloud and self-hosted?

How did you test this code?

@EDsCODE EDsCODE self-requested a review November 21, 2024 18:29
Copy link
Member

@EDsCODE EDsCODE left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Some nits but this flow is very nice to reason through. I've added a PR with the charts config for new workers in a comment below. Won't approve just yet until you move this out of WIP

raise ValueError(f"No default value defined for type: {pyarrow_type}")


def _update_incrementality(schema: ExternalDataSchema | None, table: pa.Table, logger: FilteringBoundLogger) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: _update_increment_state?

schema.update_incremental_field_last_value(last_value)


def _update_job_row_count(job_id: str, count: int, logger: FilteringBoundLogger) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this outside of the Pipeline class?

@@ -425,12 +433,18 @@ def _run(
schema: ExternalDataSchema,
reset_pipeline: bool,
):
table_row_counts = DataImportPipelineSync(job_inputs, source, logger, reset_pipeline, schema.is_incremental).run()
total_rows_synced = sum(table_row_counts.values())
if settings.DEBUG:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can base this off the env var settings.TEMPORAL_TASK_QUEUE = v2-data-warehouse-task-queue

https://github.com/PostHog/charts/pull/2389

if not primary_keys or len(primary_keys) == 0:
raise Exception("Primary key required for incremental syncs")

delta_table.merge(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will function work on an empty delta table? Asking because it'd clean up this logic a bunch if we could just handle if delta_table is None before this entire if block

for column_name in table.column_names:
column = table.column(column_name)
if pa.types.is_struct(column.type) or pa.types.is_list(column.type):
json_column = pa.array([json.dumps(row.as_py()) if row.as_py() is not None else None for row in column])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope here but an issue I just discovered that might be addressable here. clickhouse s3 can't deserialize a list like ["test"]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you more context to this? A support ticket maybe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants