Skip to content

Commit

Permalink
changes to execution request cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal committed Jan 10, 2025
1 parent a1ba0c9 commit 1533332
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,20 @@ def __init__(
self.graph = graph
self.report = report
self.instance_id = int(time.time())
self.last_print_time = 0.0

if config is not None:
self.config = config
else:
self.config = DatahubExecutionRequestCleanupConfig()

def _print_report(self) -> None:
time_taken = round(time.time() - self.last_print_time, 1)
# Print report every 2 minutes
if time_taken > 120:
self.last_print_time = time.time()
logger.info(f"\n{self.report.as_string()}")

def _to_cleanup_record(self, entry: Dict) -> CleanupRecord:
input_aspect = (
entry.get("aspects", {})
Expand Down Expand Up @@ -181,6 +189,7 @@ def _scroll_garbage_records(self):
running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000

for entry in self._scroll_execution_requests():
self._print_report()
self.report.ergc_records_read += 1
key = entry.ingestion_source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]:
def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[str]:
assert self.ctx.graph
scroll_id: Optional[str] = None

batch_size = self.config.batch_size
if entity_type == "DATA_PROCESS_INSTANCE":
# Due to a bug in Data process instance querying this is a temp workaround
# to avoid a giant stacktrace by having a smaller batch size in first call
# This will be remove in future version after server with fix has been
# around for a while
batch_size = 10

while True:
try:
result = self.ctx.graph.execute_graphql(
Expand All @@ -240,7 +249,7 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st
"types": [entity_type],
"query": "*",
"scrollId": scroll_id if scroll_id else None,
"count": self.config.batch_size,
"count": batch_size,
"orFilters": [
{
"and": [
Expand All @@ -263,6 +272,10 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st
scroll_across_entities = result.get("scrollAcrossEntities")
if not scroll_across_entities or not scroll_across_entities.get("count"):
break
if entity_type == "DATA_PROCESS_INSTANCE":
# Temp workaround. See note in beginning of the function
# We make the batch size = config after call has succeeded once
batch_size = self.config.batch_size
scroll_id = scroll_across_entities.get("nextScrollId")
self.report.num_queries_found += scroll_across_entities.get("count")
for query in scroll_across_entities.get("searchResults"):
Expand Down

0 comments on commit 1533332

Please sign in to comment.