-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmock-device-for-mqtt-async.c
171 lines (149 loc) · 5.44 KB
/
mock-device-for-mqtt-async.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
* 1: 依赖库:cjon,eclipse paho mqtt.编译前请安装
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <cjson/cJSON.h>
#include <MQTTAsync.h>
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "Mock-Device-Client"
#define DATA_TOPIC "DataTopic"
#define CMDTOPIC "CommandTopic"
#define RESPONSE_TOPIC "ResponseTopic"
#define USERNAME "huaqiao"
#define PWD "1234"
#define PAYLOAD "{\"name\":\"mqtt-device-01\",\"randnum\":\"520.1314\"}"
#define QOS 0
#define TIMEOUT 10000L
static pthread_mutex_t mutex;
char *active = "false";
void on_send_success(void* context, MQTTAsync_successData* response) {
printf("Message with token value %d delivery confirmed\n", response->token);
}
void publish(MQTTAsync *client, char *payload, char *topic) {
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
opts.onSuccess = on_send_success;
opts.context = client;
pubmsg.payload = payload;
pubmsg.payloadlen = strlen(payload);
pubmsg.qos = QOS;
pubmsg.retained = 0;
opts.context = client;
//pthread_mutex_lock(&mutex);
if ((rc = MQTTAsync_sendMessage(*client, topic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
//pthread_mutex_unlock(&mutex);
printf("Waiting for up to %d seconds for publication of %s\n",(int)(TIMEOUT/1000), payload);
}
void response_cmd(void *context, char *resp_data) {
publish(context,resp_data,RESPONSE_TOPIC);
}
int on_message(void *context, char *topicName, int topicLen, MQTTAsync_message *message) {
char *cmd;
char *method;
char *param;
cJSON *item = NULL;
cJSON *json = NULL;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %s\n",message->payload);
json = cJSON_Parse(message->payload);
item = cJSON_GetObjectItem(json,"cmd");
cmd = item->valuestring;
method = (cJSON_GetObjectItem(json,"method"))->valuestring;
//处理ping命令
if (strcmp(cmd,"ping") == 0) {
cJSON_AddItemToObject(json, "ping", cJSON_CreateString("pong"));
}
//处理message命令
if (strcmp(cmd,"message") == 0) {
if (strcmp(method,"get") == 0) {
cJSON_AddItemToObject(json, "message", cJSON_CreateString("Are you ok?"));
} else {
//param = cJSON_Print(cJSON_GetObjectItem(json,"param"));
cJSON_AddItemToObject(json, "result", cJSON_CreateString("set success."));
}
}
//处理randnum命令
if (strcmp(cmd,"randnum") == 0) {
cJSON_AddItemToObject(json, "randnum", cJSON_CreateString("520.1314"));
}
//处理collect命令
if (strcmp(cmd,"collect") == 0) {
if (strcmp(method,"get") == 0) {
cJSON_AddItemToObject(json, "collect", cJSON_CreateString(active));
} else {
active = (cJSON_GetObjectItem(json,"param"))->valuestring;
}
}
response_cmd(context,cJSON_PrintUnformatted(json));
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
// cJSON_Delete(json);
// cJSON_Delete(item);
return 1;
}
void on_connect(void* context, MQTTAsync_successData* response) {
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
printf("Successful connection\n");
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", CMDTOPIC, CLIENTID, QOS);
opts.context = client;
if ((rc = MQTTAsync_subscribe(client, CMDTOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
void *send_data_actively_server(void *context) {
for (;;) {
if (strcmp(active,"true") == 0) {
sleep(1);
printf("send data actively from mock device.\n");
publish((MQTTAsync *)context,PAYLOAD,DATA_TOPIC);
}
}
}
int main(int argc, char* argv[]) {
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
//MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_token token;
int rc;
pthread_t thread_id;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, &client, NULL, on_message, NULL);
conn_opts.MQTTVersion = MQTTVERSION_DEFAULT;
conn_opts.username = USERNAME;
conn_opts.password = PWD;
conn_opts.keepAliveInterval = 2000;
conn_opts.cleansession = 0;
conn_opts.onSuccess = on_connect;
conn_opts.context = client;
conn_opts.automaticReconnect = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
// disc_opts.onSuccess = onDisconnect;
// if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
// printf("Failed to start disconnect, return code %d\n", rc);
// exit(EXIT_FAILURE);
// }
pthread_create(&thread_id, NULL, send_data_actively_server, &client);
//pthread_join(thread_id, NULL);
for(;;) {
sleep(3);
}
MQTTClient_disconnect(client, 10000);
MQTTAsync_destroy(&client);
return rc;
}