Skip to content

Commit

Permalink
Add SSE support and deprecated socket-io
Browse files Browse the repository at this point in the history
  • Loading branch information
mateuscardosodeveloper committed Mar 21, 2024
1 parent 122890e commit 93283b9
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 45 deletions.
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ requests = "2.28.2"
python-dateutil = "^2.8.2"
python-socketio = {extras = ["asyncio_client"], version = "^5.7.2"}
aiohttp = "^3.8.4"
sseclient-py = "^1.8.0"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand Down
1 change: 1 addition & 0 deletions src/tagoio_sdk/infrastructure/api_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def request() -> ResultHandlerResponse:
data=dataBody,
params=requestParams.get("params"),
timeout=config.tagoSDKconfig["requestTimeout"],
stream=True,
)
)

Expand Down
44 changes: 44 additions & 0 deletions src/tagoio_sdk/infrastructure/api_sse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Union
from urllib.parse import urlencode, urljoin
from sseclient import SSEClient
from tagoio_sdk.common.tagoio_module import GenericModuleParams
from tagoio_sdk.regions import getConnectionURI


class OpenSSEWithID(GenericModuleParams):
def __init__(self, channel: str, resource_id: str):
super().__init__()
self.channel = channel
self.resource_id = resource_id


class OpenSSEWithoutID(GenericModuleParams):
def __init__(self, channel: str):
super().__init__()
self.channel = channel


OpenSSEConfig = Union[OpenSSEWithID, OpenSSEWithoutID]

channelsWithID = ["device_inspector", "analysis_console", "ui_dashboard"]
channelsWithoutID = ["notification", "analysis_trigger", "ui"]
channels = channelsWithID + channelsWithoutID


def isChannelWithID(params: OpenSSEConfig) -> bool:
return params.channel in channelsWithID


def openSSEListening(params: OpenSSEConfig) -> SSEClient:
base_url = getConnectionURI(params.region)["sse"]
url = urljoin(base_url, "/events")

query_params = {"token": params.token}
if isChannelWithID(params):
query_params["channel"] = f"{params.channel}.{params.resource_id}"
else:
query_params["channel"] = params.channel

url += "?" + urlencode(query_params)

return SSEClient(url)
80 changes: 37 additions & 43 deletions src/tagoio_sdk/modules/Analysis/Analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,11 @@
from typing import Callable, Optional

from tagoio_sdk.common.tagoio_module import TagoIOModule
from tagoio_sdk.infrastructure.api_sse import openSSEListening
from tagoio_sdk.modules.Services import Services

T_ANALYSIS_CONTEXT = os.environ.get("T_ANALYSIS_CONTEXT") or None

if T_ANALYSIS_CONTEXT is None:
import asyncio

from tagoio_sdk.infrastructure.api_socket import APISocket

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)


class Analysis(TagoIOModule):
def init(self, analysis: Callable):
Expand Down Expand Up @@ -61,46 +54,47 @@ def context():
self._analysis(context, data or [])

def __localRuntime(self):
tagoSocket = APISocket({"region": self.region, "token": self.token})
sio = tagoSocket.sio

async def connectSocket():
def ready(analysisObj: any): # TODO: Fix any
print(
"Analysis [{AnalysisName}] Started.\n".format(
AnalysisName=analysisObj["name"]
)
)

def connect():
print("Connected to TagoIO, Getting analysis information...")
analysis = self.doRequest({"path": "/info", "method": "GET"})

def disconnect():
print("\nDisconnected from TagoIO.\n\n")
if not analysis:
print("¬ Error :: Analysis not found or not active.")
return

def error(e: any):
print("Connection error", e)
if analysis.get("run_on") == "external":
print("¬ Warning :: Analysis is not set to run on external")

def analysisTrigger(scope: any):
self.__runLocal(
scope["environment"],
scope["data"],
scope["analysis_id"],
scope["token"],
)

sio.on("ready", ready)
sio.on("error", error)
sio.on("connect", connect)
sio.on("disconnect", disconnect)
sio.on("analysis::trigger", analysisTrigger)

await tagoSocket.connect()
tokenEnd = self.token[-5:]

try:
loop.run_until_complete(connectSocket())
except RuntimeError:
pass
sse = openSSEListening({
"token": self.token,
"region": self.region,
"channel": "analysis_trigger"
})
print(f"\n¬ Connected to TagoIO :: Analysis [{analysis['name']}]({tokenEnd}) is ready.")
print("¬ Waiting for analysis trigger...\n")
except Exception as e:
print("¬ Connection was closed, trying to reconnect...")
print(f"Error: {e}")
return

for event in sse.events():
try:
data = json.loads(event.data)

if not data:
continue

if data["analysis_id"] == analysis["id"]:
self.__runLocal(
data["environment"],
data["data"],
data["analysis_id"],
self.token,
)
except RuntimeError:
print("¬ Connection was closed, trying to reconnect...")
pass

@staticmethod
def use(analysis: Callable, params: Optional[str] = {"token": "unknown"}):
Expand Down
5 changes: 4 additions & 1 deletion src/tagoio_sdk/regions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
class RegionDefinition(TypedDict):
api: str
realtime: str
sse: str


# noRegionWarning = False

regionsDefinition = {
"usa-1": {"api": "https://api.tago.io", "realtime": "wss://realtime.tago.io"},
"env": None, # ? process object should be on trycatch.
"sse": "http://localhost:8080/events"
}

Regions = Literal["usa-1", "env"]
Expand All @@ -32,11 +34,12 @@ def getConnectionURI(region: Regions) -> RegionDefinition:
try:
api = os.environ.get("TAGOIO_API") or ""
realtime = os.environ.get("TAGOIO_REALTIME") or ""
sse = os.environ.get("TAGOIO_SSE") or ""

if api == "" and region != "env":
raise Exception("Invalid Env")

return {"api": api, "realtime": realtime}
return {"api": api, "realtime": realtime, "sse": sse}
except:
# global noRegionWarning
# if noRegionWarning is False:
Expand Down

0 comments on commit 93283b9

Please sign in to comment.