Skip to content
This repository has been archived by the owner on Oct 28, 2021. It is now read-only.

Commit

Permalink
Track external UDP endpoints seen by the peers.
Browse files Browse the repository at this point in the history
  • Loading branch information
gumb0 committed May 15, 2019
1 parent 32f9c88 commit 7d1c660
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 22 deletions.
76 changes: 76 additions & 0 deletions libp2p/EndpointTracker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2019 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.

#include "EndpointTracker.h"

namespace dev
{
namespace p2p
{
namespace
{
// Interval during which each endpoint statement is kept
constexpr std::chrono::minutes c_statementTimeToLiveMin{5};
} // namespace

// Register the statement about endpoint from one othe peers.
// Returns number of currently kept statements in favor of _externalEndpoint
size_t EndpointTracker::addEndpointStatement(
bi::udp::endpoint const& _sourceEndpoint, bi::udp::endpoint const& _externalEndpoint)
{
// remove previous statement by this peer
auto it = m_statementsMap.find(_sourceEndpoint);
if (it != m_statementsMap.end())
removeStatement(it);

return addStatement(_sourceEndpoint, _externalEndpoint);
}

// Find endpoint with max number of statemens
bi::udp::endpoint EndpointTracker::bestEndpoint() const
{
size_t maxCount = 0;
bi::udp::endpoint bestEndpoint;
for (auto const& endpointAndCount : m_endpointStatementCountMap)
if (endpointAndCount.second > maxCount)
std::tie(bestEndpoint, maxCount) = endpointAndCount;

return bestEndpoint;
}

// Remove old statements
void EndpointTracker::garbageCollectStatements()
{
auto const expiration = std::chrono::steady_clock::now() - c_statementTimeToLiveMin;
for (auto it = m_statementsMap.begin(); it != m_statementsMap.end();)
{
if (it->second.second > expiration)
it = removeStatement(it);
else
++it;
}
}

size_t EndpointTracker::addStatement(
bi::udp::endpoint const& _sourceEndpoint, bi::udp::endpoint const& _externalEndpoint)
{
EndpointAndTimePoint endpointAndTime{_externalEndpoint, std::chrono::steady_clock::now()};
m_statementsMap.insert({_sourceEndpoint, endpointAndTime});
return ++m_endpointStatementCountMap[_externalEndpoint];
}

EndpointTracker::SourceToStatementMap::iterator EndpointTracker::removeStatement(
SourceToStatementMap::iterator _it)
{
// first decrement statement counter
auto itCount = m_endpointStatementCountMap.find(_it->second.first);
assert(itCount != m_endpointStatementCountMap.end() && itCount->second > 0);
if (--itCount->second == 0)
m_endpointStatementCountMap.erase(itCount);

return m_statementsMap.erase(_it);
}

} // namespace p2p
} // namespace dev
47 changes: 47 additions & 0 deletions libp2p/EndpointTracker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2019 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.

#pragma once

#include "Common.h"

namespace dev
{
namespace p2p
{
// Class for keeping track of our external endpoint as seen by our peers.
// Keeps track of what external endpoint is seen by every peer
// and finds which endpoint is reported most often
class EndpointTracker
{
public:
// Register the statement about endpoint from one othe peers.
// Returns number of currently kept statements in favor of _externalEndpoint
size_t addEndpointStatement(
bi::udp::endpoint const& _sourceEndpoint, bi::udp::endpoint const& _externalEndpoint);

// Find endpoint with max number of statemens
bi::udp::endpoint bestEndpoint() const;

// Remove old statements
void garbageCollectStatements();

private:
using EndpointAndTimePoint =
std::pair<bi::udp::endpoint, std::chrono::steady_clock::time_point>;
using SourceToStatementMap = std::map<bi::udp::endpoint, EndpointAndTimePoint>;

size_t addStatement(
bi::udp::endpoint const& _sourceEndpoint, bi::udp::endpoint const& _externalEndpoint);

SourceToStatementMap::iterator removeStatement(SourceToStatementMap::iterator _it);

// Statements about our external endpoint, maps statement source peer => endpoint, timestamp
SourceToStatementMap m_statementsMap;
// map external endpoint => how many sources reported it
std::map<bi::udp::endpoint, size_t> m_endpointStatementCountMap;
};

} // namespace p2p
} // namespace dev
55 changes: 49 additions & 6 deletions libp2p/NodeTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ BOOST_LOG_INLINE_GLOBAL_LOGGER_CTOR_ARGS(g_discoveryWarnLogger,

// Cadence at which we timeout sent pings and evict unresponsive nodes
constexpr chrono::milliseconds c_handleTimeoutsIntervalMs{5000};
// Cadence at which we remove old records from EndpointTracker
constexpr chrono::milliseconds c_removeOldEndpointStatementsIntervalMs{5000};
// Change external endpoint after this number of peers report new one
constexpr size_t c_minEndpointTrackStatements{10};

} // namespace

Expand All @@ -35,6 +39,7 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint
ENR const& _enr, bool _enabled, bool _allowLocalDiscovery)
: m_hostNodeID{_alias.pub()},
m_hostNodeIDHash{sha3(m_hostNodeID)},
m_hostStaticIP{isAllowedEndpoint(_endpoint) ? _endpoint.address() : bi::address{}},
m_hostNodeEndpoint{_endpoint},
m_hostENR{_enr},
m_secret{_alias.secret()},
Expand All @@ -44,6 +49,7 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint
m_allowLocalDiscovery{_allowLocalDiscovery},
m_discoveryTimer{make_shared<ba::steady_timer>(_io)},
m_timeoutsTimer{make_shared<ba::steady_timer>(_io)},
m_endpointTrackingTimer{make_shared<ba::steady_timer>(_io)},
m_io{_io}
{
for (unsigned i = 0; i < s_bins; i++)
Expand All @@ -60,6 +66,7 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint
m_socket->connect();
doDiscovery();
doHandleTimeouts();
doEndpointTracking();
}
catch (exception const& _e)
{
Expand Down Expand Up @@ -484,6 +491,7 @@ void NodeTable::onPacketReceived(
}
}


shared_ptr<NodeEntry> NodeTable::handlePong(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
{
Expand Down Expand Up @@ -536,13 +544,20 @@ shared_ptr<NodeEntry> NodeTable::handlePong(

m_sentPings.erase(_from);

// update our endpoint address and UDP port
DEV_GUARDED(x_nodes)
// update our external endpoint address and UDP port
if (m_endpointTracker.addEndpointStatement(_from, pong.destination) >=
c_minEndpointTrackStatements)
{
if ((!m_hostNodeEndpoint || !isAllowedEndpoint(m_hostNodeEndpoint)) &&
isPublicAddress(pong.destination.address()))
m_hostNodeEndpoint.setAddress(pong.destination.address());
m_hostNodeEndpoint.setUdpPort(pong.destination.udpPort());
auto newUdpEndpoint = m_endpointTracker.bestEndpoint();
if (!m_hostStaticIP.is_unspecified())
newUdpEndpoint.address(m_hostStaticIP);

if (newUdpEndpoint != m_hostNodeEndpoint)
{
m_hostNodeEndpoint = NodeIPEndpoint{
newUdpEndpoint.address(), newUdpEndpoint.port(), m_hostNodeEndpoint.tcpPort()};
LOG(m_logger) << "New external endpoint found: " << m_hostNodeEndpoint;
}
}

return sourceNodeEntry;
Expand Down Expand Up @@ -801,6 +816,34 @@ void NodeTable::doHandleTimeouts()
});
}

void NodeTable::doEndpointTracking()
{
m_endpointTrackingTimer->expires_from_now(c_removeOldEndpointStatementsIntervalMs);
auto edpointTrackingTimer{m_endpointTrackingTimer};
m_endpointTrackingTimer->async_wait(
[this, edpointTrackingTimer](boost::system::error_code const& _ec) {
// We can't use m_logger if an error occurred because captured this might be already
// destroyed
if (_ec.value() == boost::asio::error::operation_aborted ||
edpointTrackingTimer->expires_at() == c_steadyClockMin)
{
clog(VerbosityDebug, "discov") << "endpoint tracking timer was probably cancelled";
return;
}
else if (_ec)
{
clog(VerbosityDebug, "discov")
<< "endpoint tracking timer encountered an error: " << _ec.value() << " "
<< _ec.message();
return;
}

m_endpointTracker.garbageCollectStatements();

doEndpointTracking();
});
}

unique_ptr<DiscoveryDatagram> DiscoveryDatagram::interpretUDP(bi::udp::endpoint const& _from, bytesConstRef _packet)
{
unique_ptr<DiscoveryDatagram> decoded;
Expand Down
43 changes: 27 additions & 16 deletions libp2p/NodeTable.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2018 Aleth Authors.
// Copyright 2019 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.

#pragma once

#include <algorithm>

#include <boost/integer/static_log2.hpp>

#include "Common.h"
#include "ENR.h"
#include "EndpointTracker.h"
#include <libp2p/UDP.h>
#include <boost/integer/static_log2.hpp>
#include <algorithm>

namespace dev
{
Expand Down Expand Up @@ -127,20 +126,23 @@ class NodeTable : UDPSocketEvents
{
if (m_socket->isOpen())
{
// We "cancel" the timers by setting c_steadyClockMin rather than calling cancel()
// because cancel won't set the boost error code if the timers have already expired and
// the handlers are in the ready queue.
//
// Note that we "cancel" via io_service::post to ensure thread safety when accessing the
// timers
auto discoveryTimer{m_discoveryTimer};
m_io.post([discoveryTimer] { discoveryTimer->expires_at(c_steadyClockMin); });
auto timeoutsTimer{m_timeoutsTimer};
m_io.post([timeoutsTimer] { timeoutsTimer->expires_at(c_steadyClockMin); });
m_socket->disconnect();
cancelTimer(m_discoveryTimer);
cancelTimer(m_timeoutsTimer);
cancelTimer(m_endpointTrackingTimer);
}
}

void cancelTimer(std::shared_ptr<ba::steady_timer> _timer)
{
// We "cancel" the timers by setting c_steadyClockMin rather than calling cancel()
// because cancel won't set the boost error code if the timers have already expired and
// the handlers are in the ready queue.
//
// Note that we "cancel" via io_service::post to ensure thread safety when accessing the
// timers
m_io.post([_timer] { _timer->expires_at(c_steadyClockMin); });
}

/// Set event handler for NodeEntryAdded and NodeEntryDropped events.
void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); }

Expand Down Expand Up @@ -312,6 +314,9 @@ class NodeTable : UDPSocketEvents
/// bring in their replacements
void doHandleTimeouts();

// Remove old records in m_endpointTracker.
void doEndpointTracking();

// Useful only for tests.
void setRequestTimeToLive(std::chrono::seconds const& _time) { m_requestTimeToLive = _time; }
uint32_t nextRequestExpirationTime() const
Expand All @@ -329,6 +334,9 @@ class NodeTable : UDPSocketEvents

NodeID const m_hostNodeID;
h256 const m_hostNodeIDHash;
// Host IP address given to constructor
bi::address const m_hostStaticIP;
// Dynamically updated host endpoint
NodeIPEndpoint m_hostNodeEndpoint;
ENR const m_hostENR;
Secret m_secret; ///< This nodes secret key.
Expand Down Expand Up @@ -358,8 +366,11 @@ class NodeTable : UDPSocketEvents

bool m_allowLocalDiscovery; ///< Allow nodes with local addresses to be included in the discovery process

EndpointTracker m_endpointTracker;

std::shared_ptr<ba::steady_timer> m_discoveryTimer;
std::shared_ptr<ba::steady_timer> m_timeoutsTimer;
std::shared_ptr<ba::steady_timer> m_endpointTrackingTimer;

ba::io_service& m_io;
};
Expand Down

0 comments on commit 7d1c660

Please sign in to comment.