Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Player Events] Add QS processing, mutex tweaks #2984

Merged
merged 3 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions common/dbcore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,17 @@ DBcore::~DBcore()
// Sends the MySQL server a keepalive
void DBcore::ping()
{
if (!m_query_lock.try_lock()) {
if (!MDatabase.trylock()) {
// well, if's it's locked, someone's using it. If someone's using it, it doesnt need a keepalive
return;
}
mysql_ping(&mysql);
m_query_lock.unlock();
MDatabase.unlock();
}

MySQLRequestResult DBcore::QueryDatabase(std::string query, bool retryOnFailureOnce)
{
m_query_lock.lock();
auto r = QueryDatabase(query.c_str(), query.length(), retryOnFailureOnce);
m_query_lock.unlock();
return r;
}

Expand All @@ -98,6 +96,8 @@ MySQLRequestResult DBcore::QueryDatabase(const char *query, uint32 querylen, boo
BenchTimer timer;
timer.reset();

LockMutex lock(&MDatabase);

// Reconnect if we are not connected before hand.
if (pStatus != Connected) {
Open();
Expand Down
8 changes: 2 additions & 6 deletions common/events/player_event_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,6 @@ void PlayerEventLogs::AddToQueue(const PlayerEventLogsRepository::PlayerEventLog
m_batch_queue_lock.lock();
m_record_batch_queue.emplace_back(log);
m_batch_queue_lock.unlock();

if (m_record_batch_queue.size() >= RuleI(Logging, BatchPlayerEventProcessChunkSize)) {
ProcessBatchQueue();
}
}

// fills common event data in the SendEvent function
Expand Down Expand Up @@ -607,10 +603,10 @@ std::string PlayerEventLogs::GetDiscordPayloadFromEvent(const PlayerEvent::Playe
return payload;
}

// general process function, used in world or UCS depending on rule Logging:PlayerEventsQSProcess
// general process function, used in world or QS depending on rule Logging:PlayerEventsQSProcess
void PlayerEventLogs::Process()
{
if (m_process_batch_events_timer.Check()) {
if (m_process_batch_events_timer.Check() || m_record_batch_queue.size() >= RuleI(Logging, BatchPlayerEventProcessChunkSize)) {
ProcessBatchQueue();
}

Expand Down
9 changes: 9 additions & 0 deletions queryserv/queryserv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "worldserver.h"
#include "../common/path_manager.h"
#include "../common/zone_store.h"
#include "../common/events/player_event_logs.h"
#include <list>
#include <signal.h>
#include <thread>
Expand All @@ -47,6 +48,7 @@ WorldServer *worldserver = 0;
EQEmuLogSys LogSys;
PathManager path;
ZoneStore zone_store;
PlayerEventLogs player_event_logs;

void CatchSignal(int sig_num)
{
Expand Down Expand Up @@ -106,6 +108,9 @@ int main()
/* Load Looking For Guild Manager */
lfguildmanager.LoadDatabase();

Timer player_event_process_timer(1000);
player_event_logs.SetDatabase(&database)->Init();

auto loop_fn = [&](EQ::Timer* t) {
Timer::SetCurrentTime();

Expand All @@ -117,6 +122,10 @@ int main()
if (LFGuildExpireTimer.Check()) {
lfguildmanager.ExpireEntries();
}

if (player_event_process_timer.Check()) {
player_event_logs.Process();
}
};

EQ::Timer process_timer(loop_fn);
Expand Down
13 changes: 13 additions & 0 deletions queryserv/worldserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#include "lfguild.h"
#include "queryservconfig.h"
#include "worldserver.h"
#include "../common/events/player_events.h"
#include "../common/events/player_event_logs.h"
#include <iomanip>
#include <iostream>
#include <stdarg.h>
Expand Down Expand Up @@ -89,6 +91,17 @@ void WorldServer::HandleMessage(uint16 opcode, const EQ::Net::Packet &p)
case 0: {
break;
}
case ServerOP_PlayerEvent: {
auto n = PlayerEvent::PlayerEventContainer{};
auto s = (ServerSendPlayerEvent_Struct *) p.Data();
EQ::Util::MemoryStreamReader ss(s->cereal_data, s->cereal_size);
cereal::BinaryInputArchive archive(ss);
archive(n);

player_event_logs.AddToQueue(n.player_event_log);

break;
}
case ServerOP_KeepAlive: {
break;
}
Expand Down
16 changes: 5 additions & 11 deletions world/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,6 @@ inline void UpdateWindowTitle(std::string new_title)
#endif
}

void PlayerEventQueueListener() {
while (RunLoops) {
player_event_logs.Process();
Sleep(1000);
}
}

/**
* World process entrypoint
Expand Down Expand Up @@ -381,13 +375,9 @@ int main(int argc, char **argv)
}
);

Timer player_event_process_timer(1000);
player_event_logs.SetDatabase(&database)->Init();

if (!RuleB(Logging, PlayerEventsQSProcess)) {
LogInfo("[PlayerEventQueueListener] Booting queue processor");
std::thread(PlayerEventQueueListener).detach();
}

auto loop_fn = [&](EQ::Timer* t) {
Timer::SetCurrentTime();

Expand Down Expand Up @@ -435,6 +425,10 @@ int main(int argc, char **argv)

client_list.Process();

if (player_event_process_timer.Check()) {
player_event_logs.Process();
}

if (PurgeInstanceTimer.Check()) {
database.PurgeExpiredInstances();
database.PurgeAllDeletedDataBuckets();
Expand Down