Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web socket api test for get table meta #1431

Merged
merged 11 commits into from
Jul 17, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ public static void main( String[] args )
VuuThreadingOptions.apply()
.withTreeThreads(4)
.withViewPortThreads(4),
new scala.collection.mutable.ListBuffer<ViewServerModule>().toList(),
new scala.collection.mutable.ListBuffer<Plugin>().toList()
VuuClientConnectionOptions.apply()
.withHeartbeat(),
new scala.collection.mutable.ListBuffer<ViewServerModule>().toList(),
new scala.collection.mutable.ListBuffer<Plugin>().toList()
).withModule(PriceModule.apply(clock, lifecycle, tableDefContainer))
.withModule(SimulationModule.apply(clock, lifecycle, tableDefContainer))
.withModule(MetricsModule.apply(clock, lifecycle, metrics, tableDefContainer))
Expand Down
4 changes: 3 additions & 1 deletion example/main/src/main/scala/org/finos/vuu/SimulMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ object SimulMain extends App with StrictLogging {
.withLoginValidator(new AlwaysHappyLoginValidator),
VuuThreadingOptions()
.withViewPortThreads(4)
.withTreeThreads(4)
.withTreeThreads(4),
VuuClientConnectionOptions()
.withHeartbeat()
).withModule(PriceModule())
.withModule(SimulationModule())
.withModule(MetricsModule())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object ClientHelperFns {

def auth(user: String, password: String)(implicit vsClient: ViewServerClient): String = {
vsClient.send(JsonViewServerMessage("", "", "", "", AuthenticateRequest(user, password)))
vsClient.awaitMsg.body.asInstanceOf[AuthenticateSuccess].token
awaitMsgBody[AuthenticateSuccess].get.token
}

def rpcCallAsync(sessionId: String, token: String, user: String, service: String, method: String, params: Array[Any], module: String)(implicit vsClient: ViewServerClient): Unit = {
Expand Down
17 changes: 11 additions & 6 deletions vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,18 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,
}

override def process(msg: GetTableMetaRequest)(ctx: RequestContext): Option[ViewServerMessage] = {
if (msg.table == null)
errorMsg(s"Table ${msg.table} not found in container")(ctx)
if (msg.table.table == null || msg.table.module == null)
errorMsg(s"No such table found with name ${msg.table.table} in module ${msg.table.module}. Table name and module should not be null")(ctx)
else {
val table = tableContainer.getTable(msg.table.table)
val columnNames = table.getTableDef.columns.sortBy(_.index).map(_.name)
val dataTypes = columnNames.map(table.getTableDef.columnForName(_)).map(col => DataType.asString(col.dataType))
vsMsg(GetTableMetaResponse(msg.table, columnNames, dataTypes, table.getTableDef.keyField))(ctx)
val table = tableContainer.getTable(msg.table.table) //todo need to check module? what if modules with same table name

if(table == null)
errorMsg(s"No such table found with name ${msg.table.table} in module ${msg.table.module}")(ctx)
else{
val columnNames = table.getTableDef.columns.sortBy(_.index).map(_.name)
val dataTypes = columnNames.map(table.getTableDef.columnForName(_)).map(col => DataType.asString(col.dataType))
vsMsg(GetTableMetaResponse(msg.table, columnNames, dataTypes, table.getTableDef.keyField))(ctx)
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.finos.vuu.core.module.{ModuleContainer, RealizedViewServerModule, Sta
import org.finos.vuu.core.table.{DataTable, TableContainer}
import org.finos.vuu.feature.inmem.{VuuInMemPlugin, VuuInMemPluginType}
import org.finos.vuu.net._
import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, FlowControllerFactory, NoHeartbeatFlowController}
import org.finos.vuu.net.http.{Http2Server, VuuHttp2Server}
import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Serializer}
import org.finos.vuu.net.rest.RestService
Expand Down Expand Up @@ -37,6 +38,8 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer,
final val authenticator: Authenticator = config.security.authenticator
final val tokenValidator: LoginTokenValidator = config.security.loginTokenValidator

final val flowControllerFactory: FlowControllerFactory = FlowControllerFactory(config.clientConnection.hasHeartbeat)

final val sessionContainer = new ClientSessionContainerImpl()

final val joinProvider: JoinTableProvider = JoinTableProviderImpl()
Expand All @@ -55,7 +58,7 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer,

final val serverApi = new CoreServerApiHandler(viewPortContainer, tableContainer, providerContainer)

final val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer)
final val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer, flowControllerFactory)

//order of creation here is important
final val server = new WebSocketServer(config.wsOptions, factory)
Expand Down
25 changes: 22 additions & 3 deletions vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package org.finos.vuu.core

import org.finos.vuu.core.module.ViewServerModule
import org.finos.vuu.net.auth.AlwaysHappyAuthenticator
import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, NoHeartbeatFlowController}
import org.finos.vuu.net.http.{VuuHttp2ServerOptions, VuuSecurityOptions}
import org.finos.vuu.net.{AlwaysHappyLoginValidator, Authenticator, LoginTokenValidator}
import org.finos.vuu.plugin.Plugin



object VuuSecurityOptions{
def apply(): VuuSecurityOptions = {
VuuSecurityOptionsImpl(new AlwaysHappyAuthenticator, new AlwaysHappyLoginValidator)
Expand All @@ -26,6 +25,12 @@ object VuuThreadingOptions {
}
}

object VuuClientConnectionOptions {
def apply(): VuuClientConnectionOptions = {
VuuClientConnectionOptionsImpl(true)
}

}
trait VuuWebSocketOptions {
def wsPort: Int
def uri: String
Expand All @@ -48,6 +53,12 @@ trait VuuThreadingOptions{
def treeThreads: Int
}

trait VuuClientConnectionOptions {
def hasHeartbeat: Boolean
def withHeartbeat(): VuuClientConnectionOptions
def withHeartbeatDisabled(): VuuClientConnectionOptions
}

case class VuuSecurityOptionsImpl(authenticator: Authenticator, loginTokenValidator: LoginTokenValidator) extends VuuSecurityOptions{
override def withAuthenticator(authenticator: Authenticator): VuuSecurityOptions = this.copy(authenticator = authenticator)
override def withLoginValidator(tokenValidator: LoginTokenValidator): VuuSecurityOptions = this.copy(loginTokenValidator = tokenValidator)
Expand Down Expand Up @@ -75,8 +86,16 @@ case class VuuThreadingOptionsImpl(viewPortThreads: Int = 1, treeViewPortThreads
override def treeThreads: Int = treeViewPortThreads
}

case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(), wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(), security: VuuSecurityOptions = VuuSecurityOptions(),
case class VuuClientConnectionOptionsImpl(hasHeartbeat: Boolean) extends VuuClientConnectionOptions {
override def withHeartbeat(): VuuClientConnectionOptions = this.copy(true)
override def withHeartbeatDisabled(): VuuClientConnectionOptions = this.copy(false)
}

case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(),
wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(),
security: VuuSecurityOptions = VuuSecurityOptions(),
threading: VuuThreadingOptions = VuuThreadingOptions(),
clientConnection: VuuClientConnectionOptions = VuuClientConnectionOptions(),
modules: List[ViewServerModule] = List(),
plugins: List[Plugin] = List()) {
def withModule(module: ViewServerModule): VuuServerConfig = {
Expand Down
7 changes: 4 additions & 3 deletions vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
import org.finos.toolbox.time.Clock
import org.finos.vuu.client.messages.SessionId
import org.finos.vuu.core.module.ModuleContainer
import org.finos.vuu.net.flowcontrol.DefaultFlowController
import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, FlowControllerFactory}
import org.finos.vuu.net.json.Serializer
import org.finos.vuu.util.{OutboundRowPublishQueue, PublishQueue}
import org.finos.vuu.viewport.ViewPortUpdate
Expand All @@ -21,7 +21,8 @@ class RequestProcessor(authenticator: Authenticator,
clientSessionContainer: ClientSessionContainer,
serverApi: ServerApi,
serializer: Serializer[String, MessageBody],
moduleContainer: ModuleContainer
moduleContainer: ModuleContainer,
flowControllerFactory: FlowControllerFactory,
)(implicit timeProvider: Clock) extends StrictLogging {

@volatile private var session: ClientSessionId = null
Expand Down Expand Up @@ -63,7 +64,7 @@ class RequestProcessor(authenticator: Authenticator,

protected def createMessageHandler(channel: Channel, sessionId: ClientSessionId): MessageHandler = {
val queue = new OutboundRowPublishQueue()
val flowController = new DefaultFlowController
val flowController = flowControllerFactory.create()
new DefaultMessageHandler(channel, queue, sessionId, serverApi, serializer, flowController, clientSessionContainer, moduleContainer)
}

Expand Down
5 changes: 4 additions & 1 deletion vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ class WebSocketViewServerClient(ws: WebSocketClient, serializer: Serializer[Stri
}
else {
Try(serializer.deserialize(msg)) match {
case Success(vsMsg) => vsMsg
case Success(vsMsg) => {
logger.info(s"[Received] $vsMsg")
vsMsg
}
case Failure(e) =>
logger.error(s"could not deserialize ${msg} going to return null", e)
null
Expand Down
7 changes: 5 additions & 2 deletions vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.finos.vuu.core.module.ModuleContainer
import org.finos.vuu.net.json.Serializer
import org.finos.toolbox.json.JsonUtil
import org.finos.toolbox.time.Clock
import org.finos.vuu.net.flowcontrol.{FlowController, FlowControllerFactory}

trait ViewServerHandlerFactory {
def create(): ViewServerHandler
Expand All @@ -15,9 +16,11 @@ trait ViewServerHandlerFactory {
class ViewServerHandlerFactoryImpl(authenticator: Authenticator,
tokenValidator: LoginTokenValidator, sessionContainer: ClientSessionContainer,
serverApi: ServerApi, jsonVsSerializer: Serializer[String, MessageBody],
moduleContainer: ModuleContainer)(implicit val timeProvider: Clock) extends ViewServerHandlerFactory {
moduleContainer: ModuleContainer,
flowControllerFactory: FlowControllerFactory,
)(implicit val timeProvider: Clock) extends ViewServerHandlerFactory {
override def create(): ViewServerHandler = {
val requestProcessor = new RequestProcessor(authenticator, tokenValidator, sessionContainer, serverApi, jsonVsSerializer, moduleContainer)
val requestProcessor = new RequestProcessor(authenticator, tokenValidator, sessionContainer, serverApi, jsonVsSerializer, moduleContainer, flowControllerFactory)
new ViewServerHandler(jsonVsSerializer, requestProcessor)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ trait FlowController {
def shouldSend(): FlowControlOp
}

case class FlowControllerFactory(hasHeartbeat: Boolean)(implicit timeProvider: Clock){
def create(): FlowController = {
if (hasHeartbeat)
new DefaultFlowController()
else
new NoHeartbeatFlowController

}
}
class DefaultFlowController(implicit timeProvider: Clock) extends FlowController {

@volatile private var lastMsgTime: Long = -1
Expand Down Expand Up @@ -57,3 +66,13 @@ class DefaultFlowController(implicit timeProvider: Clock) extends FlowController
}

}

class NoHeartbeatFlowController() extends FlowController {
override def process(msg: ViewServerMessage): Unit = {
//nothing to do here
}

override def shouldSend(): FlowControlOp = {
BatchSize(300)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class WebSocketClient(url: String, port: Int)(implicit lifecycle: LifecycleConta
// if (sslCtx != null) {
// p.addLast(sslCtx.newHandler(ch.alloc, host, port))
// }
p.addLast("ssl-handler", sslCtx.newHandler(ch.alloc, "localhost", 8443))
// p.addLast("ssl-handler", sslCtx.newHandler(ch.alloc, "localhost", 8443))
p.addLast(new HttpClientCodec, new HttpObjectAggregator(8192), WebSocketClientCompressionHandler.INSTANCE, handler)
}
})
Expand Down
94 changes: 94 additions & 0 deletions vuu/src/test/scala/org/finos/vuu/net/TestVuuClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.finos.vuu.net

import org.finos.vuu.client.messages.{RequestId, TokenId}
import org.scalatest.concurrent.TimeLimits.failAfter
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._

import java.util.concurrent.ConcurrentHashMap
import scala.language.postfixOps
import scala.reflect.ClassTag

class TestVuuClient(vsClient: ViewServerClient) {

type SessionId = String
type Token = String

val timeout: Span = 30 seconds

def send(sessionId: String, token: String, body: MessageBody): Unit = {
vsClient.send(createViewServerMessage(sessionId, token, body))
}

//todo fold this in to WebSocketViewServerClient?
//is intention that this can be used for non ws client?
def awaitForMsgWithBody[T <: AnyRef](implicit t: ClassTag[T]): Option[T] =
awaitForMsg.map(msg => msg.body.asInstanceOf[T])



def awaitForMsg[T <: AnyRef](implicit t: ClassTag[T]): Option[ViewServerMessage] = {
failAfter(timeout){
val msg = vsClient.awaitMsg
if (msg != null) { //null indicate error or timeout
if (isExpectedBodyType(t, msg))
Some(msg)
else
awaitForMsg
}
else
None
}
}

val responsesMap: ConcurrentHashMap[String, ViewServerMessage] = new ConcurrentHashMap

def awaitForResponse(requestId: String): Option[ViewServerMessage] = {

lookupFromReceivedResponses(requestId)
.map(msg => return Some(msg))

val msg = vsClient.awaitMsg
if (msg != null)
if (msg.requestId == requestId)
Some(msg)
else {
responsesMap.put(requestId, msg)
awaitForResponse(requestId)
}
else
None
}

def createAuthToken(): Token = TokenId.oneNew()

def login(token: String, user: String): Option[String] = {
send("not used", "not used", LoginRequest(token, user))

//capture messages rather than dismissing, - how to cap size
// need to match on request id to ensure correct response?
awaitForMsg[LoginSuccess]
.map(x => x.sessionId)
//todo handle no response
//todo what to do if LoginFailure
// why does these response return token that was passed in the request? Does UI use this or match based on message request id?
}

private def isExpectedBodyType[T <: AnyRef](t: ClassTag[T], msg: ViewServerMessage) = {
val expectedBodyType: Class[T] = t.runtimeClass.asInstanceOf[Class[T]]
expectedBodyType.isAssignableFrom(msg.body.getClass)
}

private def lookupFromReceivedResponses(requestId: String): Option[ViewServerMessage] = {
Option(responsesMap.get(requestId))
}

private def createViewServerMessage(sessionId: String, token: String, body: MessageBody): ViewServerMessage = {
JsonViewServerMessage(RequestId.oneNew(),
sessionId,
token,
"testUser",
body,
"DoesntReallyMatter")
}
}
Loading
Loading