-
Notifications
You must be signed in to change notification settings - Fork 685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
run barrier callback in BarrierPhyInstrOperand::~BarrierPhyInstrOperand #7702
Changes from 6 commits
b8f7f89
af55be9
1b3c3df
516c691
9095397
c3846cd
969ba8c
d406a30
c7c6c67
65db196
fca7b8d
b049dfc
7ed5d66
29ccd30
8509484
434b7af
3925eb4
86296cb
454f5e7
d1d9ad7
a58348d
8bb83a1
fe64379
9ca83c6
9b5cecc
f7f1fdf
72426fc
3be6f81
9f10675
da8b44d
c52bc90
cf14a1e
e3297f4
6f5d7c6
5d0c648
97cf982
a119bf0
d2b36a4
d8862b9
6e22eb4
e58bee9
df91894
93191d7
da35647
a3d45e3
1ec6dc3
3d6c7cd
f7e2241
c1afee6
7c4cb89
7daab5a
07bb2a2
47738f7
87c748f
9d5ab85
80a4542
57fff70
ce55514
c679884
53d088a
c6ecc12
204aa54
3124ecf
8c6c03e
72c30fd
5d6212f
19c18d4
002bacf
5cfcadd
aef2edc
b2d87f2
f00f991
ae9d601
89f3ce2
ad18575
b67a60e
fb8b9fa
ec2c402
45fd613
b730ef0
fbd921c
41c90aa
31538de
5fdba64
1f91eaa
6996258
f6c0b99
0fbfc80
22f220b
bcc1d8d
aec7634
c1010a1
44196a8
255c874
cd80f0c
34af282
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,8 @@ limitations under the License. | |
#include "oneflow/core/profiler/profiler.h" | ||
#include "oneflow/core/common/cpp_attribute.h" | ||
#include "oneflow/core/common/global.h" | ||
#include "oneflow/core/common/foreign_lock_helper.h" | ||
#include <typeinfo> | ||
|
||
namespace oneflow { | ||
namespace vm { | ||
|
@@ -189,19 +191,21 @@ void VirtualMachineEngine::ReleaseFinishedInstructions() { | |
// in stream->DeleteInstruction(...) | ||
intrusive::shared_ptr<InstructionMsg> instr_msg(instruction_ptr->mut_instr_msg()); | ||
stream->DeleteInstruction(LivelyInstructionListErase(instruction_ptr)); | ||
MoveInstructionMsgToGarbageMsgList(std::move(instr_msg)); | ||
static constexpr int kFlushWindowSize = 32; | ||
MoveInstructionMsgToGarbageMsgList(kFlushWindowSize, std::move(instr_msg)); | ||
} | ||
if (stream->running_instruction_list().empty()) { mut_active_stream_list()->Erase(stream); } | ||
} | ||
} | ||
|
||
void VirtualMachineEngine::MoveInstructionMsgToGarbageMsgList( | ||
intrusive::shared_ptr<InstructionMsg>&& instr_msg) { | ||
int flush_window_size, intrusive::shared_ptr<InstructionMsg>&& instr_msg) { | ||
local_garbage_msg_list_.EmplaceBack(std::move(instr_msg)); | ||
static constexpr int kWindowSize = 32; | ||
// local_garbage_msg_list_ is the cache of garbage_msg_list_. | ||
// `kWindowSize` controls the frequency of the usage of mutexed list. | ||
if (unlikely(local_garbage_msg_list_.size() > kWindowSize)) { MoveToGarbageMsgListAndNotifyGC(); } | ||
if (unlikely(local_garbage_msg_list_.size() > flush_window_size)) { | ||
MoveToGarbageMsgListAndNotifyGC(); | ||
} | ||
} | ||
|
||
void VirtualMachineEngine::MoveToGarbageMsgListAndNotifyGC() { | ||
|
@@ -495,7 +499,11 @@ void VirtualMachineEngine::TryRunBarrierInstruction() { | |
CHECK(OnSchedulerThread(stream_type)); | ||
stream_type.Run(sequnential_instruction); | ||
mut_barrier_instruction_list()->Erase(sequnential_instruction); | ||
intrusive::shared_ptr<InstructionMsg> instr_msg(sequnential_instruction->mut_instr_msg()); | ||
LivelyInstructionListErase(sequnential_instruction); | ||
sequnential_instruction->clear_instr_msg(); | ||
constexpr int kZeroWindowSize = 0; // flush immediately. | ||
MoveInstructionMsgToGarbageMsgList(kZeroWindowSize, std::move(instr_msg)); | ||
OF_PROFILER_RANGE_POP(); | ||
} | ||
|
||
|
@@ -534,7 +542,18 @@ void VirtualMachineEngine::Schedule() { | |
void VirtualMachineEngine::Callback() { | ||
InstructionMsgList garbage_msg_list; | ||
mut_garbage_msg_list()->MoveTo(&garbage_msg_list); | ||
// destruct garbage_msg_list. | ||
INTRUSIVE_FOR_EACH(garbage, &garbage_msg_list) { | ||
CHECK_JUST(Global<ForeignLockHelper>::Get()->WithScopedAcquire([&, this]() -> Maybe<void> { | ||
garbage_msg_list.Erase(garbage.Mutable()); | ||
while (garbage->ref_cnt() > 1) { | ||
// Do nothing. wait until all other threads ref_cnts released. | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. call back线程busy wait其它线程释放指令,以确认该指令在其它所有线程使用完成了 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. main线程的sync会等待这个CallBack执行完成? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个部分是需要main线程代码自行用bc来处理。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里之前不太熟悉,尝试整理一下。 1、当调用Sync时,被视为是BarrierInstruction Maybe<void> ClusterSync() {
auto bc = std::make_shared<BlockingCounter>(1);
JUST(PhysicalRun([bc](InstructionsBuilder* builder) -> Maybe<void> {
JUST(builder->ComputeGlobalFrontSeqBarrier()); // 是FrontSeq的指令,会被TryRunBarrierInstruction执行
JUST(builder->ComputeRankFrontSeqCallback([bc]() { bc->Decrease(); })); // 指令callback执行时,就bc减1
return Maybe<void>::Ok();
}));
JUST(bc->WaitUntilCntEqualZero(VirtualMachine::GetPredicatorNoMoreInstructionsFinished())); // 主线程等待bc为0,效果是阻塞主线程等待BarrierInstruction的callback执行
return Maybe<void>::Ok();
} 2、TryRunBarrierInstruction时,立即做指令的gc,触发BarrierInstruction及之前的所有指令的callback的执行 constexpr int kZeroWindowSize = 0; // flush immediately.
MoveInstructionMsgToGarbageMsgList(kZeroWindowSize, std::move(instr_msg)); 3、callback线程执行gc得到的所有指令的callback void VirtualMachine::CallbackLoop(const std::function<void()>& Initializer) {
Initializer();
auto* vm = mut_vm();
while (callback_notifier_.WaitAndClearNotifiedCnt() == kNotifierStatusSuccess) { vm->Callback(); }
} 执行有两个特点:
这样gc中的callback执行完,代表BarrierInstruction及之前的指令都执行完了,bc置0,main线程继续。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里的 busy wait 会有性能问题吗? cpu 占用率过高等问题 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
我更新了第548行的注释。这里只是处理一个非常罕见的情况,那个罕见情况会导致对象不是在callback线程里释放,这违背我们的本意。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 好的 |
||
CHECK_NOTNULL(garbage->phy_instr_operand()); | ||
CHECK_EQ(garbage->phy_instr_operand().use_count(), 1) << garbage->DebugName(); | ||
// Destruct garbage. | ||
return Maybe<void>::Ok(); | ||
})); | ||
} | ||
} | ||
|
||
void VirtualMachineEngine::NotifyCallback() { MoveToGarbageMsgListAndNotifyGC(); } | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里支持了在sync vm时,立即发送指令的GC