Skip to content

Commit

Permalink
Add AMX to message struct for reliable callback invoking
Browse files Browse the repository at this point in the history
Previous implementation processed the entire message queue on the first
loaded AMX which could be an unrelated script. Now it takes the AMX
pointer and stores it on the message struct and uses that to invoke the
callback.
  • Loading branch information
AGraber committed Aug 22, 2020
1 parent 7e4d1f1 commit 96218c4
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
9 changes: 6 additions & 3 deletions src/impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ int Impl::HIncrByFloat(int client_id, std::string key, std::string field, float
return 0;
}

int Impl::Subscribe(std::string host, int port, std::string auth, std::string channel, std::string callback, int& id)
int Impl::Subscribe(AMX* amx, std::string host, int port, std::string auth, std::string channel, std::string callback, int& id)
{
cpp_redis::subscriber* sub = new cpp_redis::subscriber();
sub->connect(host, port);
Expand All @@ -404,8 +404,9 @@ int Impl::Subscribe(std::string host, int port, std::string auth, std::string ch
sub->auth(auth);
}

sub->subscribe(channel, [callback](const std::string& chan, const std::string& msg) {
sub->subscribe(channel, [amx, callback](const std::string& chan, const std::string& msg) {
message m;
m.amx = amx;
m.channel = chan;
m.msg = msg;
m.callback = callback;
Expand Down Expand Up @@ -467,7 +468,7 @@ int Impl::Publish(int client_id, std::string channel, std::string data)
return 0;
}

void Impl::amx_tick(AMX* amx)
void Impl::amx_tick()
{
if (message_stack_mutex.try_lock()) {
message m;
Expand All @@ -480,6 +481,8 @@ void Impl::amx_tick(AMX* amx)
while (!message_stack.empty()) {
m = message_stack.top();

AMX* amx = m.amx;

error = amx_FindPublic(amx, m.callback.c_str(), &amx_idx);

if (error == AMX_ERR_NONE) {
Expand Down
5 changes: 3 additions & 2 deletions src/impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct message {
std::string channel;
std::string msg;
std::string callback;
AMX* amx;
};

int Connect(std::string hostname, int port, std::string auth, int& id);
Expand All @@ -87,13 +88,13 @@ int HIncrBy(int client_id, std::string key, std::string field, int incr);
int HIncrByFloat(int client_id, std::string key, std::string field, float incr);
int HDel(int client_id, std::string key, std::string field);

int Subscribe(std::string host, int port, std::string auth, std::string channel, std::string callback, int& id);
int Subscribe(AMX* amx, std::string host, int port, std::string auth, std::string channel, std::string callback, int& id);
int Unsubscribe(int client_id);
int Publish(int client_id, std::string channel, std::string message);

int clientFromID(int client_id, cpp_redis::client*& client);
int clientDataFromID(int client_id, clientData& client);
void amx_tick(AMX* amx);
void amx_tick();
std::vector<std::string> split(const std::string s);

extern int context_count;
Expand Down
4 changes: 1 addition & 3 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ PLUGIN_EXPORT void PLUGIN_CALL Unload()

PLUGIN_EXPORT void PLUGIN_CALL ProcessTick()
{
for (AMX* i : amx_list) {
Impl::amx_tick(i);
}
Impl::amx_tick();
}

PLUGIN_EXPORT int PLUGIN_CALL AmxLoad(AMX* amx)
Expand Down
2 changes: 1 addition & 1 deletion src/natives.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ cell Natives::Subscribe(AMX* amx, cell* params)
cell* addr;
amx_GetAddr(amx, params[6], &addr);
try {
return Impl::Subscribe(host, port, auth, channel, callback, *addr);
return Impl::Subscribe(amx, host, port, auth, channel, callback, *addr);
}
catch (cpp_redis::redis_error e) {
logprintf("ERROR: %s", e.what());
Expand Down

0 comments on commit 96218c4

Please sign in to comment.