diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index 7f317b1700..d59bb63b75 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -19,6 +19,7 @@ package main import ( "encoding/json" "fmt" + "mosn.io/layotto/pkg/grpc/default_api" "os" "strconv" "time" @@ -116,6 +117,7 @@ import ( "google.golang.org/grpc" _ "mosn.io/layotto/pkg/filter/network/tcpcopy" "mosn.io/layotto/pkg/runtime" + _ "mosn.io/layotto/pkg/wasm" "mosn.io/mosn/pkg/featuregate" _ "mosn.io/mosn/pkg/filter/network/grpc" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" @@ -153,6 +155,10 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp // 3. run server, err := rt.Run( runtime.WithGrpcOptions(opts...), + // register your grpc API here + runtime.WithGrpcAPI( + default_api.NewGrpcAPI, + ), // Hello runtime.WithHelloFactory( hello.NewHelloFactory("helloworld", helloworld.NewHelloWorld), diff --git a/cmd/layotto_multiple_api/client/main.go b/cmd/layotto_multiple_api/client/main.go new file mode 100644 index 0000000000..1182a71859 --- /dev/null +++ b/cmd/layotto_multiple_api/client/main.go @@ -0,0 +1,58 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package main implements a client for Greeter service. +package main + +import ( + "context" + "log" + "os" + "time" + + "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/helloworld/helloworld" +) + +const ( + address = "localhost:34904" + defaultName = "world" +) + +func main() { + // Set up a connection to the server. + conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer conn.Close() + c := pb.NewGreeterClient(conn) + + // Contact the server and print out its response. + name := defaultName + if len(os.Args) > 1 { + name = os.Args[1] + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name}) + if err != nil { + log.Fatalf("could not greet: %v", err) + } + log.Printf("Greeting: %s", r.GetMessage()) +} diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go new file mode 100644 index 0000000000..60a5c38f3d --- /dev/null +++ b/cmd/layotto_multiple_api/main.go @@ -0,0 +1,413 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "encoding/json" + "fmt" + "mosn.io/layotto/pkg/grpc/default_api" + helloworld_api "mosn.io/layotto/pkg/integrate/api/helloworld" + "os" + "strconv" + "time" + + mock_state "mosn.io/layotto/pkg/mock/components/state" + _ "mosn.io/layotto/pkg/wasm" + + "mosn.io/layotto/components/file/local" + + "mosn.io/layotto/components/file/s3/alicloud" + "mosn.io/layotto/components/file/s3/aws" + "mosn.io/layotto/components/file/s3/minio" + + dbindings "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/bindings/http" + "mosn.io/layotto/components/configstores/etcdv3" + "mosn.io/layotto/components/file" + "mosn.io/layotto/components/sequencer" + "mosn.io/layotto/pkg/runtime/bindings" + runtime_sequencer "mosn.io/layotto/pkg/runtime/sequencer" + "mosn.io/pkg/log" + + // Hello + "mosn.io/layotto/components/hello" + "mosn.io/layotto/components/hello/helloworld" + + // Configuration + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/configstores/apollo" + + // Pub/Sub + dapr_comp_pubsub "github.com/dapr/components-contrib/pubsub" + pubsub_snssqs "github.com/dapr/components-contrib/pubsub/aws/snssqs" + pubsub_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs" + "github.com/dapr/components-contrib/pubsub/azure/servicebus" + pubsub_gcp "github.com/dapr/components-contrib/pubsub/gcp/pubsub" + pubsub_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast" + pubsub_inmemory "github.com/dapr/components-contrib/pubsub/in-memory" + pubsub_kafka "github.com/dapr/components-contrib/pubsub/kafka" + pubsub_mqtt "github.com/dapr/components-contrib/pubsub/mqtt" + "github.com/dapr/components-contrib/pubsub/natsstreaming" + pubsub_pulsar "github.com/dapr/components-contrib/pubsub/pulsar" + "github.com/dapr/components-contrib/pubsub/rabbitmq" + pubsub_redis "github.com/dapr/components-contrib/pubsub/redis" + "github.com/dapr/kit/logger" + "mosn.io/layotto/pkg/runtime/pubsub" + + // RPC + "mosn.io/layotto/components/rpc" + mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" + + // State Stores + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/aerospike" + state_dynamodb "github.com/dapr/components-contrib/state/aws/dynamodb" + state_azure_blobstorage "github.com/dapr/components-contrib/state/azure/blobstorage" + state_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb" + state_azure_tablestorage "github.com/dapr/components-contrib/state/azure/tablestorage" + "github.com/dapr/components-contrib/state/cassandra" + "github.com/dapr/components-contrib/state/cloudstate" + "github.com/dapr/components-contrib/state/couchbase" + "github.com/dapr/components-contrib/state/gcp/firestore" + "github.com/dapr/components-contrib/state/hashicorp/consul" + "github.com/dapr/components-contrib/state/hazelcast" + "github.com/dapr/components-contrib/state/memcached" + "github.com/dapr/components-contrib/state/mongodb" + state_mysql "github.com/dapr/components-contrib/state/mysql" + "github.com/dapr/components-contrib/state/postgresql" + state_redis "github.com/dapr/components-contrib/state/redis" + "github.com/dapr/components-contrib/state/rethinkdb" + "github.com/dapr/components-contrib/state/sqlserver" + "github.com/dapr/components-contrib/state/zookeeper" + runtime_state "mosn.io/layotto/pkg/runtime/state" + + // Lock + "mosn.io/layotto/components/lock" + lock_consul "mosn.io/layotto/components/lock/consul" + lock_etcd "mosn.io/layotto/components/lock/etcd" + lock_redis "mosn.io/layotto/components/lock/redis" + lock_zookeeper "mosn.io/layotto/components/lock/zookeeper" + runtime_lock "mosn.io/layotto/pkg/runtime/lock" + + // Sequencer + sequencer_etcd "mosn.io/layotto/components/sequencer/etcd" + sequencer_redis "mosn.io/layotto/components/sequencer/redis" + sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper" + + // Actuator + _ "mosn.io/layotto/pkg/actuator" + "mosn.io/layotto/pkg/actuator/health" + actuatorInfo "mosn.io/layotto/pkg/actuator/info" + _ "mosn.io/layotto/pkg/filter/stream/actuator/http" + "mosn.io/layotto/pkg/integrate/actuator" + + "github.com/urfave/cli" + "google.golang.org/grpc" + _ "mosn.io/layotto/pkg/filter/network/tcpcopy" + "mosn.io/layotto/pkg/runtime" + "mosn.io/mosn/pkg/featuregate" + _ "mosn.io/mosn/pkg/filter/network/grpc" + mgrpc "mosn.io/mosn/pkg/filter/network/grpc" + _ "mosn.io/mosn/pkg/filter/network/proxy" + _ "mosn.io/mosn/pkg/filter/stream/flowcontrol" + _ "mosn.io/mosn/pkg/metrics/sink" + _ "mosn.io/mosn/pkg/metrics/sink/prometheus" + "mosn.io/mosn/pkg/mosn" + _ "mosn.io/mosn/pkg/network" + _ "mosn.io/mosn/pkg/stream/http" + _ "mosn.io/mosn/pkg/wasm/runtime/wasmer" + _ "mosn.io/pkg/buffer" +) + +// loggerForDaprComp is constructed for reusing dapr's components. +var loggerForDaprComp = logger.NewLogger("reuse.dapr.component") + +func init() { + mgrpc.RegisterServerHandler("runtime", NewRuntimeGrpcServer) + // Register default actuator implementations + actuatorInfo.AddInfoContributor("app", actuator.GetAppContributor()) + health.AddReadinessIndicator("runtime_startup", actuator.GetRuntimeReadinessIndicator()) + health.AddLivenessIndicator("runtime_startup", actuator.GetRuntimeLivenessIndicator()) +} + +func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrpc.RegisteredServer, error) { + // 1. parse config + cfg, err := runtime.ParseRuntimeConfig(data) + if err != nil { + actuator.GetRuntimeReadinessIndicator().SetUnhealthy(fmt.Sprintf("parse config error.%v", err)) + return nil, err + } + // 2. new instance + rt := runtime.NewMosnRuntime(cfg) + // 3. run + server, err := rt.Run( + runtime.WithGrpcOptions(opts...), + // register your grpc API here + runtime.WithGrpcAPI( + // default grpc API + default_api.NewGrpcAPI, + // a demo to show how to register your own API + helloworld_api.NewHelloWorldAPI, + ), + // Hello + runtime.WithHelloFactory( + hello.NewHelloFactory("helloworld", helloworld.NewHelloWorld), + ), + // Configuration + runtime.WithConfigStoresFactory( + configstores.NewStoreFactory("apollo", apollo.NewStore), + configstores.NewStoreFactory("etcd", etcdv3.NewStore), + ), + + // RPC + runtime.WithRpcFactory( + rpc.NewRpcFactory("mosn", mosninvoker.NewMosnInvoker), + ), + + // File + runtime.WithFileFactory( + file.NewFileFactory("aliOSS", alicloud.NewAliCloudOSS), + file.NewFileFactory("minioOSS", minio.NewMinioOss), + file.NewFileFactory("awsOSS", aws.NewAwsOss), + file.NewFileFactory("local", local.NewLocalStore), + ), + + // PubSub + runtime.WithPubSubFactory( + pubsub.NewFactory("redis", func() dapr_comp_pubsub.PubSub { + return pubsub_redis.NewRedisStreams(loggerForDaprComp) + }), + pubsub.NewFactory("natsstreaming", func() dapr_comp_pubsub.PubSub { + return natsstreaming.NewNATSStreamingPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("azure.eventhubs", func() dapr_comp_pubsub.PubSub { + return pubsub_eventhubs.NewAzureEventHubs(loggerForDaprComp) + }), + pubsub.NewFactory("azure.servicebus", func() dapr_comp_pubsub.PubSub { + return servicebus.NewAzureServiceBus(loggerForDaprComp) + }), + pubsub.NewFactory("rabbitmq", func() dapr_comp_pubsub.PubSub { + return rabbitmq.NewRabbitMQ(loggerForDaprComp) + }), + pubsub.NewFactory("hazelcast", func() dapr_comp_pubsub.PubSub { + return pubsub_hazelcast.NewHazelcastPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("gcp.pubsub", func() dapr_comp_pubsub.PubSub { + return pubsub_gcp.NewGCPPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("kafka", func() dapr_comp_pubsub.PubSub { + return pubsub_kafka.NewKafka(loggerForDaprComp) + }), + pubsub.NewFactory("snssqs", func() dapr_comp_pubsub.PubSub { + return pubsub_snssqs.NewSnsSqs(loggerForDaprComp) + }), + pubsub.NewFactory("mqtt", func() dapr_comp_pubsub.PubSub { + return pubsub_mqtt.NewMQTTPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("pulsar", func() dapr_comp_pubsub.PubSub { + return pubsub_pulsar.NewPulsar(loggerForDaprComp) + }), + pubsub.NewFactory("in-memory", func() dapr_comp_pubsub.PubSub { + return pubsub_inmemory.New(loggerForDaprComp) + }), + ), + // State + runtime.WithStateFactory( + runtime_state.NewFactory("in-memory", func() state.Store { + return mock_state.NewInMemoryStateStore() + }), + runtime_state.NewFactory("redis", func() state.Store { + return state_redis.NewRedisStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("consul", func() state.Store { + return consul.NewConsulStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("azure.blobstorage", func() state.Store { + return state_azure_blobstorage.NewAzureBlobStorageStore(loggerForDaprComp) + }), + runtime_state.NewFactory("azure.cosmosdb", func() state.Store { + return state_cosmosdb.NewCosmosDBStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("azure.tablestorage", func() state.Store { + return state_azure_tablestorage.NewAzureTablesStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("cassandra", func() state.Store { + return cassandra.NewCassandraStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("memcached", func() state.Store { + return memcached.NewMemCacheStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("mongodb", func() state.Store { + return mongodb.NewMongoDB(loggerForDaprComp) + }), + runtime_state.NewFactory("zookeeper", func() state.Store { + return zookeeper.NewZookeeperStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("gcp.firestore", func() state.Store { + return firestore.NewFirestoreStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("postgresql", func() state.Store { + return postgresql.NewPostgreSQLStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("sqlserver", func() state.Store { + return sqlserver.NewSQLServerStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("hazelcast", func() state.Store { + return hazelcast.NewHazelcastStore(loggerForDaprComp) + }), + runtime_state.NewFactory("cloudstate.crdt", func() state.Store { + return cloudstate.NewCRDT(loggerForDaprComp) + }), + runtime_state.NewFactory("couchbase", func() state.Store { + return couchbase.NewCouchbaseStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("aerospike", func() state.Store { + return aerospike.NewAerospikeStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("rethinkdb", func() state.Store { + return rethinkdb.NewRethinkDBStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("aws.dynamodb", state_dynamodb.NewDynamoDBStateStore), + runtime_state.NewFactory("mysql", func() state.Store { + return state_mysql.NewMySQLStateStore(loggerForDaprComp) + }), + ), + // Lock + runtime.WithLockFactory( + runtime_lock.NewFactory("redis_cluster", func() lock.LockStore { + return lock_redis.NewClusterRedisLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("redis", func() lock.LockStore { + return lock_redis.NewStandaloneRedisLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("zookeeper", func() lock.LockStore { + return lock_zookeeper.NewZookeeperLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("etcd", func() lock.LockStore { + return lock_etcd.NewEtcdLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("consul", func() lock.LockStore { + return lock_consul.NewConsulLock(log.DefaultLogger) + }), + ), + + // bindings + runtime.WithOutputBindings( + bindings.NewOutputBindingFactory("http", func() dbindings.OutputBinding { + return http.NewHTTP(loggerForDaprComp) + }), + ), + + // Sequencer + runtime.WithSequencerFactory( + runtime_sequencer.NewFactory("etcd", func() sequencer.Store { + return sequencer_etcd.NewEtcdSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("redis", func() sequencer.Store { + return sequencer_redis.NewStandaloneRedisSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store { + return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger) + }), + )) + // 4. check if unhealthy + if err != nil { + actuator.GetRuntimeReadinessIndicator().SetUnhealthy(err.Error()) + actuator.GetRuntimeLivenessIndicator().SetUnhealthy(err.Error()) + } + return server, err +} + +var cmdStart = cli.Command{ + Name: "start", + Usage: "start runtime", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config, c", + Usage: "Load configuration from `FILE`", + EnvVar: "RUNTIME_CONFIG", + Value: "configs/config.json", + }, cli.StringFlag{ + Name: "feature-gates, f", + Usage: "config feature gates", + EnvVar: "FEATURE_GATES", + }, + }, + Action: func(c *cli.Context) error { + stm := mosn.NewStageManager(c, c.String("config")) + + stm.AppendParamsParsedStage(func(c *cli.Context) { + err := featuregate.Set(c.String("feature-gates")) + if err != nil { + os.Exit(1) + } + }) + + stm.AppendInitStage(mosn.DefaultInitStage) + + stm.AppendPreStartStage(mosn.DefaultPreStartStage) // called finally stage by default + + stm.AppendStartStage(mosn.DefaultStartStage) + + stm.Run() + + actuator.GetRuntimeReadinessIndicator().SetStarted() + actuator.GetRuntimeLivenessIndicator().SetStarted() + // wait mosn finished + stm.WaitFinish() + return nil + }, +} + +func main() { + app := newRuntimeApp(&cmdStart) + registerAppInfo(app) + _ = app.Run(os.Args) +} + +func registerAppInfo(app *cli.App) { + appInfo := actuator.NewAppInfo() + appInfo.Name = app.Name + appInfo.Version = app.Version + appInfo.Compiled = app.Compiled + actuator.SetAppInfoSingleton(appInfo) +} + +func newRuntimeApp(startCmd *cli.Command) *cli.App { + app := cli.NewApp() + app.Name = "Layotto" + app.Version = "0.1.0" + app.Compiled = time.Now() + app.Copyright = "(c) " + strconv.Itoa(time.Now().Year()) + " Ant Group" + app.Usage = "A fast and efficient cloud native application runtime based on MOSN." + app.Flags = cmdStart.Flags + + // commands + app.Commands = []cli.Command{ + cmdStart, + } + // action + app.Action = func(c *cli.Context) error { + if c.NumFlags() == 0 { + return cli.ShowAppHelp(c) + } + + return startCmd.Action.(func(c *cli.Context) error)(c) + } + + return app +} diff --git a/docs/_sidebar.md b/docs/_sidebar.md index a97ad799c0..07f27fad69 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -10,6 +10,7 @@ - Service Invocation - [Hello World](en/start/rpc/helloworld.md) - [Dubbo JSON RPC](en/start/rpc/dubbo_json_rpc.md) + - [API plugin: register your own API](en/start/api_plugin/helloworld.md) - Integrate with istio - [As a data plane in istio](en/start/istio/start.md) - Traffic intervention on the 4th layer network @@ -18,7 +19,7 @@ - [Method Level Flow Control](en/start/stream_filter/flow_control.md) - [Trace management](en/start/trace/trace.md) - [Health check and metadata query](en/start/actuator/start.md) - - [Multilingual programming based on WASM](en/start/wasm/start.md) + - [Run business logic in Layotto using WASM](en/start/wasm/start.md) - [FaaS model based on WASM and Runtime](en/start/faas/start.md) - Developer guide - Building blocks diff --git a/docs/en/start/api_plugin/helloworld.md b/docs/en/start/api_plugin/helloworld.md new file mode 100644 index 0000000000..560887503a --- /dev/null +++ b/docs/en/start/api_plugin/helloworld.md @@ -0,0 +1,49 @@ +# API plugin: register your own API +This is a demo to show you how to register your own API. + +Layotto has the api-plugin feature to let you add your own API based on your need. + +## step 0. change directory +```shell +cd ${projectpath}/cmd/layotto_multiple_api +``` + +## step 1. start Layotto with a new helloworld API +Build and run Layotto : + +```shell +go build -o layotto +# run it +./layotto start -c ../../configs/config_in_memory.json +``` + +Q: What happened? + +Check the code in `main.go` and you will find a new API was registered during startup: + +```go + // register your grpc API here + runtime.WithGrpcAPI( + // default grpc API + default_api.NewGrpcAPI, + // a demo to show how to register your own API + helloworld_api.NewHelloWorldAPI, + ), +``` + +## step 2. invoke the helloworld API +```shell +go run client/main.go +``` +The result will be: + +```shell +Greeting: Hello world +``` + +This message is the response of the helloworld API you just registered in step 1. + +## Next +You can refer to the demo code to implement your own API. Have a try ! + +For more details,you can refer to the [design doc](zh/design/api_plugin/design.md) \ No newline at end of file diff --git a/docs/img/api_plugin/img.png b/docs/img/api_plugin/img.png new file mode 100644 index 0000000000..20291b3107 Binary files /dev/null and b/docs/img/api_plugin/img.png differ diff --git a/docs/zh/_sidebar.md b/docs/zh/_sidebar.md index be9eefeb61..007dbb349f 100644 --- a/docs/zh/_sidebar.md +++ b/docs/zh/_sidebar.md @@ -10,6 +10,9 @@ - 进行RPC调用 - [Hello World](zh/start/rpc/helloworld.md) - [Dubbo JSON RPC](zh/start/rpc/dubbo_json_rpc.md) + - 使用File API + - [基于阿里云OSS](zh/start/file/start.md) + - [API插件:注册您自己的API](zh/start/api_plugin/helloworld.md) - 集成 Istio - [作为 Istio 的数据面](zh/start/istio/start.md) - 在四层网络进行流量干预 @@ -18,7 +21,6 @@ - [方法级别限流](zh/start/stream_filter/flow_control.md) - [健康检查、查询运行时元数据](zh/start/actuator/start.md) - [trace管理](zh/start/trace/trace.md) - - [OSS访问文件](zh/start/file/start.md) - [将业务逻辑通过 WASM 下沉进sidecar](zh/start/wasm/start.md) - [基于 WASM 跟 Runtime 实现的 Faas 模型](zh/start/faas/start.md) - 用户手册 @@ -70,6 +72,7 @@ - [Sequencer API设计文档](zh/design/sequencer/design.md) - [file API设计文档](zh/design/file/file-design.md) - [FaaS 设计文档](zh/design/faas/faas-poc-design.md) + - [API插件](zh/design/api_plugin/design.md) - 贡献指南 - [新手攻略:从零开始成为 Layotto 贡献者](zh/development/start-from-zero.md) - [文档贡献指南](zh/development/contributing-doc.md) diff --git a/docs/zh/design/api_plugin/design.md b/docs/zh/design/api_plugin/design.md new file mode 100644 index 0000000000..7d6c11c1ac --- /dev/null +++ b/docs/zh/design/api_plugin/design.md @@ -0,0 +1,120 @@ +# Hierarchical API design +## 1. 需求分析 +### 1.1. 解决什么问题 +解决扩展性问题。不管是Dapr还是开源Layotto的API,目前都无法完全满足生产需求。 + +回看操作系统领域POSIX API和system call的发展历史,我们可以学到很多,借此预测Runtime的未来。我们可以说,Runtime API将来也不可能完全满足用户需求。想想OS领域,即使有POSIX API了,一些场景还是需要绕开标准API、用特殊指令操作特殊硬件。 + + +Dapr的扩展性是通过Binding API解决,但是这种非结构化的API有很多问题(比如破坏可移植性、不支持stream等语义) + +### 1.2. 用户场景和需求 +举例来说,有以下用户场景: +1. 公司有自己的定制API需求,因为是非通用需求、不适合做到开源Layotto/Dapr上,于是公司的中间件团队想自己开发到sidecar里。如果公司的项目import 开源Layotto或者Dapr,按目前的架构是没法扩展开发API的,只能Fork出来做扩展 + +![image](https://user-images.githubusercontent.com/26001097/131614836-60d797c8-b80b-4018-ad43-c2b874d35660.png) + +这种情况下的用户需求: +- sdk下沉; +- 支持多语言; +- 多云部署(只不过需要中间件团队自己为多云环境开发组件,没有社区现成的组件拿来用了) + +2. 公司有新API需求,适合做到开源项目里,于是提需求给社区,但是社区很难达成共识、争了几个月还没落地(例如https://github.com/dapr/dapr/issues/2988 )。这种情况公司可能有业务压力,没法等那么久,希望自己先实现、落地,等社区实现新功能后再迁移过来。 + +这种情况下的用户需求: +- 用户对该功能自主可控,不需要(同时用中文和英文)说服社区、说服世界才能做这个功能 +- 快速扩展、服务业务 + +3. 用户想给Dapr API加字段,先在自己的Fork版本里添加了字段、满足线上需求,然后将PR提给社区。社区拒绝添加该字段,PR被关闭。用户很尴尬:这字段已经在线上使用了,怎么处理? + +## 2. High level design +### 2.1. Hierarchical API +参考OS领域当年是怎么定API的,我们可以把Runtime API设计成多层: + + +![img.png](../../../img/api_plugin/img.png) + +分别对应OS领域的: +- POSIX API +- 各种Unix-like系统自己的System Call (有可移植性,通过不同的硬件驱动实现相同的接口) +- 特殊硬件提供的特殊功能 (没有可移植性) + +基于这种思想,我们可以设计API插件,支持用户扩展自己的私有API + +![image](https://user-images.githubusercontent.com/26001097/131614802-c6f6a556-4e8b-4fee-b899-275a80e00eb6.png) + + +### 2.2. 设计目标 +1. 让有定制开发需求的开源用户通过import Layotto的方式使用Layotto,而不是Fork + +2. 开发api plugin足够简单 + +3. 配置文件公用同一个json,新增api plugin无需新增配置文件 + +### 2.3. 用户扩展开发时的编程界面 + +![image](https://user-images.githubusercontent.com/26001097/131614952-ccfc7d11-d376-41b0-b16c-2f17bfd2c9fc.png) + +#### step 1. 实现自己的私有API + +如果需要自己的私有API,用户需要实现GrpcAPI interface,以及相应的构造函数。 + +这个GrpcAPI就是您自己的API,它需要实现一些生命周期管理方法。目前只定义了Init和Register。 + +```go +// GrpcAPI is the interface of API plugin. It has lifecycle related methods +type GrpcAPI interface { + // init this API before binding it to the grpc server. For example,you can call app to query their subscriptions. + Init(conn *grpc.ClientConn) error + // Bind this API to the grpc server + Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) mgrpc.RegisteredServer +} + +// NewGrpcAPI is the constructor of GrpcAPI +type NewGrpcAPI func( + appId string, + hellos map[string]hello.HelloService, + configStores map[string]configstores.Store, + rpcs map[string]rpc.Invoker, + pubSubs map[string]pubsub.PubSub, + stateStores map[string]state.Store, + files map[string]file.File, + lockStores map[string]lock.LockStore, + sequencers map[string]sequencer.Store, + sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), +) GrpcAPI + +``` + +#### step 2. 将自己的API注册进Layotto + +用户可以把Layotto的main复制粘贴出来,按需修改,去掉用不到的东西(比如用不到etcd的分布式锁组件,可以在自己的main里删掉它) + +如果用户写了自己的API,可以在main里将它注册进Layotto + +```go + +func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrpc.RegisteredServer, error) { + // ........... + + // 3. run + server, err := rt.Run( + runtime.WithGrpcOptions(opts...), + // register your grpc API here + runtime.WithGrpcAPI( + // default grpc API + default_api.NewGrpcAPI, + // a demo to show how to register your own API + helloworld_api.NewHelloWorldAPI, + ), + // Hello + runtime.WithHelloFactory( + hello.NewHelloFactory("helloworld", helloworld.NewHelloWorld), + ), + // ........... +``` + +#### step 3. 编译运行Layotto +Layotto启动过程中,会回调每个注册进来的API的生命周期方法(Init,Register) + +启动完成后,您的API就会对外提供 grpc 服务 \ No newline at end of file diff --git a/docs/zh/start/api_plugin/helloworld.md b/docs/zh/start/api_plugin/helloworld.md new file mode 100644 index 0000000000..b71ebc4fb6 --- /dev/null +++ b/docs/zh/start/api_plugin/helloworld.md @@ -0,0 +1,50 @@ +# API plugin: register your own API +This is a demo to show you how to register your own API. + +Layotto has the api-plugin feature to let you add your own API based on your need. + +## step 0. change directory +```shell +cd ${projectpath}/cmd/layotto_multiple_api +``` + +## step 1. start Layotto with a new helloworld API +Build and run Layotto : + +```shell +go build -o layotto +# run it +./layotto start -c ../../configs/config_in_memory.json +``` + +Q: What happened? + +Check the code in `main.go` and you will find a new API was registered during startup: + +```go + // register your grpc API here + runtime.WithGrpcAPI( + // default grpc API + default_api.NewGrpcAPI, + // a demo to show how to register your own API + helloworld_api.NewHelloWorldAPI, + ), +``` + +## step 2. invoke the helloworld API +```shell +go run client/main.go +``` +The result will be: + +```shell +Greeting: Hello world +``` + +This message is the response of the helloworld API you just registered in step 1. + +## Next +You can refer to the demo code to implement your own API. +Have a try ! + +For more details,you can refer to the [design doc](zh/design/api_plugin/design.md) \ No newline at end of file diff --git a/go.mod b/go.mod index 45e93eb9c3..ec1239f224 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/grpc v1.39.0 + google.golang.org/grpc/examples v0.0.0-20210818220435-8ab16ef276a3 google.golang.org/protobuf v1.27.1 mosn.io/api v0.0.0-20210714065837-5b4c2d66e70c mosn.io/layotto/components v0.0.0-20211020084508-6f5ee3cfeba0 diff --git a/pkg/grpc/api.go b/pkg/grpc/default_api/api.go similarity index 96% rename from pkg/grpc/api.go rename to pkg/grpc/default_api/api.go index c4f77f38fe..8512a4b069 100644 --- a/pkg/grpc/api.go +++ b/pkg/grpc/default_api/api.go @@ -14,13 +14,15 @@ * limitations under the License. */ -package grpc +package default_api import ( "context" "errors" "fmt" "io" + grpc_api "mosn.io/layotto/pkg/grpc" + mgrpc "mosn.io/mosn/pkg/filter/network/grpc" "strings" "sync" @@ -66,6 +68,10 @@ import ( "mosn.io/pkg/log" ) +const ( + Metadata_key_pubsubName = "pubsubName" +) + var ( ErrNoInstance = errors.New("no instance found") bytesPool = sync.Pool{ @@ -74,6 +80,9 @@ var ( return new([]byte) }, } + // FIXME I put it here for compatibility.Don't write singleton like this ! + // It should be refactored and deleted. + LayottoAPISingleton API ) type API interface { @@ -114,6 +123,8 @@ type API interface { GetNextId(context.Context, *runtimev1pb.GetNextIdRequest) (*runtimev1pb.GetNextIdResponse, error) // InvokeBinding Binding API InvokeBinding(context.Context, *runtimev1pb.InvokeBindingRequest) (*runtimev1pb.InvokeBindingResponse, error) + // GrpcAPI related + grpc_api.GrpcAPI } // api is a default implementation for MosnRuntimeServer. @@ -129,6 +140,38 @@ type api struct { lockStores map[string]lock.LockStore sequencers map[string]sequencer.Store sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) + // app callback + AppCallbackConn *grpc.ClientConn + topicPerComponent map[string]TopicSubscriptions + // json + json jsoniter.API +} + +func (a *api) Init(conn *grpc.ClientConn) error { + // 1. set connection + a.AppCallbackConn = conn + return a.startSubscribing() +} + +func (a *api) Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) mgrpc.RegisteredServer { + LayottoAPISingleton = a + runtimev1pb.RegisterRuntimeServer(s, a) + return registeredServer +} + +func NewGrpcAPI( + appId string, + hellos map[string]hello.HelloService, + configStores map[string]configstores.Store, + rpcs map[string]rpc.Invoker, + pubSubs map[string]pubsub.PubSub, + stateStores map[string]state.Store, + files map[string]file.File, + lockStores map[string]lock.LockStore, + sequencers map[string]sequencer.Store, + sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), +) grpc_api.GrpcAPI { + return NewAPI(appId, hellos, configStores, rpcs, pubSubs, stateStores, files, lockStores, sequencers, sendToOutputBindingFn) } func NewAPI( @@ -163,6 +206,7 @@ func NewAPI( lockStores: lockStores, sequencers: sequencers, sendToOutputBindingFn: sendToOutputBindingFn, + json: jsoniter.ConfigFastest, } } diff --git a/pkg/grpc/default_api/api_callback.go b/pkg/grpc/default_api/api_callback.go new file mode 100644 index 0000000000..254a4d46e7 --- /dev/null +++ b/pkg/grpc/default_api/api_callback.go @@ -0,0 +1,220 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package default_api + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "github.com/dapr/components-contrib/contenttype" + "github.com/dapr/components-contrib/pubsub" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + runtime_pubsub "mosn.io/layotto/pkg/runtime/pubsub" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" + _ "net/http/pprof" + + "mosn.io/pkg/log" +) + +func (a *api) startSubscribing() error { + // 1. check if there is no need to do it + if len(a.pubSubs) == 0 { + return nil + } + // 2. list topics + topicRoutes, err := a.getInterestedTopics() + if err != nil { + return err + } + // return if no need to dosubscription + if len(topicRoutes) == 0 { + return nil + } + // 3. loop subscribe + for name, pubsub := range a.pubSubs { + if err := a.beginPubSub(name, pubsub, topicRoutes); err != nil { + return err + } + } + return nil +} + +func (a *api) beginPubSub(pubsubName string, ps pubsub.PubSub, topicRoutes map[string]TopicSubscriptions) error { + // 1. call app to find topic topic2Details. + v, ok := topicRoutes[pubsubName] + if !ok { + return nil + } + // 2. loop subscribing every + for topic, route := range v.topic2Details { + // TODO limit topic scope + log.DefaultLogger.Debugf("[runtime][beginPubSub]subscribing to topic=%s on pubsub=%s", topic, pubsubName) + // ask component to subscribe + if err := ps.Subscribe(pubsub.SubscribeRequest{ + Topic: topic, + Metadata: route.metadata, + }, func(ctx context.Context, msg *pubsub.NewMessage) error { + if msg.Metadata == nil { + msg.Metadata = make(map[string]string, 1) + } + msg.Metadata[Metadata_key_pubsubName] = pubsubName + return a.publishMessageGRPC(ctx, msg) + }); err != nil { + log.DefaultLogger.Warnf("[runtime][beginPubSub]failed to subscribe to topic %s: %s", topic, err) + return err + } + } + + return nil +} + +type Details struct { + metadata map[string]string +} + +type TopicSubscriptions struct { + topic2Details map[string]Details +} + +func (a *api) getInterestedTopics() (map[string]TopicSubscriptions, error) { + // 1. check + if a.topicPerComponent != nil { + return a.topicPerComponent, nil + } + if a.AppCallbackConn == nil { + return make(map[string]TopicSubscriptions), nil + } + comp2Topic := make(map[string]TopicSubscriptions) + var subscriptions []*runtimev1pb.TopicSubscription + + // 2. handle app subscriptions + client := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn) + subscriptions = runtime_pubsub.ListTopicSubscriptions(client, log.DefaultLogger) + // TODO handle declarative subscriptions + + // 3. prepare result + for _, s := range subscriptions { + if s == nil { + continue + } + if _, ok := comp2Topic[s.PubsubName]; !ok { + comp2Topic[s.PubsubName] = TopicSubscriptions{topic2Details: make(map[string]Details)} + } + comp2Topic[s.PubsubName].topic2Details[s.Topic] = Details{metadata: s.Metadata} + } + + // 4. log + if len(comp2Topic) > 0 { + for pubsubName, v := range comp2Topic { + topics := []string{} + for topic := range v.topic2Details { + topics = append(topics, topic) + } + log.DefaultLogger.Infof("[runtime][getInterestedTopics]app is subscribed to the following topics: %v through pubsub=%s", topics, pubsubName) + } + } + // 5. cache the result + a.topicPerComponent = comp2Topic + return comp2Topic, nil +} + +func (a *api) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) error { + // 1. Unmarshal to cloudEvent model + var cloudEvent map[string]interface{} + err := a.json.Unmarshal(msg.Data, &cloudEvent) + if err != nil { + log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err) + return err + } + + // 2. Drop msg if the current cloud event has expired + if pubsub.HasExpired(cloudEvent) { + log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string)) + return nil + } + + // 3. Convert to proto domain struct + envelope := &runtimev1pb.TopicEventRequest{ + Id: cloudEvent[pubsub.IDField].(string), + Source: cloudEvent[pubsub.SourceField].(string), + DataContentType: cloudEvent[pubsub.DataContentTypeField].(string), + Type: cloudEvent[pubsub.TypeField].(string), + SpecVersion: cloudEvent[pubsub.SpecVersionField].(string), + Topic: msg.Topic, + PubsubName: msg.Metadata[Metadata_key_pubsubName], + } + + // set data field + if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil { + decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string)) + if decodeErr != nil { + log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr) + return err + } + + envelope.Data = decoded + } else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil { + envelope.Data = nil + + if contenttype.IsStringContentType(envelope.DataContentType) { + envelope.Data = []byte(data.(string)) + } else if contenttype.IsJSONContentType(envelope.DataContentType) { + envelope.Data, _ = a.json.Marshal(data) + } + } + // TODO tracing + + // 4. Call appcallback + clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn) + res, err := clientV1.OnTopicEvent(ctx, envelope) + + // 5. Check result + return retryStrategy(err, res, cloudEvent) +} + +// retryStrategy returns error when the message should be redelivered +func retryStrategy(err error, res *runtimev1pb.TopicEventResponse, cloudEvent map[string]interface{}) error { + if err != nil { + errStatus, hasErrStatus := status.FromError(err) + if hasErrStatus && (errStatus.Code() == codes.Unimplemented) { + // DROP + log.DefaultLogger.Warnf("[runtime]non-retriable error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) + return nil + } + + err = errors.New(fmt.Sprintf("error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err)) + log.DefaultLogger.Debugf("%s", err) + // on error from application, return error for redelivery of event + return err + } + + switch res.GetStatus() { + case runtimev1pb.TopicEventResponse_SUCCESS: + // on uninitialized status, this is the case it defaults to as an uninitialized status defaults to 0 which is + // success from protobuf definition + return nil + case runtimev1pb.TopicEventResponse_RETRY: + return errors.New(fmt.Sprintf("RETRY status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string))) + case runtimev1pb.TopicEventResponse_DROP: + log.DefaultLogger.Warnf("[runtime]DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) + return nil + } + // Consider unknown status field as error and retry + return errors.New(fmt.Sprintf("unknown status returned from app while processing pub/sub event %v: %v", cloudEvent[pubsub.IDField].(string), res.GetStatus())) +} diff --git a/pkg/grpc/api_errors.go b/pkg/grpc/default_api/api_errors.go similarity index 97% rename from pkg/grpc/api_errors.go rename to pkg/grpc/default_api/api_errors.go index 301fc8695d..0c63cba6e6 100644 --- a/pkg/grpc/api_errors.go +++ b/pkg/grpc/default_api/api_errors.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package grpc +package default_api import ( "google.golang.org/grpc/codes" diff --git a/pkg/grpc/api_test.go b/pkg/grpc/default_api/api_test.go similarity index 94% rename from pkg/grpc/api_test.go rename to pkg/grpc/default_api/api_test.go index 93201abd0b..c55c848180 100644 --- a/pkg/grpc/api_test.go +++ b/pkg/grpc/default_api/api_test.go @@ -14,36 +14,31 @@ * limitations under the License. */ -package grpc +package default_api import ( "context" - "errors" + "encoding/json" "fmt" - "io" - "net" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - "testing" - "time" - - "mosn.io/layotto/components/file" - "github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/state" "github.com/golang/mock/gomock" - "github.com/golang/protobuf/ptypes/any" "github.com/stretchr/testify/assert" - tmock "github.com/stretchr/testify/mock" - "google.golang.org/grpc" + rawGRPC "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" + l8grpc "mosn.io/layotto/pkg/grpc" + "net" + "testing" + "errors" + jsoniter "github.com/json-iterator/go" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "io" "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" "mosn.io/layotto/components/lock" "mosn.io/layotto/components/rpc" - mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" "mosn.io/layotto/components/sequencer" "mosn.io/layotto/pkg/mock" mock_invoker "mosn.io/layotto/pkg/mock/components/invoker" @@ -51,7 +46,18 @@ import ( mock_pubsub "mosn.io/layotto/pkg/mock/components/pubsub" mock_sequencer "mosn.io/layotto/pkg/mock/components/sequencer" mock_state "mosn.io/layotto/pkg/mock/components/state" + mock_appcallback "mosn.io/layotto/pkg/mock/runtime/appcallback" runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" + + "time" + + "mosn.io/layotto/components/file" + + "github.com/golang/protobuf/ptypes/any" + tmock "github.com/stretchr/testify/mock" + "google.golang.org/grpc" + + mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" ) const ( @@ -133,6 +139,58 @@ func TestSayHello(t *testing.T) { }) } +func TestMosnRuntime_publishMessageGRPC(t *testing.T) { + t.Run("publish success", func(t *testing.T) { + subResp := &runtimev1pb.TopicEventResponse{ + Status: runtimev1pb.TopicEventResponse_SUCCESS, + } + // init grpc server + mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) + mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(subResp, nil) + + lis := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) + go func() { + s.Serve(lis) + }() + + // init callback client + callbackClient, err := grpc.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return lis.Dial() + })) + assert.Nil(t, err) + + cloudEvent := map[string]interface{}{ + pubsub.IDField: "id", + pubsub.SourceField: "source", + pubsub.DataContentTypeField: "content-type", + pubsub.TypeField: "type", + pubsub.SpecVersionField: "v1.0.0", + pubsub.DataBase64Field: "bGF5b3R0bw==", + } + + data, err := json.Marshal(cloudEvent) + assert.Nil(t, err) + + msg := &pubsub.NewMessage{ + Data: data, + Topic: "layotto", + Metadata: make(map[string]string), + } + a := NewAPI("", nil, nil, nil, nil, nil, nil, nil, nil, nil) + + var apiForTest = a.(*api) + //apiForTest.errInt = func(err error, format string, args ...interface{}) { + // log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + //} + apiForTest.AppCallbackConn = callbackClient + apiForTest.json = jsoniter.ConfigFastest + err = apiForTest.publishMessageGRPC(context.Background(), msg) + assert.Nil(t, err) + }) +} + func startTestRuntimeAPIServer(port int, testAPIServer API) *grpc.Server { lis, _ := net.Listen("tcp", fmt.Sprintf(":%d", port)) opts := []grpc.ServerOption{grpc.WriteBufferSize(1)} @@ -1087,3 +1145,8 @@ func TestGetFileMeta(t *testing.T) { assert.Equal(t, resp.LastModified, "123") assert.Equal(t, int(resp.Size), 10) } + +func TestNewGrpcServer(t *testing.T) { + apiInterface := &api{} + l8grpc.NewGrpcServer(l8grpc.WithGrpcAPIs([]l8grpc.GrpcAPI{apiInterface}), l8grpc.WithNewServer(l8grpc.NewDefaultServer), l8grpc.WithGrpcOptions()) +} diff --git a/pkg/grpc/grpc.go b/pkg/grpc/grpc.go index 8c6879837d..b089a89554 100644 --- a/pkg/grpc/grpc.go +++ b/pkg/grpc/grpc.go @@ -19,7 +19,6 @@ package grpc import ( "google.golang.org/grpc" "mosn.io/layotto/diagnostics" - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" ) @@ -34,11 +33,16 @@ func NewGrpcServer(opts ...Option) mgrpc.RegisteredServer { if o.maker != nil { srvMaker = o.maker } - return srvMaker(o.api, o.options...) + return srvMaker(o.apis, o.options...) } -func NewDefaultServer(api API, opts ...grpc.ServerOption) mgrpc.RegisteredServer { +func NewDefaultServer(apis []GrpcAPI, opts ...grpc.ServerOption) mgrpc.RegisteredServer { s := grpc.NewServer(opts...) - runtimev1pb.RegisterRuntimeServer(s, api) - return s + // create registeredServer to manage lifecycle of the grpc server + var registeredServer mgrpc.RegisteredServer = s + // loop registering grpc api + for _, grpcAPI := range apis { + registeredServer = grpcAPI.Register(s, registeredServer) + } + return registeredServer } diff --git a/pkg/grpc/grpc_api.go b/pkg/grpc/grpc_api.go new file mode 100644 index 0000000000..598385f60c --- /dev/null +++ b/pkg/grpc/grpc_api.go @@ -0,0 +1,53 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/components-contrib/state" + "google.golang.org/grpc" + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/file" + "mosn.io/layotto/components/hello" + "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/rpc" + "mosn.io/layotto/components/sequencer" + mgrpc "mosn.io/mosn/pkg/filter/network/grpc" +) + +// GrpcAPI is the interface of API plugin. It has lifecycle related methods +type GrpcAPI interface { + // init this API before binding it to the grpc server. For example,you can call app to query their subscriptions. + Init(conn *grpc.ClientConn) error + // Bind this API to the grpc server + Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) mgrpc.RegisteredServer +} + +// NewGrpcAPI is the constructor of GrpcAPI +type NewGrpcAPI func( + appId string, + hellos map[string]hello.HelloService, + configStores map[string]configstores.Store, + rpcs map[string]rpc.Invoker, + pubSubs map[string]pubsub.PubSub, + stateStores map[string]state.Store, + files map[string]file.File, + lockStores map[string]lock.LockStore, + sequencers map[string]sequencer.Store, + sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), +) GrpcAPI diff --git a/pkg/grpc/grpc_test.go b/pkg/grpc/grpc_test.go deleted file mode 100644 index f33bf25c2e..0000000000 --- a/pkg/grpc/grpc_test.go +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2021 Layotto Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package grpc - -import ( - "testing" -) - -func TestNewGrpcServer(t *testing.T) { - apiInterface := &api{} - NewGrpcServer(WithAPI(apiInterface), WithNewServer(NewDefaultServer), WithGrpcOptions()) -} diff --git a/pkg/grpc/options.go b/pkg/grpc/options.go index 839937bc35..943d4100ba 100644 --- a/pkg/grpc/options.go +++ b/pkg/grpc/options.go @@ -22,22 +22,20 @@ import ( ) type grpcOptions struct { - api API + apis []GrpcAPI maker NewServer options []grpc.ServerOption } type Option func(o *grpcOptions) -// add an api for grpc server. -// api CANNOT be nil. -func WithAPI(api API) Option { +func WithGrpcAPIs(apis []GrpcAPI) Option { return func(o *grpcOptions) { - o.api = api + o.apis = apis } } -type NewServer func(api API, opts ...grpc.ServerOption) mgrpc.RegisteredServer +type NewServer func(apis []GrpcAPI, opts ...grpc.ServerOption) mgrpc.RegisteredServer func WithNewServer(f NewServer) Option { return func(o *grpcOptions) { diff --git a/pkg/integrate/api/helloworld/grpc_api.go b/pkg/integrate/api/helloworld/grpc_api.go new file mode 100644 index 0000000000..751d275fb0 --- /dev/null +++ b/pkg/integrate/api/helloworld/grpc_api.go @@ -0,0 +1,68 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package helloworld + +import ( + "context" + "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/components-contrib/state" + rawGRPC "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/helloworld/helloworld" + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/file" + "mosn.io/layotto/components/hello" + "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/rpc" + "mosn.io/layotto/components/sequencer" + "mosn.io/layotto/pkg/grpc" + mgrpc "mosn.io/mosn/pkg/filter/network/grpc" +) + +func NewHelloWorldAPI( + appId string, + hellos map[string]hello.HelloService, + configStores map[string]configstores.Store, + rpcs map[string]rpc.Invoker, + pubSubs map[string]pubsub.PubSub, + stateStores map[string]state.Store, + files map[string]file.File, + lockStores map[string]lock.LockStore, + sequencers map[string]sequencer.Store, + sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), +) grpc.GrpcAPI { + return &server{} +} + +// server is used to implement helloworld.GreeterServer. +type server struct { + pb.UnimplementedGreeterServer +} + +func (s *server) Init(conn *rawGRPC.ClientConn) error { + return nil +} + +func (s *server) Register(grpcServer *rawGRPC.Server, registeredServer mgrpc.RegisteredServer) mgrpc.RegisteredServer { + pb.RegisterGreeterServer(grpcServer, s) + return registeredServer +} + +// SayHello implements helloworld.GreeterServer +func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { + return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil +} diff --git a/pkg/runtime/const.go b/pkg/runtime/const.go index 151646ef12..c11504b496 100644 --- a/pkg/runtime/const.go +++ b/pkg/runtime/const.go @@ -19,6 +19,5 @@ package runtime import "time" const ( - Metadata_key_pubsubName = "pubsubName" - dialTimeout = time.Second * 30 + dialTimeout = time.Second * 30 ) diff --git a/pkg/runtime/options.go b/pkg/runtime/options.go index b5ce6e8452..50eaed4168 100644 --- a/pkg/runtime/options.go +++ b/pkg/runtime/options.go @@ -52,6 +52,8 @@ type runtimeOptions struct { srvMaker rgrpc.NewServer errInt ErrInterceptor options []grpc.ServerOption + // new grpc api + apiFactorys []rgrpc.NewGrpcAPI } type Option func(o *runtimeOptions) @@ -68,6 +70,12 @@ func WithGrpcOptions(options ...grpc.ServerOption) Option { } } +func WithGrpcAPI(apiFuncs ...rgrpc.NewGrpcAPI) Option { + return func(o *runtimeOptions) { + o.apiFactorys = append(o.apiFactorys, apiFuncs...) + } +} + type ErrInterceptor func(err error, format string, args ...interface{}) func WithErrInterceptor(i ErrInterceptor) Option { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index c4d9fb3c36..71cda8051d 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -18,7 +18,6 @@ package runtime import ( "context" - "encoding/base64" "errors" "fmt" "strings" @@ -29,13 +28,9 @@ import ( "mosn.io/layotto/components/file" - "github.com/dapr/components-contrib/contenttype" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/state" - jsoniter "github.com/json-iterator/go" rawGRPC "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" "mosn.io/layotto/components/lock" @@ -50,8 +45,6 @@ import ( runtime_pubsub "mosn.io/layotto/pkg/runtime/pubsub" runtime_sequencer "mosn.io/layotto/pkg/runtime/sequencer" runtime_state "mosn.io/layotto/pkg/runtime/state" - "mosn.io/layotto/pkg/wasm" - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" "mosn.io/pkg/log" ) @@ -72,29 +65,19 @@ type MosnRuntime struct { sequencerRegistry runtime_sequencer.Registry bindingsRegistry mbindings.Registry // component pool - hellos map[string]hello.HelloService - configStores map[string]configstores.Store - rpcs map[string]rpc.Invoker - pubSubs map[string]pubsub.PubSub - topicPerComponent map[string]TopicSubscriptions - states map[string]state.Store - files map[string]file.File - locks map[string]lock.LockStore - sequencers map[string]sequencer.Store - outputBindings map[string]bindings.OutputBinding + hellos map[string]hello.HelloService + configStores map[string]configstores.Store + rpcs map[string]rpc.Invoker + pubSubs map[string]pubsub.PubSub + states map[string]state.Store + files map[string]file.File + locks map[string]lock.LockStore + sequencers map[string]sequencer.Store + outputBindings map[string]bindings.OutputBinding // app callback AppCallbackConn *rawGRPC.ClientConn // extends errInt ErrInterceptor - json jsoniter.API -} - -type Details struct { - metadata map[string]string -} - -type TopicSubscriptions struct { - topic2Details map[string]Details } func NewMosnRuntime(runtimeConfig *MosnRuntimeConfig) *MosnRuntime { @@ -120,7 +103,6 @@ func NewMosnRuntime(runtimeConfig *MosnRuntimeConfig) *MosnRuntime { locks: make(map[string]lock.LockStore), sequencers: make(map[string]sequencer.Store), outputBindings: make(map[string]bindings.OutputBinding), - json: jsoniter.ConfigFastest, } } @@ -150,10 +132,12 @@ func (m *MosnRuntime) sendToOutputBinding(name string, req *bindings.InvokeReque } func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { + // prepare runtimeOptions var o runtimeOptions for _, opt := range opts { opt(&o) } + // set ErrInterceptor if o.errInt != nil { m.errInt = o.errInt } else { @@ -161,30 +145,42 @@ func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } } - + // init runtime with runtimeOptions if err := m.initRuntime(&o); err != nil { return nil, err } + // prepare grpcOpts var grpcOpts []grpc.Option if o.srvMaker != nil { grpcOpts = append(grpcOpts, grpc.WithNewServer(o.srvMaker)) } - wasm.Layotto = grpc.NewAPI( - m.runtimeConfig.AppManagement.AppId, - m.hellos, - m.configStores, - m.rpcs, - m.pubSubs, - m.states, - m.files, - m.locks, - m.sequencers, - m.sendToOutputBinding, - ) + // create GrpcAPIs + var apis []grpc.GrpcAPI + for _, apiFactory := range o.apiFactorys { + api := apiFactory( + m.runtimeConfig.AppManagement.AppId, + m.hellos, + m.configStores, + m.rpcs, + m.pubSubs, + m.states, + m.files, + m.locks, + m.sequencers, + m.sendToOutputBinding, + ) + // init the GrpcAPI + if err := api.Init(m.AppCallbackConn); err != nil { + return nil, err + } + apis = append(apis, api) + } + // put them into grpc options grpcOpts = append(grpcOpts, grpc.WithGrpcOptions(o.options...), - grpc.WithAPI(wasm.Layotto), + grpc.WithGrpcAPIs(apis), ) + // create grpc server m.srv = grpc.NewGrpcServer(grpcOpts...) return m.srv, nil } @@ -305,27 +301,29 @@ func (m *MosnRuntime) initRpcs(rpcs ...*rpc.Factory) error { func (m *MosnRuntime) initPubSubs(factorys ...*runtime_pubsub.Factory) error { // 1. init components log.DefaultLogger.Infof("[runtime] start initializing pubsub components") - // register all config store services implementation + // register all components implementation m.pubSubRegistry.Register(factorys...) for name, config := range m.runtimeConfig.PubSubManagement { + // create component comp, err := m.pubSubRegistry.Create(name) if err != nil { m.errInt(err, "create pubsub component %s failed", name) return err } + // check config consumerID := strings.TrimSpace(config.Metadata["consumerID"]) if consumerID == "" { config.Metadata["consumerID"] = m.runtimeConfig.AppManagement.AppId } - + // init this component with the config if err := comp.Init(pubsub.Metadata{Properties: config.Metadata}); err != nil { m.errInt(err, "init pubsub component %s failed", name) return err } + // register this component m.pubSubs[name] = comp } - // 2. start subscribing - return m.startSubscribing() + return nil } func (m *MosnRuntime) initStates(factorys ...*runtime_state.Factory) error { @@ -440,184 +438,6 @@ func (m *MosnRuntime) initSequencers(factorys ...*runtime_sequencer.Factory) err return nil } -func (m *MosnRuntime) startSubscribing() error { - // 1. check if there is no need to do it - if len(m.pubSubs) == 0 { - return nil - } - topicRoutes, err := m.getInterestedTopics() - if err != nil { - return err - } - if len(topicRoutes) == 0 { - // no need - return nil - } - // 2. loop subscribe - for name, pubsub := range m.pubSubs { - if err := m.beginPubSub(name, pubsub, topicRoutes); err != nil { - return err - } - } - return nil -} - -func (m *MosnRuntime) beginPubSub(pubsubName string, ps pubsub.PubSub, topicRoutes map[string]TopicSubscriptions) error { - // 1. call app to find topic topic2Details. - v, ok := topicRoutes[pubsubName] - if !ok { - return nil - } - // 2. loop subscribing every - for topic, route := range v.topic2Details { - // TODO limit topic scope - log.DefaultLogger.Debugf("[runtime][beginPubSub]subscribing to topic=%s on pubsub=%s", topic, pubsubName) - // ask component to subscribe - if err := ps.Subscribe(pubsub.SubscribeRequest{ - Topic: topic, - Metadata: route.metadata, - }, func(ctx context.Context, msg *pubsub.NewMessage) error { - if msg.Metadata == nil { - msg.Metadata = make(map[string]string, 1) - } - msg.Metadata[Metadata_key_pubsubName] = pubsubName - return m.publishMessageGRPC(ctx, msg) - }); err != nil { - log.DefaultLogger.Warnf("[runtime][beginPubSub]failed to subscribe to topic %s: %s", topic, err) - return err - } - } - - return nil -} - -func (m *MosnRuntime) getInterestedTopics() (map[string]TopicSubscriptions, error) { - // 1. check - if m.topicPerComponent != nil { - return m.topicPerComponent, nil - } - if m.AppCallbackConn == nil { - return make(map[string]TopicSubscriptions), nil - } - comp2Topic := make(map[string]TopicSubscriptions) - var subscriptions []*runtimev1pb.TopicSubscription - - // 2. handle app subscriptions - client := runtimev1pb.NewAppCallbackClient(m.AppCallbackConn) - subscriptions = runtime_pubsub.ListTopicSubscriptions(client, log.DefaultLogger) - // TODO handle declarative subscriptions - - // 3. prepare result - for _, s := range subscriptions { - if s == nil { - continue - } - if _, ok := comp2Topic[s.PubsubName]; !ok { - comp2Topic[s.PubsubName] = TopicSubscriptions{topic2Details: make(map[string]Details)} - } - comp2Topic[s.PubsubName].topic2Details[s.Topic] = Details{metadata: s.Metadata} - } - - // 4. log - if len(comp2Topic) > 0 { - for pubsubName, v := range comp2Topic { - topics := []string{} - for topic := range v.topic2Details { - topics = append(topics, topic) - } - log.DefaultLogger.Infof("[runtime][getInterestedTopics]app is subscribed to the following topics: %v through pubsub=%s", topics, pubsubName) - } - } - // 5. cache the result - m.topicPerComponent = comp2Topic - return comp2Topic, nil -} - -func (m *MosnRuntime) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) error { - // 1. Unmarshal to cloudEvent model - var cloudEvent map[string]interface{} - err := m.json.Unmarshal(msg.Data, &cloudEvent) - if err != nil { - log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err) - return err - } - - // 2. Drop msg if the current cloud event has expired - if pubsub.HasExpired(cloudEvent) { - log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string)) - return nil - } - - // 3. Convert to proto domain struct - envelope := &runtimev1pb.TopicEventRequest{ - Id: cloudEvent[pubsub.IDField].(string), - Source: cloudEvent[pubsub.SourceField].(string), - DataContentType: cloudEvent[pubsub.DataContentTypeField].(string), - Type: cloudEvent[pubsub.TypeField].(string), - SpecVersion: cloudEvent[pubsub.SpecVersionField].(string), - Topic: msg.Topic, - PubsubName: msg.Metadata[Metadata_key_pubsubName], - } - - // set data field - if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil { - decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string)) - if decodeErr != nil { - log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr) - return err - } - - envelope.Data = decoded - } else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil { - envelope.Data = nil - - if contenttype.IsStringContentType(envelope.DataContentType) { - envelope.Data = []byte(data.(string)) - } else if contenttype.IsJSONContentType(envelope.DataContentType) { - envelope.Data, _ = m.json.Marshal(data) - } - } - // TODO tracing - - // 4. Call appcallback - clientV1 := runtimev1pb.NewAppCallbackClient(m.AppCallbackConn) - res, err := clientV1.OnTopicEvent(ctx, envelope) - - // 5. Check result - return retryStrategy(err, res, cloudEvent) -} - -// retryStrategy returns error when the message should be redelivered -func retryStrategy(err error, res *runtimev1pb.TopicEventResponse, cloudEvent map[string]interface{}) error { - if err != nil { - errStatus, hasErrStatus := status.FromError(err) - if hasErrStatus && (errStatus.Code() == codes.Unimplemented) { - // DROP - log.DefaultLogger.Warnf("[runtime]non-retriable error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) - return nil - } - - err = errors.New(fmt.Sprintf("error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err)) - log.DefaultLogger.Debugf("%s", err) - // on error from application, return error for redelivery of event - return err - } - - switch res.GetStatus() { - case runtimev1pb.TopicEventResponse_SUCCESS: - // on uninitialized status, this is the case it defaults to as an uninitialized status defaults to 0 which is - // success from protobuf definition - return nil - case runtimev1pb.TopicEventResponse_RETRY: - return errors.New(fmt.Sprintf("RETRY status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string))) - case runtimev1pb.TopicEventResponse_DROP: - log.DefaultLogger.Warnf("[runtime]DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) - return nil - } - // Consider unknown status field as error and retry - return errors.New(fmt.Sprintf("unknown status returned from app while processing pub/sub event %v: %v", cloudEvent[pubsub.IDField].(string), res.GetStatus())) -} - func (m *MosnRuntime) initAppCallbackConnection() error { // init the client connection for calling app if m.runtimeConfig == nil || m.runtimeConfig.AppManagement.GrpcCallbackPort == 0 { diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index c05ffc0a2d..4a0d315d65 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -20,6 +20,10 @@ import ( "context" "encoding/json" "fmt" + "google.golang.org/grpc/test/bufconn" + "mosn.io/layotto/pkg/grpc/default_api" + mock_appcallback "mosn.io/layotto/pkg/mock/runtime/appcallback" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" "net" "testing" @@ -29,11 +33,8 @@ import ( "github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/state" "github.com/golang/mock/gomock" - jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" rawGRPC "google.golang.org/grpc" - "google.golang.org/grpc/test/bufconn" - "mosn.io/pkg/log" "mosn.io/layotto/components/configstores" @@ -47,18 +48,17 @@ import ( mock_pubsub "mosn.io/layotto/pkg/mock/components/pubsub" mock_sequencer "mosn.io/layotto/pkg/mock/components/sequencer" mock_state "mosn.io/layotto/pkg/mock/components/state" - mock_appcallback "mosn.io/layotto/pkg/mock/runtime/appcallback" mlock "mosn.io/layotto/pkg/runtime/lock" mpubsub "mosn.io/layotto/pkg/runtime/pubsub" msequencer "mosn.io/layotto/pkg/runtime/sequencer" mstate "mosn.io/layotto/pkg/runtime/state" - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" ) func TestNewMosnRuntime(t *testing.T) { runtimeConfig := &MosnRuntimeConfig{} rt := NewMosnRuntime(runtimeConfig) assert.NotNil(t, rt) + rt.Stop() } func TestMosnRuntime_GetInfo(t *testing.T) { @@ -66,22 +66,35 @@ func TestMosnRuntime_GetInfo(t *testing.T) { rt := NewMosnRuntime(runtimeConfig) runtimeInfo := rt.GetInfo() assert.NotNil(t, runtimeInfo) + rt.Stop() } func TestMosnRuntime_Run(t *testing.T) { t.Run("run succ", func(t *testing.T) { runtimeConfig := &MosnRuntimeConfig{} rt := NewMosnRuntime(runtimeConfig) - server, err := rt.Run() + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + ) assert.Nil(t, err) assert.NotNil(t, server) + rt.Stop() }) t.Run("no runtime config", func(t *testing.T) { rt := NewMosnRuntime(nil) - _, err := rt.Run() + _, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + ) assert.NotNil(t, err) assert.Equal(t, "[runtime] init error:no runtimeConfig", err.Error()) + rt.Stop() }) } @@ -113,37 +126,9 @@ func TestMosnRuntime_initAppCallbackConnection(t *testing.T) { func TestMosnRuntime_initPubSubs(t *testing.T) { t.Run("normal", func(t *testing.T) { - // mock callback response - subResp := &runtimev1pb.ListTopicSubscriptionsResponse{ - Subscriptions: []*runtimev1pb.TopicSubscription{ - { - PubsubName: "mock", - Topic: "layotto", - Metadata: nil, - }, - }, - } - // init grpc server - mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) - mockAppCallbackServer.EXPECT().ListTopicSubscriptions(gomock.Any(), gomock.Any()).Return(subResp, nil) - - lis := bufconn.Listen(1024 * 1024) - s := rawGRPC.NewServer() - runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) - go func() { - s.Serve(lis) - }() - - // init callback client - callbackClient, err := rawGRPC.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { - return lis.Dial() - })) - assert.Nil(t, err) - // mock pubsub component mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) - mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Return(nil) f := func() pubsub.PubSub { return mockPubSub } @@ -159,12 +144,11 @@ func TestMosnRuntime_initPubSubs(t *testing.T) { } // construct MosnRuntime m := NewMosnRuntime(cfg) - m.AppCallbackConn = callbackClient m.errInt = func(err error, format string, args ...interface{}) { log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } // test initPubSubs - err = m.initPubSubs(mpubsub.NewFactory("mock", f)) + err := m.initPubSubs(mpubsub.NewFactory("mock", f)) // assert result assert.Nil(t, err) }) @@ -314,58 +298,6 @@ func TestMosnRuntime_initLocks(t *testing.T) { }) } -func TestMosnRuntime_publishMessageGRPC(t *testing.T) { - t.Run("publish success", func(t *testing.T) { - subResp := &runtimev1pb.TopicEventResponse{ - Status: runtimev1pb.TopicEventResponse_SUCCESS, - } - // init grpc server - mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) - mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(subResp, nil) - - lis := bufconn.Listen(1024 * 1024) - s := rawGRPC.NewServer() - runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) - go func() { - s.Serve(lis) - }() - - // init callback client - callbackClient, err := rawGRPC.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { - return lis.Dial() - })) - assert.Nil(t, err) - - cloudEvent := map[string]interface{}{ - pubsub.IDField: "id", - pubsub.SourceField: "source", - pubsub.DataContentTypeField: "content-type", - pubsub.TypeField: "type", - pubsub.SpecVersionField: "v1.0.0", - pubsub.DataBase64Field: "bGF5b3R0bw==", - } - - data, err := json.Marshal(cloudEvent) - assert.Nil(t, err) - - msg := &pubsub.NewMessage{ - Data: data, - Topic: "layotto", - Metadata: make(map[string]string), - } - - cfg := &MosnRuntimeConfig{} - m := NewMosnRuntime(cfg) - m.errInt = func(err error, format string, args ...interface{}) { - log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) - } - m.AppCallbackConn = callbackClient - m.json = jsoniter.ConfigFastest - err = m.publishMessageGRPC(context.Background(), msg) - assert.Nil(t, err) - }) -} - type MockBindings struct { } @@ -398,3 +330,233 @@ func TestMosnRuntime_initOutputBinding(t *testing.T) { m.initOutputBinding(registry) assert.NotNil(t, m.outputBindings["mockOutbindings"]) } + +func TestMosnRuntime_runWithPubsub(t *testing.T) { + t.Run("normal", func(t *testing.T) { + // mock pubsub component + mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) + mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Return(nil) + f := func() pubsub.PubSub { + return mockPubSub + } + + // 2. construct runtime + rt, _ := runtimeWithCallbackConnection(t) + + // 3. Run + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + // PubSub + WithPubSubFactory( + mpubsub.NewFactory("mock", f), + ), + ) + // 4. assert + assert.Nil(t, err) + assert.NotNil(t, server) + + // 5. stop + rt.Stop() + }) + + t.Run("init_with_callback", func(t *testing.T) { + cloudEvent := constructCloudEvent() + data, err := json.Marshal(cloudEvent) + assert.Nil(t, err) + // mock pubsub component + mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) + mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).DoAndReturn(func(req pubsub.SubscribeRequest, handler pubsub.Handler) error { + if req.Topic == "layotto" { + return handler(context.Background(), &pubsub.NewMessage{ + Data: data, + Topic: "layotto", + Metadata: nil, + }) + } else { + return nil + } + }) + f := func() pubsub.PubSub { + return mockPubSub + } + + // 2. construct runtime + rt, mockAppCallbackServer := runtimeWithCallbackConnection(t) + + topicResp := &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_SUCCESS} + mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(topicResp, nil) + // 3. Run + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + // PubSub + WithPubSubFactory( + mpubsub.NewFactory("mock", f), + ), + ) + // 4. assert + assert.Nil(t, err) + assert.NotNil(t, server) + + // 5. stop + rt.Stop() + }) + + t.Run("callback_fail_then_retry", func(t *testing.T) { + cloudEvent := constructCloudEvent() + data, err := json.Marshal(cloudEvent) + assert.Nil(t, err) + // mock pubsub component + mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) + mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).DoAndReturn(func(req pubsub.SubscribeRequest, handler pubsub.Handler) error { + if req.Topic == "layotto" { + err := handler(context.Background(), &pubsub.NewMessage{ + Data: data, + Topic: "layotto", + Metadata: nil, + }) + assert.NotNil(t, err) + return nil + } else { + return nil + } + }) + f := func() pubsub.PubSub { + return mockPubSub + } + + // 2. construct runtime + rt, mockAppCallbackServer := runtimeWithCallbackConnection(t) + + topicResp := &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_RETRY} + mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(topicResp, nil) + // 3. Run + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + // PubSub + WithPubSubFactory( + mpubsub.NewFactory("mock", f), + ), + ) + // 4. assert + assert.Nil(t, err) + assert.NotNil(t, server) + + // 5. stop + rt.Stop() + }) + + t.Run("callback_drop", func(t *testing.T) { + cloudEvent := constructCloudEvent() + data, err := json.Marshal(cloudEvent) + assert.Nil(t, err) + // mock pubsub component + mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) + mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).DoAndReturn(func(req pubsub.SubscribeRequest, handler pubsub.Handler) error { + if req.Topic == "layotto" { + err := handler(context.Background(), &pubsub.NewMessage{ + Data: data, + Topic: "layotto", + Metadata: nil, + }) + assert.Nil(t, err) + return nil + } else { + return nil + } + }) + f := func() pubsub.PubSub { + return mockPubSub + } + + // 2. construct runtime + rt, mockAppCallbackServer := runtimeWithCallbackConnection(t) + + topicResp := &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_DROP} + mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(topicResp, nil) + // 3. Run + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + // PubSub + WithPubSubFactory( + mpubsub.NewFactory("mock", f), + ), + ) + // 4. assert + assert.Nil(t, err) + assert.NotNil(t, server) + + // 5. stop + rt.Stop() + }) +} + +func constructCloudEvent() map[string]interface{} { + cloudEvent := make(map[string]interface{}) + cloudEvent[pubsub.IDField] = "1" + cloudEvent[pubsub.SpecVersionField] = "1" + cloudEvent[pubsub.SourceField] = "adsdafdas" + cloudEvent[pubsub.DataContentTypeField] = "application/json" + cloudEvent[pubsub.TypeField] = "adsdafdas" + return cloudEvent +} + +func runtimeWithCallbackConnection(t *testing.T) (*MosnRuntime, *mock_appcallback.MockAppCallbackServer) { + // 1. prepare callback + // mock callback response + subResp := &runtimev1pb.ListTopicSubscriptionsResponse{ + Subscriptions: []*runtimev1pb.TopicSubscription{ + { + PubsubName: "mock", + Topic: "layotto", + Metadata: nil, + }, + }, + } + // init grpc server for callback + mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) + mockAppCallbackServer.EXPECT().ListTopicSubscriptions(gomock.Any(), gomock.Any()).Return(subResp, nil) + + lis := bufconn.Listen(1024 * 1024) + s := rawGRPC.NewServer() + runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) + go func() { + s.Serve(lis) + }() + + // 2. construct those necessary fields for mosn runtime + // init callback client + callbackClient, err := rawGRPC.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return lis.Dial() + })) + assert.Nil(t, err) + + // 3. construct mosn runtime + cfg := &MosnRuntimeConfig{ + PubSubManagement: map[string]mpubsub.Config{ + "mock": { + Metadata: map[string]string{ + "target": "layotto", + }, + }, + }, + } + rt := NewMosnRuntime(cfg) + rt.AppCallbackConn = callbackClient + return rt, mockAppCallbackServer +} diff --git a/pkg/wasm/imports.go b/pkg/wasm/imports.go index 6903c7aaca..736a70294a 100644 --- a/pkg/wasm/imports.go +++ b/pkg/wasm/imports.go @@ -19,7 +19,7 @@ package wasm import ( "context" anypb "github.com/golang/protobuf/ptypes/any" - "mosn.io/layotto/pkg/grpc" + "mosn.io/layotto/pkg/grpc/default_api" runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" "mosn.io/mosn/pkg/wasm/abi/proxywasm010" "mosn.io/proxy-wasm-go-host/proxywasm/common" @@ -35,14 +35,12 @@ type LayottoHandler struct { var _ proxywasm.ImportsHandler = &LayottoHandler{} -var Layotto grpc.API - func (d *LayottoHandler) GetState(storeName string, key string) (string, proxywasm.WasmResult) { req := &runtimev1pb.GetStateRequest{ StoreName: storeName, Key: key, } - resp, err := Layotto.GetState(context.Background(), req) + resp, err := default_api.LayottoAPISingleton.GetState(context.Background(), req) if err != nil { return "", proxywasm.WasmResultInternalFailure } @@ -57,7 +55,7 @@ func (d *LayottoHandler) InvokeService(id string, method string, param string) ( Data: &anypb.Any{Value: []byte(param)}, }, } - resp, err := Layotto.InvokeService(context.Background(), req) + resp, err := default_api.LayottoAPISingleton.InvokeService(context.Background(), req) if err != nil { return "", proxywasm.WasmResultInternalFailure } diff --git a/pkg/wasm/imports_test.go b/pkg/wasm/imports_test.go index 07dae2ee5f..e32f2f3145 100644 --- a/pkg/wasm/imports_test.go +++ b/pkg/wasm/imports_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" "mosn.io/layotto/components/rpc" mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" - "mosn.io/layotto/pkg/grpc" + "mosn.io/layotto/pkg/grpc/default_api" mock_invoker "mosn.io/layotto/pkg/mock/components/invoker" mock_state "mosn.io/layotto/pkg/mock/components/state" proxywasm "mosn.io/proxy-wasm-go-host/proxywasm/v1" @@ -52,7 +52,7 @@ func TestGetState(t *testing.T) { Metadata: nil, } mockStore.EXPECT().Get(gomock.Any()).Return(compResp, nil) - Layotto = grpc.NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil, nil, nil) + default_api.LayottoAPISingleton = default_api.NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil, nil, nil) value, ok := d.GetState("mock", "mykey") assert.Equal(t, proxywasm.WasmResultOk, ok) @@ -74,7 +74,7 @@ func TestInvokeService(t *testing.T) { assert.Equal(t, "id_2", req.Id) return resp, nil }) - Layotto = grpc.NewAPI("", nil, nil, map[string]rpc.Invoker{mosninvoker.Name: mockInvoker}, nil, nil, nil, nil, nil, nil) + default_api.LayottoAPISingleton = default_api.NewAPI("", nil, nil, map[string]rpc.Invoker{mosninvoker.Name: mockInvoker}, nil, nil, nil, nil, nil, nil) result, ok := d.InvokeService("id_2", "", "book1") assert.Equal(t, proxywasm.WasmResultOk, ok)