Skip to content

Commit

Permalink
* Fixed premature close of TCP connection
Browse files Browse the repository at this point in the history
 * Fixed unsubscribe functionality of topics
  • Loading branch information
Hemofektik committed Mar 9, 2018
1 parent f5c4790 commit 9f7468e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Source/ROSIntegration/Classes/RI/Topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ROSINTEGRATION_API UTopic: public UObject

bool Subscribe(std::function<void(TSharedPtr<FROSBaseMsg>)> func);

bool Unsubscribe(std::function<void(TSharedPtr<FROSBaseMsg>)> func);
bool Unsubscribe();

bool Advertise();

Expand Down
35 changes: 26 additions & 9 deletions Source/ROSIntegration/Private/Topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,21 @@
class UTopic::Impl {
// hidden implementation details
public:
Impl(){

Impl()
: _Ric(nullptr)
, _ROSTopic(nullptr)
{
}

~Impl() {

if (_Callback) {
Unsubscribe();
}

delete _ROSTopic;
}

//ROSBridgeHandler _Handler;
UROSIntegrationCore* _Ric;
FString _Topic;
Expand All @@ -26,7 +38,7 @@ class UTopic::Impl {
// TODO do this on advertise/call?
UBaseMessageConverter** Converter = _ConverterMap.Find(_MessageType);
if (!Converter) {
UE_LOG(LogTemp, Error, TEXT("MessageType is unknown. Can't find Converter to encode message"));
UE_LOG(LogTemp, Error, TEXT("MessageType %s is unknown. Can't find Converter to encode message"), *_MessageType);
return false;
}

Expand All @@ -39,7 +51,7 @@ class UTopic::Impl {
// TODO do this on advertise/call?
UBaseMessageConverter** Converter = _ConverterMap.Find(_MessageType);
if (!Converter) {
UE_LOG(LogTemp, Error, TEXT("MessageType is unknown. Can't find Converter to decode message"));
UE_LOG(LogTemp, Error, TEXT("MessageType %s is unknown. Can't find Converter to decode message"), *_MessageType);
return false;
}

Expand All @@ -52,18 +64,25 @@ class UTopic::Impl {
UE_LOG(LogTemp, Error, TEXT("Rostopic hasn't been initialized before Subscribe() call"));
return false;
}
if (_Callback) {
UE_LOG(LogTemp, Warning, TEXT("Rostopic was already subscribed"));
Unsubscribe();
}

bool result = _ROSTopic->Subscribe(std::bind(&UTopic::Impl::MessageCallback, this, std::placeholders::_1));
_Callback = func;
return result;
}
bool Unsubscribe(std::function<void(TSharedPtr<FROSBaseMsg>)> func) {
bool Unsubscribe() {
if (!_ROSTopic) {
UE_LOG(LogTemp, Error, TEXT("Rostopic hasn't been initialized before Unsubscribe() call"));
return false;
}

bool result = _ROSTopic->Unsubscribe(std::bind(&UTopic::Impl::MessageCallback, this, std::placeholders::_1));
if (result) {
_Callback = nullptr;
}
return result;
}

Expand Down Expand Up @@ -109,8 +128,6 @@ class UTopic::Impl {
UBaseMessageConverter* ConcreteConverter = ClassItr->GetDefaultObject<UBaseMessageConverter>();
UE_LOG(LogTemp, Log, TEXT("Added %s with type %s to TopicConverterMap"), *(It->GetDefaultObjectName().ToString()), *(ConcreteConverter->_MessageType));
_ConverterMap.Add(*(ConcreteConverter->_MessageType), ConcreteConverter);


}
}

Expand Down Expand Up @@ -145,8 +162,8 @@ void UTopic::BeginDestroy() {
bool UTopic::Subscribe(std::function<void(TSharedPtr<FROSBaseMsg>)> func) {
return _Implementation->Subscribe(func);
}
bool UTopic::Unsubscribe(std::function<void(TSharedPtr<FROSBaseMsg>)> func) {
return _Implementation->Unsubscribe(func);
bool UTopic::Unsubscribe() {
return _Implementation->Unsubscribe();
}

bool UTopic::Advertise() {
Expand Down
14 changes: 9 additions & 5 deletions Source/ROSIntegration/Private/rosbridge2cpp/TCPConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,15 @@ int TCPConnection::ReceiverThreadFunction(){
// Checking the connection state alone doesn't catch simple interruptions
// like a crashed server etc.
//

if(_sock->GetConnectionState()!=ESocketConnectionState::SCS_Connected){
std::cout << "Error on connection" << std::endl;
ReportError(rosbridge2cpp::TransportError::R2C_SOCKET_ERROR);
return 2; // error while receiving from socket
ESocketConnectionState ConnectionState = _sock->GetConnectionState();
if( ConnectionState != ESocketConnectionState::SCS_Connected ){
if (ConnectionState == SCS_NotConnected) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
} else {
std::cout << "Error on connection" << std::endl;
ReportError(rosbridge2cpp::TransportError::R2C_SOCKET_ERROR);
return 2; // error while receiving from socket
}
}else{
// std::cout << "c";
// std::cout.flush();
Expand Down
29 changes: 14 additions & 15 deletions Source/ROSIntegration/Private/rosbridge2cpp/ros_bridge.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "ros_bridge.h"
#include "ros_topic.h"
#include "bson.h"

#include "Misc/ScopeLock.h"

namespace rosbridge2cpp{
bool ROSBridge::SendMessage(std::string data){
Expand Down Expand Up @@ -76,14 +78,12 @@ namespace rosbridge2cpp{
}

void ROSBridge::HandleIncomingPublishMessage(ROSBridgePublishMsg &data){
FScopeLock Lock(&ChangeTopicsMutex);

//Incoming topic message - dispatch to correct callback
std::string &incoming_topic_name = data.topic_;
if ( registered_topic_callbacks_.find(incoming_topic_name) == registered_topic_callbacks_.end()) {
std::cerr << "[ROSBridge] Received message for topic " << incoming_topic_name << "where no callback has been registered before" <<std::endl;
return;
}
if ( registered_topic_callbacks_.empty()) {
std::cerr << "[ROSBridge] Received message for topic " << incoming_topic_name << "where no callback is currently registered" <<std::endl;
std::cerr << "[ROSBridge] Received message for topic " << incoming_topic_name << " where no callback has been registered before" <<std::endl;
return;
}

Expand Down Expand Up @@ -242,8 +242,6 @@ namespace rosbridge2cpp{
m.FromJSON(data);
HandleIncomingServiceRequestMessage(m.id_, m);
}


}

bool ROSBridge::Init(std::string ip_addr, int port){
Expand All @@ -264,6 +262,7 @@ namespace rosbridge2cpp{
}

void ROSBridge::RegisterTopicCallback(std::string topic_name, FunVrROSPublishMsg fun){
FScopeLock Lock(&ChangeTopicsMutex);
registered_topic_callbacks_[topic_name].push_back(fun);
}

Expand All @@ -279,14 +278,12 @@ namespace rosbridge2cpp{
registered_service_request_callbacks_bson_[service_name] = fun;
}


bool ROSBridge::UnregisterTopicCallback(std::string topic_name, FunVrROSPublishMsg fun){

FScopeLock Lock(&ChangeTopicsMutex);

if ( registered_topic_callbacks_.find(topic_name) == registered_topic_callbacks_.end()) {
std::cerr << "[ROSBridge] UnregisterTopicCallback called but given topic name" << topic_name << " not in map." <<std::endl;
return false;
}
if ( registered_topic_callbacks_.empty()) {
std::cerr << "[ROSBridge] UnregisterTopicCallback called but given topic name" << topic_name << " is empty in map." <<std::endl;
std::cerr << "[ROSBridge] UnregisterTopicCallback called but given topic name '" << topic_name << "' not in map." <<std::endl;
return false;
}

Expand All @@ -295,8 +292,10 @@ namespace rosbridge2cpp{
for(std::list<FunVrROSPublishMsg>::iterator topic_callback_it = r_list_of_callbacks.begin();
topic_callback_it!= r_list_of_callbacks.end();
++topic_callback_it){
// if(get_address(*topic_callback_it) == get_address(fun)){
if(1 == 0){ // TODO refactor unsubscribe

// the following compare only works with the UE4 ROSIntegration because the topic class has only one callback method
typedef void(fnType)(ROSBridgePublishMsg&);
if(topic_callback_it->target<fnType>() == fun.target<fnType>()) {
std::cout << "[ROSBridge] Found CB in UnregisterTopicCallback. Deleting it ... " << std::endl;
r_list_of_callbacks.erase(topic_callback_it);
return true;
Expand Down
2 changes: 2 additions & 0 deletions Source/ROSIntegration/Private/rosbridge2cpp/ros_bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ namespace rosbridge2cpp{
std::unordered_map<std::string, FunVrROSCallServiceMsgrROSServiceResponseMsg> registered_service_request_callbacks_bson_;
bool bson_only_mode_ = false;

FCriticalSection ChangeTopicsMutex;

// template<typename T>
// size_t get_address(std::function<void (T &)> f) {
// typedef void (fnType)(T &);
Expand Down

0 comments on commit 9f7468e

Please sign in to comment.