Skip to content

Commit

Permalink
refactor msg
Browse files Browse the repository at this point in the history
  • Loading branch information
taohexxx committed Apr 19, 2018
1 parent ec2d617 commit b5879ad
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 118 deletions.
8 changes: 4 additions & 4 deletions phxrpc/http/http_msg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ const char *HttpMessage::GetHeaderValue(const char *name) const {

//---------------------------------------------------------

HttpRequest::HttpRequest()
: HttpMessage(), BaseRequest(Protocol::HTTP_POST) {
HttpRequest::HttpRequest() {
set_protocol(Protocol::HTTP_POST);
SetVersion("HTTP/1.0");
memset(method_, 0, sizeof(method_));
memset(client_ip_, 0, sizeof(client_ip_));
Expand Down Expand Up @@ -221,8 +221,8 @@ int HttpRequest::IsKeepAlive() const {

//---------------------------------------------------------

HttpResponse::HttpResponse()
: HttpMessage(), BaseResponse(Protocol::HTTP_POST) {
HttpResponse::HttpResponse() {
set_protocol(Protocol::HTTP_POST);
SetVersion("HTTP/1.0");
status_code_ = 200;
snprintf(reason_phrase_, sizeof(reason_phrase_), "%s", "OK");
Expand Down
15 changes: 13 additions & 2 deletions phxrpc/mqtt/mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ int MqttClient::Publish(BaseTcpStream &socket, const MqttPublish &req,
return static_cast<int>(DoMethod(socket, &req, &resp));
}

int MqttClient::Puback(BaseTcpStream &socket, const MqttPuback &req,
MqttClient::MqttStat &mqtt_stat) {
MqttFakeResponse resp;
return static_cast<int>(DoMethod(socket, &req, &resp, mqtt_stat));
}

int MqttClient::Puback(BaseTcpStream &socket, const MqttPuback &req) {
MqttFakeResponse resp;
return static_cast<int>(DoMethod(socket, &req, &resp));
}

int MqttClient::Subscribe(BaseTcpStream &socket, const MqttSubscribe &req,
MqttSuback &resp) {
return static_cast<int>(DoMethod(socket, &req, &resp));
Expand All @@ -121,12 +132,12 @@ int MqttClient::Ping(BaseTcpStream &socket, const MqttPingreq &req,

int MqttClient::Disconnect(BaseTcpStream &socket, const MqttDisconnect &req,
MqttClient::MqttStat &mqtt_stat) {
MqttFakeDisconnack resp;
MqttFakeResponse resp;
return static_cast<int>(DoMethod(socket, &req, &resp, mqtt_stat));
}

int MqttClient::Disconnect(BaseTcpStream &socket, const MqttDisconnect &req) {
MqttFakeDisconnack resp;
MqttFakeResponse resp;
return static_cast<int>(DoMethod(socket, &req, &resp));
}

Expand Down
5 changes: 5 additions & 0 deletions phxrpc/mqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class MqttClient {
static int Publish(BaseTcpStream &socket, const MqttPublish &req,
MqttPuback &resp);

// @return true: socket ok; false: socket error
static int Puback(BaseTcpStream &socket, const MqttPuback &req,
MqttStat &mqtt_stat);
static int Puback(BaseTcpStream &socket, const MqttPuback &req);

// @return true: socket ok; false: socket error
static int Subscribe(BaseTcpStream &socket, const MqttSubscribe &req,
MqttSuback &resp);
Expand Down
127 changes: 81 additions & 46 deletions phxrpc/mqtt/mqtt_msg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,35 @@ ReturnCode MqttMessage::SendRemaining(ostringstream &out_stream) const {

return ret;
}
// TODO: remove
//string remaining_buffer(out_stream.str());
//printf("remaining_buffer %zu\n", remaining_buffer.size());
//for (int i{0}; remaining_buffer.size() > i; ++i) {
//printf("%d\t", remaining_buffer.at(i));
//}
//printf("\n");
//for (int i{0}; remaining_buffer.size() > i; ++i) {
//printf("%c\t", remaining_buffer.at(i));
//}
//printf("\n");

ret = SendPayload(out_stream);
if (ReturnCode::OK != ret) {
phxrpc::log(LOG_ERR, "SendPayload err %d", ret);

return ret;
}
// TODO: remove
//remaining_buffer = out_stream.str();
//printf("remaining_buffer %zu\n", remaining_buffer.size());
//for (int i{0}; remaining_buffer.size() > i; ++i) {
//printf("%d\t", remaining_buffer.at(i));
//}
//printf("\n");
//for (int i{0}; remaining_buffer.size() > i; ++i) {
//printf("%c\t", remaining_buffer.at(i));
//}
//printf("\n");

return ret;
}
Expand Down Expand Up @@ -528,16 +550,16 @@ ReturnCode MqttResponse::ModifyResp(const bool keep_alive, const string &version
}


MqttConnect::MqttConnect() : MqttRequest(Protocol::MQTT_CONNECT) {
mutable_fixed_header().control_packet_type = ControlPacketType::CONNECT;
MqttFakeResponse::MqttFakeResponse() {
set_protocol(Protocol::MQTT_FAKE_NONE);
mutable_fixed_header().control_packet_type = ControlPacketType::FAKE_NONE;
set_fake(true);
}

proto_name_.resize(6);
proto_name_[0] = 0;
proto_name_[1] = 4;
proto_name_[2] = 'M';
proto_name_[3] = 'Q';
proto_name_[4] = 'T';
proto_name_[5] = 'T';

MqttConnect::MqttConnect() {
set_protocol(Protocol::MQTT_CONNECT);
mutable_fixed_header().control_packet_type = ControlPacketType::CONNECT;
}

ReturnCode MqttConnect::ToPb(google::protobuf::Message *const message) const {
Expand Down Expand Up @@ -579,10 +601,9 @@ ReturnCode MqttConnect::FromPb(const google::protobuf::Message &message) {
BaseResponse *MqttConnect::GenResponse() const { return new MqttConnack; }

ReturnCode MqttConnect::SendVariableHeader(ostringstream &out_stream) const {
ReturnCode ret{SendChars(out_stream, proto_name_.data(),
proto_name_.size())};
ReturnCode ret{SendUnicode(out_stream, proto_name_)};
if (ReturnCode::OK != ret) {
phxrpc::log(LOG_ERR, "SendChars err %d", ret);
phxrpc::log(LOG_ERR, "SendUnicode err %d", ret);

return ret;
}
Expand Down Expand Up @@ -614,27 +635,27 @@ ReturnCode MqttConnect::SendVariableHeader(ostringstream &out_stream) const {
}

ReturnCode MqttConnect::RecvVariableHeader(istringstream &in_stream) {
char proto_name[6]{0x0};
ReturnCode ret{RecvChars(in_stream, proto_name, 6)};
ReturnCode ret{RecvUnicode(in_stream, proto_name_)};
if (ReturnCode::OK != ret) {
phxrpc::log(LOG_ERR, "RecvChars err %d", ret);
phxrpc::log(LOG_ERR, "RecvUnicode err %d", ret);

return ret;
}
proto_name_.resize(6);
proto_name_[0] = proto_name[0];
proto_name_[1] = proto_name[1];
proto_name_[2] = proto_name[2];
proto_name_[3] = proto_name[3];
proto_name_[4] = proto_name[4];
proto_name_[5] = proto_name[5];

ret = RecvChar(in_stream, proto_level_);

if ("MQTT" != proto_name_) {
phxrpc::log(LOG_ERR, "violate mqtt protocol");

return ReturnCode::ERROR_VIOLATE_PROTOCOL;
}

char proto_level{'\0'};
ret = RecvChar(in_stream, proto_level);
if (ReturnCode::OK != ret) {
phxrpc::log(LOG_ERR, "RecvChar err %d", ret);

return ret;
}
proto_level_ = proto_level;

char connect_flags{0x0};
ret = RecvChar(in_stream, connect_flags);
Expand Down Expand Up @@ -664,7 +685,8 @@ ReturnCode MqttConnect::RecvPayload(istringstream &in_stream) {
}


MqttConnack::MqttConnack() : MqttResponse(Protocol::MQTT_CONNECT) {
MqttConnack::MqttConnack() {
set_protocol(Protocol::MQTT_CONNECT);
mutable_fixed_header().control_packet_type = ControlPacketType::CONNACK;
}

Expand Down Expand Up @@ -737,16 +759,21 @@ ReturnCode MqttConnack::RecvVariableHeader(istringstream &in_stream) {
}


MqttPublish::MqttPublish() : MqttRequest(Protocol::MQTT_PUBLISH) {
MqttPublish::MqttPublish() {
set_protocol(Protocol::MQTT_PUBLISH);
mutable_fixed_header().control_packet_type = ControlPacketType::PUBLISH;
}

ReturnCode MqttPublish::ToPb(google::protobuf::Message *const message) const {
phxrpc::MqttPublishPb publish;

publish.set_dup(fixed_header().dup);
publish.set_qos(fixed_header().qos);
publish.set_retain(fixed_header().retain);

publish.set_topic_name(topic_name_);
publish.set_content(GetContent());
publish.set_package_identifier(packet_identifier());
publish.set_packet_identifier(packet_identifier());

try {
message->CopyFrom(publish);
Expand All @@ -766,9 +793,14 @@ ReturnCode MqttPublish::FromPb(const google::protobuf::Message &message) {
return ReturnCode::ERROR;
}

FixedHeader &fixed_header(mutable_fixed_header());
fixed_header.dup = publish.dup();
fixed_header.qos = publish.qos();
fixed_header.retain = publish.retain();

topic_name_ = publish.topic_name();
SetContent(publish.content().data(), publish.content().length());
set_packet_identifier(publish.package_identifier());
set_packet_identifier(publish.packet_identifier());

return ReturnCode::OK;
}
Expand Down Expand Up @@ -836,14 +868,15 @@ ReturnCode MqttPublish::RecvPayload(istringstream &in_stream) {
}


MqttPuback::MqttPuback() : MqttResponse(Protocol::MQTT_PUBLISH) {
MqttPuback::MqttPuback() {
set_protocol(Protocol::MQTT_PUBACK);
mutable_fixed_header().control_packet_type = ControlPacketType::PUBACK;
}

ReturnCode MqttPuback::ToPb(google::protobuf::Message *const message) const {
phxrpc::MqttPubackPb puback;

puback.set_package_identifier(packet_identifier());
puback.set_packet_identifier(packet_identifier());

try {
message->CopyFrom(puback);
Expand All @@ -863,11 +896,13 @@ ReturnCode MqttPuback::FromPb(const google::protobuf::Message &message) {
return ReturnCode::ERROR;
}

set_packet_identifier(puback.package_identifier());
set_packet_identifier(puback.packet_identifier());

return ReturnCode::OK;
}

BaseResponse *MqttPuback::GenResponse() const { return new MqttFakeResponse; }

ReturnCode MqttPuback::SendVariableHeader(ostringstream &out_stream) const {
return SendPacketIdentifier(out_stream);
}
Expand All @@ -877,7 +912,8 @@ ReturnCode MqttPuback::RecvVariableHeader(istringstream &in_stream) {
}


MqttSubscribe::MqttSubscribe() : MqttRequest(Protocol::MQTT_SUBSCRIBE) {
MqttSubscribe::MqttSubscribe() {
set_protocol(Protocol::MQTT_SUBSCRIBE);
mutable_fixed_header().control_packet_type = ControlPacketType::SUBSCRIBE;
}

Expand Down Expand Up @@ -950,7 +986,8 @@ ReturnCode MqttSubscribe::RecvPayload(istringstream &in_stream) {
}


MqttSuback::MqttSuback() : MqttResponse(Protocol::MQTT_SUBSCRIBE) {
MqttSuback::MqttSuback() {
set_protocol(Protocol::MQTT_SUBSCRIBE);
mutable_fixed_header().control_packet_type = ControlPacketType::SUBACK;
}

Expand Down Expand Up @@ -997,7 +1034,8 @@ ReturnCode MqttSuback::RecvPayload(istringstream &in_stream) {
}


MqttUnsubscribe::MqttUnsubscribe() : MqttRequest(Protocol::MQTT_UNSUBSCRIBE) {
MqttUnsubscribe::MqttUnsubscribe() {
set_protocol(Protocol::MQTT_UNSUBSCRIBE);
mutable_fixed_header().control_packet_type = ControlPacketType::UNSUBSCRIBE;
}

Expand Down Expand Up @@ -1059,7 +1097,8 @@ MqttUnsubscribe::RecvPayload(istringstream &in_stream) {
}


MqttUnsuback::MqttUnsuback() : MqttResponse(Protocol::MQTT_UNSUBSCRIBE) {
MqttUnsuback::MqttUnsuback() {
set_protocol(Protocol::MQTT_UNSUBSCRIBE);
mutable_fixed_header().control_packet_type = ControlPacketType::UNSUBACK;
}

Expand All @@ -1072,19 +1111,22 @@ ReturnCode MqttUnsuback::RecvVariableHeader(istringstream &in_stream) {
}


MqttPingreq::MqttPingreq() : MqttRequest(Protocol::MQTT_PING) {
MqttPingreq::MqttPingreq() {
set_protocol(Protocol::MQTT_PING);
mutable_fixed_header().control_packet_type = ControlPacketType::PINGREQ;
}

BaseResponse *MqttPingreq::GenResponse() const { return new MqttPingresp; }


MqttPingresp::MqttPingresp() : MqttResponse(Protocol::MQTT_PING) {
MqttPingresp::MqttPingresp() {
set_protocol(Protocol::MQTT_PING);
mutable_fixed_header().control_packet_type = ControlPacketType::PINGRESP;
}


MqttDisconnect::MqttDisconnect() : MqttRequest(Protocol::MQTT_DISCONNECT) {
MqttDisconnect::MqttDisconnect() {
set_protocol(Protocol::MQTT_DISCONNECT);
mutable_fixed_header().control_packet_type = ControlPacketType::DISCONNECT;
}

Expand Down Expand Up @@ -1112,14 +1154,7 @@ ReturnCode MqttDisconnect::FromPb(const google::protobuf::Message &message) {
return ReturnCode::OK;
}

BaseResponse *MqttDisconnect::GenResponse() const { return new MqttFakeDisconnack; }


MqttFakeDisconnack::MqttFakeDisconnack()
: MqttResponse(Protocol::MQTT_FAKE_DISCONNACK) {
mutable_fixed_header().control_packet_type = ControlPacketType::FAKE_DISCONNACK;
set_fake(true);
}
BaseResponse *MqttDisconnect::GenResponse() const { return new MqttFakeResponse; }


} // namespace phxrpc
Expand Down
Loading

0 comments on commit b5879ad

Please sign in to comment.