diff --git a/pxr/usd/usd/crateData.cpp b/pxr/usd/usd/crateData.cpp index eee827c266..ba0605ee29 100644 --- a/pxr/usd/usd/crateData.cpp +++ b/pxr/usd/usd/crateData.cpp @@ -35,9 +35,10 @@ #include "pxr/base/tf/typeInfoMap.h" #include "pxr/base/trace/trace.h" -#include "pxr/base/work/arenaDispatcher.h" -#include "pxr/base/work/utils.h" +#include "pxr/base/work/dispatcher.h" #include "pxr/base/work/loops.h" +#include "pxr/base/work/utils.h" +#include "pxr/base/work/withScopedParallelism.h" #include "pxr/usd/sdf/payload.h" #include "pxr/usd/sdf/schema.h" @@ -849,136 +850,141 @@ class Usd_CrateDataImpl // Ensure we start from a clean slate. _ClearSpecData(); - WorkArenaDispatcher dispatcher; - - // Pull all the data out of the crate file structure that we'll consume. - vector specs; - vector fields; - vector fieldSets; - _crateFile->RemoveStructuralData(specs, fields, fieldSets); - - // Remove any target specs, we do not store target specs in Usd, but old - // files could contain them. - specs.erase( - remove_if( - specs.begin(), specs.end(), - [this](CrateFile::Spec const &spec) { - return _crateFile->GetPath(spec.pathIndex).IsTargetPath(); - }), - specs.end()); - - // Sort by path fast-less-than, need same order that _Table will - // store. - dispatcher.Run([this, &specs]() { - tbb::parallel_sort( - specs.begin(), specs.end(), - [this](CrateFile::Spec const &l, CrateFile::Spec const &r) { - SdfPath::FastLessThan flt; - return flt(_crateFile->GetPath(l.pathIndex), - _crateFile->GetPath(r.pathIndex)); + WorkWithScopedParallelism([this]() { + + WorkDispatcher dispatcher; + + // Pull all the data out of the crate file structure that we'll + // consume. + vector specs; + vector fields; + vector fieldSets; + _crateFile->RemoveStructuralData(specs, fields, fieldSets); + + // Remove any target specs, we do not store target specs in Usd, + // but old files could contain them. + specs.erase( + remove_if( + specs.begin(), specs.end(), + [this](CrateFile::Spec const &spec) { + return _crateFile->GetPath( + spec.pathIndex).IsTargetPath(); + }), + specs.end()); + + // Sort by path fast-less-than, need same order that _Table will + // store. + dispatcher.Run([this, &specs]() { + tbb::parallel_sort( + specs.begin(), specs.end(), + [this](CrateFile::Spec const &l, CrateFile::Spec const &r) { + SdfPath::FastLessThan flt; + return flt(_crateFile->GetPath(l.pathIndex), + _crateFile->GetPath(r.pathIndex)); + }); }); - }); - dispatcher.Wait(); + dispatcher.Wait(); - // This function object just turns a CrateFile::Spec into the spec data - // type that we want to store in _flatData. It has to be a function - // object instead of a lambda because boost::transform_iterator requires - // the function object be copy/assignable. - struct _SpecToPair { - using result_type = _FlatMap::value_type; - explicit _SpecToPair(CrateFile *crateFile) : crateFile(crateFile) {} - result_type operator()(CrateFile::Spec const &spec) const { - result_type r(crateFile->GetPath(spec.pathIndex), - _FlatSpecData(Usd_EmptySharedTag)); - TF_AXIOM(!r.first.IsTargetPath()); - return r; - } - CrateFile *crateFile; - }; - - { - TfAutoMallocTag tag2("Usd_CrateDataImpl main hash table"); - _SpecToPair s2p(_crateFile.get()); - decltype(_flatData)( - boost::container::ordered_unique_range, - boost::make_transform_iterator(specs.begin(), s2p), - boost::make_transform_iterator(specs.end(), s2p)).swap( - _flatData); - } - - // Allocate all the spec data structures in the hashtable first, so we - // can populate fields in parallel without locking. - vector<_FlatSpecData *> specDataPtrs; - - // Create all the specData entries and store pointers to them. - dispatcher.Run([this, &specs, &specDataPtrs]() { - // XXX Won't need first two tags when bug #132031 is addressed - TfAutoMallocTag2 tag("Usd", "Usd_CrateDataImpl::Open"); - TfAutoMallocTag tag2("Usd_CrateDataImpl main hash table"); - specDataPtrs.resize(specs.size()); - for (size_t i = 0; i != specs.size(); ++i) { - specDataPtrs[i] = &(_flatData.begin()[i].second); + // This function object just turns a CrateFile::Spec into the + // spec data type that we want to store in _flatData. It has to + // be a function object instead of a lambda because + // boost::transform_iterator requires the function object be + // copy/assignable. + struct _SpecToPair { + using result_type = _FlatMap::value_type; + explicit _SpecToPair(CrateFile *crateFile) : crateFile(crateFile) {} + result_type operator()(CrateFile::Spec const &spec) const { + result_type r(crateFile->GetPath(spec.pathIndex), + _FlatSpecData(Usd_EmptySharedTag)); + TF_AXIOM(!r.first.IsTargetPath()); + return r; + } + CrateFile *crateFile; + }; + + { + TfAutoMallocTag tag2("Usd_CrateDataImpl main hash table"); + _SpecToPair s2p(_crateFile.get()); + decltype(_flatData)( + boost::container::ordered_unique_range, + boost::make_transform_iterator(specs.begin(), s2p), + boost::make_transform_iterator(specs.end(), s2p)).swap( + _flatData); } - }); - - // Create the specType array. - dispatcher.Run([this, &specs]() { - // XXX Won't need first two tags when bug #132031 is addressed - TfAutoMallocTag2 tag("Usd", "Usd_CrateDataImpl::Open"); - TfAutoMallocTag tag2("Usd_CrateDataImpl main hash table"); - _flatTypes.resize(specs.size()); - }); - - typedef Usd_Shared<_FieldValuePairVector> SharedFieldValuePairVector; - - unordered_map< - FieldSetIndex, SharedFieldValuePairVector, _Hasher> liveFieldSets; - - for (auto fsBegin = fieldSets.begin(), - fsEnd = find(fsBegin, fieldSets.end(), FieldIndex()); - fsBegin != fieldSets.end(); - fsBegin = fsEnd + 1, - fsEnd = find(fsBegin, fieldSets.end(), FieldIndex())) { - - // Add this range to liveFieldSets. - TfAutoMallocTag tag2("field data"); - auto &fieldValuePairs = - liveFieldSets[FieldSetIndex(fsBegin-fieldSets.begin())]; - - dispatcher.Run( - [this, fsBegin, fsEnd, &fields, &fieldValuePairs]() mutable { - // XXX Won't need first two tags when bug #132031 is - // addressed - TfAutoMallocTag2 tag("Usd", "Usd_CrateDataImpl::Open"); + + // Allocate all the spec data structures in the hashtable first, + // so we can populate fields in parallel without locking. + vector<_FlatSpecData *> specDataPtrs; + + // Create all the specData entries and store pointers to them. + dispatcher.Run([this, &specs, &specDataPtrs]() { + // XXX Won't need first two tags when bug #132031 is addressed + TfAutoMallocTag2 tag("Usd", "Usd_CrateDataImpl::Open"); + TfAutoMallocTag tag2("Usd_CrateDataImpl main hash table"); + specDataPtrs.resize(specs.size()); + for (size_t i = 0; i != specs.size(); ++i) { + specDataPtrs[i] = &(_flatData.begin()[i].second); + } + }); + + // Create the specType array. + dispatcher.Run([this, &specs]() { + // XXX Won't need first two tags when bug #132031 is addressed + TfAutoMallocTag2 tag("Usd", "Usd_CrateDataImpl::Open"); + TfAutoMallocTag tag2("Usd_CrateDataImpl main hash table"); + _flatTypes.resize(specs.size()); + }); + + typedef Usd_Shared<_FieldValuePairVector> SharedFieldValuePairVector; + + unordered_map< + FieldSetIndex, SharedFieldValuePairVector, _Hasher> liveFieldSets; + + for (auto fsBegin = fieldSets.begin(), + fsEnd = find(fsBegin, fieldSets.end(), FieldIndex()); + fsBegin != fieldSets.end(); + fsBegin = fsEnd + 1, + fsEnd = find(fsBegin, fieldSets.end(), FieldIndex())) { + + // Add this range to liveFieldSets. TfAutoMallocTag tag2("field data"); - auto &pairs = fieldValuePairs.GetMutable(); - pairs.resize(fsEnd-fsBegin); - for (size_t i = 0; fsBegin != fsEnd; ++fsBegin, ++i) { - auto const &field = fields[fsBegin->value]; - pairs[i].first = _crateFile->GetToken(field.tokenIndex); - pairs[i].second = _UnpackForField(field.valueRep); - } - }); - } - - dispatcher.Wait(); - - dispatcher.Run( - [this, &specs, &specDataPtrs, &liveFieldSets]() { - tbb::parallel_for( - static_cast(0), static_cast(specs.size()), - [this, &specs, &specDataPtrs, &liveFieldSets] - (size_t specIdx) { - auto const &s = specs[specIdx]; - auto *specData = specDataPtrs[specIdx]; - _flatTypes[specIdx].type = s.specType; - specData->fields = - liveFieldSets.find(s.fieldSetIndex)->second; + auto &fieldValuePairs = + liveFieldSets[FieldSetIndex(fsBegin-fieldSets.begin())]; + + dispatcher.Run( + [this, fsBegin, fsEnd, &fields, &fieldValuePairs]() mutable { + // XXX Won't need first two tags when bug #132031 is + // addressed + TfAutoMallocTag2 tag("Usd", "Usd_CrateDataImpl::Open"); + TfAutoMallocTag tag2("field data"); + auto &pairs = fieldValuePairs.GetMutable(); + pairs.resize(fsEnd-fsBegin); + for (size_t i = 0; fsBegin != fsEnd; ++fsBegin, ++i) { + auto const &field = fields[fsBegin->value]; + pairs[i].first = _crateFile->GetToken(field.tokenIndex); + pairs[i].second = _UnpackForField(field.valueRep); + } + }); + } + + dispatcher.Wait(); + + dispatcher.Run( + [this, &specs, &specDataPtrs, &liveFieldSets]() { + tbb::parallel_for( + static_cast(0), static_cast(specs.size()), + [this, &specs, &specDataPtrs, &liveFieldSets] + (size_t specIdx) { + auto const &s = specs[specIdx]; + auto *specData = specDataPtrs[specIdx]; + _flatTypes[specIdx].type = s.specType; + specData->fields = + liveFieldSets.find(s.fieldSetIndex)->second; + }); }); + + dispatcher.Wait(); }); - - dispatcher.Wait(); - return true; } diff --git a/pxr/usd/usd/crateFile.cpp b/pxr/usd/usd/crateFile.cpp index 0949f21b5a..565cacfcdf 100644 --- a/pxr/usd/usd/crateFile.cpp +++ b/pxr/usd/usd/crateFile.cpp @@ -63,9 +63,10 @@ #include "pxr/base/trace/trace.h" #include "pxr/base/vt/dictionary.h" #include "pxr/base/vt/value.h" -#include "pxr/base/work/arenaDispatcher.h" +#include "pxr/base/work/dispatcher.h" #include "pxr/base/work/singularTask.h" #include "pxr/base/work/utils.h" +#include "pxr/base/work/withScopedParallelism.h" #include "pxr/usd/ar/asset.h" #include "pxr/usd/ar/resolvedPath.h" #include "pxr/usd/ar/resolver.h" @@ -870,7 +871,7 @@ class CrateFile::_BufferedOutput // Queue of pending write operations. tbb::concurrent_queue<_WriteOp> _writeQueue; - WorkArenaDispatcher _dispatcher; + WorkDispatcher _dispatcher; WorkSingularTask _writeTask; }; @@ -893,61 +894,68 @@ struct CrateFile::_PackingContext // Populate this context with everything we need from \p crate in order // to do deduplication, etc. - WorkArenaDispatcher wd; - - // Read in any unknown sections so we can rewrite them later. - wd.Run([this, crate]() { - for (auto const &sec: crate->_toc.sections) { - if (!_IsKnownSection(sec.name)) { - unknownSections.emplace_back( - sec.name, _ReadSectionBytes(sec, crate), sec.size); - } - } - }); - - // Ensure that pathToPathIndex is correctly populated. - wd.Run([this, crate]() { - for (size_t i = 0; i != crate->_paths.size(); ++i) - pathToPathIndex[crate->_paths[i]] = PathIndex(i); - }); + WorkWithScopedParallelism([this, crate]() { + WorkDispatcher wd; + + // Read in any unknown sections so we can rewrite them later. + wd.Run([this, crate]() { + for (auto const &sec: crate->_toc.sections) { + if (!_IsKnownSection(sec.name)) { + unknownSections.emplace_back( + sec.name, _ReadSectionBytes(sec, crate), + sec.size); + } + } + }); + + // Ensure that pathToPathIndex is correctly populated. + wd.Run([this, crate]() { + for (size_t i = 0; i != crate->_paths.size(); ++i) + pathToPathIndex[crate->_paths[i]] = PathIndex(i); + }); + + // Ensure that fieldToFieldIndex is correctly populated. + wd.Run([this, crate]() { + for (size_t i = 0; i != crate->_fields.size(); ++i) + fieldToFieldIndex[ + crate->_fields[i]] = FieldIndex(i); + }); + + // Ensure that fieldsToFieldSetIndex is correctly populated. + auto const &fsets = crate->_fieldSets; + wd.Run([this, &fsets]() { + vector fieldIndexes; + for (auto fsBegin = fsets.begin(), + fsEnd = find( + fsBegin, fsets.end(), FieldIndex()); + fsBegin != fsets.end(); + fsBegin = fsEnd + 1, + fsEnd = find( + fsBegin, fsets.end(), FieldIndex())) { + fieldIndexes.assign(fsBegin, fsEnd); + fieldsToFieldSetIndex[fieldIndexes] = + FieldSetIndex(fsBegin - fsets.begin()); + } + }); - // Ensure that fieldToFieldIndex is correctly populated. - wd.Run([this, crate]() { - for (size_t i = 0; i != crate->_fields.size(); ++i) - fieldToFieldIndex[crate->_fields[i]] = FieldIndex(i); + // Ensure that tokenToTokenIndex is correctly populated. + wd.Run([this, crate]() { + for (size_t i = 0; i != crate->_tokens.size(); ++i) + tokenToTokenIndex[ + crate->_tokens[i]] = TokenIndex(i); + }); + + // Ensure that stringToStringIndex is correctly populated. + wd.Run([this, crate]() { + for (size_t i = 0; i != crate->_strings.size(); ++i) + stringToStringIndex[ + crate->GetString(StringIndex(i))] = + StringIndex(i); + }); }); - // Ensure that fieldsToFieldSetIndex is correctly populated. - auto const &fsets = crate->_fieldSets; - wd.Run([this, &fsets]() { - vector fieldIndexes; - for (auto fsBegin = fsets.begin(), - fsEnd = find(fsBegin, fsets.end(), FieldIndex()); - fsBegin != fsets.end(); - fsBegin = fsEnd + 1, - fsEnd = find(fsBegin, fsets.end(), FieldIndex())) { - fieldIndexes.assign(fsBegin, fsEnd); - fieldsToFieldSetIndex[fieldIndexes] = - FieldSetIndex(fsBegin - fsets.begin()); - } - }); - - // Ensure that tokenToTokenIndex is correctly populated. - wd.Run([this, crate]() { - for (size_t i = 0; i != crate->_tokens.size(); ++i) - tokenToTokenIndex[crate->_tokens[i]] = TokenIndex(i); - }); - - // Ensure that stringToStringIndex is correctly populated. - wd.Run([this, crate]() { - for (size_t i = 0; i != crate->_strings.size(); ++i) - stringToStringIndex[ - crate->GetString(StringIndex(i))] = StringIndex(i); - }); - // Set file pos to start of the structural sections in the current TOC. bufferedOutput.Seek(crate->_toc.GetMinimumSectionStart()); - wd.Wait(); } // Destructively move the output file out of this context. @@ -3349,25 +3357,26 @@ CrateFile::_ReadTokens(Reader reader) _tokens.clear(); _tokens.resize(numTokens); - WorkArenaDispatcher wd; - struct MakeToken { - void operator()() const { (*tokens)[index] = TfToken(str); } - vector *tokens; - size_t index; - char const *str; - }; - size_t i = 0; - for (; p < charsEnd && i != numTokens; ++i) { - MakeToken mt { &_tokens, i, p }; - wd.Run(mt); - p += strlen(p) + 1; - } - wd.Wait(); - - if (i != numTokens) { - TF_RUNTIME_ERROR("Crate file claims %zu tokens, found %zu", - numTokens, i); - } + WorkWithScopedParallelism([this, &p, charsEnd, numTokens]() { + WorkDispatcher wd; + struct MakeToken { + void operator()() const { (*tokens)[index] = TfToken(str); } + vector *tokens; + size_t index; + char const *str; + }; + size_t i = 0; + for (; p < charsEnd && i != numTokens; ++i) { + MakeToken mt { &_tokens, i, p }; + wd.Run(mt); + p += strlen(p) + 1; + } + wd.Wait(); + if (i != numTokens) { + TF_RUNTIME_ERROR("Crate file claims %zu tokens, found %zu", + numTokens, i); + } + }); WorkSwapDestroyAsync(chars); } @@ -3388,25 +3397,25 @@ CrateFile::_ReadPaths(Reader reader) _paths.resize(reader.template Read()); std::fill(_paths.begin(), _paths.end(), SdfPath()); - WorkArenaDispatcher dispatcher; - // VERSIONING: PathItemHeader changes size from 0.0.1 to 0.1.0. - Version fileVer(_boot); - if (fileVer == Version(0,0,1)) { - _ReadPathsImpl<_PathItemHeader_0_0_1>(reader, dispatcher); - } else if (fileVer < Version(0,4,0)) { - _ReadPathsImpl<_PathItemHeader>(reader, dispatcher); - } else { - // 0.4.0 has compressed paths. - _ReadCompressedPaths(reader, dispatcher); - } - - dispatcher.Wait(); + WorkWithScopedParallelism([this, &reader]() { + WorkDispatcher dispatcher; + // VERSIONING: PathItemHeader changes size from 0.0.1 to 0.1.0. + Version fileVer(_boot); + if (fileVer == Version(0,0,1)) { + _ReadPathsImpl<_PathItemHeader_0_0_1>(reader, dispatcher); + } else if (fileVer < Version(0,4,0)) { + _ReadPathsImpl<_PathItemHeader>(reader, dispatcher); + } else { + // 0.4.0 has compressed paths. + _ReadCompressedPaths(reader, dispatcher); + } + }); } template void CrateFile::_ReadPathsImpl(Reader reader, - WorkArenaDispatcher &dispatcher, + WorkDispatcher &dispatcher, SdfPath parentPath) { bool hasChild = false, hasSibling = false; @@ -3458,7 +3467,7 @@ CrateFile::_ReadPathsImpl(Reader reader, template void CrateFile::_ReadCompressedPaths(Reader reader, - WorkArenaDispatcher &dispatcher) + WorkDispatcher &dispatcher) { // Read compressed data first. vector pathIndexes; @@ -3496,7 +3505,7 @@ CrateFile::_BuildDecompressedPathsImpl( vector const &jumps, size_t curIndex, SdfPath parentPath, - WorkArenaDispatcher &dispatcher) + WorkDispatcher &dispatcher) { bool hasChild = false, hasSibling = false; do { diff --git a/pxr/usd/usd/crateFile.h b/pxr/usd/usd/crateFile.h index c9950cdd1e..6d8e1a747d 100644 --- a/pxr/usd/usd/crateFile.h +++ b/pxr/usd/usd/crateFile.h @@ -34,7 +34,7 @@ #include "pxr/base/tf/token.h" #include "pxr/base/vt/array.h" #include "pxr/base/vt/value.h" -#include "pxr/base/work/arenaDispatcher.h" +#include "pxr/base/work/dispatcher.h" #include "pxr/usd/ar/asset.h" #include "pxr/usd/sdf/assetPath.h" #include "pxr/usd/sdf/path.h" @@ -802,18 +802,18 @@ class CrateFile template void _ReadPaths(Reader src); template void _ReadPathsImpl(Reader reader, - WorkArenaDispatcher &dispatcher, + WorkDispatcher &dispatcher, SdfPath parentPath=SdfPath()); template void _ReadCompressedPaths(Reader reader, - WorkArenaDispatcher &dispatcher); + WorkDispatcher &dispatcher); void _BuildDecompressedPathsImpl( std::vector const &pathIndexes, std::vector const &elementTokenIndexes, std::vector const &jumps, size_t curIndex, SdfPath parentPath, - WorkArenaDispatcher &dispatcher); + WorkDispatcher &dispatcher); void _ReadRawBytes(int64_t start, int64_t size, char *buf) const; diff --git a/pxr/usd/usd/prim.cpp b/pxr/usd/usd/prim.cpp index 300d0cfb21..d4535cbc80 100644 --- a/pxr/usd/usd/prim.cpp +++ b/pxr/usd/usd/prim.cpp @@ -44,9 +44,10 @@ #include "pxr/usd/sdf/primSpec.h" #include "pxr/base/plug/registry.h" -#include "pxr/base/work/arenaDispatcher.h" +#include "pxr/base/work/dispatcher.h" #include "pxr/base/work/loops.h" #include "pxr/base/work/singularTask.h" +#include "pxr/base/work/withScopedParallelism.h" #include "pxr/base/tf/ostreamMethods.h" @@ -895,16 +896,12 @@ struct UsdPrim_TargetFinder void _Find() { TF_PY_ALLOW_THREADS_IN_SCOPE(); - _dispatcher.Run([this]() { _VisitSubtree(_prim); }); - _dispatcher.Wait(); - - // (We run this parallel_sort in the arena dispatcher to avoid the TBB - // deadlock issue). - _dispatcher.Run([this]() { + WorkWithScopedParallelism([this]() { + _VisitSubtree(_prim); + _dispatcher.Wait(); tbb::parallel_sort(_result.begin(), _result.end(), SdfPath::FastLessThan()); }); - _dispatcher.Wait(); _result.erase(unique(_result.begin(), _result.end()), _result.end()); } @@ -917,7 +914,7 @@ struct UsdPrim_TargetFinder } UsdPrim _prim; - WorkArenaDispatcher _dispatcher; + WorkDispatcher _dispatcher; WorkSingularTask _consumerTask; Predicate const &_predicate; tbb::concurrent_queue _workQueue; diff --git a/pxr/usd/usd/schemaRegistry.cpp b/pxr/usd/usd/schemaRegistry.cpp index aba31f5d8c..176cf1bf3b 100644 --- a/pxr/usd/usd/schemaRegistry.cpp +++ b/pxr/usd/usd/schemaRegistry.cpp @@ -47,6 +47,7 @@ #include "pxr/base/tf/token.h" #include "pxr/base/tf/type.h" #include "pxr/base/work/loops.h" +#include "pxr/base/work/withScopedParallelism.h" #include #include @@ -805,9 +806,7 @@ UsdSchemaRegistry::_FindAndAddPluginSchema() // For each plugin, if it has generated schema, add it to the schematics. std::vector generatedSchemas(plugins.size()); - { - WorkArenaDispatcher dispatcher; - dispatcher.Run([&plugins, &generatedSchemas]() { + WorkWithScopedParallelism([&plugins, &generatedSchemas]() { WorkParallelForN( plugins.size(), [&plugins, &generatedSchemas](size_t begin, size_t end) { @@ -816,8 +815,7 @@ UsdSchemaRegistry::_FindAndAddPluginSchema() _GetGeneratedSchema(plugins[begin]); } }); - }); - } + }); SdfChangeBlock block; TfToken::HashSet appliedAPISchemaNames = _GetAppliedAPISchemaNames(); diff --git a/pxr/usd/usd/stage.cpp b/pxr/usd/usd/stage.cpp index 493f03b869..c4e05d5db2 100644 --- a/pxr/usd/usd/stage.cpp +++ b/pxr/usd/usd/stage.cpp @@ -90,9 +90,10 @@ #include "pxr/base/tf/span.h" #include "pxr/base/tf/stl.h" #include "pxr/base/tf/stringUtils.h" -#include "pxr/base/work/arenaDispatcher.h" +#include "pxr/base/work/dispatcher.h" #include "pxr/base/work/loops.h" #include "pxr/base/work/utils.h" +#include "pxr/base/work/withScopedParallelism.h" #include #include @@ -508,37 +509,44 @@ UsdStage::_Close() TF_PY_ALLOW_THREADS_IN_SCOPE(); - WorkArenaDispatcher wd; + WorkWithScopedParallelism([this]() { - // Stop listening for notices. - wd.Run([this]() { - for (auto &p: _layersAndNoticeKeys) - TfNotice::Revoke(p.second); - }); - - // Destroy prim structure. - vector primsToDestroy; - if (_pseudoRoot) { - // Instancing prototypes are not children of the pseudo-root so - // we need to explicitly destroy those subtrees. - primsToDestroy = _instanceCache->GetAllPrototypes(); - wd.Run([this, &primsToDestroy]() { - primsToDestroy.push_back(SdfPath::AbsoluteRootPath()); - _DestroyPrimsInParallel(primsToDestroy); - _pseudoRoot = nullptr; - WorkMoveDestroyAsync(primsToDestroy); - }); - } + // Destroy prim structure. + vector primsToDestroy; + { + // Scope the dispatcher so that its dtor Wait()s for work to + // complete before primsToDestroy is destroyed, since tasks we + // schedule in the dispatcher access it. + WorkDispatcher wd; + + // Stop listening for notices. + wd.Run([this]() { + for (auto &p: _layersAndNoticeKeys) + TfNotice::Revoke(p.second); + }); + + if (_pseudoRoot) { + // Instancing prototypes are not children of the pseudo-root + // so we need to explicitly destroy those subtrees. + primsToDestroy = _instanceCache->GetAllPrototypes(); + wd.Run([this, &primsToDestroy]() { + primsToDestroy.push_back( + SdfPath::AbsoluteRootPath()); + _DestroyPrimsInParallel(primsToDestroy); + _pseudoRoot = nullptr; + WorkMoveDestroyAsync(primsToDestroy); + }); + } - // Clear members. - wd.Run([this]() { _cache.reset(); }); - wd.Run([this]() { _clipCache.reset(); }); - wd.Run([this]() { _instanceCache.reset(); }); - wd.Run([this]() { _sessionLayer.Reset(); }); - wd.Run([this]() { _rootLayer.Reset(); }); - _editTarget = UsdEditTarget(); - - wd.Wait(); + // Clear members. + wd.Run([this]() { _cache.reset(); }); + wd.Run([this]() { _clipCache.reset(); }); + wd.Run([this]() { _instanceCache.reset(); }); + wd.Run([this]() { _sessionLayer.Reset(); }); + wd.Run([this]() { _rootLayer.Reset(); }); + _editTarget = UsdEditTarget(); + } + }); WorkSwapDestroyAsync(_primMap); // XXX: Do not do this async, since python might shut down concurrently with @@ -2900,29 +2908,31 @@ UsdStage::_ComposeSubtreesInParallel( TRACE_FUNCTION(); // Begin a subtree composition in parallel. - _primMapMutex = boost::in_place(); - _dispatcher = boost::in_place(); - // We populate the clip cache concurrently during composition, so we need to - // enable concurrent population here. - Usd_ClipCache::ConcurrentPopulationContext - clipConcurrentPopContext(*_clipCache); - try { - for (size_t i = 0; i != prims.size(); ++i) { - Usd_PrimDataPtr p = prims[i]; - _dispatcher->Run( - &UsdStage::_ComposeSubtreeImpl, this, p, p->GetParent(), - &_populationMask, - primIndexPaths ? (*primIndexPaths)[i] : p->GetPath()); - } - } - catch (...) { - _dispatcher = boost::none; - _primMapMutex = boost::none; - throw; - } - - _dispatcher = boost::none; - _primMapMutex = boost::none; + WorkWithScopedParallelism([this, &prims, &primIndexPaths]() { + _primMapMutex = boost::in_place(); + _dispatcher = boost::in_place(); + // We populate the clip cache concurrently during composition, so we + // need to enable concurrent population here. + Usd_ClipCache::ConcurrentPopulationContext + clipConcurrentPopContext(*_clipCache); + try { + for (size_t i = 0; i != prims.size(); ++i) { + Usd_PrimDataPtr p = prims[i]; + _dispatcher->Run( + &UsdStage::_ComposeSubtreeImpl, this, p, p->GetParent(), + &_populationMask, + primIndexPaths ? (*primIndexPaths)[i] : p->GetPath()); + } + } + catch (...) { + _dispatcher = boost::none; + _primMapMutex = boost::none; + throw; + } + + _dispatcher = boost::none; + _primMapMutex = boost::none; + }); } void @@ -3048,21 +3058,22 @@ UsdStage::_DestroyPrimsInParallel(const vector& paths) TF_AXIOM(!_dispatcher && !_primMapMutex); - _primMapMutex = boost::in_place(); - _dispatcher = boost::in_place(); - - for (const auto& path : paths) { - Usd_PrimDataPtr prim = _GetPrimDataAtPath(path); - // We *expect* every prim in paths to be valid as we iterate, but at - // one time had issues with deactivated prototype prims, so we preserve - // a guard for resiliency. See testUsdBug141491.py - if (TF_VERIFY(prim)) { - _dispatcher->Run(&UsdStage::_DestroyPrim, this, prim); - } - } - - _dispatcher = boost::none; - _primMapMutex = boost::none; + WorkWithScopedParallelism([&]() { + _primMapMutex = boost::in_place(); + _dispatcher = boost::in_place(); + for (const auto& path : paths) { + Usd_PrimDataPtr prim = _GetPrimDataAtPath(path); + // We *expect* every prim in paths to be valid as we iterate, + // but at one time had issues with deactivated prototype prims, + // so we preserve a guard for resiliency. See + // testUsdBug141491.py + if (TF_VERIFY(prim)) { + _dispatcher->Run(&UsdStage::_DestroyPrim, this, prim); + } + } + _dispatcher = boost::none; + _primMapMutex = boost::none; + }); } void diff --git a/pxr/usd/usd/stage.h b/pxr/usd/usd/stage.h index a1ff4f4f05..82bc28e705 100644 --- a/pxr/usd/usd/stage.h +++ b/pxr/usd/usd/stage.h @@ -47,7 +47,7 @@ #include "pxr/usd/sdf/types.h" #include "pxr/usd/pcp/cache.h" #include "pxr/base/vt/value.h" -#include "pxr/base/work/arenaDispatcher.h" +#include "pxr/base/work/dispatcher.h" #include @@ -2218,7 +2218,7 @@ class UsdStage : public TfRefBase, public TfWeakBase { _LayerAndNoticeKeyVec _layersAndNoticeKeys; size_t _lastChangeSerialNumber; - boost::optional _dispatcher; + boost::optional _dispatcher; // To provide useful aggregation of malloc stats, we bill everything // for this stage - from all access points - to this tag. diff --git a/pxr/usd/usd/testenv/testUsdStageThreading.cpp b/pxr/usd/usd/testenv/testUsdStageThreading.cpp index e1ff744c4c..9acc00e139 100644 --- a/pxr/usd/usd/testenv/testUsdStageThreading.cpp +++ b/pxr/usd/usd/testenv/testUsdStageThreading.cpp @@ -35,7 +35,8 @@ #include "pxr/base/tf/stopwatch.h" #include "pxr/base/tf/debug.h" #include "pxr/base/tf/diagnostic.h" -#include "pxr/base/work/arenaDispatcher.h" +#include "pxr/base/work/dispatcher.h" +#include "pxr/base/work/withScopedParallelism.h" #include #include @@ -243,15 +244,16 @@ int main(int argc, char const **argv) TfStopwatch sw; sw.Start(); - WorkArenaDispatcher wd; - auto localMsecsToRun = msecsToRun; - auto localRunForever = runForever; - for (size_t i = 0; i < numThreads; ++i) { - wd.Run([localMsecsToRun, localRunForever]() { - _WorkTask(localMsecsToRun, localRunForever); - }); - } - wd.Wait(); + WorkWithScopedParallelism([&]() { + WorkDispatcher wd; + auto localMsecsToRun = msecsToRun; + auto localRunForever = runForever; + for (size_t i = 0; i < numThreads; ++i) { + wd.Run([localMsecsToRun, localRunForever]() { + _WorkTask(localMsecsToRun, localRunForever); + }); + } + }); sw.Stop();