Skip to content

Commit

Permalink
Migrate TCP::mPendingPackets to Pool interface (#11834)
Browse files Browse the repository at this point in the history
  • Loading branch information
kghost authored and pull[bot] committed Dec 16, 2021
1 parent b0b0138 commit 1735532
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/channel/Channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ChannelState ChannelHandle::GetState() const

ExchangeContext * ChannelHandle::NewExchange(ExchangeDelegate * delegate)
{
assert(mAssociation != nullptr);
VerifyOrDie(mAssociation != nullptr);
return mAssociation->mChannelContext->NewExchange(delegate);
}

Expand Down
2 changes: 1 addition & 1 deletion src/channel/ChannelContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void ChannelContext::Start(const ChannelBuilder & builder)

ExchangeContext * ChannelContext::NewExchange(ExchangeDelegate * delegate)
{
assert(GetState() == ChannelState::kReady);
VerifyOrDie(GetState() == ChannelState::kReady);
return mExchangeManager->NewContext(GetReadyVars().mSession, delegate);
}

Expand Down
13 changes: 6 additions & 7 deletions src/lib/support/Pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
* limitations under the License.
*/

#include <lib/support/CodeUtils.h>
#include <lib/support/Pool.h>

#include <nlassert.h>

namespace chip {

namespace internal {
Expand Down Expand Up @@ -68,20 +67,20 @@ void StaticAllocatorBitmap::Deallocate(void * element)
size_t offset = index - (word * kBitChunkSize);

// ensure the element is in the pool
assert(index < Capacity());
VerifyOrDie(index < Capacity());

auto value = mUsage[word].fetch_and(~(kBit1 << offset));
nlASSERT((value & (kBit1 << offset)) != 0); // assert fail when free an unused slot
VerifyOrDie((value & (kBit1 << offset)) != 0); // assert fail when free an unused slot
DecreaseUsage();
}

size_t StaticAllocatorBitmap::IndexOf(void * element)
{
std::ptrdiff_t diff = static_cast<uint8_t *>(element) - static_cast<uint8_t *>(mElements);
assert(diff >= 0);
assert(static_cast<size_t>(diff) % mElementSize == 0);
VerifyOrDie(diff >= 0);
VerifyOrDie(static_cast<size_t>(diff) % mElementSize == 0);
auto index = static_cast<size_t>(diff) / mElementSize;
assert(index < Capacity());
VerifyOrDie(index < Capacity());
return index;
}

Expand Down
3 changes: 1 addition & 2 deletions src/lib/support/Pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@

#pragma once

#include <lib/support/CodeUtils.h>
#include <system/SystemConfig.h>

#include <assert.h>
#include <atomic>
#include <limits>
#include <new>
Expand Down
112 changes: 112 additions & 0 deletions src/lib/support/PoolWrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2021 Project CHIP Authors
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <tuple>

#include <lib/support/Pool.h>

namespace chip {

/// Provides an interface over a pool implementation which doesn't expose the size and the actual type of the pool.
template <typename U, typename... ConstructorArguments>
class PoolInterface
{
public:
// For convenient use in PoolImpl
using Interface = std::tuple<U, ConstructorArguments...>;

virtual ~PoolInterface() {}

virtual U * CreateObject(ConstructorArguments &&... args) = 0;
virtual void ReleaseObject(U * element) = 0;
virtual void ResetObject(U * element, ConstructorArguments &&... args) = 0;

template <typename Function>
bool ForEachActiveObject(Function && function)
{
auto proxy = [&](U * target) -> bool { return function(target); };
return ForEachActiveObjectInner(
&proxy, [](void * context, U * target) -> bool { return (*static_cast<decltype(proxy) *>(context))(target); });
}

protected:
using Lambda = bool (*)(void *, U *);
virtual bool ForEachActiveObjectInner(void * context, Lambda lambda) = 0;
};

template <class T, size_t N, typename Interface>
class PoolProxy;

template <class T, size_t N, typename U, typename... ConstructorArguments>
class PoolProxy<T, N, std::tuple<U, ConstructorArguments...>> : public PoolInterface<U, ConstructorArguments...>
{
public:
static_assert(std::is_base_of<U, T>::value, "Interface type is not derived from Pool type");

PoolProxy() {}
virtual ~PoolProxy() override {}

virtual U * CreateObject(ConstructorArguments &&... args) override
{
return Impl().CreateObject(std::forward<ConstructorArguments>(args)...);
}

virtual void ReleaseObject(U * element) override { Impl().ReleaseObject(static_cast<T *>(element)); }

virtual void ResetObject(U * element, ConstructorArguments &&... args) override
{
return Impl().ResetObject(static_cast<T *>(element), std::forward<ConstructorArguments>(args)...);
}

protected:
virtual bool ForEachActiveObjectInner(void * context,
typename PoolInterface<U, ConstructorArguments...>::Lambda lambda) override
{
return Impl().ForEachActiveObject([&](T * target) { return lambda(context, static_cast<U *>(target)); });
}

virtual BitMapObjectPool<T, N> & Impl() = 0;
};

/*
* @brief
* Define a implementation of a pool which derive and expose PoolInterface's.
*
* @tparam T a subclass of element to be allocated.
* @tparam N a positive integer max number of elements the pool provides.
* @tparam Interfaces a list of parameters which defines PoolInterface's. each interface is defined by a
* std::tuple<U, ConstructorArguments...>. The PoolImpl is derived from every
* PoolInterface<U, ConstructorArguments...>, the PoolImpl can be converted to the interface type
* and passed around
*/
template <class T, size_t N, typename... Interfaces>
class PoolImpl : public PoolProxy<T, N, Interfaces>...
{
public:
PoolImpl() {}
virtual ~PoolImpl() override {}

protected:
virtual BitMapObjectPool<T, N> & Impl() override { return mImpl; }

private:
BitMapObjectPool<T, N> mImpl;
};

} // namespace chip
1 change: 1 addition & 0 deletions src/lib/support/tests/TestPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <lib/support/Pool.h>
#include <lib/support/UnitTestRegistration.h>
#include <system/SystemConfig.h>

#include <nlunit-test.h>

Expand Down
2 changes: 1 addition & 1 deletion src/messaging/ExchangeMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ CHIP_ERROR ExchangeManager::Shutdown()

mContextPool.ForEachActiveObject([](auto * ec) {
// There should be no active object in the pool
assert(false);
VerifyOrDie(false);
return true;
});

Expand Down
97 changes: 35 additions & 62 deletions src/transport/raw/TCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ TCPBase::~TCPBase()
}

CloseActiveConnections();

for (size_t i = 0; i < mPendingPacketsSize; i++)
{
mPendingPackets[i].packetBuffer = nullptr;
}
}

void TCPBase::CloseActiveConnections()
Expand Down Expand Up @@ -207,53 +202,39 @@ CHIP_ERROR TCPBase::SendMessage(const Transport::PeerAddress & address, System::
CHIP_ERROR TCPBase::SendAfterConnect(const PeerAddress & addr, System::PacketBufferHandle && msg)
{
// This will initiate a connection to the specified peer
CHIP_ERROR err = CHIP_NO_ERROR;
PendingPacket * packet = nullptr;
bool alreadyConnecting = false;
Inet::TCPEndPoint * endPoint = nullptr;
bool alreadyConnecting = false;

// Iterate through the ENTIRE array. If a pending packet for
// the address already exists, this means a connection is pending and
// does NOT need to be re-established.
for (size_t i = 0; i < mPendingPacketsSize; i++)
{
if (mPendingPackets[i].packetBuffer.IsNull())
{
if (packet == nullptr)
{
// found a slot to store the packet into
packet = mPendingPackets + i;
}
}
else if (mPendingPackets[i].peerAddress == addr)
mPendingPackets.ForEachActiveObject([&](PendingPacket * pending) {
if (pending->mPeerAddress == addr)
{
// same destination exists.
alreadyConnecting = true;

// ensure packets are ORDERED
if (packet != nullptr)
{
packet->peerAddress = addr;
packet->packetBuffer = std::move(mPendingPackets[i].packetBuffer);
packet = mPendingPackets + i;
}
pending->mPacketBuffer->AddToEnd(std::move(msg));
return false;
}
}

VerifyOrExit(packet != nullptr, err = CHIP_ERROR_NO_MEMORY);
return true;
});

// If already connecting, buffer was just enqueued for more sending
VerifyOrExit(!alreadyConnecting, err = CHIP_NO_ERROR);
if (alreadyConnecting)
{
return CHIP_NO_ERROR;
}

// Ensures sufficient active connections size exist
VerifyOrExit(mUsedEndPointCount < mActiveConnectionsSize, err = CHIP_ERROR_NO_MEMORY);
VerifyOrReturnError(mUsedEndPointCount < mActiveConnectionsSize, CHIP_ERROR_NO_MEMORY);

Inet::TCPEndPoint * endPoint = nullptr;
#if INET_CONFIG_ENABLE_TCP_ENDPOINT
err = mListenSocket->GetEndPointManager().NewEndPoint(&endPoint);
ReturnErrorOnFailure(mListenSocket->GetEndPointManager().NewEndPoint(&endPoint));
auto EndPointDeletor = [](Inet::TCPEndPoint * e) { e->Free(); };
std::unique_ptr<Inet::TCPEndPoint, decltype(EndPointDeletor)> endPointHolder(endPoint, EndPointDeletor);
#else
err = CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE;
return CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE;
#endif
SuccessOrExit(err);

endPoint->mAppState = reinterpret_cast<void *>(this);
endPoint->OnDataReceived = OnTcpReceive;
Expand All @@ -263,23 +244,17 @@ CHIP_ERROR TCPBase::SendAfterConnect(const PeerAddress & addr, System::PacketBuf
endPoint->OnAcceptError = OnAcceptError;
endPoint->OnPeerClose = OnPeerClosed;

err = endPoint->Connect(addr.GetIPAddress(), addr.GetPort(), addr.GetInterface());
SuccessOrExit(err);
ReturnErrorOnFailure(endPoint->Connect(addr.GetIPAddress(), addr.GetPort(), addr.GetInterface()));

// enqueue the packet once the connection succeeds
packet->peerAddress = addr;
packet->packetBuffer = std::move(msg);
VerifyOrReturnError(mPendingPackets.CreateObject(addr, std::move(msg)) != nullptr, CHIP_ERROR_NO_MEMORY);
mUsedEndPointCount++;

exit:
if (err != CHIP_NO_ERROR)
{
if (endPoint != nullptr)
{
endPoint->Free();
}
}
return err;
#if INET_CONFIG_ENABLE_TCP_ENDPOINT
endPointHolder.release();
#endif

return CHIP_NO_ERROR;
}

CHIP_ERROR TCPBase::ProcessReceivedBuffer(Inet::TCPEndPoint * endPoint, const PeerAddress & peerAddress,
Expand Down Expand Up @@ -391,22 +366,20 @@ void TCPBase::OnConnectionComplete(Inet::TCPEndPoint * endPoint, CHIP_ERROR inet
PeerAddress addr = PeerAddress::TCP(ipAddress, port, interfaceId);

// Send any pending packets
for (size_t i = 0; i < tcp->mPendingPacketsSize; i++)
{
if ((tcp->mPendingPackets[i].peerAddress != addr) || (tcp->mPendingPackets[i].packetBuffer.IsNull()))
tcp->mPendingPackets.ForEachActiveObject([&](PendingPacket * pending) {
if (pending->mPeerAddress == addr)
{
continue;
}
foundPendingPacket = true;
foundPendingPacket = true;
System::PacketBufferHandle buffer = std::move(pending->mPacketBuffer);
tcp->mPendingPackets.ReleaseObject(pending);

System::PacketBufferHandle buffer = std::move(tcp->mPendingPackets[i].packetBuffer);
tcp->mPendingPackets[i].peerAddress = PeerAddress::Uninitialized();

if ((inetErr == CHIP_NO_ERROR) && (err == CHIP_NO_ERROR))
{
err = endPoint->Send(std::move(buffer));
if ((inetErr == CHIP_NO_ERROR) && (err == CHIP_NO_ERROR))
{
err = endPoint->Send(std::move(buffer));
}
}
}
return true;
});

if (err == CHIP_NO_ERROR)
{
Expand Down
Loading

0 comments on commit 1735532

Please sign in to comment.