From 978a6cc21325bcd30fdc919d9567c39b6b426c01 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 8 Nov 2024 00:02:42 +0100 Subject: [PATCH] Fix for non posthog team --- .../data_imports/pipelines/pipeline_sync.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/pipeline_sync.py b/posthog/temporal/data_imports/pipelines/pipeline_sync.py index 77c8dfc581878..17aecea88dab9 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline_sync.py +++ b/posthog/temporal/data_imports/pipelines/pipeline_sync.py @@ -154,14 +154,15 @@ def _iter_chunks(self, lst: list[Any], n: int) -> Iterator[list[Any]]: yield lst[i : i + n] # Monkey patch to fix large memory consumption until https://github.com/dlt-hub/dlt/pull/2031 gets merged in - FilesystemDestinationClientConfiguration.delta_jobs_per_write = 1 - FilesystemClient.create_table_chain_completed_followup_jobs = create_table_chain_completed_followup_jobs # type: ignore - FilesystemClient._iter_chunks = _iter_chunks # type: ignore - - dlt.config["data_writer.file_max_items"] = 500_000 - dlt.config["data_writer.file_max_bytes"] = 500_000_000 # 500 MB - dlt.config["loader_parallelism_strategy"] = "table-sequential" - dlt.config["delta_jobs_per_write"] = 1 + if self._incremental or is_posthog_team(self.inputs.team_id): + FilesystemDestinationClientConfiguration.delta_jobs_per_write = 1 + FilesystemClient.create_table_chain_completed_followup_jobs = create_table_chain_completed_followup_jobs # type: ignore + FilesystemClient._iter_chunks = _iter_chunks # type: ignore + + dlt.config["data_writer.file_max_items"] = 500_000 + dlt.config["data_writer.file_max_bytes"] = 500_000_000 # 500 MB + dlt.config["loader_parallelism_strategy"] = "table-sequential" + dlt.config["delta_jobs_per_write"] = 1 dlt.config["normalize.parquet_normalizer.add_dlt_load_id"] = True dlt.config["normalize.parquet_normalizer.add_dlt_id"] = True