Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multithreading support for erasure coding #1087

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

munna0908
Copy link
Contributor

@munna0908 munna0908 commented Jan 23, 2025

This PR introduces the following changes:

  • Utilizes TaskPool to spawn separate threads for the erasure encoding and decoding.
  • Adds a new num-threads flag, enabling users to specify the number of CPU threads available for TaskPool workers.

Docs PR

@munna0908 munna0908 marked this pull request as draft January 23, 2025 17:20
@munna0908 munna0908 requested review from gmega and dryajov January 24, 2025 12:25
@munna0908 munna0908 self-assigned this Jan 24, 2025
@munna0908 munna0908 marked this pull request as ready for review January 24, 2025 12:26
@munna0908 munna0908 added the Client See https://miro.com/app/board/uXjVNZ03E-c=/ for details label Jan 24, 2025
Copy link
Member

@gmega gmega left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good, well done. I see one minor issue with the exception handling but otherwise looks great. We should wait for @dryajov's review on this one anyhow though.

codex/codex.nim Outdated
taskpool = Taskpool.new(numThreads = config.numThreads)
info "Threadpool started", numThreads = taskpool.numThreads
except Exception:
raise newException(Defect, "Failure in taskpool initialization.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Not sure we want to be casting everything as Defect and bubbling it up this way? I think we'd use a result type for this instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@munna0908 Exception is a parent type for Defect and CatchableError, the latter of which all other "catchable" error types are derived from. Defect is not catchable, and therefore we should not be using except Expection where we can avoid it (despite its usage in other places in the codebase).

In this particular case, using except CatchableError and converting to a Defect is a valid pattern, and it also means that if a Defect was raised in the try block, it would not be caught in the except, and would be bubbled up the stack.

See https://status-im.github.io/nim-style-guide/errors.exceptions.html for more info on exception handling best practices (TL;DR using Result is recommended)

@gmega I agree that we should probably return a Result[void] type here, read the Result in the layer above, and then quit on error. However, the layer above currently quits on exception, and there are a fair few instances of raised Defects in the new proc, so converting everything to Result would require a decent amount of cascading changes. For the scope of this PR, I think that Defect is suitable since there is no recovery opportunity here.

So my suggestion would be something like:

try:
  # start taskpool
except CatchableError as parent:
  raise newException(Defect, "Failure in taskpool initialisation: " & parent.msg, parent)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emizzle Commit-link

@@ -184,6 +184,13 @@ type
name: "max-peers"
.}: int

