From f9369b33e0f07c1dde097d37debdb8c5e376983a Mon Sep 17 00:00:00 2001 From: naleeha Date: Tue, 9 Jul 2024 15:40:49 +0100 Subject: [PATCH 1/9] #1301 fixing the web socket server to client connection test without ssl --- .../finos/vuu/net/ws/WebSocketClient.scala | 2 +- .../net/ws/WebSocketServerClientTest.scala | 77 ++++++++----------- 2 files changed, 34 insertions(+), 45 deletions(-) diff --git a/vuu/src/main/scala/org/finos/vuu/net/ws/WebSocketClient.scala b/vuu/src/main/scala/org/finos/vuu/net/ws/WebSocketClient.scala index a3f08a9eb..845a49627 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/ws/WebSocketClient.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/ws/WebSocketClient.scala @@ -53,7 +53,7 @@ class WebSocketClient(url: String, port: Int)(implicit lifecycle: LifecycleConta // if (sslCtx != null) { // p.addLast(sslCtx.newHandler(ch.alloc, host, port)) // } - p.addLast("ssl-handler", sslCtx.newHandler(ch.alloc, "localhost", 8443)) + // p.addLast("ssl-handler", sslCtx.newHandler(ch.alloc, "localhost", 8443)) p.addLast(new HttpClientCodec, new HttpObjectAggregator(8192), WebSocketClientCompressionHandler.INSTANCE, handler) } }) diff --git a/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala b/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala index 78b64d51b..4adf5a788 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala @@ -1,19 +1,14 @@ package org.finos.vuu.net.ws +import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl} +import org.finos.toolbox.lifecycle.LifecycleContainer +import org.finos.toolbox.time.{Clock, DefaultClock} import org.finos.vuu.client.ClientHelperFns.awaitMsgBody -import org.finos.vuu.core.{CoreServerApiHandler, VuuWebSocketOptions} -import org.finos.vuu.core.module.ModuleContainer -import org.finos.vuu.core.table.TableContainer +import org.finos.vuu.core.{VuuSecurityOptions, VuuServer, VuuServerConfig, VuuWebSocketOptions} import org.finos.vuu.net._ import org.finos.vuu.net.auth.AlwaysHappyAuthenticator +import org.finos.vuu.net.http.VuuHttp2ServerOptions import org.finos.vuu.net.json.JsonVsSerializer -import org.finos.vuu.provider.{JoinTableProviderImpl, ProviderContainer} -import org.finos.vuu.viewport.ViewPortContainer -import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl} -import org.finos.toolbox.lifecycle.LifecycleContainer -import org.finos.toolbox.time.{Clock, DefaultClock} -import org.finos.vuu.feature.inmem.VuuInMemPlugin -import org.finos.vuu.plugin.DefaultPluginRegistry import org.scalatest.featurespec.AnyFeatureSpec import org.scalatest.matchers.should.Matchers @@ -21,62 +16,56 @@ class WebSocketServerClientTest extends AnyFeatureSpec with Matchers { Feature("Check that we can create a websocket server and client"){ - ignore("create web socket server and client and send data between"){ + Scenario("create connection without ssl between web socket server and client and send data between"){ implicit val timeProvider: Clock = new DefaultClock - implicit val lifecycle = new LifecycleContainer + implicit val lifecycle: LifecycleContainer = new LifecycleContainer implicit val metrics: MetricsProvider = new MetricsProviderImpl - val serializer = JsonVsSerializer - val authenticator = new AlwaysHappyAuthenticator - val tokenValidator = new AlwaysHappyLoginValidator - - val sessionContainer = new ClientSessionContainerImpl() + lifecycle.autoShutdownHook() - val joinProvider = JoinTableProviderImpl() + val http = 10011 + val ws = 10013 - val tableContainer = new TableContainer(joinProvider) + val config = VuuServerConfig( + VuuHttp2ServerOptions() + .withWebRoot("vuu/src/main/resources/www") + .withSslDisabled() + .withDirectoryListings(true) + .withPort(http), + VuuWebSocketOptions() + .withBindAddress("0.0.0.0") + .withUri("websocket") + .withWsPort(ws) + .withWssDisabled(), + VuuSecurityOptions() + .withAuthenticator(new AlwaysHappyAuthenticator) + .withLoginValidator(new AlwaysHappyLoginValidator) + ) - val providerContainer = new ProviderContainer(joinProvider) + val viewServer = new VuuServer(config) - val pluginRegistry = new DefaultPluginRegistry - pluginRegistry.registerPlugin(new VuuInMemPlugin) - - val viewPortContainer = new ViewPortContainer(tableContainer, providerContainer, pluginRegistry) - - val serverApi = new CoreServerApiHandler(viewPortContainer, tableContainer, providerContainer) - - val moduleContainer = new ModuleContainer - - val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer) - - val options = VuuWebSocketOptions.apply() - .withWsPort(18090) - .withBindAddress("0.0.0.0") - //.withWss() - - //order of creation here is important - val server = new WebSocketServer(options, factory) - - val client = new WebSocketClient("ws://localhost:8090/websocket", 18090) - implicit val vsClient = new WebSocketViewServerClient(client, JsonVsSerializer) + val client = new WebSocketClient(s"ws://localhost:$ws/websocket", ws) //todo review params - port specified twice + implicit val vsClient: WebSocketViewServerClient = new WebSocketViewServerClient(client, JsonVsSerializer) //set up a dependency on ws server from ws client. - lifecycle(client).dependsOn(server) + lifecycle(client).dependsOn(viewServer) //lifecycle registration is done in constructor of service classes, so sequence of create is important lifecycle.start() - vsClient.send(JsonViewServerMessage("", "", "", "",AuthenticateRequest("chris", "chris"))) + vsClient.send(JsonViewServerMessage("", "", "", "", AuthenticateRequest("chris", "chris"))) val authMsg = awaitMsgBody[AuthenticateSuccess].get authMsg.getClass should equal(classOf[AuthenticateSuccess]) - authMsg.token should not be ("") + authMsg.token should not be "" vsClient.send(JsonViewServerMessage("", "", authMsg.token, "chris", LoginRequest(authMsg.token, "chris"))) awaitMsgBody[LoginSuccess].get.token should equal(authMsg.token) + + lifecycle.stop() } } From cd640b8f1884cee1b384dd3723d12be2010a193b Mon Sep 17 00:00:00 2001 From: naleeha Date: Wed, 10 Jul 2024 12:34:51 +0100 Subject: [PATCH 2/9] #1301 added config option to disable heartbeat --- .../java/org/finos/vuu/VuuExampleMain.java | 6 +++-- .../main/scala/org/finos/vuu/SimulMain.scala | 4 ++- .../scala/org/finos/vuu/core/VuuServer.scala | 5 +++- .../org/finos/vuu/core/VuuServerOptions.scala | 25 ++++++++++++++++--- .../org/finos/vuu/net/RequestProcessor.scala | 7 +++--- .../org/finos/vuu/net/ViewServerHandler.scala | 7 ++++-- .../vuu/net/flowcontrol/FlowController.scala | 19 ++++++++++++++ .../vuu/test/impl/TestVuuServerImpl.scala | 4 ++- 8 files changed, 64 insertions(+), 13 deletions(-) diff --git a/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java b/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java index ddb8fca2c..6f9e18043 100644 --- a/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java +++ b/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java @@ -73,8 +73,10 @@ public static void main( String[] args ) VuuThreadingOptions.apply() .withTreeThreads(4) .withViewPortThreads(4), - new scala.collection.mutable.ListBuffer().toList(), - new scala.collection.mutable.ListBuffer().toList() + VuuClientConnectionOptions.apply() + .withHeartbeat(), + new scala.collection.mutable.ListBuffer().toList(), + new scala.collection.mutable.ListBuffer().toList() ).withModule(PriceModule.apply(clock, lifecycle, tableDefContainer)) .withModule(SimulationModule.apply(clock, lifecycle, tableDefContainer)) .withModule(MetricsModule.apply(clock, lifecycle, metrics, tableDefContainer)) diff --git a/example/main/src/main/scala/org/finos/vuu/SimulMain.scala b/example/main/src/main/scala/org/finos/vuu/SimulMain.scala index 7d90a9bbd..95c496669 100644 --- a/example/main/src/main/scala/org/finos/vuu/SimulMain.scala +++ b/example/main/src/main/scala/org/finos/vuu/SimulMain.scala @@ -61,7 +61,9 @@ object SimulMain extends App with StrictLogging { .withLoginValidator(new AlwaysHappyLoginValidator), VuuThreadingOptions() .withViewPortThreads(4) - .withTreeThreads(4) + .withTreeThreads(4), + VuuClientConnectionOptions() + .withHeartbeat() ).withModule(PriceModule()) .withModule(SimulationModule()) .withModule(MetricsModule()) diff --git a/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala b/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala index 61c1e7ceb..c9ac2df43 100644 --- a/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala +++ b/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala @@ -10,6 +10,7 @@ import org.finos.vuu.core.module.{ModuleContainer, RealizedViewServerModule, Sta import org.finos.vuu.core.table.{DataTable, TableContainer} import org.finos.vuu.feature.inmem.{VuuInMemPlugin, VuuInMemPluginType} import org.finos.vuu.net._ +import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, FlowControllerFactory, NoHeartbeatFlowController} import org.finos.vuu.net.http.{Http2Server, VuuHttp2Server} import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Serializer} import org.finos.vuu.net.rest.RestService @@ -37,6 +38,8 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer, final val authenticator: Authenticator = config.security.authenticator final val tokenValidator: LoginTokenValidator = config.security.loginTokenValidator + final val flowControllerFactory: FlowControllerFactory = FlowControllerFactory(config.clientConnection.hasHeartbeat) + final val sessionContainer = new ClientSessionContainerImpl() final val joinProvider: JoinTableProvider = JoinTableProviderImpl() @@ -55,7 +58,7 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer, final val serverApi = new CoreServerApiHandler(viewPortContainer, tableContainer, providerContainer) - final val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer) + final val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer, flowControllerFactory) //order of creation here is important final val server = new WebSocketServer(config.wsOptions, factory) diff --git a/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala b/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala index ce11a23e5..3b3986338 100644 --- a/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala +++ b/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala @@ -2,12 +2,11 @@ package org.finos.vuu.core import org.finos.vuu.core.module.ViewServerModule import org.finos.vuu.net.auth.AlwaysHappyAuthenticator +import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, NoHeartbeatFlowController} import org.finos.vuu.net.http.{VuuHttp2ServerOptions, VuuSecurityOptions} import org.finos.vuu.net.{AlwaysHappyLoginValidator, Authenticator, LoginTokenValidator} import org.finos.vuu.plugin.Plugin - - object VuuSecurityOptions{ def apply(): VuuSecurityOptions = { VuuSecurityOptionsImpl(new AlwaysHappyAuthenticator, new AlwaysHappyLoginValidator) @@ -26,6 +25,12 @@ object VuuThreadingOptions { } } +object VuuClientConnectionOptions { + def apply(): VuuClientConnectionOptions = { + VuuClientConnectionOptionsImpl(true) + } + +} trait VuuWebSocketOptions { def wsPort: Int def uri: String @@ -48,6 +53,12 @@ trait VuuThreadingOptions{ def treeThreads: Int } +trait VuuClientConnectionOptions { + def hasHeartbeat: Boolean + def withHeartbeat(): VuuClientConnectionOptions + def withHeartbeatDisabled(): VuuClientConnectionOptions +} + case class VuuSecurityOptionsImpl(authenticator: Authenticator, loginTokenValidator: LoginTokenValidator) extends VuuSecurityOptions{ override def withAuthenticator(authenticator: Authenticator): VuuSecurityOptions = this.copy(authenticator = authenticator) override def withLoginValidator(tokenValidator: LoginTokenValidator): VuuSecurityOptions = this.copy(loginTokenValidator = tokenValidator) @@ -75,8 +86,16 @@ case class VuuThreadingOptionsImpl(viewPortThreads: Int = 1, treeViewPortThreads override def treeThreads: Int = treeViewPortThreads } -case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(), wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(), security: VuuSecurityOptions = VuuSecurityOptions(), +case class VuuClientConnectionOptionsImpl(hasHeartbeat: Boolean) extends VuuClientConnectionOptions { + override def withHeartbeat(): VuuClientConnectionOptions = this.copy(true) + override def withHeartbeatDisabled(): VuuClientConnectionOptions = this.copy(false) +} + +case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(), + wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(), + security: VuuSecurityOptions = VuuSecurityOptions(), threading: VuuThreadingOptions = VuuThreadingOptions(), + clientConnection: VuuClientConnectionOptions = VuuClientConnectionOptions(), modules: List[ViewServerModule] = List(), plugins: List[Plugin] = List()) { def withModule(module: ViewServerModule): VuuServerConfig = { diff --git a/vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala b/vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala index 765612317..a9443e9e4 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala @@ -6,7 +6,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame import org.finos.toolbox.time.Clock import org.finos.vuu.client.messages.SessionId import org.finos.vuu.core.module.ModuleContainer -import org.finos.vuu.net.flowcontrol.DefaultFlowController +import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, FlowControllerFactory} import org.finos.vuu.net.json.Serializer import org.finos.vuu.util.{OutboundRowPublishQueue, PublishQueue} import org.finos.vuu.viewport.ViewPortUpdate @@ -21,7 +21,8 @@ class RequestProcessor(authenticator: Authenticator, clientSessionContainer: ClientSessionContainer, serverApi: ServerApi, serializer: Serializer[String, MessageBody], - moduleContainer: ModuleContainer + moduleContainer: ModuleContainer, + flowControllerFactory: FlowControllerFactory, )(implicit timeProvider: Clock) extends StrictLogging { @volatile private var session: ClientSessionId = null @@ -63,7 +64,7 @@ class RequestProcessor(authenticator: Authenticator, protected def createMessageHandler(channel: Channel, sessionId: ClientSessionId): MessageHandler = { val queue = new OutboundRowPublishQueue() - val flowController = new DefaultFlowController + val flowController = flowControllerFactory.create() new DefaultMessageHandler(channel, queue, sessionId, serverApi, serializer, flowController, clientSessionContainer, moduleContainer) } diff --git a/vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala b/vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala index 8d69e1d45..5e97d44be 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala @@ -7,6 +7,7 @@ import org.finos.vuu.core.module.ModuleContainer import org.finos.vuu.net.json.Serializer import org.finos.toolbox.json.JsonUtil import org.finos.toolbox.time.Clock +import org.finos.vuu.net.flowcontrol.{FlowController, FlowControllerFactory} trait ViewServerHandlerFactory { def create(): ViewServerHandler @@ -15,9 +16,11 @@ trait ViewServerHandlerFactory { class ViewServerHandlerFactoryImpl(authenticator: Authenticator, tokenValidator: LoginTokenValidator, sessionContainer: ClientSessionContainer, serverApi: ServerApi, jsonVsSerializer: Serializer[String, MessageBody], - moduleContainer: ModuleContainer)(implicit val timeProvider: Clock) extends ViewServerHandlerFactory { + moduleContainer: ModuleContainer, + flowControllerFactory: FlowControllerFactory, + )(implicit val timeProvider: Clock) extends ViewServerHandlerFactory { override def create(): ViewServerHandler = { - val requestProcessor = new RequestProcessor(authenticator, tokenValidator, sessionContainer, serverApi, jsonVsSerializer, moduleContainer) + val requestProcessor = new RequestProcessor(authenticator, tokenValidator, sessionContainer, serverApi, jsonVsSerializer, moduleContainer, flowControllerFactory) new ViewServerHandler(jsonVsSerializer, requestProcessor) } } diff --git a/vuu/src/main/scala/org/finos/vuu/net/flowcontrol/FlowController.scala b/vuu/src/main/scala/org/finos/vuu/net/flowcontrol/FlowController.scala index 1835db3df..368dbfe7c 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/flowcontrol/FlowController.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/flowcontrol/FlowController.scala @@ -17,6 +17,15 @@ trait FlowController { def shouldSend(): FlowControlOp } +case class FlowControllerFactory(hasHeartbeat: Boolean)(implicit timeProvider: Clock){ + def create(): FlowController = { + if (hasHeartbeat) + new DefaultFlowController() + else + new NoHeartbeatFlowController + + } +} class DefaultFlowController(implicit timeProvider: Clock) extends FlowController { @volatile private var lastMsgTime: Long = -1 @@ -57,3 +66,13 @@ class DefaultFlowController(implicit timeProvider: Clock) extends FlowController } } + +class NoHeartbeatFlowController() extends FlowController { + override def process(msg: ViewServerMessage): Unit = { + //nothing to do here + } + + override def shouldSend(): FlowControlOp = { + BatchSize(300) + } +} diff --git a/vuu/src/test/scala/org/finos/vuu/test/impl/TestVuuServerImpl.scala b/vuu/src/test/scala/org/finos/vuu/test/impl/TestVuuServerImpl.scala index 59815fd94..3479f1df5 100644 --- a/vuu/src/test/scala/org/finos/vuu/test/impl/TestVuuServerImpl.scala +++ b/vuu/src/test/scala/org/finos/vuu/test/impl/TestVuuServerImpl.scala @@ -15,6 +15,7 @@ import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Ser import org.finos.vuu.net.rest.RestService import org.finos.vuu.net.rpc.{JsonSubTypeRegistry, RpcHandler} import org.finos.vuu.net._ +import org.finos.vuu.net.flowcontrol.FlowControllerFactory import org.finos.vuu.plugin.{DefaultPluginRegistry, Plugin} import org.finos.vuu.provider._ import org.finos.vuu.test.rpc.RpcDynamicProxy @@ -35,6 +36,7 @@ class TestVuuServerImpl(val modules: List[ViewServerModule])(implicit clock: Clo val authenticator = new AlwaysHappyAuthenticator val tokenValidator = new AlwaysHappyLoginValidator + val flowControllerFactory = FlowControllerFactory(hasHeartbeat = false) val joinProvider: JoinTableProvider = JoinTableProviderImpl() @@ -56,7 +58,7 @@ class TestVuuServerImpl(val modules: List[ViewServerModule])(implicit clock: Clo val serverApi = new CoreServerApiHandler(viewPortContainer, tableContainer, providerContainer) - val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer) + val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer, flowControllerFactory) val queue = new OutboundRowPublishQueue() From aee4d7777d7a32b06d04408d8376c6172c8670c0 Mon Sep 17 00:00:00 2001 From: naleeha Date: Wed, 10 Jul 2024 12:35:40 +0100 Subject: [PATCH 3/9] #1301 added websocket api test for getting table meta data --- .../finos/vuu/client/ClientHelperFns.scala | 2 +- .../org/finos/vuu/net/WebSocketApiTest.scala | 112 ++++++++++++++---- .../net/ws/WebSocketServerClientTest.scala | 6 +- 3 files changed, 94 insertions(+), 26 deletions(-) diff --git a/vuu/src/main/scala/org/finos/vuu/client/ClientHelperFns.scala b/vuu/src/main/scala/org/finos/vuu/client/ClientHelperFns.scala index d5fe3719d..cba4055c0 100644 --- a/vuu/src/main/scala/org/finos/vuu/client/ClientHelperFns.scala +++ b/vuu/src/main/scala/org/finos/vuu/client/ClientHelperFns.scala @@ -100,7 +100,7 @@ object ClientHelperFns { def auth(user: String, password: String)(implicit vsClient: ViewServerClient): String = { vsClient.send(JsonViewServerMessage("", "", "", "", AuthenticateRequest(user, password))) - vsClient.awaitMsg.body.asInstanceOf[AuthenticateSuccess].token + awaitMsgBody[AuthenticateSuccess].get.token } def rpcCallAsync(sessionId: String, token: String, user: String, service: String, method: String, params: Array[Any], module: String)(implicit vsClient: ViewServerClient): Unit = { diff --git a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala index 3d4e7989a..e25836c43 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala @@ -1,35 +1,103 @@ package org.finos.vuu.net +import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl} +import org.finos.toolbox.lifecycle.LifecycleContainer +import org.finos.toolbox.time.{Clock, DefaultClock} +import org.finos.vuu.client.ClientHelperFns +import org.finos.vuu.client.messages.RequestId +import org.finos.vuu.core.module.{TableDefContainer, TestModule} +import org.finos.vuu.core.{VuuClientConnectionOptions, VuuSecurityOptions, VuuServer, VuuServerConfig, VuuThreadingOptions, VuuWebSocketOptions} +import org.finos.vuu.net.auth.AlwaysHappyAuthenticator +import org.finos.vuu.net.http.VuuHttp2ServerOptions +import org.finos.vuu.net.json.JsonVsSerializer +import org.finos.vuu.net.ws.WebSocketClient +import org.finos.vuu.viewport.ViewPortTable import org.scalatest.featurespec.AnyFeatureSpec import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterEach, GivenWhenThen} -class WebSocketApiTest extends AnyFeatureSpec with Matchers { - def awaitMsg[TYPE](implicit client: ViewServerClient): Option[TYPE] = { - None +class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterEach with GivenWhenThen with Matchers { + implicit var viewServerClient: ViewServerClient = _ + override def beforeEach(): Unit = { + viewServerClient = testStartUp() } + def testStartUp(): ViewServerClient = { - Feature("test server api shape"){ - - Scenario("test login and creation of view port"){ - -// implicit val serverApi: ServerApi = null -// implicit val client: Client = null -// -// client.send(serverApi.serialize(AuthenticateMessage("foo", "bar"))) -// -// val responseOption = awaitMsg[LoginResponse] -// -// val response = responseOption.get -// -// val vpId = UUID.randomUUID().toString -// -// client.send(serverApi.serialize(CreateViewPort(response.token, vpId, "orderPrices"))) -// -// val vpResonse = awaitMsg[ServerMessage] + implicit val timeProvider: Clock = new DefaultClock + implicit val lifecycle: LifecycleContainer = new LifecycleContainer + implicit val metrics: MetricsProvider = new MetricsProviderImpl + implicit val tableDefContainer: TableDefContainer = new TableDefContainer(Map()) + + lifecycle.autoShutdownHook() + + val http = 10011 + val ws = 10013 + + val config = VuuServerConfig( + VuuHttp2ServerOptions() + .withWebRoot("vuu/src/main/resources/www") + .withSslDisabled() + .withDirectoryListings(true) + .withPort(http), + VuuWebSocketOptions() + .withBindAddress("0.0.0.0") + .withUri("websocket") + .withWsPort(ws) + .withWssDisabled(), + VuuSecurityOptions() + .withAuthenticator(new AlwaysHappyAuthenticator) + .withLoginValidator(new AlwaysHappyLoginValidator), + VuuThreadingOptions(), + VuuClientConnectionOptions() + .withHeartbeatDisabled() + ).withModule(TestModule()) + + val viewServer = new VuuServer(config) + + val client = new WebSocketClient(s"ws://localhost:$ws/websocket", ws) //todo review params - port specified twice + val viewServerClient: ViewServerClient = new WebSocketViewServerClient(client, JsonVsSerializer) + + //set up a dependency on ws server from ws client. + lifecycle(client).dependsOn(viewServer) + + //lifecycle registration is done in constructor of service classes, so sequence of create is important + lifecycle.start() + + viewServerClient + } + + Feature("Server api") { + Scenario("should get table metadata") { + Given("a table name") + + val token = ClientHelperFns.auth("testUser", "testUserPassword") + val session = ClientHelperFns.login(token, "chris") + + //example user helper function + //ClientHelperFns.tableMetaAsync("someSessionId", "someToken", "testUser", ViewPortTable("GetMeTable", "TestModule"), "requestId1") + + // example without helper + val getTableMetaRequestMessage = createViewSerMessage(session, token, GetTableMetaRequest(ViewPortTable("instruments", "TEST"))) + viewServerClient.send(getTableMetaRequestMessage) + + + val response = ClientHelperFns.awaitMsgBody[GetTableMetaResponse] + response.isDefined shouldBe true + + val responseMessage = response.get + responseMessage.columns.length shouldEqual 5 } + } + def createViewSerMessage(sessionId: String, token: String, body: MessageBody): ViewServerMessage = { + JsonViewServerMessage(RequestId.oneNew(), + sessionId, + token, + "testUser", + body, + "DoesntReallyMatter") } -} +} \ No newline at end of file diff --git a/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala b/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala index 4adf5a788..61969666c 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala @@ -24,8 +24,8 @@ class WebSocketServerClientTest extends AnyFeatureSpec with Matchers { lifecycle.autoShutdownHook() - val http = 10011 - val ws = 10013 + val http = 10021 + val ws = 10023 val config = VuuServerConfig( VuuHttp2ServerOptions() @@ -46,7 +46,7 @@ class WebSocketServerClientTest extends AnyFeatureSpec with Matchers { val viewServer = new VuuServer(config) val client = new WebSocketClient(s"ws://localhost:$ws/websocket", ws) //todo review params - port specified twice - implicit val vsClient: WebSocketViewServerClient = new WebSocketViewServerClient(client, JsonVsSerializer) + implicit val vsClient: ViewServerClient = new WebSocketViewServerClient(client, JsonVsSerializer) //set up a dependency on ws server from ws client. lifecycle(client).dependsOn(viewServer) From 646450bf46b11ff29550c542c44a8fea4f4cf5bb Mon Sep 17 00:00:00 2001 From: naleeha Date: Fri, 12 Jul 2024 15:05:46 +0100 Subject: [PATCH 4/9] #1301 refactoring web socket api tests and adding error case test for getting table metadata --- .../org/finos/vuu/net/WebSocketApiTest.scala | 116 ++++++++++++++++-- 1 file changed, 106 insertions(+), 10 deletions(-) diff --git a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala index e25836c43..b406a676b 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala @@ -14,14 +14,21 @@ import org.finos.vuu.net.ws.WebSocketClient import org.finos.vuu.viewport.ViewPortTable import org.scalatest.featurespec.AnyFeatureSpec import org.scalatest.matchers.should.Matchers -import org.scalatest.{BeforeAndAfterEach, GivenWhenThen} +import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} +import scala.reflect.ClassTag -class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterEach with GivenWhenThen with Matchers { + +class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterAll with GivenWhenThen with Matchers { implicit var viewServerClient: ViewServerClient = _ - override def beforeEach(): Unit = { + + override def beforeAll(): Unit = { viewServerClient = testStartUp() } + + override def afterAll(): Unit = { + //todo cleanup + } def testStartUp(): ViewServerClient = { implicit val timeProvider: Clock = new DefaultClock @@ -67,12 +74,10 @@ class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterEach with Given viewServerClient } - Feature("Server api") { - Scenario("should get table metadata") { - Given("a table name") - + Feature("Server web socket api") { + Scenario("client requests to get table metadata for a table") { val token = ClientHelperFns.auth("testUser", "testUserPassword") - val session = ClientHelperFns.login(token, "chris") + val session = ClientHelperFns.login(token, "testUser") //example user helper function //ClientHelperFns.tableMetaAsync("someSessionId", "someToken", "testUser", ViewPortTable("GetMeTable", "TestModule"), "requestId1") @@ -81,11 +86,11 @@ class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterEach with Given val getTableMetaRequestMessage = createViewSerMessage(session, token, GetTableMetaRequest(ViewPortTable("instruments", "TEST"))) viewServerClient.send(getTableMetaRequestMessage) - + Then("return table data in response") val response = ClientHelperFns.awaitMsgBody[GetTableMetaResponse] response.isDefined shouldBe true - val responseMessage = response.get + val responseMessage = response.get responseMessage.columns.length shouldEqual 5 } @@ -100,4 +105,95 @@ class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterEach with Given "DoesntReallyMatter") } + Scenario("client requests to get table metadata for a non existent") { + + val client = new MyWebSocketClient(viewServerClient) + + val (token, sessionId) = client.authenticateAndLogin("testUser", "testUserPassword") + + client.send(sessionId, token, GetTableMetaRequest(ViewPortTable("DoesNotExist", "TEST"))) + + Then("return error response with helpful message") + val response = client.awaitForMsgWithBody[ErrorResponse] + response.isDefined shouldBe true + + val responseMessage = response.get + responseMessage.msg shouldEqual "No such table found with name DoesNotExist in module TEST" + } + + Scenario("client requests to get table metadata for null table name") { + + val client = new MyWebSocketClient(viewServerClient) + + val (token, sessionId) = client.authenticateAndLogin("testUser", "testUserPassword") + + client.send(sessionId, token, GetTableMetaRequest(ViewPortTable(null, "TEST"))) + + + Then("return error response with helpful message") + val response = client.awaitForMsgWithBody[ErrorResponse] + response.isDefined shouldBe true + + val responseMessage = response.get + responseMessage.msg shouldEqual "No such table found with name DoesNotExist in module TEST" + + } + +} + +class MyWebSocketClient(vsClient: ViewServerClient) { + + type SessionId = String + type Token = String + + def send(sessionId: String, token: String, body: MessageBody): Unit = { + vsClient.send(createViewServerMessage(sessionId, token, body)) + } + + //todo fold this in to WebSocketViewServerClient? + //is intention that this can be used for non ws client? + def awaitForMsgWithBody[T <: AnyRef](implicit t: ClassTag[T]): Option[T] = { + val msg = vsClient.awaitMsg + if (msg != null) { //null indicate error or timeout + if (isExpectedBodyType(t, msg)) + Some(msg.body.asInstanceOf[T]) + else + awaitForMsgWithBody + } + else + None + } + + def authenticateAndLogin(user: String, password: String): (Token, SessionId) = { + val token = authenticate(user, password) + val session = login(token, user) + (token, session) + } + + private def isExpectedBodyType[T <: AnyRef](t: ClassTag[T], msg: ViewServerMessage) = { + val expectedBodyType: Class[T] = t.runtimeClass.asInstanceOf[Class[T]] + expectedBodyType.isAssignableFrom(msg.body.getClass) + } + + private def createViewServerMessage(sessionId: String, token: String, body: MessageBody): ViewServerMessage = { + JsonViewServerMessage(RequestId.oneNew(), + sessionId, + token, + "testUser", + body, + "DoesntReallyMatter") + } + + //todo not used + def authenticate(user: String, password: String): String = { + send("not used", "not used", AuthenticateRequest(user, password)) + awaitForMsgWithBody[AuthenticateSuccess].get.token //todo handle no response + } + + def login(token: String, user: String): String = { + send("not used", "not used", LoginRequest(token, user)) + vsClient.awaitMsg.sessionId //todo handle no response + //todo what to do if LoginFailure - is this expected to return token when failed? + //Should token be called session? + } } \ No newline at end of file From 3e18a989e3a09247e9080ef6fc75a108953a79cc Mon Sep 17 00:00:00 2001 From: naleeha Date: Mon, 15 Jul 2024 23:48:14 +0100 Subject: [PATCH 5/9] #1301 handling when get table request does not have valid table name --- .../finos/vuu/core/CoreServerApiHandler.scala | 17 +-- .../org/finos/vuu/net/WebSocketApiTest.scala | 101 +++++++++++++----- 2 files changed, 83 insertions(+), 35 deletions(-) diff --git a/vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala b/vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala index 306098e24..bdc285b25 100644 --- a/vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala +++ b/vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala @@ -220,13 +220,18 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer, } override def process(msg: GetTableMetaRequest)(ctx: RequestContext): Option[ViewServerMessage] = { - if (msg.table == null) - errorMsg(s"Table ${msg.table} not found in container")(ctx) + if (msg.table.table == null || msg.table.module == null) + errorMsg(s"No such table found with name ${msg.table.table} in module ${msg.table.module}. Table name and module should not be null")(ctx) else { - val table = tableContainer.getTable(msg.table.table) - val columnNames = table.getTableDef.columns.sortBy(_.index).map(_.name) - val dataTypes = columnNames.map(table.getTableDef.columnForName(_)).map(col => DataType.asString(col.dataType)) - vsMsg(GetTableMetaResponse(msg.table, columnNames, dataTypes, table.getTableDef.keyField))(ctx) + val table = tableContainer.getTable(msg.table.table) //todo need to check module? what if modules with same table name + + if(table == null) + errorMsg(s"No such table found with name ${msg.table.table} in module ${msg.table.module}")(ctx) + else{ + val columnNames = table.getTableDef.columns.sortBy(_.index).map(_.name) + val dataTypes = columnNames.map(table.getTableDef.columnForName(_)).map(col => DataType.asString(col.dataType)) + vsMsg(GetTableMetaResponse(msg.table, columnNames, dataTypes, table.getTableDef.keyField))(ctx) + } } } diff --git a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala index b406a676b..aed7637a9 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala @@ -4,7 +4,7 @@ import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl} import org.finos.toolbox.lifecycle.LifecycleContainer import org.finos.toolbox.time.{Clock, DefaultClock} import org.finos.vuu.client.ClientHelperFns -import org.finos.vuu.client.messages.RequestId +import org.finos.vuu.client.messages.{RequestId, TokenId} import org.finos.vuu.core.module.{TableDefContainer, TestModule} import org.finos.vuu.core.{VuuClientConnectionOptions, VuuSecurityOptions, VuuServer, VuuServerConfig, VuuThreadingOptions, VuuWebSocketOptions} import org.finos.vuu.net.auth.AlwaysHappyAuthenticator @@ -16,14 +16,16 @@ import org.scalatest.featurespec.AnyFeatureSpec import org.scalatest.matchers.should.Matchers import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} +import java.util.concurrent.ConcurrentHashMap import scala.reflect.ClassTag class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterAll with GivenWhenThen with Matchers { implicit var viewServerClient: ViewServerClient = _ - + var vuuClient: TestVuuClient = _ override def beforeAll(): Unit = { viewServerClient = testStartUp() + vuuClient = new TestVuuClient(viewServerClient) } override def afterAll(): Unit = { @@ -107,41 +109,33 @@ class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterAll with GivenW Scenario("client requests to get table metadata for a non existent") { - val client = new MyWebSocketClient(viewServerClient) - - val (token, sessionId) = client.authenticateAndLogin("testUser", "testUserPassword") + val (tokenId, sessionId) = vuuClient.createTokenAndLogin("testUser") + assert(sessionId.isDefined) - client.send(sessionId, token, GetTableMetaRequest(ViewPortTable("DoesNotExist", "TEST"))) + vuuClient.send(sessionId.get, tokenId, GetTableMetaRequest(ViewPortTable("DoesNotExist", "TEST"))) Then("return error response with helpful message") - val response = client.awaitForMsgWithBody[ErrorResponse] - response.isDefined shouldBe true - - val responseMessage = response.get - responseMessage.msg shouldEqual "No such table found with name DoesNotExist in module TEST" + val response = vuuClient.awaitForMsgWithBody[ErrorResponse] + assert(response.isDefined) + response.get.msg shouldEqual "No such table found with name DoesNotExist in module TEST" } Scenario("client requests to get table metadata for null table name") { - val client = new MyWebSocketClient(viewServerClient) - - val (token, sessionId) = client.authenticateAndLogin("testUser", "testUserPassword") - - client.send(sessionId, token, GetTableMetaRequest(ViewPortTable(null, "TEST"))) + val (tokenId, sessionId) = vuuClient.createTokenAndLogin("testUser") + assert(sessionId.isDefined) + // sessionId.isDefined shouldBe true + vuuClient.send(sessionId.get, tokenId, GetTableMetaRequest(ViewPortTable(null, "TEST"))) Then("return error response with helpful message") - val response = client.awaitForMsgWithBody[ErrorResponse] - response.isDefined shouldBe true - - val responseMessage = response.get - responseMessage.msg shouldEqual "No such table found with name DoesNotExist in module TEST" - + val response = vuuClient.awaitForMsgWithBody[ErrorResponse] + assert(response.isDefined) + response.get.msg shouldEqual "No such table found with name null in module TEST. Table name and module should not be null" } - } -class MyWebSocketClient(vsClient: ViewServerClient) { +class TestVuuClient(vsClient: ViewServerClient) { type SessionId = String type Token = String @@ -164,12 +158,56 @@ class MyWebSocketClient(vsClient: ViewServerClient) { None } - def authenticateAndLogin(user: String, password: String): (Token, SessionId) = { + def awaitForMsg[T <: AnyRef](implicit t: ClassTag[T]): Option[ViewServerMessage] = { + val msg = vsClient.awaitMsg + if (msg != null) { //null indicate error or timeout + if (isExpectedBodyType(t, msg)) + Some(msg) + else + awaitForMsg + } + else + None + } + + + val responsesMap: ConcurrentHashMap[String, ViewServerMessage] = new ConcurrentHashMap + def awaitForResponse(requestId:String): Option[ViewServerMessage] = { + + lookupFromReceivedResponses(requestId) + .map(msg => return Some(msg)) + + val msg = vsClient.awaitMsg + if (msg!=null) + if(msg.requestId == requestId) + Some(msg) + else { + responsesMap.put(requestId, msg) + awaitForResponse(requestId) + } + else + None + } + + private def lookupFromReceivedResponses(requestId:String): Option[ViewServerMessage] = { + Option(responsesMap.get(requestId)) + } + + private def awaitNextMessage(): Option[ViewServerMessage] = { + Option(vsClient.awaitMsg) + } + def authenticateAndLogin(user: String, password: String): (Token, Option[SessionId]) = { val token = authenticate(user, password) val session = login(token, user) (token, session) } + def createTokenAndLogin(user: String): (Token, Option[SessionId]) = { + val tokenId = TokenId.oneNew() + val sessionId = login(tokenId, user) + (tokenId, sessionId) + } + private def isExpectedBodyType[T <: AnyRef](t: ClassTag[T], msg: ViewServerMessage) = { val expectedBodyType: Class[T] = t.runtimeClass.asInstanceOf[Class[T]] expectedBodyType.isAssignableFrom(msg.body.getClass) @@ -190,10 +228,15 @@ class MyWebSocketClient(vsClient: ViewServerClient) { awaitForMsgWithBody[AuthenticateSuccess].get.token //todo handle no response } - def login(token: String, user: String): String = { + def login(token: String, user: String): Option[String] = { send("not used", "not used", LoginRequest(token, user)) - vsClient.awaitMsg.sessionId //todo handle no response - //todo what to do if LoginFailure - is this expected to return token when failed? - //Should token be called session? + + //capture messages rather than dismissing, - how to cap size + // need to match on request id to ensure correct response? + awaitForMsg[LoginSuccess] + .map( x => x.sessionId) + //todo handle no response + //todo what to do if LoginFailure + // why does these response return token that was passed in the request? Does UI use this or match based on message request id? } } \ No newline at end of file From d407bdb6c4d624c72b11b7802ced287aad881b87 Mon Sep 17 00:00:00 2001 From: naleeha Date: Tue, 16 Jul 2024 13:49:57 +0100 Subject: [PATCH 6/9] #1301 clean up and logging in once at start up --- .../org/finos/vuu/net/TestVuuClient.scala | 93 +++++++++ .../org/finos/vuu/net/WebSocketApiTest.scala | 188 +++--------------- 2 files changed, 124 insertions(+), 157 deletions(-) create mode 100644 vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala diff --git a/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala b/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala new file mode 100644 index 000000000..67eacc33d --- /dev/null +++ b/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala @@ -0,0 +1,93 @@ +package org.finos.vuu.net + +import org.finos.vuu.client.messages.{RequestId, TokenId} + +import java.util.concurrent.ConcurrentHashMap +import scala.reflect.ClassTag + +class TestVuuClient(vsClient: ViewServerClient) { + + type SessionId = String + type Token = String + + def send(sessionId: String, token: String, body: MessageBody): Unit = { + vsClient.send(createViewServerMessage(sessionId, token, body)) + } + + //todo fold this in to WebSocketViewServerClient? + //is intention that this can be used for non ws client? + def awaitForMsgWithBody[T <: AnyRef](implicit t: ClassTag[T]): Option[T] = { + val msg = vsClient.awaitMsg + if (msg != null) { //null indicate error or timeout + if (isExpectedBodyType(t, msg)) + Some(msg.body.asInstanceOf[T]) + else + awaitForMsgWithBody + } + else + None + } + + def awaitForMsg[T <: AnyRef](implicit t: ClassTag[T]): Option[ViewServerMessage] = { + val msg = vsClient.awaitMsg + if (msg != null) { //null indicate error or timeout + if (isExpectedBodyType(t, msg)) + Some(msg) + else + awaitForMsg + } + else + None + } + + val responsesMap: ConcurrentHashMap[String, ViewServerMessage] = new ConcurrentHashMap + + def awaitForResponse(requestId: String): Option[ViewServerMessage] = { + + lookupFromReceivedResponses(requestId) + .map(msg => return Some(msg)) + + val msg = vsClient.awaitMsg + if (msg != null) + if (msg.requestId == requestId) + Some(msg) + else { + responsesMap.put(requestId, msg) + awaitForResponse(requestId) + } + else + None + } + + def createAuthToken(): Token = TokenId.oneNew() + + def login(token: String, user: String): Option[String] = { + send("not used", "not used", LoginRequest(token, user)) + + //capture messages rather than dismissing, - how to cap size + // need to match on request id to ensure correct response? + awaitForMsg[LoginSuccess] + .map(x => x.sessionId) + //todo handle no response + //todo what to do if LoginFailure + // why does these response return token that was passed in the request? Does UI use this or match based on message request id? + } + + private def isExpectedBodyType[T <: AnyRef](t: ClassTag[T], msg: ViewServerMessage) = { + val expectedBodyType: Class[T] = t.runtimeClass.asInstanceOf[Class[T]] + expectedBodyType.isAssignableFrom(msg.body.getClass) + } + + private def lookupFromReceivedResponses(requestId: String): Option[ViewServerMessage] = { + Option(responsesMap.get(requestId)) + } + + private def createViewServerMessage(sessionId: String, token: String, body: MessageBody): ViewServerMessage = { + JsonViewServerMessage(RequestId.oneNew(), + sessionId, + token, + "testUser", + body, + "DoesntReallyMatter") + } +} diff --git a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala index aed7637a9..e291b26e1 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala @@ -3,10 +3,8 @@ package org.finos.vuu.net import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl} import org.finos.toolbox.lifecycle.LifecycleContainer import org.finos.toolbox.time.{Clock, DefaultClock} -import org.finos.vuu.client.ClientHelperFns -import org.finos.vuu.client.messages.{RequestId, TokenId} +import org.finos.vuu.core._ import org.finos.vuu.core.module.{TableDefContainer, TestModule} -import org.finos.vuu.core.{VuuClientConnectionOptions, VuuSecurityOptions, VuuServer, VuuServerConfig, VuuThreadingOptions, VuuWebSocketOptions} import org.finos.vuu.net.auth.AlwaysHappyAuthenticator import org.finos.vuu.net.http.VuuHttp2ServerOptions import org.finos.vuu.net.json.JsonVsSerializer @@ -16,22 +14,26 @@ import org.scalatest.featurespec.AnyFeatureSpec import org.scalatest.matchers.should.Matchers import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} -import java.util.concurrent.ConcurrentHashMap -import scala.reflect.ClassTag - - class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterAll with GivenWhenThen with Matchers { implicit var viewServerClient: ViewServerClient = _ var vuuClient: TestVuuClient = _ + var tokenId: String = _ + var sessionId: String = _ + override def beforeAll(): Unit = { - viewServerClient = testStartUp() - vuuClient = new TestVuuClient(viewServerClient) + vuuClient = testStartUp() + + tokenId = vuuClient.createAuthToken() + val sessionOption = vuuClient.login(tokenId, "testUser") + assert(sessionOption.isDefined) + sessionId = sessionOption.get } override def afterAll(): Unit = { //todo cleanup } - def testStartUp(): ViewServerClient = { + + def testStartUp(): TestVuuClient = { implicit val timeProvider: Clock = new DefaultClock implicit val lifecycle: LifecycleContainer = new LifecycleContainer @@ -66,6 +68,7 @@ class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterAll with GivenW val client = new WebSocketClient(s"ws://localhost:$ws/websocket", ws) //todo review params - port specified twice val viewServerClient: ViewServerClient = new WebSocketViewServerClient(client, JsonVsSerializer) + val vuuClient = new TestVuuClient(viewServerClient) //set up a dependency on ws server from ws client. lifecycle(client).dependsOn(viewServer) @@ -73,170 +76,41 @@ class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterAll with GivenW //lifecycle registration is done in constructor of service classes, so sequence of create is important lifecycle.start() - viewServerClient + vuuClient } Feature("Server web socket api") { Scenario("client requests to get table metadata for a table") { - val token = ClientHelperFns.auth("testUser", "testUserPassword") - val session = ClientHelperFns.login(token, "testUser") - - //example user helper function - //ClientHelperFns.tableMetaAsync("someSessionId", "someToken", "testUser", ViewPortTable("GetMeTable", "TestModule"), "requestId1") - // example without helper - val getTableMetaRequestMessage = createViewSerMessage(session, token, GetTableMetaRequest(ViewPortTable("instruments", "TEST"))) - viewServerClient.send(getTableMetaRequestMessage) + vuuClient.send(sessionId, tokenId, GetTableMetaRequest(ViewPortTable("instruments", "TEST"))) Then("return table data in response") - val response = ClientHelperFns.awaitMsgBody[GetTableMetaResponse] - response.isDefined shouldBe true + val response = vuuClient.awaitForMsgWithBody[GetTableMetaResponse] + assert(response.isDefined) val responseMessage = response.get responseMessage.columns.length shouldEqual 5 - } - } - - def createViewSerMessage(sessionId: String, token: String, body: MessageBody): ViewServerMessage = { - JsonViewServerMessage(RequestId.oneNew(), - sessionId, - token, - "testUser", - body, - "DoesntReallyMatter") - } - - Scenario("client requests to get table metadata for a non existent") { - - val (tokenId, sessionId) = vuuClient.createTokenAndLogin("testUser") - assert(sessionId.isDefined) - vuuClient.send(sessionId.get, tokenId, GetTableMetaRequest(ViewPortTable("DoesNotExist", "TEST"))) + Scenario("client requests to get table metadata for a non existent") { - Then("return error response with helpful message") - val response = vuuClient.awaitForMsgWithBody[ErrorResponse] - assert(response.isDefined) - response.get.msg shouldEqual "No such table found with name DoesNotExist in module TEST" - } - - Scenario("client requests to get table metadata for null table name") { - - val (tokenId, sessionId) = vuuClient.createTokenAndLogin("testUser") - assert(sessionId.isDefined) - // sessionId.isDefined shouldBe true - - vuuClient.send(sessionId.get, tokenId, GetTableMetaRequest(ViewPortTable(null, "TEST"))) - - Then("return error response with helpful message") - val response = vuuClient.awaitForMsgWithBody[ErrorResponse] - assert(response.isDefined) - response.get.msg shouldEqual "No such table found with name null in module TEST. Table name and module should not be null" - } -} - -class TestVuuClient(vsClient: ViewServerClient) { - - type SessionId = String - type Token = String - - def send(sessionId: String, token: String, body: MessageBody): Unit = { - vsClient.send(createViewServerMessage(sessionId, token, body)) - } + vuuClient.send(sessionId, tokenId, GetTableMetaRequest(ViewPortTable("DoesNotExist", "TEST"))) - //todo fold this in to WebSocketViewServerClient? - //is intention that this can be used for non ws client? - def awaitForMsgWithBody[T <: AnyRef](implicit t: ClassTag[T]): Option[T] = { - val msg = vsClient.awaitMsg - if (msg != null) { //null indicate error or timeout - if (isExpectedBodyType(t, msg)) - Some(msg.body.asInstanceOf[T]) - else - awaitForMsgWithBody + Then("return error response with helpful message") + val response = vuuClient.awaitForMsgWithBody[ErrorResponse] + assert(response.isDefined) + response.get.msg shouldEqual "No such table found with name DoesNotExist in module TEST" } - else - None - } - - def awaitForMsg[T <: AnyRef](implicit t: ClassTag[T]): Option[ViewServerMessage] = { - val msg = vsClient.awaitMsg - if (msg != null) { //null indicate error or timeout - if (isExpectedBodyType(t, msg)) - Some(msg) - else - awaitForMsg - } - else - None - } - - - val responsesMap: ConcurrentHashMap[String, ViewServerMessage] = new ConcurrentHashMap - def awaitForResponse(requestId:String): Option[ViewServerMessage] = { - - lookupFromReceivedResponses(requestId) - .map(msg => return Some(msg)) - - val msg = vsClient.awaitMsg - if (msg!=null) - if(msg.requestId == requestId) - Some(msg) - else { - responsesMap.put(requestId, msg) - awaitForResponse(requestId) - } - else - None - } - - private def lookupFromReceivedResponses(requestId:String): Option[ViewServerMessage] = { - Option(responsesMap.get(requestId)) - } - private def awaitNextMessage(): Option[ViewServerMessage] = { - Option(vsClient.awaitMsg) - } - def authenticateAndLogin(user: String, password: String): (Token, Option[SessionId]) = { - val token = authenticate(user, password) - val session = login(token, user) - (token, session) - } - - def createTokenAndLogin(user: String): (Token, Option[SessionId]) = { - val tokenId = TokenId.oneNew() - val sessionId = login(tokenId, user) - (tokenId, sessionId) - } + Scenario("client requests to get table metadata for null table name") { - private def isExpectedBodyType[T <: AnyRef](t: ClassTag[T], msg: ViewServerMessage) = { - val expectedBodyType: Class[T] = t.runtimeClass.asInstanceOf[Class[T]] - expectedBodyType.isAssignableFrom(msg.body.getClass) - } + vuuClient.send(sessionId, tokenId, GetTableMetaRequest(ViewPortTable(null, "TEST"))) - private def createViewServerMessage(sessionId: String, token: String, body: MessageBody): ViewServerMessage = { - JsonViewServerMessage(RequestId.oneNew(), - sessionId, - token, - "testUser", - body, - "DoesntReallyMatter") - } - - //todo not used - def authenticate(user: String, password: String): String = { - send("not used", "not used", AuthenticateRequest(user, password)) - awaitForMsgWithBody[AuthenticateSuccess].get.token //todo handle no response + Then("return error response with helpful message") + val response = vuuClient.awaitForMsgWithBody[ErrorResponse] + assert(response.isDefined) + response.get.msg shouldEqual "No such table found with name null in module TEST. Table name and module should not be null" + } } +} - def login(token: String, user: String): Option[String] = { - send("not used", "not used", LoginRequest(token, user)) - - //capture messages rather than dismissing, - how to cap size - // need to match on request id to ensure correct response? - awaitForMsg[LoginSuccess] - .map( x => x.sessionId) - //todo handle no response - //todo what to do if LoginFailure - // why does these response return token that was passed in the request? Does UI use this or match based on message request id? - } -} \ No newline at end of file From 898329a5ea55102a78681407219e934f5d15f94a Mon Sep 17 00:00:00 2001 From: naleeha Date: Tue, 16 Jul 2024 14:51:14 +0100 Subject: [PATCH 7/9] #1301 logging message received on the client --- vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala b/vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala index 1ce5fe5b2..523d12dfc 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala @@ -50,7 +50,10 @@ class WebSocketViewServerClient(ws: WebSocketClient, serializer: Serializer[Stri } else { Try(serializer.deserialize(msg)) match { - case Success(vsMsg) => vsMsg + case Success(vsMsg) => { + logger.info(s"[Received] $vsMsg") + vsMsg + } case Failure(e) => logger.error(s"could not deserialize ${msg} going to return null", e) null From 5167b00a7b5f9fd0059f48bd2f4bdc770d3bdc88 Mon Sep 17 00:00:00 2001 From: naleeha Date: Tue, 16 Jul 2024 15:45:25 +0100 Subject: [PATCH 8/9] #1301 adding timeout to await message function on test client --- .../org/finos/vuu/net/TestVuuClient.scala | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala b/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala index 67eacc33d..1e052f2ae 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala @@ -1,8 +1,12 @@ package org.finos.vuu.net import org.finos.vuu.client.messages.{RequestId, TokenId} +import org.scalatest.concurrent.TimeLimits.failAfter +import org.scalatest.time.Span +import org.scalatest.time.SpanSugar._ import java.util.concurrent.ConcurrentHashMap +import scala.language.postfixOps import scala.reflect.ClassTag class TestVuuClient(vsClient: ViewServerClient) { @@ -10,34 +14,31 @@ class TestVuuClient(vsClient: ViewServerClient) { type SessionId = String type Token = String + val timeout: Span = 30 seconds + def send(sessionId: String, token: String, body: MessageBody): Unit = { vsClient.send(createViewServerMessage(sessionId, token, body)) } //todo fold this in to WebSocketViewServerClient? //is intention that this can be used for non ws client? - def awaitForMsgWithBody[T <: AnyRef](implicit t: ClassTag[T]): Option[T] = { - val msg = vsClient.awaitMsg - if (msg != null) { //null indicate error or timeout - if (isExpectedBodyType(t, msg)) - Some(msg.body.asInstanceOf[T]) - else - awaitForMsgWithBody - } - else - None - } + def awaitForMsgWithBody[T <: AnyRef](implicit t: ClassTag[T]): Option[T] = + awaitForMsg.map(msg => msg.body.asInstanceOf[T]) + + def awaitForMsg[T <: AnyRef](implicit t: ClassTag[T]): Option[ViewServerMessage] = { - val msg = vsClient.awaitMsg - if (msg != null) { //null indicate error or timeout - if (isExpectedBodyType(t, msg)) - Some(msg) + failAfter(timeout){ + val msg = vsClient.awaitMsg + if (msg != null) { //null indicate error or timeout + if (isExpectedBodyType(t, msg)) + Some(msg) + else + awaitForMsg + } else - awaitForMsg + None } - else - None } val responsesMap: ConcurrentHashMap[String, ViewServerMessage] = new ConcurrentHashMap From f6ded4cf297c830da14a09b9670bb07720c4c650 Mon Sep 17 00:00:00 2001 From: naleeha Date: Tue, 16 Jul 2024 16:27:55 +0100 Subject: [PATCH 9/9] #1301 removing websocket server client test as covered by the api test --- .../org/finos/vuu/net/WebSocketApiTest.scala | 9 ++- .../net/ws/WebSocketServerClientTest.scala | 73 ------------------- 2 files changed, 5 insertions(+), 77 deletions(-) delete mode 100644 vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala diff --git a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala index e291b26e1..a217e83f8 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala @@ -15,7 +15,10 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterAll with GivenWhenThen with Matchers { - implicit var viewServerClient: ViewServerClient = _ + + implicit val timeProvider: Clock = new DefaultClock + implicit val lifecycle: LifecycleContainer = new LifecycleContainer + var viewServerClient: ViewServerClient = _ var vuuClient: TestVuuClient = _ var tokenId: String = _ var sessionId: String = _ @@ -30,13 +33,11 @@ class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterAll with GivenW } override def afterAll(): Unit = { - //todo cleanup + lifecycle.stop() } def testStartUp(): TestVuuClient = { - implicit val timeProvider: Clock = new DefaultClock - implicit val lifecycle: LifecycleContainer = new LifecycleContainer implicit val metrics: MetricsProvider = new MetricsProviderImpl implicit val tableDefContainer: TableDefContainer = new TableDefContainer(Map()) diff --git a/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala b/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala deleted file mode 100644 index 61969666c..000000000 --- a/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala +++ /dev/null @@ -1,73 +0,0 @@ -package org.finos.vuu.net.ws - -import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl} -import org.finos.toolbox.lifecycle.LifecycleContainer -import org.finos.toolbox.time.{Clock, DefaultClock} -import org.finos.vuu.client.ClientHelperFns.awaitMsgBody -import org.finos.vuu.core.{VuuSecurityOptions, VuuServer, VuuServerConfig, VuuWebSocketOptions} -import org.finos.vuu.net._ -import org.finos.vuu.net.auth.AlwaysHappyAuthenticator -import org.finos.vuu.net.http.VuuHttp2ServerOptions -import org.finos.vuu.net.json.JsonVsSerializer -import org.scalatest.featurespec.AnyFeatureSpec -import org.scalatest.matchers.should.Matchers - -class WebSocketServerClientTest extends AnyFeatureSpec with Matchers { - - Feature("Check that we can create a websocket server and client"){ - - Scenario("create connection without ssl between web socket server and client and send data between"){ - - implicit val timeProvider: Clock = new DefaultClock - implicit val lifecycle: LifecycleContainer = new LifecycleContainer - implicit val metrics: MetricsProvider = new MetricsProviderImpl - - lifecycle.autoShutdownHook() - - val http = 10021 - val ws = 10023 - - val config = VuuServerConfig( - VuuHttp2ServerOptions() - .withWebRoot("vuu/src/main/resources/www") - .withSslDisabled() - .withDirectoryListings(true) - .withPort(http), - VuuWebSocketOptions() - .withBindAddress("0.0.0.0") - .withUri("websocket") - .withWsPort(ws) - .withWssDisabled(), - VuuSecurityOptions() - .withAuthenticator(new AlwaysHappyAuthenticator) - .withLoginValidator(new AlwaysHappyLoginValidator) - ) - - val viewServer = new VuuServer(config) - - val client = new WebSocketClient(s"ws://localhost:$ws/websocket", ws) //todo review params - port specified twice - implicit val vsClient: ViewServerClient = new WebSocketViewServerClient(client, JsonVsSerializer) - - //set up a dependency on ws server from ws client. - lifecycle(client).dependsOn(viewServer) - - //lifecycle registration is done in constructor of service classes, so sequence of create is important - lifecycle.start() - - vsClient.send(JsonViewServerMessage("", "", "", "", AuthenticateRequest("chris", "chris"))) - - val authMsg = awaitMsgBody[AuthenticateSuccess].get - - authMsg.getClass should equal(classOf[AuthenticateSuccess]) - authMsg.token should not be "" - - vsClient.send(JsonViewServerMessage("", "", authMsg.token, "chris", LoginRequest(authMsg.token, "chris"))) - - awaitMsgBody[LoginSuccess].get.token should equal(authMsg.token) - - lifecycle.stop() - } - - } - -}