Skip to content

Commit

Permalink
fixed pickling error in python 3.8+
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Mar 1, 2022
1 parent fdaf415 commit 7f4e088
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions bin/parallel_sync
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ tool to populate Elasticsearch with the majority of the records.
We can then revert to the native sync process.
This approach uses the Tuple identifier record of the table columns.
Each table contains a hidden system column - "ctid" of type "tid" that
Each table contains a hidden system column - "ctid" of type "tid" that
identifies the page record and row number in each block.
We use this to paginate the sync process.
Expand Down Expand Up @@ -64,28 +64,26 @@ class Task:
txmin: Optional[int] = None
txmax: Optional[int] = None

def __call__(self, sync: Sync) -> None:
sync.es.bulk(
sync.index,
sync.sync(ctid=self.ctid, txmin=self.txmin, txmax=self.txmax),
)


class Worker(multiprocessing.Process):
def __init__(self, queue: multiprocessing.JoinableQueue, doc: dict):
multiprocessing.Process.__init__(self)
self.queue: multiprocessing.JoinableQueue = queue
self.sync: Sync = Sync(doc)
self.doc: dict = doc

def run(self) -> None:
sync: Sync = Sync(self.doc)
while True:
task: Task = self.queue.get()
if task is None:
sys.stdout.write(f"{self.name}: Exiting...\n")
sys.stdout.flush()
self.queue.task_done()
break
task(self.sync)
sync.es.bulk(
sync.index,
sync.sync(ctid=task.ctid, txmin=task.txmin, txmax=task.txmax),
)
self.queue.task_done()
return

Expand Down

0 comments on commit 7f4e088

Please sign in to comment.