numThreads* {.
desc:
"Number of worker threads (\"0\" = use as many threads as there are CPU cores available)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Number of worker threads (\"0\" = use as many threads as there are CPU cores available)",
"Number of worker threads for running erasure coding jobs (\"0\" = use as many threads as there are CPU cores available)",

I prefer to be specific about what threads are those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the only place where threads are going to be used, the taskpool is shared across several different places, so the old wording is totally accurate.

Copy link
Member

@gmega gmega Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this for two reasons:

  1. "worker threads" means nothing. Someone familiar with the project might not be able to tell what those "worker threads" do (is it I/O? is it EC? is it the prover?) and, as a consequence, won't know how to set the parameter;
  2. the concurrency limits for different things might be completely different. I don't know, for instance, that you want to use the same number of worker threads for erasure coding vs. running the prover - if the prover has all-core internal parallelism, for instance, you might not want to run more than one background instance of the prover at a time.

So either way I'd improve the wording and be specific about what those threads are doing. As to sharing the taskpool, I'd only do it if the concurrency requirements for all tasks sharing that taskpool are similar, otherwise you'll need to to extra bookkeeping (e.g. sempahores) to enforce concurrency limits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"worker threads" means nothing.

On the contrary, this is pretty common nomenclature.

the concurrency limits for different things might be completely different.

This isn't the case and balancing this properly is beyond the scope of what we need and this point or in the future. We aren't planning on having different thread pools for different subsystems, simply because the complexity that this brings and the potential gains are not justified.

if the prover has all-core internal parallelism, for instance, you might not want to run more than one background instance of the prover at a time.

If the prover has it's own parallelism mechanism (which it does), it means that we need to contend with this resources regardless of who is using the cores, the erasure coding, or something else - the cores/cpus are shared by the entire process, so splitting the threadpool makes absolutely no sense.

Copy link
Member

@gmega gmega Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the contrary, this is pretty common nomenclature.

Yes, it is common nomenclature for "background threads that do work", but in a vacuum this means nothing. These worker threads could be doing anything, and you can't make an educated guess about how to size this properly without knowing what that anything is.

If the prover has it's own parallelism mechanism (which it does), it means that we need to contend with this resources regardless of who is using the cores, the erasure coding, or something else - the cores/cpus are shared by the entire process, so splitting the threadpool makes absolutely no sense.

I understand it doesn't solve the problem of resource contention completely, but allowing such large number of provers to run concurrently might make things much worse, including causing Codex to crash/thrash. If you need 8-16GB of RAM per prover instance, for example, you'll need 4x as much in a 4-core machine (and most machines these days have a lot more than that...).

Copy link
Contributor

@dryajov dryajov Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it doesn't solve the problem of resource contention completely, but allowing such large number of provers to run concurrently might make things much worse, including causing Codex to crash/thrash. If you need 8-16GB of RAM per prover instance, for example, you'll need 4x as much in a 4-core machine (and most machines these days have a lot more than that...).

That doesn't affect the number of dedicated cores/threads, this merely dictates how the threadpool is used by the different subsystems. For example, the erasure coding subsystem, is only attempting to offload work to a single thread at a time right now, without attempting to parallelize anything, later this might change. The same goes for the prover and anything else we use, how many instances of "something" are running at the same time is not directly related to the number of worker threads in the pool, rather to how the pool is used by the subsystem. So, if you can only run one or two instances of the prover, this should be properly synchronized at the invocation point.

Yes, it is common nomenclature for "background threads that do work", but in a vacuum this means nothing.

Again, in the context of using a thread pool, this is pretty self explanatory? Usually, you have a single threadpool per process, very rarely do you want different parallelization mechanisms across the codebase.

result: Atomic[bool]
encoder: EncoderBackend
blocks: ref seq[seq[byte]]
parity: ptr seq[seq[byte]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: why do we use ref and not ptr as well for blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmega blocks uses ref since it's read-only input data for the encoding. parity uses ptr since the C encoding function needs direct memory access to write the parity data

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK fair enough. I guess we could've used ptr for everything but would've been pointless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is quite unsafe (as we discussed), for both ptr (and more so) for ref.

The correct way is to pass around ptr UncheckedArray[ptr UncheckedArray] which point to the underlying seq heap location, which is retrieved with addr buffers[0][0] to get to the individual buffer, the first element in the seq points to the address of the contiguous heap memory.

The reason this is unsafe, is because with refc, the GC is not threadsafe, and any refcount increment from a foreign thread, would trip the GC. It's actually surprising that this works at all, but I guess we got lucky.

The problem however, is that converting UncheckedArray back to a seq is not safe either in this case. Essentially we end up with two GCs contending for the same memory region. Say for example, that we have a seq allocated in the main thread, and we pass a pointer to it to the child/worker thread which then (somehow) maps it back into it's own seq. What happens when the worker thread is finished and the second seq is collected? Most likely it will try to free that memory that is also owned by the main thread.

So, what to do? I think, there are two ways forward.

  1. Copy the contents of ptr UncheckedArray[ptr UncheckedArray[byte]] onto a seq[seq[byte]], on the threads local heap. This incurres 4 copies, one to copy the ptr onto the thread and then another by the nim leopard wrapper, which also has internal work buffers, and then back.
  2. Change the interface of nim-leopard to work with ptr, so that we avoid the excessive copies.

I like option 2 because it's cheaper, plus we control the nim wrapper and can modify it as we see fit.

Option 1 is simpler, but not by much.

If we go with option two, I would make the following changes to vendor/nim-leopard/leopard/leopard.nim:

# Current
func encode*(
  self: var LeoEncoder,
  data,
  parity: var openArray[seq[byte]]): Result[void, cstring] =
  ...

func decode*(
  self: var LeoDecoder,
  data,
  parity,
  recovered: var openArray[seq[byte]]): Result[void, cstring] =
  ...

# New

func encode*(
  self: var LeoEncoder,
  data,
  parity: ptr UncheckedArray[ptr UncheckedArray[byte]]): Result[void, cstring] =


func decode*(
  self: var LeoDecoder,
  data,
  parity,
  recovered: ptr UncheckedArray[ptr UncheckedArray[byte]]): Result[void, cstring] =

I would leave the old encode/decode as overloads that convert the openArray[seq[byte]] to it's pointer version for backwards compatibility reasons.

decoder: DecoderBackend
data: ref seq[seq[byte]]
parity: ref seq[seq[byte]]
recovered: ptr seq[seq[byte]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmega Same here, Decode function updates the recovered sequence so we need a ptr

Copy link
Contributor

@emizzle emizzle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job Rahul! I left mostly suggestions, just one question around requiring threading to be turned on and the configuration requirements/validation.

codex/codex.nim Outdated
taskpool = Taskpool.new(numThreads = config.numThreads)
info "Threadpool started", numThreads = taskpool.numThreads
except Exception:
raise newException(Defect, "Failure in taskpool initialization.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@munna0908 Exception is a parent type for Defect and CatchableError, the latter of which all other "catchable" error types are derived from. Defect is not catchable, and therefore we should not be using except Expection where we can avoid it (despite its usage in other places in the codebase).

In this particular case, using except CatchableError and converting to a Defect is a valid pattern, and it also means that if a Defect was raised in the try block, it would not be caught in the except, and would be bubbled up the stack.

See https://status-im.github.io/nim-style-guide/errors.exceptions.html for more info on exception handling best practices (TL;DR using Result is recommended)

@gmega I agree that we should probably return a Result[void] type here, read the Result in the layer above, and then quit on error. However, the layer above currently quits on exception, and there are a fair few instances of raised Defects in the new proc, so converting everything to Result would require a decent amount of cascading changes. For the scope of this PR, I think that Defect is suitable since there is no recovery opportunity here.

So my suggestion would be something like:

try:
  # start taskpool
except CatchableError as parent:
  raise newException(Defect, "Failure in taskpool initialisation: " & parent.msg, parent)

codex/erasure/erasure.nim Show resolved Hide resolved
blockSize, ecK, ecM: int,
data: ref seq[seq[byte]],
parity: ptr seq[seq[byte]],
): Future[Result[void, string]] {.async.} =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 This is completely correct, just highlighting that another option would be to continue the pattern in this module of returning ?!void, which is an alias forResult[void, CatchableError], from the questionable lib. Then, you can return success() or failure("something"). Again, just a suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit-link

@@ -87,6 +90,21 @@ type
# provided.
minSize*: NBytes

EncodeTask = object
result: Atomic[bool]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result is generally a reserved word. Not sure if we should use something else? @dryajov?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emizzle Good point, I will change this to success

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit-link

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine inside a struct/object, I don't think it will be a problem, but might be prudent to use a different name, res/ok?

blockSize, ecK, ecM: int,
data, parity: ref seq[seq[byte]],
recovered: ptr seq[seq[byte]],
): Future[Result[void, string]] {.async.} =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a good idea to tell the compiler we don't want any exceptions leaking out of this routine, so we don't have any issues with crashing thread tasks

Suggested change
): Future[Result[void, string]] {.async.} =
): Future[Result[void, string]] {.async: (raises:[]).} =

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for encodeAsync

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emizzle Commit-link

