Skip to content

Commit

Permalink
feat!: Refactored topic structure for more granular flow and access (#19
Browse files Browse the repository at this point in the history
)

* adding some notes for refactor @EdwardLu2018

* more notes

* pull topic-v5 branch

* use new topic structure

* revert package

* update remote render public/private use

* send repsonse on private topics

* add server publish to private user

* add client id to successive messages

* just filter on /r/ type

* add message logger by flag

* cleanup logging

* move logging to main unity
  • Loading branch information
mwfarb authored Oct 14, 2024
1 parent ddda050 commit ec3806b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 68 deletions.
23 changes: 15 additions & 8 deletions ApplicationLauncher~/renderfusion-application-launcher.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import json
import subprocess
from arena import *
from sys import platform
import logging
import subprocess
import uuid
from sys import platform

from arena import *

scene = Scene(host="arena-dev1.conix.io", scene="example")
CLIENT_CONNECT = "realm/g/a/hybrid_rendering/client/connect/#"
CLIENT_DISCONNECT = "realm/g/a/hybrid_rendering/client/disconnect/#"
CLIENT_REMOTE = "realm/g/a/hybrid_rendering/client/remote/#"
SERVER_HEALTH = "realm/g/a/hybrid_rendering/server/health/#"
HAL_CONNECT = "realm/g/a/hybrid_rendering/HAL/connect/"
topicParams = { # Reusable topic param dict
'realm': scene.realm,
'nameSpace': scene.namespace,
'sceneName': scene.scene,
'idTag': '-'
}
CLIENT_CONNECT = PUBLISH_TOPICS.SCENE_RENDER_PRIVATE.substitute(topicParams)
CLIENT_DISCONNECT = PUBLISH_TOPICS.SCENE_RENDER_PRIVATE.substitute(topicParams)
CLIENT_REMOTE = PUBLISH_TOPICS.SCENE_RENDER_PRIVATE.substitute(topicParams)
SERVER_HEALTH = PUBLISH_TOPICS.SCENE_RENDER_PUBLIC.substitute(topicParams)
HAL_CONNECT = PUBLISH_TOPICS.SCENE_RENDER_PUBLIC.substitute(topicParams)

clientDict = dict()
SceneDict = dict()
Expand Down
8 changes: 4 additions & 4 deletions Runtime/Scripts/PeerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public PeerConnection(RTCPeerConnection peer, ConnectData data, ISignaling signa

_peer = peer;
_peer.OnNegotiationNeeded = () => StartCoroutine(OnNegotiationNeeded());
_peer.OnIceCandidate = candidate => m_signaler.SendCandidate(m_id, candidate);
_peer.OnIceCandidate = candidate => m_signaler.SendCandidate(m_id, m_clientId, candidate);
_peer.OnDataChannel = OnDataChannel;

sourceStream = new MediaStream();
Expand Down Expand Up @@ -185,7 +185,7 @@ public IEnumerator OnNegotiationNeeded()
if (!op1.IsError)
{
// Debug.Log($"[{m_clientId}] sent offer.");
m_signaler.SendOffer(m_id, _peer.LocalDescription);
m_signaler.SendOffer(m_id, m_clientId, _peer.LocalDescription);
}
}
else
Expand Down Expand Up @@ -223,7 +223,7 @@ public IEnumerator CreateAndSendAnswerCoroutine(SDPData offer)
if (!op1.IsError)
{
// Debug.Log($"[{m_clientId}] sent answer.");
m_signaler.SendAnswer(m_id, _peer.LocalDescription);
m_signaler.SendAnswer(m_id, m_clientId, _peer.LocalDescription);
}
}
else
Expand Down Expand Up @@ -311,7 +311,7 @@ public IEnumerator GetStats(float interval = 1.0f)
;
}
}
m_signaler.SendStats(text);
m_signaler.SendStats(text, m_clientId);
// Debug.Log(statsOperation);
}
}
Expand Down
82 changes: 30 additions & 52 deletions Runtime/Scripts/Signaling/ARENAMQTTSignaling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,7 @@ namespace ArenaUnity.RenderFusion.Signaling
{
public class ARENAMQTTSignaling : ISignaling
{
private static readonly string TOPIC_PREFIX = "realm/g/a";

private readonly string SERVER_OFFER_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/server/offer";
private readonly string SERVER_ANSWER_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/server/answer";
private readonly string SERVER_CANDIDATE_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/server/candidate";
private readonly string SERVER_HEALTH_CHECK_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/server/health";
private readonly string SERVER_STATS_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/server/stats";
private readonly string SERVER_CONNECT_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/server/connect";

private readonly string CLIENT_CONNECT_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/client/connect";
private readonly string CLIENT_DISCONNECT_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/client/disconnect";
private readonly string CLIENT_OFFER_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/client/offer";
private readonly string CLIENT_ANSWER_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/client/answer";
private readonly string CLIENT_CANDIDATE_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/client/candidate";
private readonly string CLIENT_HEALTH_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/client/health";
// private readonly string CLIENT_STATS_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/client/stats";
private readonly string HAL_CONNECT_TOPIC_PREFIX = $"{TOPIC_PREFIX}/hybrid_rendering/HAL/connect";

private string SERVER_OFFER_TOPIC;
private string SERVER_ANSWER_TOPIC;
private string SERVER_CANDIDATE_TOPIC;
private string SERVER_HEALTH_CHECK_TOPIC;
private string SERVER_STATS_TOPIC;
private string SERVER_CONNECT_TOPIC;
private ArenaTopics subRenderServerTopic;

private string[] m_subbedTopics;

Expand All @@ -51,23 +28,14 @@ public ARENAMQTTSignaling(SynchronizationContext mainThreadContext) {
var scene = ArenaClientScene.Instance;

m_clientId = "cloud-" + Guid.NewGuid().ToString();

SERVER_OFFER_TOPIC = $"{SERVER_OFFER_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}";
SERVER_ANSWER_TOPIC = $"{SERVER_ANSWER_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}";
SERVER_CANDIDATE_TOPIC = $"{SERVER_CANDIDATE_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}";
SERVER_HEALTH_CHECK_TOPIC = $"{SERVER_HEALTH_CHECK_PREFIX}/{scene.namespaceName}/{scene.sceneName}";
SERVER_STATS_TOPIC = $"{SERVER_STATS_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}";
SERVER_CONNECT_TOPIC = $"{SERVER_CONNECT_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}";

subRenderServerTopic = new ArenaTopics(
realm: scene.realm,
name_space: scene.namespaceName,
scenename: scene.sceneName,
idtag: "-"
);
m_subbedTopics = new string[] {
$"{CLIENT_CONNECT_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}/#",
$"{CLIENT_DISCONNECT_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}/#",
$"{CLIENT_OFFER_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}/#",
$"{CLIENT_ANSWER_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}/#",
$"{CLIENT_CANDIDATE_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}/#",
$"{CLIENT_HEALTH_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}/#",
// $"{CLIENT_STATS_TOPIC_PREFIX}/{scene.namespaceName}/{scene.sceneName}/#",
$"{HAL_CONNECT_TOPIC_PREFIX}/{m_halID}/#",
$"{subRenderServerTopic.SUB_SCENE_RENDER_PRIVATE}",
};

for (int i = 0; i < m_subbedTopics.Length; i++) {
Expand Down Expand Up @@ -97,10 +65,18 @@ public void CloseConnection()
{
}

private void Publish(string topic, string msg)
private void Publish(string toUid, string msg)
{
var scene = ArenaClientScene.Instance;
byte[] payload = System.Text.Encoding.UTF8.GetBytes(msg);
var pubRenderServerTopic = new ArenaTopics(
realm: scene.realm,
name_space: scene.namespaceName,
scenename: scene.sceneName,
idtag: "-",
touid: toUid
);
var topic = (toUid == null) ? pubRenderServerTopic.PUB_SCENE_RENDER : pubRenderServerTopic.PUB_SCENE_RENDER_PRI_SERV;
scene.Publish(topic, payload);
}

Expand All @@ -113,10 +89,10 @@ public void SendConnect()
id = m_clientId,
data = ""
};
Publish(SERVER_CONNECT_TOPIC, JsonUtility.ToJson(routedMessage));
Publish(null, JsonUtility.ToJson(routedMessage));
}

public void SendOffer(string id, RTCSessionDescription offer)
public void SendOffer(string id, string toUid, RTCSessionDescription offer)
{
RoutedMessage<SDPData> routedMessage = new RoutedMessage<SDPData>{
type = "offer",
Expand All @@ -128,10 +104,10 @@ public void SendOffer(string id, RTCSessionDescription offer)
}
};

Publish(SERVER_OFFER_TOPIC, JsonUtility.ToJson(routedMessage));
Publish(toUid, JsonUtility.ToJson(routedMessage));
}

public void SendAnswer(string id, RTCSessionDescription answer)
public void SendAnswer(string id, string toUid, RTCSessionDescription answer)
{
RoutedMessage<SDPData> routedMessage = new RoutedMessage<SDPData>
{
Expand All @@ -144,7 +120,7 @@ public void SendAnswer(string id, RTCSessionDescription answer)
}
};

Publish(SERVER_ANSWER_TOPIC, JsonUtility.ToJson(routedMessage));
Publish(toUid, JsonUtility.ToJson(routedMessage));
}

public void SendHealthCheck(string id){
Expand All @@ -157,7 +133,7 @@ public void SendHealthCheck(string id){
id = id,
data = $"{scene.namespaceName}/{scene.sceneName}"
};
Publish(SERVER_HEALTH_CHECK_TOPIC, JsonUtility.ToJson(healthCheck));
Publish(null, JsonUtility.ToJson(healthCheck));
}

public void UpdateHALInfo(string id, bool halStatus)
Expand All @@ -166,7 +142,7 @@ public void UpdateHALInfo(string id, bool halStatus)
m_halStatus = halStatus;
}

public void SendCandidate(string id, RTCIceCandidate candidate)
public void SendCandidate(string id, string toUid, RTCIceCandidate candidate)
{
RoutedMessage<CandidateData> routedMessage = new RoutedMessage<CandidateData>
{
Expand All @@ -180,17 +156,19 @@ public void SendCandidate(string id, RTCIceCandidate candidate)
}
};

Publish(SERVER_CANDIDATE_TOPIC, JsonUtility.ToJson(routedMessage));
Publish(toUid, JsonUtility.ToJson(routedMessage));
}

public void SendStats(string stats)
public void SendStats(string stats, string toUid)
{
Publish(SERVER_STATS_TOPIC, stats);
Publish(toUid, stats);
}

protected void ProcessMessage(string topic, string content)
{
if ( !m_subbedTopics.Any(s => topic.Contains( s.Substring(0,s.Length-2) )) ) return;
// filter messages based on expected payload format, only read "r" messages
var topicSplit = topic.Split("/");
if (topicSplit.Length <= 4 || topicSplit[4] != "r") return;

try
{
Expand Down
8 changes: 4 additions & 4 deletions Runtime/Scripts/Signaling/ISignaling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ public interface ISignaling
void OpenConnection();
void CloseConnection();
void SendConnect();
void SendOffer(string id, RTCSessionDescription offer);
void SendAnswer(string id, RTCSessionDescription answer);
void SendCandidate(string id, RTCIceCandidate candidate);
void SendOffer(string id, string toUid, RTCSessionDescription offer);
void SendAnswer(string id, string toUid, RTCSessionDescription answer);
void SendCandidate(string id, string toUid, RTCIceCandidate candidate);
void SendHealthCheck(string id);
void SendStats(string stats);
void SendStats(string stats, string toUid);
void UpdateHALInfo(string id, bool halStatus);
}
}

0 comments on commit ec3806b

Please sign in to comment.