diff --git a/plugins/twitch/event-sub.cpp b/plugins/twitch/event-sub.cpp index dd801a727..68b3f7fa5 100644 --- a/plugins/twitch/event-sub.cpp +++ b/plugins/twitch/event-sub.cpp @@ -3,7 +3,6 @@ #include "twitch-helpers.hpp" #include -#include namespace advss { @@ -27,6 +26,8 @@ static constexpr std::string_view registerSubscriptionPath = #endif static const int reconnectDelay = 15; +#undef DispatchMessage + EventSub::EventSub() : QObject(nullptr) { _client.get_alog().clear_channels( @@ -59,20 +60,6 @@ EventSub::~EventSub() UnregisterInstance(); } -static bool setupEventSubMessageClear() -{ - AddIntervalResetStep(&EventSub::ClearAllEvents); - return true; -} - -bool EventSub::_setupDone = setupEventSubMessageClear(); - -void EventSub::ClearEvents() -{ - std::lock_guard lock(_messageMtx); - _messages.clear(); -} - std::mutex EventSub::_instancesMtx; std::vector EventSub::_instances; @@ -89,14 +76,6 @@ void EventSub::UnregisterInstance() _instances.erase(it, _instances.end()); } -void EventSub::ClearAllEvents() -{ - std::lock_guard lock(_instancesMtx); - for (const auto &eventSub : _instances) { - eventSub->ClearEvents(); - } -} - void EventSub::ConnectThread() { while (!_disconnect) { @@ -169,10 +148,9 @@ void EventSub::Disconnect() ClearActiveSubscriptions(); } -std::vector EventSub::Events() +EventSubMessageBuffer EventSub::RegisterForEvents() { - std::lock_guard lock(_messageMtx); - return _messages; + return _dispatcher.RegisterClient(); } bool EventSub::SubscriptionIsActive(const std::string &id) @@ -358,8 +336,7 @@ void EventSub::HandleNotification(obs_data_t *data) event.type = obs_data_get_string(subscription, "type"); OBSDataAutoRelease eventData = obs_data_get_obj(data, "event"); event.data = eventData; - std::lock_guard lock(_messageMtx); - _messages.emplace_back(event); + _dispatcher.DispatchMessage(event); } void EventSub::HandleReconnect(obs_data_t *data) diff --git a/plugins/twitch/event-sub.hpp b/plugins/twitch/event-sub.hpp index 6ba8d6024..93dbe0045 100644 --- a/plugins/twitch/event-sub.hpp +++ b/plugins/twitch/event-sub.hpp @@ -1,4 +1,6 @@ #pragma once +#include "message-dispatcher.hpp" + #include #include #include @@ -23,9 +25,13 @@ typedef websocketpp::client EventSubWSClient; #endif -using websocketpp::connection_hdl; +struct Event; class TwitchToken; +using websocketpp::connection_hdl; +using EventSubMessageBuffer = std::shared_ptr>; +using EventSubMessageDispatcher = MessageDispatcher; + struct Event { std::string id; std::string type; @@ -47,11 +53,10 @@ class EventSub : public QObject { void Connect(); void Disconnect(); - std::vector Events(); + [[nodiscard]] EventSubMessageBuffer RegisterForEvents(); bool SubscriptionIsActive(const std::string &id); static std::string AddEventSubscribtion(std::shared_ptr, Subscription); - static void ClearAllEvents(); void ClearActiveSubscriptions(); private: @@ -71,8 +76,6 @@ class EventSub : public QObject { void HandleReconnect(obs_data_t *); void HanldeRevocation(obs_data_t *); - void ClearEvents(); - void RegisterInstance(); void UnregisterInstance(); @@ -87,14 +90,12 @@ class EventSub : public QObject { std::string _url; std::string _sessionID; - std::mutex _messageMtx; - std::vector _messages; std::deque _messageIDs; std::mutex _subscriptionMtx; std::set _activeSubscriptions; static std::mutex _instancesMtx; static std::vector _instances; - static bool _setupDone; + EventSubMessageDispatcher _dispatcher; }; } // namespace advss diff --git a/plugins/twitch/macro-condition-twitch.cpp b/plugins/twitch/macro-condition-twitch.cpp index 708b157d5..7ced72768 100644 --- a/plugins/twitch/macro-condition-twitch.cpp +++ b/plugins/twitch/macro-condition-twitch.cpp @@ -229,6 +229,11 @@ void MacroConditionTwitch::SetCondition(const Condition &condition) ResetSubscription(); } +void MacroConditionTwitch::SetToken(const std::weak_ptr &t) +{ + _token = t; +} + void MacroConditionTwitch::SetChannel(const TwitchChannel &channel) { _channel = channel; @@ -281,19 +286,21 @@ setTempVarsHelper(obs_data_t *data, bool MacroConditionTwitch::CheckChannelGenericEvents(TwitchToken &token) { - auto eventSub = token.GetEventSub(); - if (!eventSub) { + if (!_eventBuffer) { return false; } - auto events = eventSub->Events(); - for (const auto &event : events) { - if (_subscriptionID != event.id) { + while (!_eventBuffer->Empty()) { + auto event = _eventBuffer->ConsumeMessage(); + if (!event) { + continue; + } + if (_subscriptionID != event->id) { continue; } - SetVariableValue(event.ToString()); + SetVariableValue(event->ToString()); setTempVarsHelper( - event.data, + event->data, std::bind(&MacroConditionTwitch::SetTempVarValue, this, std::placeholders::_1, std::placeholders::_2)); @@ -305,14 +312,16 @@ bool MacroConditionTwitch::CheckChannelGenericEvents(TwitchToken &token) bool MacroConditionTwitch::CheckChannelLiveEvents(TwitchToken &token) { - auto eventSub = token.GetEventSub(); - if (!eventSub) { + if (!_eventBuffer) { return false; } - auto events = eventSub->Events(); - for (const auto &event : events) { - if (_subscriptionID != event.id) { + while (!_eventBuffer->Empty()) { + auto event = _eventBuffer->ConsumeMessage(); + if (!event) { + continue; + } + if (_subscriptionID != event->id) { continue; } @@ -321,15 +330,15 @@ bool MacroConditionTwitch::CheckChannelLiveEvents(TwitchToken &token) continue; } - auto type = obs_data_get_string(event.data, "type"); + auto type = obs_data_get_string(event->data, "type"); const auto &typeId = it->second; if (type != typeId) { continue; } - SetVariableValue(event.ToString()); + SetVariableValue(event->ToString()); setTempVarsHelper( - event.data, + event->data, std::bind(&MacroConditionTwitch::SetTempVarValue, this, std::placeholders::_1, std::placeholders::_2)); @@ -422,6 +431,20 @@ void MacroConditionTwitch::SetTempVarValues(const ChannelInfo &info) info.is_branded_content ? "true" : "false"); } +bool advss::MacroConditionTwitch::EventSubscriptionIsSetup( + const std::shared_ptr &eventSub) +{ + if (!eventSub) { + return false; + } + SetupEventSubscription(*eventSub); + if (_subscriptionIDFuture.valid()) { + // Still waiting for the subscription to be registered + return false; + } + return true; +} + bool MacroConditionTwitch::CheckCondition() { SetVariableValue(""); @@ -432,15 +455,8 @@ bool MacroConditionTwitch::CheckCondition() auto eventSub = token->GetEventSub(); - if (IsUsingEventSubCondition()) { - if (!eventSub) { - return false; - } - CheckEventSubscription(*eventSub); - if (_subscriptionIDFuture.valid()) { - // Still waiting for the subscription to be registered - return false; - } + if (IsUsingEventSubCondition() && !EventSubscriptionIsSetup(eventSub)) { + return false; } switch (_condition) { @@ -688,7 +704,7 @@ bool MacroConditionTwitch::ConditionIsSupportedByToken() return token->AnyOptionIsEnabled(options); } -void MacroConditionTwitch::SetupEventSubscriptions() +void MacroConditionTwitch::RegisterEventSubscription() { if (!IsUsingEventSubCondition()) { return; @@ -773,10 +789,11 @@ void MacroConditionTwitch::SetupEventSubscriptions() void MacroConditionTwitch::ResetSubscription() { + _eventBuffer.reset(); _subscriptionID = ""; } -void MacroConditionTwitch::CheckEventSubscription(EventSub &eventSub) +void MacroConditionTwitch::SetupEventSubscription(EventSub &eventSub) { if (_subscriptionIDFuture.valid()) { if (_subscriptionIDFuture.wait_for(std::chrono::seconds(0)) != @@ -788,7 +805,8 @@ void MacroConditionTwitch::CheckEventSubscription(EventSub &eventSub) if (eventSub.SubscriptionIsActive(_subscriptionID)) { return; } - SetupEventSubscriptions(); + RegisterEventSubscription(); + _eventBuffer = eventSub.RegisterForEvents(); } bool MacroConditionTwitch::IsUsingEventSubCondition() @@ -1298,10 +1316,10 @@ void MacroConditionTwitchEdit::TwitchTokenChanged(const QString &token) } auto lock = LockContext(); - _entryData->_token = GetWeakTwitchTokenByQString(token); - _category->SetToken(_entryData->_token); - _channel->SetToken(_entryData->_token); - _pointsReward->SetToken(_entryData->_token); + _entryData->SetToken(GetWeakTwitchTokenByQString(token)); + _category->SetToken(_entryData->GetToken()); + _channel->SetToken(_entryData->GetToken()); + _pointsReward->SetToken(_entryData->GetToken()); _entryData->ResetChatConnection(); SetWidgetVisibility(); @@ -1322,14 +1340,14 @@ void MacroConditionTwitchEdit::CheckToken() if (!_entryData) { return; } - if (_entryData->_token.expired()) { + if (_entryData->GetToken().expired()) { SetTokenWarning( true, obs_module_text( "AdvSceneSwitcher.twitchToken.noSelection")); return; } - if (!TokenIsValid(_entryData->_token)) { + if (!TokenIsValid(_entryData->GetToken())) { SetTokenWarning( true, obs_module_text( "AdvSceneSwitcher.twitchToken.notValid")); @@ -1470,17 +1488,17 @@ void MacroConditionTwitchEdit::UpdateEntryData() _conditions->setCurrentIndex(_conditions->findData( static_cast(_entryData->GetCondition()))); - _tokens->SetToken(_entryData->_token); - _channel->SetToken(_entryData->_token); + _tokens->SetToken(_entryData->GetToken()); + _channel->SetToken(_entryData->GetToken()); _channel->SetChannel(_entryData->_channel); - _pointsReward->SetToken(_entryData->_token); + _pointsReward->SetToken(_entryData->GetToken()); _pointsReward->SetChannel(_entryData->_channel); _pointsReward->SetPointsReward(_entryData->_pointsReward); _streamTitle->setText(_entryData->_streamTitle); _regexTitle->SetRegexConfig(_entryData->_regexTitle); _chatMessage->setPlainText(_entryData->_chatMessage); _regexChat->SetRegexConfig(_entryData->_regexChat); - _category->SetToken(_entryData->_token); + _category->SetToken(_entryData->GetToken()); _category->SetCategory(_entryData->_category); SetWidgetVisibility(); diff --git a/plugins/twitch/macro-condition-twitch.hpp b/plugins/twitch/macro-condition-twitch.hpp index d141130ff..e51cc097e 100644 --- a/plugins/twitch/macro-condition-twitch.hpp +++ b/plugins/twitch/macro-condition-twitch.hpp @@ -82,6 +82,8 @@ class MacroConditionTwitch : public MacroCondition { void SetCondition(const Condition &condition); Condition GetCondition() const { return _condition; } + void SetToken(const std::weak_ptr &); + std::weak_ptr GetToken() const { return _token; } void SetChannel(const TwitchChannel &channel); TwitchChannel GetChannel() const { return _channel; } void SetPointsReward(const TwitchPointsReward &pointsReward); @@ -93,7 +95,6 @@ class MacroConditionTwitch : public MacroCondition { bool Load(obs_data_t *obj); bool ConditionIsSupportedByToken(); - std::weak_ptr _token; TwitchChannel _channel; TwitchPointsReward _pointsReward; StringVariable _streamTitle = obs_module_text( @@ -108,10 +109,11 @@ class MacroConditionTwitch : public MacroCondition { bool CheckChannelLiveEvents(TwitchToken &token); bool CheckChatMessages(TwitchToken &token); - void SetupEventSubscriptions(); + void RegisterEventSubscription(); void ResetSubscription(); - void CheckEventSubscription(EventSub &); + void SetupEventSubscription(EventSub &); bool IsUsingEventSubCondition(); + bool EventSubscriptionIsSetup(const std::shared_ptr &); void AddChannelGenericEventSubscription( const char *version, bool includeModeratorId = false, const char *mainUserIdFieldName = "broadcaster_user_id", @@ -123,6 +125,9 @@ class MacroConditionTwitch : public MacroCondition { Condition _condition = Condition::LIVE_POLLING; + std::weak_ptr _token; + + EventSubMessageBuffer _eventBuffer; std::future _subscriptionIDFuture; std::string _subscriptionID;