diff --git a/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java b/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java index 798ea0688..5a95b2398 100644 --- a/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java +++ b/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java @@ -73,8 +73,10 @@ public static void main( String[] args ) VuuThreadingOptions.apply() .withTreeThreads(4) .withViewPortThreads(4), - new scala.collection.mutable.ListBuffer().toList(), - new scala.collection.mutable.ListBuffer().toList() + VuuClientConnectionOptions.apply() + .withHeartbeat(), + new scala.collection.mutable.ListBuffer().toList(), + new scala.collection.mutable.ListBuffer().toList() ).withModule(PriceModule.apply(clock, lifecycle, tableDefContainer)) .withModule(SimulationModule.apply(clock, lifecycle, tableDefContainer)) .withModule(MetricsModule.apply(clock, lifecycle, metrics, tableDefContainer)) diff --git a/example/main/src/main/scala/org/finos/vuu/SimulMain.scala b/example/main/src/main/scala/org/finos/vuu/SimulMain.scala index 7d90a9bbd..95c496669 100644 --- a/example/main/src/main/scala/org/finos/vuu/SimulMain.scala +++ b/example/main/src/main/scala/org/finos/vuu/SimulMain.scala @@ -61,7 +61,9 @@ object SimulMain extends App with StrictLogging { .withLoginValidator(new AlwaysHappyLoginValidator), VuuThreadingOptions() .withViewPortThreads(4) - .withTreeThreads(4) + .withTreeThreads(4), + VuuClientConnectionOptions() + .withHeartbeat() ).withModule(PriceModule()) .withModule(SimulationModule()) .withModule(MetricsModule()) diff --git a/vuu/src/main/scala/org/finos/vuu/client/ClientHelperFns.scala b/vuu/src/main/scala/org/finos/vuu/client/ClientHelperFns.scala index d5fe3719d..cba4055c0 100644 --- a/vuu/src/main/scala/org/finos/vuu/client/ClientHelperFns.scala +++ b/vuu/src/main/scala/org/finos/vuu/client/ClientHelperFns.scala @@ -100,7 +100,7 @@ object ClientHelperFns { def auth(user: String, password: String)(implicit vsClient: ViewServerClient): String = { vsClient.send(JsonViewServerMessage("", "", "", "", AuthenticateRequest(user, password))) - vsClient.awaitMsg.body.asInstanceOf[AuthenticateSuccess].token + awaitMsgBody[AuthenticateSuccess].get.token } def rpcCallAsync(sessionId: String, token: String, user: String, service: String, method: String, params: Array[Any], module: String)(implicit vsClient: ViewServerClient): Unit = { diff --git a/vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala b/vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala index 306098e24..bdc285b25 100644 --- a/vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala +++ b/vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala @@ -220,13 +220,18 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer, } override def process(msg: GetTableMetaRequest)(ctx: RequestContext): Option[ViewServerMessage] = { - if (msg.table == null) - errorMsg(s"Table ${msg.table} not found in container")(ctx) + if (msg.table.table == null || msg.table.module == null) + errorMsg(s"No such table found with name ${msg.table.table} in module ${msg.table.module}. Table name and module should not be null")(ctx) else { - val table = tableContainer.getTable(msg.table.table) - val columnNames = table.getTableDef.columns.sortBy(_.index).map(_.name) - val dataTypes = columnNames.map(table.getTableDef.columnForName(_)).map(col => DataType.asString(col.dataType)) - vsMsg(GetTableMetaResponse(msg.table, columnNames, dataTypes, table.getTableDef.keyField))(ctx) + val table = tableContainer.getTable(msg.table.table) //todo need to check module? what if modules with same table name + + if(table == null) + errorMsg(s"No such table found with name ${msg.table.table} in module ${msg.table.module}")(ctx) + else{ + val columnNames = table.getTableDef.columns.sortBy(_.index).map(_.name) + val dataTypes = columnNames.map(table.getTableDef.columnForName(_)).map(col => DataType.asString(col.dataType)) + vsMsg(GetTableMetaResponse(msg.table, columnNames, dataTypes, table.getTableDef.keyField))(ctx) + } } } diff --git a/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala b/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala index 61c1e7ceb..c9ac2df43 100644 --- a/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala +++ b/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala @@ -10,6 +10,7 @@ import org.finos.vuu.core.module.{ModuleContainer, RealizedViewServerModule, Sta import org.finos.vuu.core.table.{DataTable, TableContainer} import org.finos.vuu.feature.inmem.{VuuInMemPlugin, VuuInMemPluginType} import org.finos.vuu.net._ +import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, FlowControllerFactory, NoHeartbeatFlowController} import org.finos.vuu.net.http.{Http2Server, VuuHttp2Server} import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Serializer} import org.finos.vuu.net.rest.RestService @@ -37,6 +38,8 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer, final val authenticator: Authenticator = config.security.authenticator final val tokenValidator: LoginTokenValidator = config.security.loginTokenValidator + final val flowControllerFactory: FlowControllerFactory = FlowControllerFactory(config.clientConnection.hasHeartbeat) + final val sessionContainer = new ClientSessionContainerImpl() final val joinProvider: JoinTableProvider = JoinTableProviderImpl() @@ -55,7 +58,7 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer, final val serverApi = new CoreServerApiHandler(viewPortContainer, tableContainer, providerContainer) - final val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer) + final val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer, flowControllerFactory) //order of creation here is important final val server = new WebSocketServer(config.wsOptions, factory) diff --git a/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala b/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala index ce11a23e5..3b3986338 100644 --- a/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala +++ b/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala @@ -2,12 +2,11 @@ package org.finos.vuu.core import org.finos.vuu.core.module.ViewServerModule import org.finos.vuu.net.auth.AlwaysHappyAuthenticator +import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, NoHeartbeatFlowController} import org.finos.vuu.net.http.{VuuHttp2ServerOptions, VuuSecurityOptions} import org.finos.vuu.net.{AlwaysHappyLoginValidator, Authenticator, LoginTokenValidator} import org.finos.vuu.plugin.Plugin - - object VuuSecurityOptions{ def apply(): VuuSecurityOptions = { VuuSecurityOptionsImpl(new AlwaysHappyAuthenticator, new AlwaysHappyLoginValidator) @@ -26,6 +25,12 @@ object VuuThreadingOptions { } } +object VuuClientConnectionOptions { + def apply(): VuuClientConnectionOptions = { + VuuClientConnectionOptionsImpl(true) + } + +} trait VuuWebSocketOptions { def wsPort: Int def uri: String @@ -48,6 +53,12 @@ trait VuuThreadingOptions{ def treeThreads: Int } +trait VuuClientConnectionOptions { + def hasHeartbeat: Boolean + def withHeartbeat(): VuuClientConnectionOptions + def withHeartbeatDisabled(): VuuClientConnectionOptions +} + case class VuuSecurityOptionsImpl(authenticator: Authenticator, loginTokenValidator: LoginTokenValidator) extends VuuSecurityOptions{ override def withAuthenticator(authenticator: Authenticator): VuuSecurityOptions = this.copy(authenticator = authenticator) override def withLoginValidator(tokenValidator: LoginTokenValidator): VuuSecurityOptions = this.copy(loginTokenValidator = tokenValidator) @@ -75,8 +86,16 @@ case class VuuThreadingOptionsImpl(viewPortThreads: Int = 1, treeViewPortThreads override def treeThreads: Int = treeViewPortThreads } -case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(), wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(), security: VuuSecurityOptions = VuuSecurityOptions(), +case class VuuClientConnectionOptionsImpl(hasHeartbeat: Boolean) extends VuuClientConnectionOptions { + override def withHeartbeat(): VuuClientConnectionOptions = this.copy(true) + override def withHeartbeatDisabled(): VuuClientConnectionOptions = this.copy(false) +} + +case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(), + wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(), + security: VuuSecurityOptions = VuuSecurityOptions(), threading: VuuThreadingOptions = VuuThreadingOptions(), + clientConnection: VuuClientConnectionOptions = VuuClientConnectionOptions(), modules: List[ViewServerModule] = List(), plugins: List[Plugin] = List()) { def withModule(module: ViewServerModule): VuuServerConfig = { diff --git a/vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala b/vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala index 765612317..a9443e9e4 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala @@ -6,7 +6,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame import org.finos.toolbox.time.Clock import org.finos.vuu.client.messages.SessionId import org.finos.vuu.core.module.ModuleContainer -import org.finos.vuu.net.flowcontrol.DefaultFlowController +import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, FlowControllerFactory} import org.finos.vuu.net.json.Serializer import org.finos.vuu.util.{OutboundRowPublishQueue, PublishQueue} import org.finos.vuu.viewport.ViewPortUpdate @@ -21,7 +21,8 @@ class RequestProcessor(authenticator: Authenticator, clientSessionContainer: ClientSessionContainer, serverApi: ServerApi, serializer: Serializer[String, MessageBody], - moduleContainer: ModuleContainer + moduleContainer: ModuleContainer, + flowControllerFactory: FlowControllerFactory, )(implicit timeProvider: Clock) extends StrictLogging { @volatile private var session: ClientSessionId = null @@ -63,7 +64,7 @@ class RequestProcessor(authenticator: Authenticator, protected def createMessageHandler(channel: Channel, sessionId: ClientSessionId): MessageHandler = { val queue = new OutboundRowPublishQueue() - val flowController = new DefaultFlowController + val flowController = flowControllerFactory.create() new DefaultMessageHandler(channel, queue, sessionId, serverApi, serializer, flowController, clientSessionContainer, moduleContainer) } diff --git a/vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala b/vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala index 1ce5fe5b2..523d12dfc 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala @@ -50,7 +50,10 @@ class WebSocketViewServerClient(ws: WebSocketClient, serializer: Serializer[Stri } else { Try(serializer.deserialize(msg)) match { - case Success(vsMsg) => vsMsg + case Success(vsMsg) => { + logger.info(s"[Received] $vsMsg") + vsMsg + } case Failure(e) => logger.error(s"could not deserialize ${msg} going to return null", e) null diff --git a/vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala b/vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala index 8d69e1d45..5e97d44be 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala @@ -7,6 +7,7 @@ import org.finos.vuu.core.module.ModuleContainer import org.finos.vuu.net.json.Serializer import org.finos.toolbox.json.JsonUtil import org.finos.toolbox.time.Clock +import org.finos.vuu.net.flowcontrol.{FlowController, FlowControllerFactory} trait ViewServerHandlerFactory { def create(): ViewServerHandler @@ -15,9 +16,11 @@ trait ViewServerHandlerFactory { class ViewServerHandlerFactoryImpl(authenticator: Authenticator, tokenValidator: LoginTokenValidator, sessionContainer: ClientSessionContainer, serverApi: ServerApi, jsonVsSerializer: Serializer[String, MessageBody], - moduleContainer: ModuleContainer)(implicit val timeProvider: Clock) extends ViewServerHandlerFactory { + moduleContainer: ModuleContainer, + flowControllerFactory: FlowControllerFactory, + )(implicit val timeProvider: Clock) extends ViewServerHandlerFactory { override def create(): ViewServerHandler = { - val requestProcessor = new RequestProcessor(authenticator, tokenValidator, sessionContainer, serverApi, jsonVsSerializer, moduleContainer) + val requestProcessor = new RequestProcessor(authenticator, tokenValidator, sessionContainer, serverApi, jsonVsSerializer, moduleContainer, flowControllerFactory) new ViewServerHandler(jsonVsSerializer, requestProcessor) } } diff --git a/vuu/src/main/scala/org/finos/vuu/net/flowcontrol/FlowController.scala b/vuu/src/main/scala/org/finos/vuu/net/flowcontrol/FlowController.scala index 1835db3df..368dbfe7c 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/flowcontrol/FlowController.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/flowcontrol/FlowController.scala @@ -17,6 +17,15 @@ trait FlowController { def shouldSend(): FlowControlOp } +case class FlowControllerFactory(hasHeartbeat: Boolean)(implicit timeProvider: Clock){ + def create(): FlowController = { + if (hasHeartbeat) + new DefaultFlowController() + else + new NoHeartbeatFlowController + + } +} class DefaultFlowController(implicit timeProvider: Clock) extends FlowController { @volatile private var lastMsgTime: Long = -1 @@ -57,3 +66,13 @@ class DefaultFlowController(implicit timeProvider: Clock) extends FlowController } } + +class NoHeartbeatFlowController() extends FlowController { + override def process(msg: ViewServerMessage): Unit = { + //nothing to do here + } + + override def shouldSend(): FlowControlOp = { + BatchSize(300) + } +} diff --git a/vuu/src/main/scala/org/finos/vuu/net/ws/WebSocketClient.scala b/vuu/src/main/scala/org/finos/vuu/net/ws/WebSocketClient.scala index a3f08a9eb..845a49627 100644 --- a/vuu/src/main/scala/org/finos/vuu/net/ws/WebSocketClient.scala +++ b/vuu/src/main/scala/org/finos/vuu/net/ws/WebSocketClient.scala @@ -53,7 +53,7 @@ class WebSocketClient(url: String, port: Int)(implicit lifecycle: LifecycleConta // if (sslCtx != null) { // p.addLast(sslCtx.newHandler(ch.alloc, host, port)) // } - p.addLast("ssl-handler", sslCtx.newHandler(ch.alloc, "localhost", 8443)) + // p.addLast("ssl-handler", sslCtx.newHandler(ch.alloc, "localhost", 8443)) p.addLast(new HttpClientCodec, new HttpObjectAggregator(8192), WebSocketClientCompressionHandler.INSTANCE, handler) } }) diff --git a/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala b/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala new file mode 100644 index 000000000..1e052f2ae --- /dev/null +++ b/vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala @@ -0,0 +1,94 @@ +package org.finos.vuu.net + +import org.finos.vuu.client.messages.{RequestId, TokenId} +import org.scalatest.concurrent.TimeLimits.failAfter +import org.scalatest.time.Span +import org.scalatest.time.SpanSugar._ + +import java.util.concurrent.ConcurrentHashMap +import scala.language.postfixOps +import scala.reflect.ClassTag + +class TestVuuClient(vsClient: ViewServerClient) { + + type SessionId = String + type Token = String + + val timeout: Span = 30 seconds + + def send(sessionId: String, token: String, body: MessageBody): Unit = { + vsClient.send(createViewServerMessage(sessionId, token, body)) + } + + //todo fold this in to WebSocketViewServerClient? + //is intention that this can be used for non ws client? + def awaitForMsgWithBody[T <: AnyRef](implicit t: ClassTag[T]): Option[T] = + awaitForMsg.map(msg => msg.body.asInstanceOf[T]) + + + + def awaitForMsg[T <: AnyRef](implicit t: ClassTag[T]): Option[ViewServerMessage] = { + failAfter(timeout){ + val msg = vsClient.awaitMsg + if (msg != null) { //null indicate error or timeout + if (isExpectedBodyType(t, msg)) + Some(msg) + else + awaitForMsg + } + else + None + } + } + + val responsesMap: ConcurrentHashMap[String, ViewServerMessage] = new ConcurrentHashMap + + def awaitForResponse(requestId: String): Option[ViewServerMessage] = { + + lookupFromReceivedResponses(requestId) + .map(msg => return Some(msg)) + + val msg = vsClient.awaitMsg + if (msg != null) + if (msg.requestId == requestId) + Some(msg) + else { + responsesMap.put(requestId, msg) + awaitForResponse(requestId) + } + else + None + } + + def createAuthToken(): Token = TokenId.oneNew() + + def login(token: String, user: String): Option[String] = { + send("not used", "not used", LoginRequest(token, user)) + + //capture messages rather than dismissing, - how to cap size + // need to match on request id to ensure correct response? + awaitForMsg[LoginSuccess] + .map(x => x.sessionId) + //todo handle no response + //todo what to do if LoginFailure + // why does these response return token that was passed in the request? Does UI use this or match based on message request id? + } + + private def isExpectedBodyType[T <: AnyRef](t: ClassTag[T], msg: ViewServerMessage) = { + val expectedBodyType: Class[T] = t.runtimeClass.asInstanceOf[Class[T]] + expectedBodyType.isAssignableFrom(msg.body.getClass) + } + + private def lookupFromReceivedResponses(requestId: String): Option[ViewServerMessage] = { + Option(responsesMap.get(requestId)) + } + + private def createViewServerMessage(sessionId: String, token: String, body: MessageBody): ViewServerMessage = { + JsonViewServerMessage(RequestId.oneNew(), + sessionId, + token, + "testUser", + body, + "DoesntReallyMatter") + } +} diff --git a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala index 3d4e7989a..a217e83f8 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/WebSocketApiTest.scala @@ -1,35 +1,117 @@ package org.finos.vuu.net +import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl} +import org.finos.toolbox.lifecycle.LifecycleContainer +import org.finos.toolbox.time.{Clock, DefaultClock} +import org.finos.vuu.core._ +import org.finos.vuu.core.module.{TableDefContainer, TestModule} +import org.finos.vuu.net.auth.AlwaysHappyAuthenticator +import org.finos.vuu.net.http.VuuHttp2ServerOptions +import org.finos.vuu.net.json.JsonVsSerializer +import org.finos.vuu.net.ws.WebSocketClient +import org.finos.vuu.viewport.ViewPortTable import org.scalatest.featurespec.AnyFeatureSpec import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} -class WebSocketApiTest extends AnyFeatureSpec with Matchers { +class WebSocketApiTest extends AnyFeatureSpec with BeforeAndAfterAll with GivenWhenThen with Matchers { - def awaitMsg[TYPE](implicit client: ViewServerClient): Option[TYPE] = { - None + implicit val timeProvider: Clock = new DefaultClock + implicit val lifecycle: LifecycleContainer = new LifecycleContainer + var viewServerClient: ViewServerClient = _ + var vuuClient: TestVuuClient = _ + var tokenId: String = _ + var sessionId: String = _ + + override def beforeAll(): Unit = { + vuuClient = testStartUp() + + tokenId = vuuClient.createAuthToken() + val sessionOption = vuuClient.login(tokenId, "testUser") + assert(sessionOption.isDefined) + sessionId = sessionOption.get } - Feature("test server api shape"){ - - Scenario("test login and creation of view port"){ - -// implicit val serverApi: ServerApi = null -// implicit val client: Client = null -// -// client.send(serverApi.serialize(AuthenticateMessage("foo", "bar"))) -// -// val responseOption = awaitMsg[LoginResponse] -// -// val response = responseOption.get -// -// val vpId = UUID.randomUUID().toString -// -// client.send(serverApi.serialize(CreateViewPort(response.token, vpId, "orderPrices"))) -// -// val vpResonse = awaitMsg[ServerMessage] + override def afterAll(): Unit = { + lifecycle.stop() + } - } + def testStartUp(): TestVuuClient = { + + implicit val metrics: MetricsProvider = new MetricsProviderImpl + implicit val tableDefContainer: TableDefContainer = new TableDefContainer(Map()) + + lifecycle.autoShutdownHook() + + val http = 10011 + val ws = 10013 + val config = VuuServerConfig( + VuuHttp2ServerOptions() + .withWebRoot("vuu/src/main/resources/www") + .withSslDisabled() + .withDirectoryListings(true) + .withPort(http), + VuuWebSocketOptions() + .withBindAddress("0.0.0.0") + .withUri("websocket") + .withWsPort(ws) + .withWssDisabled(), + VuuSecurityOptions() + .withAuthenticator(new AlwaysHappyAuthenticator) + .withLoginValidator(new AlwaysHappyLoginValidator), + VuuThreadingOptions(), + VuuClientConnectionOptions() + .withHeartbeatDisabled() + ).withModule(TestModule()) + + val viewServer = new VuuServer(config) + + val client = new WebSocketClient(s"ws://localhost:$ws/websocket", ws) //todo review params - port specified twice + val viewServerClient: ViewServerClient = new WebSocketViewServerClient(client, JsonVsSerializer) + val vuuClient = new TestVuuClient(viewServerClient) + + //set up a dependency on ws server from ws client. + lifecycle(client).dependsOn(viewServer) + + //lifecycle registration is done in constructor of service classes, so sequence of create is important + lifecycle.start() + + vuuClient } + Feature("Server web socket api") { + Scenario("client requests to get table metadata for a table") { + + vuuClient.send(sessionId, tokenId, GetTableMetaRequest(ViewPortTable("instruments", "TEST"))) + + Then("return table data in response") + val response = vuuClient.awaitForMsgWithBody[GetTableMetaResponse] + assert(response.isDefined) + + val responseMessage = response.get + responseMessage.columns.length shouldEqual 5 + } + + Scenario("client requests to get table metadata for a non existent") { + + vuuClient.send(sessionId, tokenId, GetTableMetaRequest(ViewPortTable("DoesNotExist", "TEST"))) + + Then("return error response with helpful message") + val response = vuuClient.awaitForMsgWithBody[ErrorResponse] + assert(response.isDefined) + response.get.msg shouldEqual "No such table found with name DoesNotExist in module TEST" + } + + Scenario("client requests to get table metadata for null table name") { + + vuuClient.send(sessionId, tokenId, GetTableMetaRequest(ViewPortTable(null, "TEST"))) + + Then("return error response with helpful message") + val response = vuuClient.awaitForMsgWithBody[ErrorResponse] + assert(response.isDefined) + response.get.msg shouldEqual "No such table found with name null in module TEST. Table name and module should not be null" + } + } } + diff --git a/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala b/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala deleted file mode 100644 index 78b64d51b..000000000 --- a/vuu/src/test/scala/org/finos/vuu/net/ws/WebSocketServerClientTest.scala +++ /dev/null @@ -1,84 +0,0 @@ -package org.finos.vuu.net.ws - -import org.finos.vuu.client.ClientHelperFns.awaitMsgBody -import org.finos.vuu.core.{CoreServerApiHandler, VuuWebSocketOptions} -import org.finos.vuu.core.module.ModuleContainer -import org.finos.vuu.core.table.TableContainer -import org.finos.vuu.net._ -import org.finos.vuu.net.auth.AlwaysHappyAuthenticator -import org.finos.vuu.net.json.JsonVsSerializer -import org.finos.vuu.provider.{JoinTableProviderImpl, ProviderContainer} -import org.finos.vuu.viewport.ViewPortContainer -import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl} -import org.finos.toolbox.lifecycle.LifecycleContainer -import org.finos.toolbox.time.{Clock, DefaultClock} -import org.finos.vuu.feature.inmem.VuuInMemPlugin -import org.finos.vuu.plugin.DefaultPluginRegistry -import org.scalatest.featurespec.AnyFeatureSpec -import org.scalatest.matchers.should.Matchers - -class WebSocketServerClientTest extends AnyFeatureSpec with Matchers { - - Feature("Check that we can create a websocket server and client"){ - - ignore("create web socket server and client and send data between"){ - - implicit val timeProvider: Clock = new DefaultClock - implicit val lifecycle = new LifecycleContainer - implicit val metrics: MetricsProvider = new MetricsProviderImpl - - val serializer = JsonVsSerializer - val authenticator = new AlwaysHappyAuthenticator - val tokenValidator = new AlwaysHappyLoginValidator - - val sessionContainer = new ClientSessionContainerImpl() - - val joinProvider = JoinTableProviderImpl() - - val tableContainer = new TableContainer(joinProvider) - - val providerContainer = new ProviderContainer(joinProvider) - - val pluginRegistry = new DefaultPluginRegistry - pluginRegistry.registerPlugin(new VuuInMemPlugin) - - val viewPortContainer = new ViewPortContainer(tableContainer, providerContainer, pluginRegistry) - - val serverApi = new CoreServerApiHandler(viewPortContainer, tableContainer, providerContainer) - - val moduleContainer = new ModuleContainer - - val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer) - - val options = VuuWebSocketOptions.apply() - .withWsPort(18090) - .withBindAddress("0.0.0.0") - //.withWss() - - //order of creation here is important - val server = new WebSocketServer(options, factory) - - val client = new WebSocketClient("ws://localhost:8090/websocket", 18090) - implicit val vsClient = new WebSocketViewServerClient(client, JsonVsSerializer) - - //set up a dependency on ws server from ws client. - lifecycle(client).dependsOn(server) - - //lifecycle registration is done in constructor of service classes, so sequence of create is important - lifecycle.start() - - vsClient.send(JsonViewServerMessage("", "", "", "",AuthenticateRequest("chris", "chris"))) - - val authMsg = awaitMsgBody[AuthenticateSuccess].get - - authMsg.getClass should equal(classOf[AuthenticateSuccess]) - authMsg.token should not be ("") - - vsClient.send(JsonViewServerMessage("", "", authMsg.token, "chris", LoginRequest(authMsg.token, "chris"))) - - awaitMsgBody[LoginSuccess].get.token should equal(authMsg.token) - } - - } - -} diff --git a/vuu/src/test/scala/org/finos/vuu/test/impl/TestVuuServerImpl.scala b/vuu/src/test/scala/org/finos/vuu/test/impl/TestVuuServerImpl.scala index 59815fd94..3479f1df5 100644 --- a/vuu/src/test/scala/org/finos/vuu/test/impl/TestVuuServerImpl.scala +++ b/vuu/src/test/scala/org/finos/vuu/test/impl/TestVuuServerImpl.scala @@ -15,6 +15,7 @@ import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Ser import org.finos.vuu.net.rest.RestService import org.finos.vuu.net.rpc.{JsonSubTypeRegistry, RpcHandler} import org.finos.vuu.net._ +import org.finos.vuu.net.flowcontrol.FlowControllerFactory import org.finos.vuu.plugin.{DefaultPluginRegistry, Plugin} import org.finos.vuu.provider._ import org.finos.vuu.test.rpc.RpcDynamicProxy @@ -35,6 +36,7 @@ class TestVuuServerImpl(val modules: List[ViewServerModule])(implicit clock: Clo val authenticator = new AlwaysHappyAuthenticator val tokenValidator = new AlwaysHappyLoginValidator + val flowControllerFactory = FlowControllerFactory(hasHeartbeat = false) val joinProvider: JoinTableProvider = JoinTableProviderImpl() @@ -56,7 +58,7 @@ class TestVuuServerImpl(val modules: List[ViewServerModule])(implicit clock: Clo val serverApi = new CoreServerApiHandler(viewPortContainer, tableContainer, providerContainer) - val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer) + val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer, flowControllerFactory) val queue = new OutboundRowPublishQueue()