Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel processing of single transform #501

Open
ptgolden opened this issue Oct 17, 2024 · 0 comments
Open

Parallel processing of single transform #501

ptgolden opened this issue Oct 17, 2024 · 0 comments
Labels
enhancement New feature or request

Comments

@ptgolden
Copy link

Is your feature request related to a problem? Please describe.
kgx's CLI allows transforming several files at once via a process pool, but it is not possible to use multiple cores when transforming a single data file. I am transforming a large jsonl file into ntriples, and was hoping to make it faster by processing it in parallel.

Describe the solution you'd like
I have not dug into every sink, but I believe the jsonl->ntriples transform is a pure function-- one jsonl record goes in, and one set of ntriples comes out. That is, a subsequent line transformed will not rely on any of the previous ones. This seems to make it a good candidate for parallelization. (As I said, I don't know if this is true for every source-sink pair).

Describe alternatives you've considered
It would be possible to split up the file ahead of time into N files, where N is >= the number of processes available, and then run the CLI to process all those split files at once. That being said, if there is an opportunity to make computation on a single source parallel, it may be worth doing.

Here is a gist containing an experiment to get this working: https://gist.github.com/ptgolden/8d836d11b9c6b2211e5f606ed6203960

What goes on is the following:

  1. A thread is created which will receive ntriples and do something with them (the current attached function just counts the triples, but there's also a function written in that gist which will write them to a file)
  2. N processes are created containing a kgx_worker function that instantiates a sink and a source
  3. A nodes file is read line by line. After a chunk of nodes are read, they are added to a queue. The kgx_workers read from that queue, convert the record using source.read_node, transform it using sink.write_node, and then write the result to a queue containing ntriples. The thread created in (1) consumes those ntriples.
  4. Same as 3 but with an edges file.
  5. After the nodes and edges files have been consumed, the processes and threads are sent a sentinel value to terminate them.

This is admittedly a bit hacky-- it relies on the fact that the RDF sink stores its file handler in its constructor. That file handler is replaced by an io.BytesIO object to intercept writes. A more robust method would likely require a transform not to assume that it is reading or writing to a file.

@ptgolden ptgolden added the enhancement New feature or request label Oct 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant