Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling erasure coding on another thread #716

Merged
merged 5 commits into from
Mar 23, 2024
Merged

Conversation

tbekas
Copy link
Contributor

@tbekas tbekas commented Feb 21, 2024

No description provided.

@tbekas tbekas requested review from elcritch and dryajov February 21, 2024 16:43
build.nims Outdated Show resolved Hide resolved
tests/testthreading.nim Outdated Show resolved Hide resolved
@tbekas tbekas self-assigned this Feb 28, 2024
@tbekas tbekas force-pushed the scheduling-computation branch 3 times, most recently from 52dc6dd to 4045c6d Compare March 13, 2024 14:43
@tbekas tbekas changed the title Scheduling computations on another thread Scheduling erasure coding on another thread Mar 13, 2024
@tbekas tbekas force-pushed the scheduling-computation branch from 4045c6d to c4626e1 Compare March 13, 2024 14:45
@tbekas tbekas marked this pull request as ready for review March 13, 2024 15:16
@tbekas tbekas requested review from dryajov and elcritch March 13, 2024 15:39
Copy link
Contributor

@elcritch elcritch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Would be nice to cleanup the manually results array into a proper type, but that can wait.

I think we should change the seq init's though.

codex/erasure/asyncbackend.nim Show resolved Hide resolved
codex/erasure/asyncbackend.nim Show resolved Hide resolved
Copy link
Contributor

@elcritch elcritch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, looks good. I read through the copyMem and indexing a couple of times to double check.

@benbierens
Copy link
Contributor

Image of this branch passes dist-tests. 👍

Copy link
Contributor

@dryajov dryajov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much closer to what we need, but it's still a bit broken. For one, the erasure coding backend isn't thread safe and it requires a) either locking per invocation or b) a new thread local instance each time.

The reason is because the internal work buffers are shared by the same thread, as are the internal tables (but those are read only, so that should be fine).

I also see a lot of unnecessary copying happening between the main thread and the execution thread, but this is a lesser issue.

The main point to keep in mind when working with threads under refc and it's memory model is that, objects in the owning thread with a lifetime greater than the execution thread can be safely shared, this means that when reading from an owning thread, it should be safe to share objects between threads provided that they are read only and unmodified after creation and they don't get de-allocated too soon. There are a few trics one can use to ensure this, for example holding an object in a closure, if the stack is going to vanish too soon.

In this case, when encoding/decoding, the original data can be passed by pointer without copying, but the encoded/decodec data, that is held by the thread gc, will have to be copied out on completion.

@elcritch
Copy link
Contributor

The reason is because the internal work buffers are shared by the same thread, as are the internal tables (but those are read only, so that should be fine).

Ah that'd be a problem. That'd be for the Encoder and Decoder backend?

There are a few trics one can use to ensure this, for example holding an object in a closure, if the stack is going to vanish too soon.

Chronos v4 has a nice noCancel which would prevent the async closure from being GCed in the event of a cancellation.

args: EncodeTaskArgs,
data: ref seq[seq[byte]]
): Flowvar[EncodeTaskResult] =
tp.spawn encodeTask(args, data[])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird that we're doing it like this here, as mentioned this is one place where we can pass a pointer to the data. In fact, a seq is nothing but a pointer and a length with the data being allocated on the heap (AFAIK always, but there might be exceptions), so really you are already exposed to all the potential concurrency issues...

So two things to keep in mind, a) we aren't really making a copy of the data just the seq descriptor and b) this is something to keep in mind when debugging potential concurrency issues.

And again, I'm really not a huge fan of the ref thing here in general, but I couldn't find a way of making a mutable seq in this context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So two things to keep in mind, a) we aren't really making a copy of the data just the seq descriptor and b) this is something to keep in mind when debugging potential concurrency issues.

Generally that's the case. Taskpools wrap arguments in Isolate which creates a deepcopy unless it can do a move. However, that may not alleviate the concurrency issues with refc since it's not clear if the ownership of the copied seq data is transferred to the other thread. @tbekas and I tested that it is doing a deep copy, but didn't determine who ends up owning the seq.

  let taskNode = new(TaskNode, workerContext.currentTask) do:
    type
      ScratchObj_838861196 = object
        data: seq[seq[char]]
        sig: ThreadSignalPtr

    let scratch_838861188 = cast[ptr ScratchObj_838861196](c_calloc(csize_t(1),
        csize_t(16)))
    if isNil(scratch_838861188):
      raise
        (ref OutOfMemDefect)(msg: "Could not allocate memory", parent: nil)
    block:
      var isoTemp_838861192 = isolate(data)
      scratch_838861188.data = extract(isoTemp_838861192)
      var isoTemp_838861194 = isolate(sig)
      scratch_838861188.sig = extract(isoTemp_838861194)
    proc worker_838861197(args`gensym27: pointer) {.gcsafe, nimcall, raises: [].} =
      let objTemp_838861191 = cast[ptr ScratchObj_838861196](args`gensym27)
      let data_838861193 = objTemp_838861191.data
      let sig_838861195 = objTemp_838861191.sig
      worker(data_838861193, sig_838861195)

    proc destroyScratch_838861198(args`gensym27: pointer) {.gcsafe, nimcall,
        raises: [].} =
      let obj_838861199 = cast[ptr ScratchObj_838861196](args`gensym27)
      `=destroy`(obj_838861199[])

    Task(callback: worker_838861197, args: scratch_838861188,
         destroy: destroyScratch_838861198)
  schedule(workerContext, taskNode)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of that to say that using ptr seq[seq[byte]] like @dryajov suggests shouldn't cause us any more issues when using refc than using seq[seq[byte]] since it's unclear how refc mixes with move. In the best case @tbekas setup using seq[seq[byte]] would move the data to the foreign thread like it would in arc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I'm still concerned of why we're not getting foreign GC heap contamination issue like we were previously @dryajov. Seems like seq's don't cause that corruption to occur anymore. Perhaps the compiler uses allocShared now instead? But I've run a bunch of tests with seqs and don't seem to run into the GC-heap contamination issue anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dryajov

we aren't really making a copy of the data just the seq descriptor

If that would be the case, the following test would fail, but it doesn't.

var tp = Taskpool.new(num_threads = 2)

proc runTask(zeroes: seq[byte]) =
  var ones = @[byte 1, 1, 1, 1]
  copyMem(unsafeAddr zeroes[0], addr ones[0], 4)

test "some test":
  var zeroes = newSeq[byte](4)

  tp.spawn runTask(zeroes)

  tp.syncAll()

  check:
    zeroes == @[byte 0, 0, 0, 0]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I don't understand is why your claiming that it would fail at all and how is this relevant?

Because you said that:

we aren't really making a copy of the data just the seq descriptor

which implies that we refer the same memory region from both threads. My example shows that threads refer their own copies, both descriptor and the data. If you meant something else by this statement, please explain what you meant exactly.

No, your example doesn't show that at all, that is the point I'm making in #716 (comment). For reference, you can see the transformed version of the encode/decode tasks.

I annotated the encode task with comments at the relevant places, decode follows the same patter, but it's here for completenes

Encode:

  let fut = newFlowVar(typeof(EncodeTaskResult))
  proc taskpool_encodeTask(args: EncodeTaskArgs; data: seq[seq[byte]];
                           fut: Flowvar[EncodeTaskResult]) {.nimcall.} =
    let res`gensym105 = encodeTask(args, data)
    readyWith(fut, res`gensym105)

  let task =
    type
      ScratchObj_822084605 = object
        args: EncodeTaskArgs
        data: seq[seq[byte]]
        fut: Flowvar[EncodeTaskResult]

    let scratch_822084595 = cast[ptr ScratchObj_822084605](c_calloc(csize_t(1),
        csize_t(48)))
    if isNil(scratch_822084595):
      raise
        (ref OutOfMemDefect)(msg: "Could not allocate memory", parent: nil)
    block:
      var isoTemp_822084599 = isolate(args)
      scratch_822084595.args = extract(isoTemp_822084599)
      var isoTemp_822084601 = isolate(data[])
      scratch_822084595.data = extract(isoTemp_822084601) # <---- the seq gets moved over, `extract` does `move` on the contained isolate
      var isoTemp_822084603 = isolate(fut)
      scratch_822084595.fut = extract(isoTemp_822084603)
    proc taskpool_encodeTask_822084606(args`gensym129: pointer) {.gcsafe,
        nimcall.} =
      let objTemp_822084598 = cast[ptr ScratchObj_822084605](args`gensym129)
      let args_822084600 = objTemp_822084598.args
      let data_822084602 = objTemp_822084598.data
      let fut_822084604 = objTemp_822084598.fut
      taskpool_encodeTask(args_822084600, data_822084602, fut_822084604)

    proc destroyScratch_822084607(args`gensym129: pointer) {.gcsafe, nimcall.} =
      let obj_822084608 = cast[ptr ScratchObj_822084605](args`gensym129)
      `=destroy`(obj_822084608[])

    Task(callback: taskpool_encodeTask_822084606, args: scratch_822084595,
         destroy: destroyScratch_822084607)
  let taskNode = new(TaskNode, workerContext.currentTask, task)


  schedule(workerContext, taskNode) # this is where everything gets scheduled on the worker thread, everything else happens in the owning thread

Decode:

  let fut = newFlowVar(typeof(DecodeTaskResult))
  proc taskpool_decodeTask(args: DecodeTaskArgs; data: seq[seq[byte]];
                           parity: seq[seq[byte]];
                           fut: Flowvar[DecodeTaskResult]) {.nimcall.} =
    let res`gensym133 = decodeTask(args, data, parity)
    readyWith(fut, res`gensym133)

  let task =
    type
      ScratchObj_822084876 = object
        args: DecodeTaskArgs
        data: seq[seq[byte]]
        parity: seq[seq[byte]]
        fut: Flowvar[EncodeTaskResult]

    let scratch_822084862 = cast[ptr ScratchObj_822084876](c_calloc(csize_t(1),
        csize_t(56)))
    if isNil(scratch_822084862):
      raise
        (ref OutOfMemDefect)(msg: "Could not allocate memory", parent: nil)
    block:
      var isoTemp_822084868 = isolate(args)
      scratch_822084862.args = extract(isoTemp_822084868)
      var isoTemp_822084870 = isolate(data[])
      scratch_822084862.data = extract(isoTemp_822084870)
      var isoTemp_822084872 = isolate(parity[])
      scratch_822084862.parity = extract(isoTemp_822084872)
      var isoTemp_822084874 = isolate(fut)
      scratch_822084862.fut = extract(isoTemp_822084874)
    proc taskpool_decodeTask_822084877(args`gensym138: pointer) {.gcsafe,
        nimcall.} =
      let objTemp_822084867 = cast[ptr ScratchObj_822084876](args`gensym138)
      let args_822084869 = objTemp_822084867.args
      let data_822084871 = objTemp_822084867.data
      let parity_822084873 = objTemp_822084867.parity
      let fut_822084875 = objTemp_822084867.fut
      taskpool_decodeTask(args_822084869, data_822084871, parity_822084873, fut_822084875)

    proc destroyScratch_822084878(args`gensym138: pointer) {.gcsafe, nimcall.} =
      let obj_822084879 = cast[ptr ScratchObj_822084876](args`gensym138)
      `=destroy`(obj_822084879[])

    Task(callback: taskpool_decodeTask_822084877, args: scratch_822084862,
         destroy: destroyScratch_822084878)
  let taskNode = new(TaskNode, workerContext.currentTask, task)
  schedule(workerContext, taskNode)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, after looking at this more closely, we aren't making a copy per se, but the data is still owned by the main thread, not the worker thread. So, maybe this is fine the way it is :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your code sample:

var isoTemp_822084601 = isolate(data[])
scratch_822084595.data = extract(isoTemp_822084601) # <---- the seq gets moved over, `extract` does `move` on the contained isolate

The second line maybe does the move, but the first one does the copy.

Copy link
Contributor

@elcritch elcritch Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, var isoTemp_822084601 = isolate(data[]) performs a deepcopy. I've used Isolate before and currently it's conservative and clones the data. Sometimes you can get it to move the data into the Isolate[T].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elcritch

All of that to say that using ptr seq[seq[byte]] like @dryajov suggests shouldn't cause us any more issues when using refc than using seq[seq[byte]] since it's unclear how refc mixes with move. In the best case @tbekas setup using seq[seq[byte]] would move the data to the foreign thread like it would in arc.

I don't think the move happens across GCs/threads, this is a move on to the holder object, that still lives on the owning thread, not the worker thread?

@dryajov for refc I believe you're correct, assuming move in refc doesn't move the data ownership of the seq to the new thread. I think that makes sense, but then in theory we should be getting "heap contamination".

I was digging into the seq memory handling in refc with moves but didn't get too far.

parity = newSeqWith[seq[byte]](args.ecM, newSeq[byte](args.blockSize))

try:
let res = args.backend[].encode(data[], parity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is probably the biggest issue right now, the backend isn't really thread safe and it uses internal work (read/write) buffers that will be shared by multiple threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an interface EncoderProvider* = proc(size, blocks, parity: int): EncoderBackend that's implemented here

func leoEncoderProvider*(

This creates an encoder instance per encode job, therefore encoder instance is not shared among threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I forgot about that one :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took another look at this and the way that the encode provider is being used right now is still not thread safe.

Take a look at https://github.com/codex-storage/nim-codex/pull/716/files#diff-47e9a8d861097fbe82af91a1cb564c1c7d878b5aa46a553c610f75d26d7151efL275, it creates an instance per encoding/decoding session, but this instance is then shared by all the threads spawned by that session. We might be getting away with it because access is properly synchronized/serialized at the main thread level, but it isn't properly guarded at each individual thread level, perhaps an instance per thread is a better solution - this would require passing the encode/decode providers to the backend and having each thread create it's own instance. That or a lock per encoding/decoding session that is shared by all the workers?

For now, I think this is fine, but it seems a bit brittle.

@dryajov
Copy link
Contributor

dryajov commented Mar 19, 2024

I think we can still avoid the copies all together when passing data to encode/decode and only copy the results. Also, deep copy will be quite expensive and it isn't even enabled in 2.0 by default, so I would take a closer look at that.

@tbekas tbekas force-pushed the scheduling-computation branch 3 times, most recently from f3ebfac to 3e02cba Compare March 19, 2024 21:50
@dryajov
Copy link
Contributor

dryajov commented Mar 19, 2024

There are some sleepAsync that can be removed now in https://github.com/codex-storage/nim-codex/blob/master/codex/erasure/erasure.nim#L291 and https://github.com/codex-storage/nim-codex/blob/master/codex/erasure/erasure.nim#L396

proc new*(
T: type CodexNodeRef,
switch: Switch,
networkStore: NetworkStore,
engine: BlockExcEngine,
discovery: Discovery,
prover = Prover.none,
contracts = Contracts.default): CodexNodeRef =
contracts = Contracts.default,
taskpool = Taskpool.new(num_threads = countProcessors())): CodexNodeRef =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be created in codex.nim, because it will be shared by more than one componnent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah, I'd agree. The taskpools will probably want to be shared. Though it might make sense to overschedule threads and have multiple taskpools at some point, but I can't envision a good reason currently.

Copy link
Contributor

@dryajov dryajov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is overall meargeable with the exception of the sleepAsync and the tp initialization.

A TL;DR for this thread would be:

  • The async backend is created per dataset encoding/decoding session and then shared by all the worker threads. This seems to be properly serialized at the main thread level, but not at the worker thread level and it might turn out to be a bit brittle - something to keep an eye on
  • There doesn't seem to be any excessive copying going on. The data flow is as follows
    • The main thread's sets up a task object with all the data properly moved onto that object by way of an isolate
    • The main thread owns the task object and passes a pointer to the worker thread
    • The worker thread has to be careful not to modify this object directly as it is owned by a foreign GC, but read access should be fine
    • This doesn't eliminate potential issues with object lifetimes and cancellations, but it's probably OK in general. I suspect that the issue that @elcritch is talking about with passing the pointer around instead of a properly isolated object, is due to the fact that, the GC doesn't know that another thread is holding that object, in the case of the isolate, the lifetime of the containing object is bound to the task object, which outlives the cancelation. When using pointers, it might be required to handle the lifetime of the object with GC_ref/GC_unref to prevent this issues. All in all, this solution is better and safer than the pointer approach.

@elcritch
Copy link
Contributor

  • The worker thread has to be careful not to modify this object directly as it is owned by a foreign GC, but read access should be fine

Another benefit of passing via seq is that it helps enforce this.

This doesn't eliminate potential issues with object lifetimes and cancellations, but it's probably OK in general. I suspect that the issue that @elcritch is talking about with passing the pointer around instead of a properly isolated object, is due to the fact that, the GC doesn't know that another thread is holding that object, in the case of the isolate, the lifetime of the containing object is bound to the task object, which outlives the cancelation.

That's a good summary. I just did a few more tests to verify and it appears to work this way with Nim 1.6.18. Passing a ptr to the data can cause lifetime issues in the event of cancellations. That can be avoided by using a properly isolated type or by using the upcoming noCancel.

@tbekas tbekas force-pushed the scheduling-computation branch from fd311dd to d163e5d Compare March 21, 2024 10:55
@tbekas tbekas force-pushed the scheduling-computation branch from d163e5d to b6bd063 Compare March 21, 2024 11:14
@tbekas tbekas added this pull request to the merge queue Mar 23, 2024
Merged via the queue into master with commit 59d9439 Mar 23, 2024
10 checks passed
@tbekas tbekas deleted the scheduling-computation branch March 23, 2024 10:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants