Skip to content

Commit

Permalink
Switch to message buffer / dispatcher for Twitch event sub messages
Browse files Browse the repository at this point in the history
  • Loading branch information
WarmUpTill committed Feb 21, 2024
1 parent 41033f2 commit a628831
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 76 deletions.
33 changes: 5 additions & 28 deletions plugins/twitch/event-sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "twitch-helpers.hpp"

#include <log-helper.hpp>
#include <plugin-state-helpers.hpp>

namespace advss {

Expand All @@ -27,6 +26,8 @@ static constexpr std::string_view registerSubscriptionPath =
#endif
static const int reconnectDelay = 15;

#undef DispatchMessage

EventSub::EventSub() : QObject(nullptr)
{
_client.get_alog().clear_channels(
Expand Down Expand Up @@ -59,20 +60,6 @@ EventSub::~EventSub()
UnregisterInstance();
}

static bool setupEventSubMessageClear()
{
AddIntervalResetStep(&EventSub::ClearAllEvents);
return true;
}

bool EventSub::_setupDone = setupEventSubMessageClear();

void EventSub::ClearEvents()
{
std::lock_guard<std::mutex> lock(_messageMtx);
_messages.clear();
}

std::mutex EventSub::_instancesMtx;
std::vector<EventSub *> EventSub::_instances;

Expand All @@ -89,14 +76,6 @@ void EventSub::UnregisterInstance()
_instances.erase(it, _instances.end());
}

void EventSub::ClearAllEvents()
{
std::lock_guard<std::mutex> lock(_instancesMtx);
for (const auto &eventSub : _instances) {
eventSub->ClearEvents();
}
}

void EventSub::ConnectThread()
{
while (!_disconnect) {
Expand Down Expand Up @@ -169,10 +148,9 @@ void EventSub::Disconnect()
ClearActiveSubscriptions();
}

std::vector<Event> EventSub::Events()
EventSubMessageBuffer EventSub::RegisterForEvents()
{
std::lock_guard<std::mutex> lock(_messageMtx);
return _messages;
return _dispatcher.RegisterClient();
}

bool EventSub::SubscriptionIsActive(const std::string &id)
Expand Down Expand Up @@ -358,8 +336,7 @@ void EventSub::HandleNotification(obs_data_t *data)
event.type = obs_data_get_string(subscription, "type");
OBSDataAutoRelease eventData = obs_data_get_obj(data, "event");
event.data = eventData;
std::lock_guard<std::mutex> lock(_messageMtx);
_messages.emplace_back(event);
_dispatcher.DispatchMessage(event);
}

void EventSub::HandleReconnect(obs_data_t *data)
Expand Down
17 changes: 9 additions & 8 deletions plugins/twitch/event-sub.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#pragma once
#include "message-dispatcher.hpp"

#include <obs.hpp>
#include <websocketpp/client.hpp>
#include <QObject>
Expand All @@ -23,9 +25,13 @@ typedef websocketpp::client<websocketpp::config::asio_tls_client>
EventSubWSClient;
#endif

using websocketpp::connection_hdl;
struct Event;
class TwitchToken;

using websocketpp::connection_hdl;
using EventSubMessageBuffer = std::shared_ptr<MessageBuffer<Event>>;
using EventSubMessageDispatcher = MessageDispatcher<Event>;

struct Event {
std::string id;
std::string type;
Expand All @@ -47,11 +53,10 @@ class EventSub : public QObject {

void Connect();
void Disconnect();
std::vector<Event> Events();
[[nodiscard]] EventSubMessageBuffer RegisterForEvents();
bool SubscriptionIsActive(const std::string &id);
static std::string AddEventSubscribtion(std::shared_ptr<TwitchToken>,
Subscription);
static void ClearAllEvents();
void ClearActiveSubscriptions();

private:
Expand All @@ -71,8 +76,6 @@ class EventSub : public QObject {
void HandleReconnect(obs_data_t *);
void HanldeRevocation(obs_data_t *);

void ClearEvents();

void RegisterInstance();
void UnregisterInstance();

Expand All @@ -87,14 +90,12 @@ class EventSub : public QObject {
std::string _url;
std::string _sessionID;

std::mutex _messageMtx;
std::vector<Event> _messages;
std::deque<std::string> _messageIDs;
std::mutex _subscriptionMtx;
std::set<Subscription> _activeSubscriptions;
static std::mutex _instancesMtx;
static std::vector<EventSub *> _instances;
static bool _setupDone;
EventSubMessageDispatcher _dispatcher;
};

} // namespace advss
92 changes: 55 additions & 37 deletions plugins/twitch/macro-condition-twitch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ void MacroConditionTwitch::SetCondition(const Condition &condition)
ResetSubscription();
}

void MacroConditionTwitch::SetToken(const std::weak_ptr<TwitchToken> &t)
{
_token = t;
}

void MacroConditionTwitch::SetChannel(const TwitchChannel &channel)
{
_channel = channel;
Expand Down Expand Up @@ -281,19 +286,21 @@ setTempVarsHelper(obs_data_t *data,

bool MacroConditionTwitch::CheckChannelGenericEvents(TwitchToken &token)
{
auto eventSub = token.GetEventSub();
if (!eventSub) {
if (!_eventBuffer) {
return false;
}

auto events = eventSub->Events();
for (const auto &event : events) {
if (_subscriptionID != event.id) {
while (!_eventBuffer->Empty()) {
auto event = _eventBuffer->ConsumeMessage();
if (!event) {
continue;
}
if (_subscriptionID != event->id) {
continue;
}
SetVariableValue(event.ToString());
SetVariableValue(event->ToString());
setTempVarsHelper(
event.data,
event->data,
std::bind(&MacroConditionTwitch::SetTempVarValue, this,
std::placeholders::_1,
std::placeholders::_2));
Expand All @@ -305,14 +312,16 @@ bool MacroConditionTwitch::CheckChannelGenericEvents(TwitchToken &token)

bool MacroConditionTwitch::CheckChannelLiveEvents(TwitchToken &token)
{
auto eventSub = token.GetEventSub();
if (!eventSub) {
if (!_eventBuffer) {
return false;
}

auto events = eventSub->Events();
for (const auto &event : events) {
if (_subscriptionID != event.id) {
while (!_eventBuffer->Empty()) {
auto event = _eventBuffer->ConsumeMessage();
if (!event) {
continue;
}
if (_subscriptionID != event->id) {
continue;
}

Expand All @@ -321,15 +330,15 @@ bool MacroConditionTwitch::CheckChannelLiveEvents(TwitchToken &token)
continue;
}

auto type = obs_data_get_string(event.data, "type");
auto type = obs_data_get_string(event->data, "type");
const auto &typeId = it->second;
if (type != typeId) {
continue;
}

SetVariableValue(event.ToString());
SetVariableValue(event->ToString());
setTempVarsHelper(
event.data,
event->data,
std::bind(&MacroConditionTwitch::SetTempVarValue, this,
std::placeholders::_1,
std::placeholders::_2));
Expand Down Expand Up @@ -422,6 +431,20 @@ void MacroConditionTwitch::SetTempVarValues(const ChannelInfo &info)
info.is_branded_content ? "true" : "false");
}

bool advss::MacroConditionTwitch::EventSubscriptionIsSetup(
const std::shared_ptr<EventSub> &eventSub)
{
if (!eventSub) {
return false;
}
SetupEventSubscription(*eventSub);
if (_subscriptionIDFuture.valid()) {
// Still waiting for the subscription to be registered
return false;
}
return true;
}

bool MacroConditionTwitch::CheckCondition()
{
SetVariableValue("");
Expand All @@ -432,15 +455,8 @@ bool MacroConditionTwitch::CheckCondition()

auto eventSub = token->GetEventSub();

if (IsUsingEventSubCondition()) {
if (!eventSub) {
return false;
}
CheckEventSubscription(*eventSub);
if (_subscriptionIDFuture.valid()) {
// Still waiting for the subscription to be registered
return false;
}
if (IsUsingEventSubCondition() && !EventSubscriptionIsSetup(eventSub)) {
return false;
}

switch (_condition) {
Expand Down Expand Up @@ -688,7 +704,7 @@ bool MacroConditionTwitch::ConditionIsSupportedByToken()
return token->AnyOptionIsEnabled(options);
}

void MacroConditionTwitch::SetupEventSubscriptions()
void MacroConditionTwitch::RegisterEventSubscription()
{
if (!IsUsingEventSubCondition()) {
return;
Expand Down Expand Up @@ -773,10 +789,11 @@ void MacroConditionTwitch::SetupEventSubscriptions()

void MacroConditionTwitch::ResetSubscription()
{
_eventBuffer.reset();
_subscriptionID = "";
}

void MacroConditionTwitch::CheckEventSubscription(EventSub &eventSub)
void MacroConditionTwitch::SetupEventSubscription(EventSub &eventSub)
{
if (_subscriptionIDFuture.valid()) {
if (_subscriptionIDFuture.wait_for(std::chrono::seconds(0)) !=
Expand All @@ -788,7 +805,8 @@ void MacroConditionTwitch::CheckEventSubscription(EventSub &eventSub)
if (eventSub.SubscriptionIsActive(_subscriptionID)) {
return;
}
SetupEventSubscriptions();
RegisterEventSubscription();
_eventBuffer = eventSub.RegisterForEvents();
}

bool MacroConditionTwitch::IsUsingEventSubCondition()
Expand Down Expand Up @@ -1298,10 +1316,10 @@ void MacroConditionTwitchEdit::TwitchTokenChanged(const QString &token)
}

auto lock = LockContext();
_entryData->_token = GetWeakTwitchTokenByQString(token);
_category->SetToken(_entryData->_token);
_channel->SetToken(_entryData->_token);
_pointsReward->SetToken(_entryData->_token);
_entryData->SetToken(GetWeakTwitchTokenByQString(token));
_category->SetToken(_entryData->GetToken());
_channel->SetToken(_entryData->GetToken());
_pointsReward->SetToken(_entryData->GetToken());
_entryData->ResetChatConnection();

SetWidgetVisibility();
Expand All @@ -1322,14 +1340,14 @@ void MacroConditionTwitchEdit::CheckToken()
if (!_entryData) {
return;
}
if (_entryData->_token.expired()) {
if (_entryData->GetToken().expired()) {
SetTokenWarning(
true,
obs_module_text(
"AdvSceneSwitcher.twitchToken.noSelection"));
return;
}
if (!TokenIsValid(_entryData->_token)) {
if (!TokenIsValid(_entryData->GetToken())) {
SetTokenWarning(
true, obs_module_text(
"AdvSceneSwitcher.twitchToken.notValid"));
Expand Down Expand Up @@ -1470,17 +1488,17 @@ void MacroConditionTwitchEdit::UpdateEntryData()

_conditions->setCurrentIndex(_conditions->findData(
static_cast<int>(_entryData->GetCondition())));
_tokens->SetToken(_entryData->_token);
_channel->SetToken(_entryData->_token);
_tokens->SetToken(_entryData->GetToken());
_channel->SetToken(_entryData->GetToken());
_channel->SetChannel(_entryData->_channel);
_pointsReward->SetToken(_entryData->_token);
_pointsReward->SetToken(_entryData->GetToken());
_pointsReward->SetChannel(_entryData->_channel);
_pointsReward->SetPointsReward(_entryData->_pointsReward);
_streamTitle->setText(_entryData->_streamTitle);
_regexTitle->SetRegexConfig(_entryData->_regexTitle);
_chatMessage->setPlainText(_entryData->_chatMessage);
_regexChat->SetRegexConfig(_entryData->_regexChat);
_category->SetToken(_entryData->_token);
_category->SetToken(_entryData->GetToken());
_category->SetCategory(_entryData->_category);

SetWidgetVisibility();
Expand Down
11 changes: 8 additions & 3 deletions plugins/twitch/macro-condition-twitch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class MacroConditionTwitch : public MacroCondition {

void SetCondition(const Condition &condition);
Condition GetCondition() const { return _condition; }
void SetToken(const std::weak_ptr<TwitchToken> &);
std::weak_ptr<TwitchToken> GetToken() const { return _token; }
void SetChannel(const TwitchChannel &channel);
TwitchChannel GetChannel() const { return _channel; }
void SetPointsReward(const TwitchPointsReward &pointsReward);
Expand All @@ -93,7 +95,6 @@ class MacroConditionTwitch : public MacroCondition {
bool Load(obs_data_t *obj);
bool ConditionIsSupportedByToken();

std::weak_ptr<TwitchToken> _token;
TwitchChannel _channel;
TwitchPointsReward _pointsReward;
StringVariable _streamTitle = obs_module_text(
Expand All @@ -108,10 +109,11 @@ class MacroConditionTwitch : public MacroCondition {
bool CheckChannelLiveEvents(TwitchToken &token);
bool CheckChatMessages(TwitchToken &token);

void SetupEventSubscriptions();
void RegisterEventSubscription();
void ResetSubscription();
void CheckEventSubscription(EventSub &);
void SetupEventSubscription(EventSub &);
bool IsUsingEventSubCondition();
bool EventSubscriptionIsSetup(const std::shared_ptr<EventSub> &);
void AddChannelGenericEventSubscription(
const char *version, bool includeModeratorId = false,
const char *mainUserIdFieldName = "broadcaster_user_id",
Expand All @@ -123,6 +125,9 @@ class MacroConditionTwitch : public MacroCondition {

Condition _condition = Condition::LIVE_POLLING;

std::weak_ptr<TwitchToken> _token;

EventSubMessageBuffer _eventBuffer;
std::future<std::string> _subscriptionIDFuture;
std::string _subscriptionID;

Expand Down

0 comments on commit a628831

Please sign in to comment.