@munna0908 munna0908 requested review from gmega and emizzle January 28, 2025 14:54
@munna0908 munna0908 force-pushed the feature/async-erasure branch from 3d8d133 to f1b8dfc Compare January 28, 2025 14:55
if config.numThreads == 0:
taskpool = Taskpool.new(numThreads = min(countProcessors(), 16))
elif config.numThreads < 2:
raise newException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use a raiseAssert here instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think this is better checked in the conf.nim so that the invalid value doesn't get passed the cmd validation.

info "Threadpool started", numThreads = taskpool.numThreads
except CatchableError as parent:
raise
newException(Defect, "Failure in taskpool initialization:" & parent.msg, parent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably do a raiseAssert as well.

Copy link
Contributor

@emizzle emizzle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the changes, Rahul. Apart from the minor suggestions (raiseAssert instead of raise newException(Defect...), this LGTM. So, I'll approve now.

Copy link
Contributor

@emizzle emizzle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed the CancelledError changes, which brought to mind that maybe we need to consider gracefully terminating the TaskPool in the stop routine.

Sorry for the premature approval before!

warn "Async thread error", err = exc.msg
return failure(exc.msg)
except CancelledError:
return failure("Thread cancelled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, do we want to return a failure? I thought about this a bit further... this proc executes in the master thread, correct? If so, maybe we should propagate CancelledErrors. So the raises annotation would be:

{.async: (raises: [CacncelledError]).}

and the except block would re-raise:

except CancelledError as e:
    raise e

This brings up another question -- should the TaskPool be gracefully terminated in stop? Or is there no point to wait for worker threads to join master? IOW, would stopping the master thread kill the worker at the OS-level and possibly produce an out-of-band error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to properly cleanup everything, propagate the cancelation, and then make sure we don't exit prematurely at the callsite of both asyncEncode/asyncDecode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, we can't simply exit on cancellation, we need to wait for the worker to finish, so wait threadPtr.wait() should still be called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, couldn't that create a deadlock situation where the task is cancelled with cancelAndWait and but cancellation has wait on the thread to finish its operation before cancelling? Is there a way to forcefully kill the task and discard any errors or results?

warn "Async thread error", err = exc.msg
return failure(exc.msg)
except CancelledError:
return failure("Thread cancelled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

@emizzle emizzle self-requested a review January 28, 2025 23:38
dryajov
dryajov approved these changes Jan 29, 2025
Copy link
Contributor

@dryajov dryajov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still reviewing this changes, so lets not merge this just yet.

parity: ptr seq[seq[byte]],
): Future[?!void] {.async: (raises: []).} =
## Create an encoder backend instance
let encoder = self.encoderProvider(blockSize, ecK, ecM)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backend needs to be created in the worker thread, this creates the backend on the main thread, it should also be released once done.

warn "Async thread error", err = exc.msg
return failure(exc.msg)
except CancelledError:
return failure("Thread cancelled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to properly cleanup everything, propagate the cancelation, and then make sure we don't exit prematurely at the callsite of both asyncEncode/asyncDecode.

return failure("Thread cancelled")
finally:
# release the encoder
t.encoder.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you also need to release the ThreadSignalPtr. This was actually a fun one, because it took me some time to figure out what was wrong, as I was seeing things fail arbitrarily. The issue is that, ThreadSignalPtr uses pipes undereat to communicate between threads and if you don't dispose of it properly, you'll eventually hit a limit on opened filehandles. So, you need to call threadPtr.close() here.

warn "Async thread error", err = exc.msg
return failure(exc.msg)
except CancelledError:
return failure("Thread cancelled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, we can't simply exit on cancellation, we need to wait for the worker to finish, so wait threadPtr.wait() should still be called.

dryajov
dryajov approved these changes Jan 30, 2025
Copy link
Contributor

@dryajov dryajov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job for a first pass @munna0908!

Let me summarize what needs to be changed:

  • Rework how buffers are passed arround from ptr seq[seq[]] to ptr UncheckedArray[ptr UncheckedArray[byte]]
  • Dispose of the thread signal
  • Handle cancelations
  • Add tests to explicitely test the multithreaded functionality in testerasure.nim

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client See https://miro.com/app/board/uXjVNZ03E-c=/ for details
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants