Skip to content

Commit

Permalink
Merge pull request #732 from Microsoft/macos2async
Browse files Browse the repository at this point in the history
Macos2async
  • Loading branch information
AndKram authored Aug 8, 2018
2 parents 59f5128 + 2a977a6 commit 1c4107f
Show file tree
Hide file tree
Showing 23 changed files with 206 additions and 67 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ if( INCLUDE_PYTHON )
set( MACOS_USE_PYTHON_MODULE_DESC "Specifies which Python module to build Malmo on Apple MacOS" )
set( USE_PYTHON_VERSIONS 3.6 CACHE STRING ${USE_PYTHON_VERSIONS_DESC} )
# Boost has switched to using a 2 digit naming convention for python on MacOS.
set( MACOS_USE_PYTHON_MODULE "python36" CACHE STRING ${MACOS_USE_PYTHON_MODULE_DESC} )
set( MACOS_USE_PYTHON_MODULE "python37" CACHE STRING ${MACOS_USE_PYTHON_MODULE_DESC} )
endif()

set( WARNINGS_AS_ERRORS OFF )
Expand Down
37 changes: 28 additions & 9 deletions Malmo/src/AgentHost.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace malmo

// start the io_service on background threads
this->work = boost::in_place(boost::ref(this->io_service));
const int NUM_BACKGROUND_THREADS = 1; // can be increased if I/O becomes a bottleneck
const int NUM_BACKGROUND_THREADS = 3; // can be increased if I/O becomes a bottleneck
for( int i = 0; i < NUM_BACKGROUND_THREADS; i++ )
this->background_threads.push_back( boost::make_shared<boost::thread>( boost::bind( &boost::asio::io_service::run, &this->io_service ) ) );
}
Expand Down Expand Up @@ -113,7 +113,7 @@ namespace malmo
std::string line = "";
std::string version = "";
// Keep concatenating lines until we have a match, or we run out of schema.
while (version.empty() && getline(stream, line))
while (version.empty() && !stream.eof() && getline(stream, line))
{
boost::trim(line);
xml += line;
Expand Down Expand Up @@ -314,9 +314,10 @@ namespace malmo
{
reply = rpc.sendStringAndGetShortReply(this->io_service, item->ip_address, item->control_port, request, false);
}
catch (std::exception&)
catch (std::exception& e)
{
// This is expected quite often - client is likely not running.
LOGINFO(LT("Client could not be contacted: "), item->ip_address, LT(":"), item->control_port, LT(" "), e.what());
continue;
}
LOGINFO(LT("Reserving client, received reply from "), item->ip_address, LT(": "), reply);
Expand Down Expand Up @@ -452,7 +453,8 @@ namespace malmo
{
boost::lock_guard<boost::mutex> scope_guard(this->world_state_mutex);

return this->world_state;
WorldState current_world_state(this->world_state); // Copy while holding lock.
return current_world_state;
}

WorldState AgentHost::getWorldState()
Expand Down Expand Up @@ -498,8 +500,13 @@ namespace malmo
{
return; // can re-use existing server
}

if (this->mission_control_server != 0) {
this->mission_control_server->close();
}

this->mission_control_server = boost::make_shared<StringServer>(this->io_service, port, boost::bind(&AgentHost::onMissionControlMessage, this, _1), "mcp");
this->mission_control_server->start();
this->mission_control_server->start(mission_control_server);
}

boost::shared_ptr<VideoServer> AgentHost::listenForVideo(boost::shared_ptr<VideoServer> video_server, int port, short width, short height, short channels, TimestampedVideoFrame::FrameType frametype)
Expand Down Expand Up @@ -530,6 +537,10 @@ namespace malmo
video_server->getChannels() != channels ||
video_server->getFrameType() != frametype)
{
if (video_server != 0) {
video_server->close();
}

// Can't use the server passed in - create a new one.
ret_server = boost::make_shared<VideoServer>( this->io_service, port, width, height, channels, frametype, boost::bind(&AgentHost::onVideo, this, _1));

Expand All @@ -540,8 +551,8 @@ namespace malmo
ret_server->recordBmps(this->current_mission_record->getTemporaryDirectory());
}

ret_server->start();
}
ret_server->start(ret_server);
}
else {
// re-use the existing video_server
// but now we need to re-create the file writers with the new file names
Expand All @@ -562,8 +573,12 @@ namespace malmo
{
if( !this->rewards_server || ( port != 0 && this->rewards_server->getPort() != port ) )
{
if (rewards_server != nullptr) {
rewards_server->close();
}

this->rewards_server = boost::make_shared<StringServer>(this->io_service, port, boost::bind(&AgentHost::onReward, this, _1), "rew");
this->rewards_server->start();
this->rewards_server->start(rewards_server);
}

if (this->current_mission_record->isRecordingRewards()){
Expand All @@ -575,8 +590,12 @@ namespace malmo
{
if( !this->observations_server || ( port != 0 && this->observations_server->getPort() != port ) )
{
if (observations_server != nullptr) {
observations_server->close();
}

this->observations_server = boost::make_shared<StringServer>(this->io_service, port, boost::bind(&AgentHost::onObservation, this, _1), "obs");
this->observations_server->start();
this->observations_server->start(observations_server);
}

if (this->current_mission_record->isRecordingObservations()){
Expand Down
3 changes: 2 additions & 1 deletion Malmo/src/ClientConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ namespace malmo
if (ec)
LOGERROR(LT("Error resolving remote endpoint: "), ec.message());
boost::lock_guard<boost::mutex> scope_guard(this->outbox_mutex);
this->outbox.pop_front();
if (!this->outbox.empty())
this->outbox.pop_front();
}
if (!this->outbox.empty())
this->write();
Expand Down
15 changes: 13 additions & 2 deletions Malmo/src/StringServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,27 @@
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

#include <iostream>

namespace malmo {
StringServer::StringServer(boost::asio::io_service& io_service, int port, const boost::function<void(const TimestampedString string_message)> handle_string, const std::string& log_name)
: handle_string(handle_string)
, server(io_service, port, boost::bind(&StringServer::handleMessage, this, _1), log_name)
{
}

void StringServer::start()
void StringServer::start(boost::shared_ptr<StringServer>& scope)
{
this->server.start();
this->scope = scope;
this->server.start(scope.get());
}

void StringServer::close() {
this->server.close();
}

void StringServer::release() {
this->scope = 0;
}

StringServer& StringServer::record(std::string path)
Expand Down
12 changes: 10 additions & 2 deletions Malmo/src/StringServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
// Boost:
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>

// STL:
#include <fstream>
Expand All @@ -36,7 +37,7 @@
namespace malmo
{
//! A TCP server that receives strings and can optionally persist to file.
class StringServer
class StringServer : ServerScope
{
public:

Expand All @@ -57,7 +58,12 @@ namespace malmo
void recordMessage(const TimestampedString message);

//! Starts the string server.
void start();

void start(boost::shared_ptr<StringServer>& scope);

virtual void release();

void close();

private:

Expand All @@ -67,6 +73,8 @@ namespace malmo
TCPServer server;
std::ofstream writer;
boost::mutex write_mutex;

boost::shared_ptr<StringServer> scope = nullptr;
};
}

Expand Down
61 changes: 44 additions & 17 deletions Malmo/src/TCPConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,32 +141,59 @@ namespace malmo
else
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::handle_read_line("), safe_local_endpoint(), LT("/"), safe_remote_endpoint(), LT(") - bytes_transferred: "), bytes_transferred, LT(" - ERROR: "), error.message());
}

void TCPConnection::processMessage()
{
LOGFINE(LT("TCPConnection("), this->log_name, LT(")::processMessage("), safe_local_endpoint(), LT("/"), safe_remote_endpoint(), LT(") - bytes received: "), this->body_buffer.size());

if( this->confirm_with_fixed_reply )
sendReply();
this->onMessageReceived( TimestampedUnsignedCharVector( boost::posix_time::microsec_clock::universal_time(),
this->body_buffer ) );
this->read();
if (this->confirm_with_fixed_reply)
{
reply();
}
else
{
deliverMessage();
}
}

void TCPConnection::sendReply()
void TCPConnection::reply()
{
const int REPLY_SIZE_HEADER_LENGTH = 4;
boost::system::error_code ec;
u_long reply_size_header = htonl((u_long)this->fixed_reply.size());
size_t bytes_written = boost::asio::write(this->socket, boost::asio::buffer(&reply_size_header, REPLY_SIZE_HEADER_LENGTH), ec);
if (bytes_written != REPLY_SIZE_HEADER_LENGTH || ec)
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::sendReply - ONLY SENT "), bytes_written, LT(" BYTES: "), ec.message());

bytes_written = boost::asio::write( this->socket, boost::asio::buffer(this->fixed_reply), boost::asio::transfer_all(), ec );
if (ec)
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::sendReply - failed to send body of message: "), ec.message());
this->reply_size_header = htonl((u_long)this->fixed_reply.size());

// Send header and continue after with response body.
boost::asio::async_write(this->socket, boost::asio::buffer(&this->reply_size_header, REPLY_SIZE_HEADER_LENGTH), boost::bind(&TCPConnection::transferredHeader, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void TCPConnection::transferredHeader(const boost::system::error_code& error, std::size_t bytes_transferred) {
if (!error)
{
// Send body and continue after with message delivery.
boost::asio::async_write(this->socket, boost::asio::buffer(this->fixed_reply), boost::bind(&TCPConnection::transferredBody, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
else
LOGFINE(LT("TCPConnection("), this->log_name, LT(")::sendReply sent "), bytes_written, LT(" bytes"));
{
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::transferredHeader - failed to send header of message: "), error.message());
}
}

void TCPConnection::transferredBody(const boost::system::error_code& error, std::size_t bytes_transferred) {
if (!error)
{
LOGFINE(LT("TCPConnection("), this->log_name, LT(")::transferredBody sent "), bytes_transferred, LT(" bytes"));

this->deliverMessage();
}
else
{
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::transferredBody - failed to send body of message: "), error.message());
}
}

void TCPConnection::deliverMessage()
{
this->onMessageReceived(TimestampedUnsignedCharVector(boost::posix_time::microsec_clock::universal_time(), this->body_buffer));
this->read(); // Continue on with reading of next request message.
}

TCPConnection::TCPConnection(boost::asio::io_service& io_service, boost::function<void(const TimestampedUnsignedCharVector) > callback, bool expect_size_header, const std::string& log_name)
Expand Down
8 changes: 7 additions & 1 deletion Malmo/src/TCPConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ namespace malmo
void handle_read_line( const boost::system::error_code& error, size_t bytes_transferred );

size_t getSizeFromHeader();

void processMessage();
void sendReply();
void reply();
void deliverMessage();

void transferredHeader(const boost::system::error_code& ec, std::size_t transferred);
void transferredBody(const boost::system::error_code& ec, std::size_t transferred);

private:

Expand All @@ -84,6 +89,7 @@ namespace malmo
std::string fixed_reply;
bool expect_size_header;
std::string log_name;
u_long reply_size_header;
};
}

Expand Down
46 changes: 40 additions & 6 deletions Malmo/src/TCPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,17 @@ namespace malmo
}
}

void TCPServer::start()
void TCPServer::start(ServerScope* scope)
{
this->scope = scope;
this->startAccept();
}


void TCPServer::close() {
this->closing = true;
this->acceptor->close();
}

void TCPServer::confirmWithFixedReply(std::string reply)
{
this->confirm_with_fixed_reply = true;
Expand All @@ -72,9 +78,14 @@ namespace malmo

void TCPServer::startAccept()
{
boost::function<void(const TimestampedUnsignedCharVector) > deliverMsgIfNotClosed = [this](const TimestampedUnsignedCharVector msg) {
if (!this->closing)
this->onMessageReceived(msg);
};

boost::shared_ptr<TCPConnection> new_connection = TCPConnection::create(
this->acceptor->get_io_service(),
this->onMessageReceived,
deliverMsgIfNotClosed,
this->expect_size_header,
this->log_name
);
Expand All @@ -89,17 +100,40 @@ namespace malmo
boost::asio::placeholders::error));
}

void TCPServer::handleAccept(
void TCPServer::handleAccept(
boost::shared_ptr<TCPConnection> new_connection,
const boost::system::error_code& error)
{
// On closing or on error release scope of async io processing which can be us.

if (!error)
{
new_connection->read();
this->startAccept();
if (this->closing)
{
new_connection.get()->getSocket().close();
if (this->scope != nullptr)
this->scope->release();
}
else {
new_connection->read();
if (!this->closing)
{
this->startAccept();
}
else
{
if (this->scope != nullptr)
this->scope->release();
}
}
}
else
{
LOGERROR(LT("TCPServer::handleAccept("), this->log_name, LT(") - "), error.message());
if (this->scope != nullptr) {
this->scope->release();
}
}
}

int TCPServer::getPort() const
Expand Down
Loading

0 comments on commit 1c4107f

Please sign in to comment.