-
Notifications
You must be signed in to change notification settings - Fork 398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
iox-#325 introduce DualAccessTransactionTray #748
iox-#325 introduce DualAccessTransactionTray #748
Conversation
Signed-off-by: Mathias Kraus <mathias.kraus@apex.ai>
static constexpr AccessToken LEFT{AccessToken::LEFT}; | ||
static constexpr AccessToken RIGHT{AccessToken::RIGHT}; | ||
|
||
class AccessGuard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modeled this after the std::lock_guard
but I'm not really convinced about this. Currently I'm leaning more towards something like
DualAccessTransactionTray transactionTray;
transactionTray.acquireExclusiveAccess(DualAccessTransactionTray::LEFT, [&] {
// do stuff
});
posix::Semaphore m_waitingLineLeft{posix::Semaphore::create(posix::CreateUnnamedSharedMemorySemaphore, 0U).value()}; | ||
posix::Semaphore m_waitingLineRight{ | ||
posix::Semaphore::create(posix::CreateUnnamedSharedMemorySemaphore, 0U).value()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semaphores are tricky. Are we sure it's allowed to move them?
The mutex in the ChunkDistributor
, which this is supposed to replace does just a std::terminate
if creation fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have to rephrase the question.
Named semaphores? Yes of course! sem_t *sem_open(const char *name, int oflag);
is the signature to create one and the posix semaphore class then just manages the pointer which is of course moveable.
Unnamed semaphores? I think so. int sem_init(sem_t *sem, int pshared, unsigned int value);
Here the semaphore owns the sem_t
and accesses the semaphore methods via a pointer to that owned value.
I read the posix documentation again and there was nowhere stated that it is not allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we internally already discussed moving when created by sem_open is ok. When created by sem_init we cannot do this, since our move
leads to a copy of a sem_t
struct, which is not safe (especially when someone is waiting). Besides the behavior will be weird for the user. The underlying sem_t is most likely not allowed to be copied and the new STL semaphores are als also not movable, surely for good reason.
So no, in general the semaphore is not movable as it is implemented.
auto existingToken = m_accessToken.exchange(tokenToAcquireAccess, std::memory_order_acquire); | ||
if (existingToken == tokenToAcquireAccess) | ||
{ | ||
// TODO return expected DOUBLE_ACQUIRE_BROKEN_INVARIANT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the AccessGuard
approach, this will also be cumbersome. Another reason to use a lambda instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Lambda approach is superior. The access request will be blocking either way until access is obtained.
auto finish = std::chrono::system_clock::now(); | ||
auto averageTimeInNs = | ||
std::chrono::duration_cast<std::chrono::nanoseconds>(finish - now).count() / (NUMBER_OF_LOOPS * 2); | ||
std::cout << "Average locking time: " << averageTimeInNs / 1000. << "µs" << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from my measurements
Mutex | DualAccessTransactionTray | |
---|---|---|
no contention | 16ns | 25ns |
contention | 85ns | 1980ns |
With contention the DualAccessTransactionTray
is much slower, but there is essentially no contention in the ChunkDistributor
since this can only happen when a subscriber queue is added or removed. So I would argue that the pros outweighs the cons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard to point towards the reason. There are probably possible small improvements wrt. semaphore and CAS usage. Also the expected may cause slowdown compared to native semaphores.
In any case, do we want this kind of output in a test? These are also not moduletests, but stresstests, i.e. belong into another target.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just temporarily and will either be removed or moved to a stresstest
Codecov Report
@@ Coverage Diff @@
## master #748 +/- ##
==========================================
+ Coverage 74.03% 74.25% +0.21%
==========================================
Files 319 321 +2
Lines 11426 11445 +19
Branches 1972 1979 +7
==========================================
+ Hits 8459 8498 +39
+ Misses 2192 2167 -25
- Partials 775 780 +5
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 of 6
@@ -31,7 +31,7 @@ class ThreadSafePolicy | |||
bool tryLock() const noexcept; | |||
|
|||
private: | |||
mutable posix::mutex m_mutex{true}; // recursive lock | |||
mutable posix::mutex m_mutex{false}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know its not part of the PR. But if you are bored could you please remove the bool from the mutex ctor and replace it with an enum - you really convinced me and now you have to endure the suffering of your own success ;)
iceoryx_utils/include/iceoryx_utils/internal/concurrent/dual_access_transaction_tray.hpp
Show resolved
Hide resolved
posix::Semaphore m_waitingLineLeft{posix::Semaphore::create(posix::CreateUnnamedSharedMemorySemaphore, 0U).value()}; | ||
posix::Semaphore m_waitingLineRight{ | ||
posix::Semaphore::create(posix::CreateUnnamedSharedMemorySemaphore, 0U).value()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have to rephrase the question.
Named semaphores? Yes of course! sem_t *sem_open(const char *name, int oflag);
is the signature to create one and the posix semaphore class then just manages the pointer which is of course moveable.
Unnamed semaphores? I think so. int sem_init(sem_t *sem, int pshared, unsigned int value);
Here the semaphore owns the sem_t
and accesses the semaphore methods via a pointer to that owned value.
I read the posix documentation again and there was nowhere stated that it is not allowed.
void releaseExclusiveAccess(AccessToken tokenToBeReleased); | ||
|
||
std::atomic<AccessToken> m_accessToken{AccessToken::NONE}; | ||
posix::Semaphore m_waitingLineLeft{posix::Semaphore::create(posix::CreateUnnamedSharedMemorySemaphore, 0U).value()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you have to move the initialization into an immediately invoked lambda so that you can handle the error case when the semaphore could not be created. Something like:
posix::Semaphore m_waitingLineLeft{[]{
std::move(posix::Semaphore::create(posix::CreateUnnamedSharedMemorySemaphore, 0U)\
.or_else([](auto){std::cerr << "skynet arises, prepare for termination\n"; std::terminate(); }).value());
}()};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, here or in the Ctor. But all these terminates are really bad. No stack unwinding. However, without a semaphore the object is not functional...
ẁaitingLine
sounds strange. I think the semaphores are to signal access requests. Maybe this leads to a better name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not fully understand the use case yet. It looks somewhat like implementing two mutexes in a class using two semaphores. Now implementing mutexes with semaphores (+ atomics) is fairly standard, and likely what is happening if a mutex would be used directly (but we cannot do this, they would not be exclusive to each other).
What we have here is something like a tristate mutex, would you agree? (with exclusive states LEFT_LOCKED, RIGHT_LOCKED, UNLOCKED) I think it is possible to get rid of one semaphore, but I need to think about this. This will probably not help much in terms of performance but remedy one potential problem and simplify the code a little.
getMembers()->unlock(); | ||
} | ||
else | ||
if (!getMembers()->tryLock()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on the underlying implementation, but e.g. for std::mutex try_lock
is allowed to spuriously fail (and I suspect that is the case for many other implementations as well). Therefore termination is not a good response, even if the contract is only to call this method if it is unlocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently used to detect if the mutex is locked. It will be obsolete once the semaphore based locking will be used
@@ -271,6 +266,10 @@ inline void ChunkDistributor<ChunkDistributorDataType>::cleanup() noexcept | |||
nullptr, | |||
ErrorLevel::FATAL); | |||
} | |||
|
|||
getMembers()->unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect it is a check whether no one else has the mutex? Hence the immediate unlocking. This is somewhat of a code smell, because afterwards during clearHistory
the mutex could be locked again by someone. Is this ok? The whole logic looks suspicious. Probably not focus of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The currently used mutex has the recursive flag set and this call was the only reason. I just wanted to check if it could be removed and it is possible since this code is only executed once the application died and therefore it cannot get the lock again. The method called after getMembers()->unlock();
will lock the mutex before the data is accessed but the locking in this method was just to detect if the application held the lock when it died.
{ | ||
namespace concurrent | ||
{ | ||
class DualAccessTransactionTray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on what it does exactly, we may need a better name.
iceoryx_utils/include/iceoryx_utils/internal/concurrent/dual_access_transaction_tray.hpp
Outdated
Show resolved
Hide resolved
void releaseExclusiveAccess(AccessToken tokenToBeReleased); | ||
|
||
std::atomic<AccessToken> m_accessToken{AccessToken::NONE}; | ||
posix::Semaphore m_waitingLineLeft{posix::Semaphore::create(posix::CreateUnnamedSharedMemorySemaphore, 0U).value()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, here or in the Ctor. But all these terminates are really bad. No stack unwinding. However, without a semaphore the object is not functional...
ẁaitingLine
sounds strange. I think the semaphores are to signal access requests. Maybe this leads to a better name.
auto existingToken = m_accessToken.exchange(tokenToAcquireAccess, std::memory_order_acquire); | ||
if (existingToken == tokenToAcquireAccess) | ||
{ | ||
// TODO return expected DOUBLE_ACQUIRE_BROKEN_INVARIANT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one thread is supposed to have access, either via the LEFT or RIGHT token, correct? In this case it makes sense, it is like calling lock of a mutex twice without unlocking. Otherwise we could argue that e.g. LEFT access was already granted and we would return immediately without error. (I do not fully know the use case.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's right
{ | ||
if (m_waitingLineLeft.wait().has_error()) | ||
{ | ||
// TODO return expected ERROR_WHILE_WAITING_FOR_SEMAPHORE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What can we do if the wait causes an error? I think the access request fails in this case and the user has to handle it. In any case the logic to wait looks ok.
auto casSuccessful = m_accessToken.compare_exchange_strong(expected, AccessToken::NONE, std::memory_order_release); | ||
if (!casSuccessful) | ||
{ | ||
if (expected == AccessToken::NONE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user cannot pass NONE, due to private enum. So this can only happen due to double release or releasing something not acquired before. Similar to unlocking a non-locked mutex, which is undefined.
else if (tokenToBeReleased == AccessToken::LEFT) | ||
{ | ||
// post can result in either EINVAL or EOVERFLOW; neither of those can be triggered by this code | ||
IOX_DISCARD_RESULT(m_waitingLineRight.post()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We post even though no one might be waiting? This is dangerous. Consider a Sequence of acq LEFT, rel LEFT, acq RIGHT, rel RIGHT without someone waiting. Afterwards a LEFT and RIGHT request could arrive concurrently and both would gain access (both semaphore counters are 1) or some other problematic state I think.
The compare exchanges may protect against this(by requiring the state to be none, which it would not be once the first obtains access), I need to fully think this throughl. In any case, there might be a problem but I would like to find a convincing argument why there is none (or a fix if there is).
Edit: I think it is ok, but we only need one semaphore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the current implementation two are needed else we experience kind of an ABA problem. Let's assume we have only one semaphore, LEFT has the lock and RIGHT is waiting on the semaphore. When LEFT does a sem_post and immediately tries to get the lock, the exchange
will tell LEFT that the lock is held by RIGHT and must therefore do a sem_wait. It can happen that RIGHT was not yet scheduled and the semaphore count is still 1, which means that LEFT will continue its execution and won't do a sem_post at the end, leaving RIGHT in a deadlock.
With an alternative implementation this problem might be solved differently.
auto finish = std::chrono::system_clock::now(); | ||
auto averageTimeInNs = | ||
std::chrono::duration_cast<std::chrono::nanoseconds>(finish - now).count() / (NUMBER_OF_LOOPS * 2); | ||
std::cout << "Average locking time: " << averageTimeInNs / 1000. << "µs" << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard to point towards the reason. There are probably possible small improvements wrt. semaphore and CAS usage. Also the expected may cause slowdown compared to native semaphores.
In any case, do we want this kind of output in a test? These are also not moduletests, but stresstests, i.e. belong into another target.
Signed-off-by: Mathias Kraus <mathias.kraus@apex.ai>
Signed-off-by: Mathias Kraus <mathias.kraus@apex.ai>
After some thinking, there is additional work necessary before this class could be used in the On the one hand, the vector with the history needs to be replaced by a ring buffer which can safely be accessed by RouDi even if the application terminated while mutating this buffer. It could be done in a similar fashion to the On the other hand, there is still a deadlock in RouDi since the @budrus @MatthiasKillat @elfenpiff what do you think? |
My spider-sense tells me that this sounds like a refactoring where we will discover additional unpleasant surprises.
|
@elfenpiff I have the same feeling. The mutex in the |
@elBoberido I think we need a lock-free caching structure for this. Maybe we can discuss the exact requirements again, but I think I have some idea I could sketch. I will do so in a few days. From my point of view the requirements are.
A well-defined caching structure could be useful in other ways as well I think. It should satisfy the requirements but otherwise be as general as reasonably possible. |
@MatthiasKillat I think I already figured this out and wrote something. Here is the code template <typename T>
struct Cache {
static_assert(std::is_trivially_copyable<T>::value,
"T needs to be trivially copyable");
void write(const T& value) {
memcpy(&cache[(abaCounter.load() + 1) % 2], &value);
abaCounter.fetch_add(1, std::memory_order_release);
}
T read() {
T data;
uint64_t currentAba = abaCounter.load(std::memory_order_acquire);
do {
memcpy(&data, cache[currentAba % 2]);
} while (abaCounter.compare_exchange_strong(currentAba, currentAba,
std::memory_order_relaxed,
std::memory_order_acquire));
return data;
}
T cache[2];
std::atomic<uint64_t> abaCounter{0};
}; I think this fulfills the requirements:
@MatthiasKillat @elBoberido what do you think? Is this something to work with or do you see some improvements - if so feel free to use or discard this. |
@elfenpiff This will not work in itself, due to ring-buffer semantics (we do not want this ... Edit, see below). I also question how this will deal with iteration and also have some more optimizations in mind. I am not sure whether I have the requirements right, but the proposal does not satisfy all of them. Will post details later. But yes, the easy way is to restrict T to be trivially copyable, and this is sufficient for our case. More precisely (and general), the copy must only be non-disruptive in a sense that it does cause no harm in reading while it is modified (but the modification is detected afterwards). A copy ctor operating on memory that we always own may also satisfy this, depending on what it does. I believe it is solvable for general T, but less efficient. An educated guess is also that we may not even need My other optimizations are wrt. access efficiency. Edit: it really depends on whether a ringbuffer is enough, I need to see the use case. If we need something with general removal this is not sufficient. Even a ringbuffer alone is probably not enough, we need to read the last k elements in order send them to late joiners I think. Hence the read must at least take some index parameter and there it gets a little tricky to not read invalid data. But in theroy, from one point onward, the cache should always be full with valid data (the last CacheSize of them). |
After closer inspection of the use case, we need to distinguish the following.
All of this will in itself not be sufficient to eliminate the lock in the |
@elfenpiff I think we should not copy since there is almost no contention and this would be a performance hit for each sample to deliver on each subscriber. We can have an asymmetric lock were only RouDi can lock and the application will block. RouDi would also detect in an application is in the critical section while locking and potentially let the application do some work for RouDi. @MatthiasKillat there is already the |
I close this PR since it's more or less a dead end and would requite further changes in iceoryx |
Pre-Review Checklist for the PR Author
iox-#123-this-is-a-branch
)iox-#123 commit text
)git commit -s
)task-list-completed
)Notes for Reviewer
This is just a draft to discuss the API of the
DualAccessTransactionTray
which might replace the mutex in theChunkDistributor
. A better name is also welcome.Checklist for the PR Reviewer
Post-review Checklist for the PR Author
References