From ed0d806cd61cdc64138cf7463b327dffc2dd626a Mon Sep 17 00:00:00 2001 From: seeflood <349895584@qq.com> Date: Fri, 10 Dec 2021 15:49:54 +0800 Subject: [PATCH] feat: api-plugin (#358) --- cmd/layotto/main.go | 6 + cmd/layotto_multiple_api/client/main.go | 58 ++++ cmd/layotto_multiple_api/main.go | 413 +++++++++++++++++++++++ docs/_sidebar.md | 3 +- docs/en/start/api_plugin/helloworld.md | 49 +++ docs/img/api_plugin/img.png | Bin 0 -> 13548 bytes docs/zh/_sidebar.md | 5 +- docs/zh/design/api_plugin/design.md | 120 +++++++ docs/zh/start/api_plugin/helloworld.md | 50 +++ go.mod | 1 + pkg/grpc/{ => default_api}/api.go | 46 ++- pkg/grpc/default_api/api_callback.go | 220 ++++++++++++ pkg/grpc/{ => default_api}/api_errors.go | 2 +- pkg/grpc/{ => default_api}/api_test.go | 97 +++++- pkg/grpc/grpc.go | 14 +- pkg/grpc/grpc_api.go | 53 +++ pkg/grpc/grpc_test.go | 26 -- pkg/grpc/options.go | 10 +- pkg/integrate/api/helloworld/grpc_api.go | 68 ++++ pkg/runtime/const.go | 3 +- pkg/runtime/options.go | 8 + pkg/runtime/runtime.go | 266 +++------------ pkg/runtime/runtime_test.go | 340 ++++++++++++++----- pkg/wasm/imports.go | 8 +- pkg/wasm/imports_test.go | 6 +- 25 files changed, 1492 insertions(+), 380 deletions(-) create mode 100644 cmd/layotto_multiple_api/client/main.go create mode 100644 cmd/layotto_multiple_api/main.go create mode 100644 docs/en/start/api_plugin/helloworld.md create mode 100644 docs/img/api_plugin/img.png create mode 100644 docs/zh/design/api_plugin/design.md create mode 100644 docs/zh/start/api_plugin/helloworld.md rename pkg/grpc/{ => default_api}/api.go (96%) create mode 100644 pkg/grpc/default_api/api_callback.go rename pkg/grpc/{ => default_api}/api_errors.go (97%) rename pkg/grpc/{ => default_api}/api_test.go (94%) create mode 100644 pkg/grpc/grpc_api.go delete mode 100644 pkg/grpc/grpc_test.go create mode 100644 pkg/integrate/api/helloworld/grpc_api.go diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index 7f317b1700..d59bb63b75 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -19,6 +19,7 @@ package main import ( "encoding/json" "fmt" + "mosn.io/layotto/pkg/grpc/default_api" "os" "strconv" "time" @@ -116,6 +117,7 @@ import ( "google.golang.org/grpc" _ "mosn.io/layotto/pkg/filter/network/tcpcopy" "mosn.io/layotto/pkg/runtime" + _ "mosn.io/layotto/pkg/wasm" "mosn.io/mosn/pkg/featuregate" _ "mosn.io/mosn/pkg/filter/network/grpc" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" @@ -153,6 +155,10 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp // 3. run server, err := rt.Run( runtime.WithGrpcOptions(opts...), + // register your grpc API here + runtime.WithGrpcAPI( + default_api.NewGrpcAPI, + ), // Hello runtime.WithHelloFactory( hello.NewHelloFactory("helloworld", helloworld.NewHelloWorld), diff --git a/cmd/layotto_multiple_api/client/main.go b/cmd/layotto_multiple_api/client/main.go new file mode 100644 index 0000000000..1182a71859 --- /dev/null +++ b/cmd/layotto_multiple_api/client/main.go @@ -0,0 +1,58 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package main implements a client for Greeter service. +package main + +import ( + "context" + "log" + "os" + "time" + + "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/helloworld/helloworld" +) + +const ( + address = "localhost:34904" + defaultName = "world" +) + +func main() { + // Set up a connection to the server. + conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer conn.Close() + c := pb.NewGreeterClient(conn) + + // Contact the server and print out its response. + name := defaultName + if len(os.Args) > 1 { + name = os.Args[1] + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name}) + if err != nil { + log.Fatalf("could not greet: %v", err) + } + log.Printf("Greeting: %s", r.GetMessage()) +} diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go new file mode 100644 index 0000000000..60a5c38f3d --- /dev/null +++ b/cmd/layotto_multiple_api/main.go @@ -0,0 +1,413 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "encoding/json" + "fmt" + "mosn.io/layotto/pkg/grpc/default_api" + helloworld_api "mosn.io/layotto/pkg/integrate/api/helloworld" + "os" + "strconv" + "time" + + mock_state "mosn.io/layotto/pkg/mock/components/state" + _ "mosn.io/layotto/pkg/wasm" + + "mosn.io/layotto/components/file/local" + + "mosn.io/layotto/components/file/s3/alicloud" + "mosn.io/layotto/components/file/s3/aws" + "mosn.io/layotto/components/file/s3/minio" + + dbindings "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/bindings/http" + "mosn.io/layotto/components/configstores/etcdv3" + "mosn.io/layotto/components/file" + "mosn.io/layotto/components/sequencer" + "mosn.io/layotto/pkg/runtime/bindings" + runtime_sequencer "mosn.io/layotto/pkg/runtime/sequencer" + "mosn.io/pkg/log" + + // Hello + "mosn.io/layotto/components/hello" + "mosn.io/layotto/components/hello/helloworld" + + // Configuration + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/configstores/apollo" + + // Pub/Sub + dapr_comp_pubsub "github.com/dapr/components-contrib/pubsub" + pubsub_snssqs "github.com/dapr/components-contrib/pubsub/aws/snssqs" + pubsub_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs" + "github.com/dapr/components-contrib/pubsub/azure/servicebus" + pubsub_gcp "github.com/dapr/components-contrib/pubsub/gcp/pubsub" + pubsub_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast" + pubsub_inmemory "github.com/dapr/components-contrib/pubsub/in-memory" + pubsub_kafka "github.com/dapr/components-contrib/pubsub/kafka" + pubsub_mqtt "github.com/dapr/components-contrib/pubsub/mqtt" + "github.com/dapr/components-contrib/pubsub/natsstreaming" + pubsub_pulsar "github.com/dapr/components-contrib/pubsub/pulsar" + "github.com/dapr/components-contrib/pubsub/rabbitmq" + pubsub_redis "github.com/dapr/components-contrib/pubsub/redis" + "github.com/dapr/kit/logger" + "mosn.io/layotto/pkg/runtime/pubsub" + + // RPC + "mosn.io/layotto/components/rpc" + mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" + + // State Stores + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/aerospike" + state_dynamodb "github.com/dapr/components-contrib/state/aws/dynamodb" + state_azure_blobstorage "github.com/dapr/components-contrib/state/azure/blobstorage" + state_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb" + state_azure_tablestorage "github.com/dapr/components-contrib/state/azure/tablestorage" + "github.com/dapr/components-contrib/state/cassandra" + "github.com/dapr/components-contrib/state/cloudstate" + "github.com/dapr/components-contrib/state/couchbase" + "github.com/dapr/components-contrib/state/gcp/firestore" + "github.com/dapr/components-contrib/state/hashicorp/consul" + "github.com/dapr/components-contrib/state/hazelcast" + "github.com/dapr/components-contrib/state/memcached" + "github.com/dapr/components-contrib/state/mongodb" + state_mysql "github.com/dapr/components-contrib/state/mysql" + "github.com/dapr/components-contrib/state/postgresql" + state_redis "github.com/dapr/components-contrib/state/redis" + "github.com/dapr/components-contrib/state/rethinkdb" + "github.com/dapr/components-contrib/state/sqlserver" + "github.com/dapr/components-contrib/state/zookeeper" + runtime_state "mosn.io/layotto/pkg/runtime/state" + + // Lock + "mosn.io/layotto/components/lock" + lock_consul "mosn.io/layotto/components/lock/consul" + lock_etcd "mosn.io/layotto/components/lock/etcd" + lock_redis "mosn.io/layotto/components/lock/redis" + lock_zookeeper "mosn.io/layotto/components/lock/zookeeper" + runtime_lock "mosn.io/layotto/pkg/runtime/lock" + + // Sequencer + sequencer_etcd "mosn.io/layotto/components/sequencer/etcd" + sequencer_redis "mosn.io/layotto/components/sequencer/redis" + sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper" + + // Actuator + _ "mosn.io/layotto/pkg/actuator" + "mosn.io/layotto/pkg/actuator/health" + actuatorInfo "mosn.io/layotto/pkg/actuator/info" + _ "mosn.io/layotto/pkg/filter/stream/actuator/http" + "mosn.io/layotto/pkg/integrate/actuator" + + "github.com/urfave/cli" + "google.golang.org/grpc" + _ "mosn.io/layotto/pkg/filter/network/tcpcopy" + "mosn.io/layotto/pkg/runtime" + "mosn.io/mosn/pkg/featuregate" + _ "mosn.io/mosn/pkg/filter/network/grpc" + mgrpc "mosn.io/mosn/pkg/filter/network/grpc" + _ "mosn.io/mosn/pkg/filter/network/proxy" + _ "mosn.io/mosn/pkg/filter/stream/flowcontrol" + _ "mosn.io/mosn/pkg/metrics/sink" + _ "mosn.io/mosn/pkg/metrics/sink/prometheus" + "mosn.io/mosn/pkg/mosn" + _ "mosn.io/mosn/pkg/network" + _ "mosn.io/mosn/pkg/stream/http" + _ "mosn.io/mosn/pkg/wasm/runtime/wasmer" + _ "mosn.io/pkg/buffer" +) + +// loggerForDaprComp is constructed for reusing dapr's components. +var loggerForDaprComp = logger.NewLogger("reuse.dapr.component") + +func init() { + mgrpc.RegisterServerHandler("runtime", NewRuntimeGrpcServer) + // Register default actuator implementations + actuatorInfo.AddInfoContributor("app", actuator.GetAppContributor()) + health.AddReadinessIndicator("runtime_startup", actuator.GetRuntimeReadinessIndicator()) + health.AddLivenessIndicator("runtime_startup", actuator.GetRuntimeLivenessIndicator()) +} + +func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrpc.RegisteredServer, error) { + // 1. parse config + cfg, err := runtime.ParseRuntimeConfig(data) + if err != nil { + actuator.GetRuntimeReadinessIndicator().SetUnhealthy(fmt.Sprintf("parse config error.%v", err)) + return nil, err + } + // 2. new instance + rt := runtime.NewMosnRuntime(cfg) + // 3. run + server, err := rt.Run( + runtime.WithGrpcOptions(opts...), + // register your grpc API here + runtime.WithGrpcAPI( + // default grpc API + default_api.NewGrpcAPI, + // a demo to show how to register your own API + helloworld_api.NewHelloWorldAPI, + ), + // Hello + runtime.WithHelloFactory( + hello.NewHelloFactory("helloworld", helloworld.NewHelloWorld), + ), + // Configuration + runtime.WithConfigStoresFactory( + configstores.NewStoreFactory("apollo", apollo.NewStore), + configstores.NewStoreFactory("etcd", etcdv3.NewStore), + ), + + // RPC + runtime.WithRpcFactory( + rpc.NewRpcFactory("mosn", mosninvoker.NewMosnInvoker), + ), + + // File + runtime.WithFileFactory( + file.NewFileFactory("aliOSS", alicloud.NewAliCloudOSS), + file.NewFileFactory("minioOSS", minio.NewMinioOss), + file.NewFileFactory("awsOSS", aws.NewAwsOss), + file.NewFileFactory("local", local.NewLocalStore), + ), + + // PubSub + runtime.WithPubSubFactory( + pubsub.NewFactory("redis", func() dapr_comp_pubsub.PubSub { + return pubsub_redis.NewRedisStreams(loggerForDaprComp) + }), + pubsub.NewFactory("natsstreaming", func() dapr_comp_pubsub.PubSub { + return natsstreaming.NewNATSStreamingPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("azure.eventhubs", func() dapr_comp_pubsub.PubSub { + return pubsub_eventhubs.NewAzureEventHubs(loggerForDaprComp) + }), + pubsub.NewFactory("azure.servicebus", func() dapr_comp_pubsub.PubSub { + return servicebus.NewAzureServiceBus(loggerForDaprComp) + }), + pubsub.NewFactory("rabbitmq", func() dapr_comp_pubsub.PubSub { + return rabbitmq.NewRabbitMQ(loggerForDaprComp) + }), + pubsub.NewFactory("hazelcast", func() dapr_comp_pubsub.PubSub { + return pubsub_hazelcast.NewHazelcastPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("gcp.pubsub", func() dapr_comp_pubsub.PubSub { + return pubsub_gcp.NewGCPPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("kafka", func() dapr_comp_pubsub.PubSub { + return pubsub_kafka.NewKafka(loggerForDaprComp) + }), + pubsub.NewFactory("snssqs", func() dapr_comp_pubsub.PubSub { + return pubsub_snssqs.NewSnsSqs(loggerForDaprComp) + }), + pubsub.NewFactory("mqtt", func() dapr_comp_pubsub.PubSub { + return pubsub_mqtt.NewMQTTPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("pulsar", func() dapr_comp_pubsub.PubSub { + return pubsub_pulsar.NewPulsar(loggerForDaprComp) + }), + pubsub.NewFactory("in-memory", func() dapr_comp_pubsub.PubSub { + return pubsub_inmemory.New(loggerForDaprComp) + }), + ), + // State + runtime.WithStateFactory( + runtime_state.NewFactory("in-memory", func() state.Store { + return mock_state.NewInMemoryStateStore() + }), + runtime_state.NewFactory("redis", func() state.Store { + return state_redis.NewRedisStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("consul", func() state.Store { + return consul.NewConsulStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("azure.blobstorage", func() state.Store { + return state_azure_blobstorage.NewAzureBlobStorageStore(loggerForDaprComp) + }), + runtime_state.NewFactory("azure.cosmosdb", func() state.Store { + return state_cosmosdb.NewCosmosDBStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("azure.tablestorage", func() state.Store { + return state_azure_tablestorage.NewAzureTablesStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("cassandra", func() state.Store { + return cassandra.NewCassandraStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("memcached", func() state.Store { + return memcached.NewMemCacheStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("mongodb", func() state.Store { + return mongodb.NewMongoDB(loggerForDaprComp) + }), + runtime_state.NewFactory("zookeeper", func() state.Store { + return zookeeper.NewZookeeperStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("gcp.firestore", func() state.Store { + return firestore.NewFirestoreStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("postgresql", func() state.Store { + return postgresql.NewPostgreSQLStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("sqlserver", func() state.Store { + return sqlserver.NewSQLServerStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("hazelcast", func() state.Store { + return hazelcast.NewHazelcastStore(loggerForDaprComp) + }), + runtime_state.NewFactory("cloudstate.crdt", func() state.Store { + return cloudstate.NewCRDT(loggerForDaprComp) + }), + runtime_state.NewFactory("couchbase", func() state.Store { + return couchbase.NewCouchbaseStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("aerospike", func() state.Store { + return aerospike.NewAerospikeStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("rethinkdb", func() state.Store { + return rethinkdb.NewRethinkDBStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("aws.dynamodb", state_dynamodb.NewDynamoDBStateStore), + runtime_state.NewFactory("mysql", func() state.Store { + return state_mysql.NewMySQLStateStore(loggerForDaprComp) + }), + ), + // Lock + runtime.WithLockFactory( + runtime_lock.NewFactory("redis_cluster", func() lock.LockStore { + return lock_redis.NewClusterRedisLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("redis", func() lock.LockStore { + return lock_redis.NewStandaloneRedisLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("zookeeper", func() lock.LockStore { + return lock_zookeeper.NewZookeeperLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("etcd", func() lock.LockStore { + return lock_etcd.NewEtcdLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("consul", func() lock.LockStore { + return lock_consul.NewConsulLock(log.DefaultLogger) + }), + ), + + // bindings + runtime.WithOutputBindings( + bindings.NewOutputBindingFactory("http", func() dbindings.OutputBinding { + return http.NewHTTP(loggerForDaprComp) + }), + ), + + // Sequencer + runtime.WithSequencerFactory( + runtime_sequencer.NewFactory("etcd", func() sequencer.Store { + return sequencer_etcd.NewEtcdSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("redis", func() sequencer.Store { + return sequencer_redis.NewStandaloneRedisSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store { + return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger) + }), + )) + // 4. check if unhealthy + if err != nil { + actuator.GetRuntimeReadinessIndicator().SetUnhealthy(err.Error()) + actuator.GetRuntimeLivenessIndicator().SetUnhealthy(err.Error()) + } + return server, err +} + +var cmdStart = cli.Command{ + Name: "start", + Usage: "start runtime", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config, c", + Usage: "Load configuration from `FILE`", + EnvVar: "RUNTIME_CONFIG", + Value: "configs/config.json", + }, cli.StringFlag{ + Name: "feature-gates, f", + Usage: "config feature gates", + EnvVar: "FEATURE_GATES", + }, + }, + Action: func(c *cli.Context) error { + stm := mosn.NewStageManager(c, c.String("config")) + + stm.AppendParamsParsedStage(func(c *cli.Context) { + err := featuregate.Set(c.String("feature-gates")) + if err != nil { + os.Exit(1) + } + }) + + stm.AppendInitStage(mosn.DefaultInitStage) + + stm.AppendPreStartStage(mosn.DefaultPreStartStage) // called finally stage by default + + stm.AppendStartStage(mosn.DefaultStartStage) + + stm.Run() + + actuator.GetRuntimeReadinessIndicator().SetStarted() + actuator.GetRuntimeLivenessIndicator().SetStarted() + // wait mosn finished + stm.WaitFinish() + return nil + }, +} + +func main() { + app := newRuntimeApp(&cmdStart) + registerAppInfo(app) + _ = app.Run(os.Args) +} + +func registerAppInfo(app *cli.App) { + appInfo := actuator.NewAppInfo() + appInfo.Name = app.Name + appInfo.Version = app.Version + appInfo.Compiled = app.Compiled + actuator.SetAppInfoSingleton(appInfo) +} + +func newRuntimeApp(startCmd *cli.Command) *cli.App { + app := cli.NewApp() + app.Name = "Layotto" + app.Version = "0.1.0" + app.Compiled = time.Now() + app.Copyright = "(c) " + strconv.Itoa(time.Now().Year()) + " Ant Group" + app.Usage = "A fast and efficient cloud native application runtime based on MOSN." + app.Flags = cmdStart.Flags + + // commands + app.Commands = []cli.Command{ + cmdStart, + } + // action + app.Action = func(c *cli.Context) error { + if c.NumFlags() == 0 { + return cli.ShowAppHelp(c) + } + + return startCmd.Action.(func(c *cli.Context) error)(c) + } + + return app +} diff --git a/docs/_sidebar.md b/docs/_sidebar.md index a97ad799c0..07f27fad69 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -10,6 +10,7 @@ - Service Invocation - [Hello World](en/start/rpc/helloworld.md) - [Dubbo JSON RPC](en/start/rpc/dubbo_json_rpc.md) + - [API plugin: register your own API](en/start/api_plugin/helloworld.md) - Integrate with istio - [As a data plane in istio](en/start/istio/start.md) - Traffic intervention on the 4th layer network @@ -18,7 +19,7 @@ - [Method Level Flow Control](en/start/stream_filter/flow_control.md) - [Trace management](en/start/trace/trace.md) - [Health check and metadata query](en/start/actuator/start.md) - - [Multilingual programming based on WASM](en/start/wasm/start.md) + - [Run business logic in Layotto using WASM](en/start/wasm/start.md) - [FaaS model based on WASM and Runtime](en/start/faas/start.md) - Developer guide - Building blocks diff --git a/docs/en/start/api_plugin/helloworld.md b/docs/en/start/api_plugin/helloworld.md new file mode 100644 index 0000000000..560887503a --- /dev/null +++ b/docs/en/start/api_plugin/helloworld.md @@ -0,0 +1,49 @@ +# API plugin: register your own API +This is a demo to show you how to register your own API. + +Layotto has the api-plugin feature to let you add your own API based on your need. + +## step 0. change directory +```shell +cd ${projectpath}/cmd/layotto_multiple_api +``` + +## step 1. start Layotto with a new helloworld API +Build and run Layotto : + +```shell +go build -o layotto +# run it +./layotto start -c ../../configs/config_in_memory.json +``` + +Q: What happened? + +Check the code in `main.go` and you will find a new API was registered during startup: + +```go + // register your grpc API here + runtime.WithGrpcAPI( + // default grpc API + default_api.NewGrpcAPI, + // a demo to show how to register your own API + helloworld_api.NewHelloWorldAPI, + ), +``` + +## step 2. invoke the helloworld API +```shell +go run client/main.go +``` +The result will be: + +```shell +Greeting: Hello world +``` + +This message is the response of the helloworld API you just registered in step 1. + +## Next +You can refer to the demo code to implement your own API. Have a try ! + +For more details,you can refer to the [design doc](zh/design/api_plugin/design.md) \ No newline at end of file diff --git a/docs/img/api_plugin/img.png b/docs/img/api_plugin/img.png new file mode 100644 index 0000000000000000000000000000000000000000..20291b310716a0eae7a90ab25c076cb1b522dc77 GIT binary patch literal 13548 zcma*O1yI{x6fTGa8lbp)@ZyD{L5jN+XmKZ4(c%;jR@~ja6o=xP0>z3`yb#>oee>UW zyR-7%&i-a1leu&6xpvO?o%1E)gPJ@xCIu!E5)!tef{Z5Ovjy?$0iq+`o2AO_kdXKq z6lEkodKn-6!|3qSx_f`>6sAy0@2c>TU>I*Goz;~nV>q5pON!o-O;NV^kF=HMU~)2@ zC5JQw2SY=tXxr@N<>c>4YgPN9-|>_Gm+qVF*_|u>8lQ3Tox_G3{k$vDyd6Ee8vZqD z(#qChW76)1g{#L;pUW*xG1WG%LmbA9tx_1eo9!-=%1$avQiE89VwMP30`Auh6A_J!=eyHM~x6W;vK z!=PbQV2Q8$wx4^rnZW-%DUUCTyUWY94_`jnsnh26kG1%;efv?a{7GJ-R(UfvXap8%6Efu)dmN=o4;M#i{qpA<*_u&k>dL4;kdo#?!e>)d^qnE z@UUbRE4-~RciO)CR4sftA)SRm$jn&yvW6Q#cD)d}wrdCNb)Peu`@=V_GFm^v^*~i| zhgg=548QAztcvQk&+}(gWqJNz-yQv&7C`ju4TZwmju zm8xfD+qC$(Vp~1netP1}B2KI+!~cFyD;%53doy9L%c<*byTHO>ViGa=?wO#aXryd} zU8CTG%IS`aoD&txi%~)KtJCScXSeyCSn>N^t8uX#Di8aPq7w|9O8A!n>KiH}Fawf>d%4{CdFpINkzED(@?GdqCikczr{x|Kb zIh9vV{ufzRL1-j4#&^Ryr8uRB9T#Jxj_pUY91VYXM{z_B_Zhx-syk0CAr+cPcz5V^- zUxz!92I#(=QqF4bJaQOqJE&f=uZ5wPf0;80c-q#=yPv5YSA7lB3+VIiXE#2}9&doM zSGSnITlKx*#}&JrsJ($lvRB)PKcB_1R5T;{5p?=Gos}qfyDR^G(RR;g4Y8xX^z(56 zj~i|{n}(1CSpxhNT>sPHQDx)vM97|ZD#9Fg{HO5uk7nLKAf4_i=i8%B{;n4{-7lSw z3isGIsT)HqdG@J>E<;4UA+t9=W0UH;%Fu?7gWo=GqGALBzSZy);^%$cFPAyBt8TU0 zZLi0FL7dcZV+J9ViF%ZlzuE>>-U}nf0xIC!u8Ta^F=6Qemu%njp-~FE%E`*+m9Ikt z^ieNHI^}iByP_TP7^zF%WtpE5wj{8hKCHfoD`xm0*0bBh21fgGJFv}Po5vLtfcQgk=l-1Ya5&_?tqVxKdNv6_}H zulh+%vX5V84V+Y|hn}MFmE7xH!5Jh2;#HX({ z-O2TFbX~M9;~$`MRjH2+N`&VZ3?a>?6O;U*8&Wv+sg^C!QGBia0>$F8;cDA0Byv`J zH_Er?YAo`cQscWNyz`7W!x_BN^@6-?DD8WtSzHMRoKD`#^laztjx~thkKIg!-8{(i zn!B#5q^XlaHn6kb%V)`~p`m5h_#N4WLvNH$dC=KOL!{96eo}QmU)5-Rts3QgwFsn{ zha{;^wc0)<^dxcA^${piGEm$89A>Lv%)s-42ffPFv(9PsKCBz4cWMfj#mQi-WenOi zmwgUEoP0;F?#J^>eUFk3&w;@xEp;*X-SpMJ8fwB@$%lwe&#Dg)i>ci2Sq%0rs`0<7 zkj|?B&`SI6DVr$qq6F<^yN)HFFf-(Otw$7$M+*$vGYI93|F{#c@L7*w!SU$^D052i zlS!k;V~=om`?k>lz-u9diGAq~O$(avJu+T*NrBMD7`To$QxEDSe64X2G_IBl+C?1k zr-J%<^QaQLXi+hsjJry%8Nx5C1?au-T71M1M+ebDuytpRJpb{dWmMF3eI15OM9C4L zCMh|~m`>W9Je+iQFMK_y(y|7>JYF<7J(G6d|E;wk^C`k@ey+i$*n*T+Q=iqX}s%{y9NpAt@~PMf06qdV+b6BqN=Yw0r5&+Y)pz$!P0?O z^6P=f<;Db|mTAO$^?kIrJ+Cyw&wM6QWLY&Z#K|Y3;6)g`wjVQ-W+Yc!YTvw)q7TRj z9h3~nij(S1BIm`Iq#_Y#5vu*YiCpD?iS+A$Oc*b_7-QGB(pkKALl7daYnvNZeGI)H zytrq~6S882wriuRJ%t2;T7;?ViJ1kAT!xs2GTAs-s6T(?iS;&NL&+6IE z(qXK-@)B8oS&r{~u@rL4C*ZhEHE_%b8;k*OrwVOKsyQQ<9msJ_{qh{1{Qb{!RSa$N zd?#{Q7(?p5kH`3%y{m(qmU}5zi{}P9`mn^}lwoqh`D565$@>QkKXvCA$UKe4>sC8A zZRT;H%$MZ{bX}@shzA zjPLu6AsP>Xid8KHChm`DGQB67v#XI`%A7~;A>-^AE~|Hcf$Fc)Z1j%CNTEJxtzON}eY558zuqncOe zX?=)A0gtgi+;=jL1@C{~F5&jGU1aX?RbTV-tr0cpIWt~*^ZP_@53K$;d(j`I=A2cy z=<)UGYR(CPVcWJvR`1mA5;WEb^KNAtW;B&cUL5?Y))oifQ9K@6zqhl>uuR?lQBlpd z`V}YMaU(`ht+;`_!YJCEON4YOXx{j;dM7^^M~T|GEXPAB=j>C>*H4Sa2H_lL*4qg9 zSSGq`h?rdY#+J`S^IELPv5oje#yB6Tk()*X44sniKh#^n;-adh!Xvu~?eKk>(Nb3p zvkoQJCS$9ptXFz>v+P=St?$^nl}W5)_+|b9Q{8O9x}7ZzB>i{#e1v;d^!avUYYC&; zr>>uba#46u;Bfyy{{8)A?f61!c92(>mj=cd1t?fQkGptS0|=zJXu^$^c|+F$8?z%X_{o3m-%ET=nyGJs)BzTYX;M5l`zt>iiE* zmEPn;@o(#my?=S!Bpwsq$^IGcM|wSP5odMvSBD$7>|QRn=>|R~a$=9*5b4`Lr`7D# z-alPlW+8&!%LkwTJm#LRo`UqM52EW4tWD4l$!$f52PHZGB^yKPasp=$?;s7^;XWXb_KJxy>?$0s$*2|Q3~?73g$uQAsYajS z!MFndfw23^n;cvXd8`}{1nk&daGi5jy2l z=GVV3U|4FW2z~_q2Pg!6_(XV0k@fPZ9t@W3_+vi=p-@_x@B{JM!t8cM6$%}#vmBa| zQ^>J(KhEm-B_thGHjI7H^EqnN+7UZ$*%+?DmNXhijJx-4Ung60Rs@b7Wwic{Ay(mt zddkWW-!S<7#h-UWE|WRmK(&%HfpRRQJ=U_1;$=at<^wJ{xBpK31v_65B!{QdkwHjs zi1^^{(%Y5?3NM-6rtnTfoK*Muvx#j<=Q#$f0@2eR)KLV%&1URHBQ=btnAqX_cGIfw z(&I#DlohMZ0`|iJ=p*hkLF-aI8&Gl%T~{P`sPo~8wJe+WFQ?O*e`nds9(>YDD%(1Y z+)80iwbXYF%S0hW>v7!qyVKk%8uhU7OhIgU(OwHv8}L}L?-M>_96Pi@0N=OH9;lEs zl@I09b!hyXh&J+NN|A+WWW$HUAeVhkp>i=%nJeS@JvaonXA@&rQW=$}x) zzFDtqliqa9{=(wU?vNqt|03&*_tW=o#6-4Cn>j$DXUG3SRD|F^iY;A(x-TKG#RAS*U#^y?nDkCqJOO?l z;RwA?#W?rbUq`HzqRgD5_jTV(2Qv9brJkNq!PRV0eZ6tbbd-52b83fmnlo9$i_EzW z=EgMy;^n<;{UU!#fKnu?0Kxe8_TR46GNb0@ucw>UJT{|O63}dTC5u!%Mc&h)b(GWd zD+hoM%s;RHBaA{w(_`5}(Rm`_QM{)3RxC8!g|nv(4TfCD2WF=*{j2Mlhec4H)Z$O= zkfbW0xZ3`BJ~GV{){bgk16HyE0^HqfoFbw6>sF zLA8+!Mx+GLyHNHYxxl^Y3+wN?Q7(eV@0Ii>>;`f7;YLD2vO zYv4(Q*>C5Uj|`KgbX36_&(AfJyEQCQ@CDlR(4x zqj1#l)yR1JD+QvHfgTOg`&c-m(h~f)f`pZ}ErLxbheTMg#q!%={Xqd%5Y}GuvC%7s zZe>u`%UlPc&z$kOC^{4LpAG7~+WeD)d(LMTxHY;ZYY=Z6UpmtSEU<_-Liyth@Ip9g zHIlb38;HEu5*{oevt53dD)s>3YDiyCD(hh)p&feTzmh%f0$|WjFnLE`Zp*%!(zv+% ztP`(S$c@o;v*Kf&moAj7M}(}BwuQ3^(9x5lk-cS#uFjmMqp?YPBk3IAI`JY5vkcMw zR2OD2LMmmgYTFdOen&jg4xr$SXezubi0ak_ma zzeUeZlA&9gsWBCd{DQ2Pt68WZ|3l|!ilpzCwt+)RhyCMm4_-Fcp9W?`a$>tSSF}z7 zw1D~c3=_tXe**MALx{w5Z{hB-OsdS*qkJ`)SpTcy)x{hD7s@LGSsla2JVnY6WoHg-fJBeldd&3-hJMLc4yMCIy;q$8SvG#aW?S~BYd^`R4iDigiH8JRqZ||k;)UY>qH3oV<6vci#K{O) z%%sB<@N(VUox?^q=TH*M-k3w5FO`|Tls|AlrjShv%U(GN$UqB9i7Z)zmW_o)`VP!( zl=PCv4Pk6~>6l!UFs@~j+gL;gu^yL31EM zFpbIOy=Iuvhm-y8Y$|@^Nl$>FO(0RLs3c~Q1xMjy9lfAE5pIMEbxL^L&AB<_Br2Y% zsRG5gg$V<3FPx)*{EP8}aq*=WNmBjhp6UdzLswhLu5y_&N%1^=;F=XwOJ#}zdzq+YjnHB4%yIo=Dt^( zMUR`_1N0Q`69GdnnG=G9=<>G6Ov^2vJ2JkO)_*;^_hzX>8&U2`r!bQk&=xtgQI z-q}YNQn;Zcm3V=6m^8eYaTQKK8?~*vp%vvmnf5evr;B;V%7(#GQt>08F* zRUl~4eH%!y_cWO17TZ-v*HM$Hxd|D@TA?D0fX&#j8lP~_L#RWe39v%^(%?Q^89d20+hYv=1#68sXdT{=6ry8yZ; zlPW$r%e$0NhV(LBn-MPEDOKsujILR>{Q|YRz`t0~x^rY~k~9F&4PO*Ac=3kR8@~cI zo$!QWh@f6RoPQ|d%CgQj!-5>8f*%|0bBf{rywJ1#3uJWEu{^ z8i%to*ec3me)?db?>z0e(faX5wy0sT+Qbi>2j;8N#XIp<%b-KH6K>#@XtRm)N(|^I zbOKhxcCwthv?~s6?Nd}qe>hX5T=S1-^2NJu6q)|_yL8BFUaTqUq4OF4rR>i#$pVm+goNDd&NtiE z6=hr_k?^}a%rGO?Oro;7<`{2^;E*+iaB(@al>fmthf`rhdDEVg&0-h zeZ3b09dHb@pCa&Xdga@sa-j_$??5kUf`RsCcmfEq4LQ=F5dth;u_RVhLJ65o=1-5@ zAKs}`3772B51ddGqYQ=VQcn6_NdPiR875{ixuvtx2$Jn*$qHz8pXuhawE5slctivhBkyZk+*p1aQ_>g|Z z&wiEPzqH-E`QZg+)tBkVStP>RX4TT4oR+Dv;MO9bsBJ({xnZ-4@&9C_bV*>f2Bp4 z*(5Qf z*&>%s6UxvW_~*R8y_q??b33c9s=%sTZ3qthT@yX4$R9))$r?*L!JEXoCdhniCUszb z*|Z$1TE&bmP^r0^s?((8_?{!+(U)f%U@gWm)V^SqRpPMAfpqdxue97kMuKiD9=(mO3mA) zq8`(h%5SN$ZsX8?XTuGvjlwt?>;otswn0U*3q^f}XHCd5ZytW^EWKJmoh+O)2Ym>^syf6_wPA*_e~xlS>~&iZx74i(7{o%f<2XS8r=CcvAFipIXH z+L5yuHvv@5gbh_6XLKmt*zB^}aVZy@V9!ObLp|ywx_#a6EWZ7P7Xb}hsjOJ&v?t`7qzqcc1%xLB!%M`V~q@H zjlXFT;)a6JDz4VeLO_|?NAvH-c4@v+*V;@Y zk$yT>_MxQtgB5;I^jtM}e#05M1F}uvE0-EV)LfQTe6IYBgy`#gNT-p70mr1Av(RpS z@YJ^MCi$Z}{_-pp3JsS){zXFvFAgv6H$r64 zzp=JmMgdD%`pu@Bm{qRn^y{rPz4#HQv;|sT7TnAu6uL1`o&qIxP_fYd_oAtXvAn$C z6wf1GevZZi=l9iV{-km9R(&*JM4&9j=^W5%$y2CGR5=b!u^t@)g+^OQJjPh~9Jg9W zszbc&xs9b%B)$rj9c*QslQB%lI)62I62FB9?_IsThS>^`sNsE2EV98PC`gmR=jgBQ z0HYS}Q&C9B$>8mCQ%5CpEdE!StNZ{EoANg7HpQxQBMO0w*{DIz7T(lzC=3%c4pA{q z-W~`20pD#VO<#dl2$VSzy6ko==C<|mX9R)+`@q4!UlCxz(DGMY|C!?l5X%L!fX;M{ zPB0FKjksq@T-!W-j^}&jcTNRk%XbW8QzpD^u1}rK--OYloFhRm2ALGN=E!rrO)7N8u86vZ;@lc{Mb*X@vJ=ki2WfiBPNfQZHd|?fh7~?AM#*HW z!!aeER*no#PNSP@2efS$>bm#A=uKxILQs%4mGT^mF3NC_by9!0I9TRF$azzhbRNQ( z<*}t`v9Z+CLqJv6>e*?cPWr@99u`{(eT`gk+!C2$dFCAwVKl9q_3g1$ACKzJ#M<spudC`;)!yXy6PP%AyJ3P)S|O(|qmim@m?T^;$f8#mFYdA^io7r-@GS(guV0H9EqTa4%)WQ} zOhNg^^#&A}>#=B?G-;Mk1n{8c^TFBEhs0ji;QSDC$W0mP3}O;uXA0l9LNlbmGJ{BW z3>bZOCLZ&v=c`-TMF_D(7_oe{-?9Qz90B*4TrvvPVsa9$&<037wIz;r^{o56^gH;Y z#}kuDkNW}O(vEUDE>NI}1yYb;grppqz$BptfMFvtE68&L!L6Rb>Xn^p$p2NZ_gf$s ziaWW3AjnlaN6DcQMo?ocn;oiXvMZLMB2TW;B)(!c)qd7oy(PUxp9t&g_`{So$QtTe z>oFqAd;~0&K%hZPUV$Ya$iJnZ=vSW8k5{_$xU1@tb%K8nL{Hr)O<`zLo zt_rV%s^-!g3;0kY8DJWD8Fys&0{A!e4hi=5uB=E#KThV$dxr~4rFg>*-K;Oja-lpC z@CNoMO>@M3%Ab;8lj~7F=y{o})JFgqX-z`n!{?`pdt{8o8 zTqL{-1g2uH&{tk)K_$_+aWhfy{iw!SZ}+b`c#{cTRAg*xkBh+tdr)23e=-xJ?Fu8Y z00+7IXtdwSaKmnUg$xrMAeDsl%#Q%r)k8< zQFBc0=kG#kze~3KaLH|~L|7V*2m4WWM5qsdU^VeT2BFy37}(WXsK2&h*}tvKl-U<# zoxx)0et66LP56dlZ7c+XjBlw&YVQyVuXGeGSF6pEe&DfASB_Pxk=Y$NCaZRCG1jpG zrnXV=S2~y+%AKrn?0-fgzuSZWcAbmA6|R568zCIViR*AUj85n zlVbT~IP+%^v8)tz5u#=pJ{IxK7fP>HN6T1euQ4VMExZ5yqS$N{dOoNQ8& z9;_yN8lfY>BVu6pPQnG><$aVsZu*&b@Yhy>ac2_4(gtu|twGLy9)re=1!I>Bdvw#u z#BPh#c`g5cku|J5vV35(M%VsnQJnAEVYZBKh^aDM<2yeesF1;1q&1p=_m+I>M(1ml zPN1$-Q%`q)=Yy2-d^Wxos(Go&W<9och7_Ln zzVDcd#Lma}4skOC#Kd>vg|d=v&;f zE_H&qAPMCqysZRNLaGD@CbJFIynzNAy1y?*bm{dlfLAjj@u4j z&&|w|St@xYSCg;Asec0FVs`?jJuJ>s@nv1r47(@gHf20mRc=nq&>(X`*rlMd$Hwum zGF1R9M@*U5$wMy<&*#@u30B6LADD?LRVk?R_W~&^G-r=$>|=?b70Z zl&)TvV@M_lkpIv-c6ZoDP#N`DZ}<)rfgEX=NJex%ql}R^DXD_|tsJ@ZCq*Dn@~FA+ zthO8UvHUqA=)z-k2N8DOf>om3pbt?4f1}eWrUdC!W>ycy9`$ z%;c@jd+bk?#-tw;@HKqZO=r8@XlbH3n2XluZ=ghRO?QMi=*~OxpE>E}} zANU8SVcZbptj#XABX;V*wy3VBg-!@-`WjS$p1hSWkIgWWoqn;#&#q-<&@o$U_--W0 znh9mOR89u?H3`5&&aR}=`&YtfqTH}SE){l0mNqawSm`wkYy+xoP_)Ubv{BsY%G10N z#XKl$$W)Bz7C`bKYRx?3j zh~>5|rw$K6AFBmFXA{;26u!)0p6<4aUMx#gop2z_1(<|ewMU$myds8Z-5FivSR(7v zLh@bPDI))vVYkCG6~ATwfnz*R>YgG+6g6a)yum~7o1KJwf+X9RX{Q+u>A)}t4Qp$U zhdj+a=nm|xN?g+)2xP;}hAvp!_HzYY#rRt8x3;{P5gwL*a_Fxvl3XN9Ji}(~FIh4%_MP06XIW~>j9|gJxysm}2&U|*HP&m&g zB148KejY>&H>&v+c~D!Y8~^IiHMqiZ6&X`*=PwRLq8cwnxkj8Xkh-|Vh*$~ z-WZ82&G3EW)bKM28}-Mx%!~aq`rTV%@3?ZNshr&>uJJr2Omp9>z88MW1w8g1NC48n zrCRlEc^cMr=@UZ^J}nGx@igl~m`y1-6K^}pzoUF$1-GqAV_!NG)!-u)wBZ5bgja(i z>fpuFq6-^7G@R4O-n8z&mTL3shDl;*$~C8c^Y6CQOLW9MHXfqUw+E19qs6?UCfX73 zYhm=l0mQL(h@uxp>HIN{w%m9uUA@Q@wL^s-{4}y7M?^FxRzBtMwsln678-I64~ z0bjy`Tvr0_1!Z~V`G=st7iF0nnk49uAMV2`+V?pDjY3~Pk%SbKsBw34L+`aYRC0iq z{P(II%toz7IDL25f|U4xja>FMDx6>U;~awyb$qzhf;)XT;ho!!ky0|1FbpQ@bmv1( zslGE}=ok`Cm}KnQ8Gg|@72jBOn4TDgjo{ECCtPlVPnncY(>*6&&SX?&eF0&_0UCwC} zmGCDU&KUVRCORR-rKoT5T~Q-kc<@((94{TCVU0C-0A(W#e%Y)NwCsd|&3RsiJknUY zrwfRXb=mBfZ}_aGK=ic(8{I&6Vh2mV)esuHm7*D)Pma;1naEfB;Nb)v;nHlv&z4EZ zITOsCHR~K=1)NB6LzM~Z;YMvDAhN%?M(XM@#J{m&3F(%smYB+T>Kg+SJfQChD)&>s z+$#lLamg4C=p6%!GAfM&#arM?BV5GjpK`D#?21B6RwamreOq|k*oL9SEjv&{0F8Vk z1xX12^A_r6&u4VEF_n|PS5G)erIXZT$iz?D9K<;G`x1E=OCQQ=K**|}GxM;8^0=6A zd@{Tg_n#4xJy5s1;HGbdK{2Iu#`Uzz;)050^k$BAwlcOn*cDo}xem4**re)dJ9xL` zXeDvp!4YWCB%yk#>Pdb#z1~1;b?-M|{jlCc$)w6Y622!Jb4&oK%+%g7-RzsR0u55e zv|pgND6!LPRy^B@;XJh-%9X)mBKxm3Q^pv+3n*;X%xN|pPv6HPAIU%W_tg(YAVeUN zal{3D&Rnv^<>m=Kc^Z40(i|?U=wO}Nxu#ftkdX2xuI=6!J0W4`Ab7EaT<;YfSIyM* zsS{4fCJ()hj1=;rf@`xYS&sX^&2KmbU633%f7@RjLe<96z_criiEman4?UE1kFVv4 z^m1;F)4 z)}B}%bgcK9n7B9IhL^w$e&$Ol0l`AgXz#J`x36lK+9s-;Y!{~L)w BXG;J8 literal 0 HcmV?d00001 diff --git a/docs/zh/_sidebar.md b/docs/zh/_sidebar.md index be9eefeb61..007dbb349f 100644 --- a/docs/zh/_sidebar.md +++ b/docs/zh/_sidebar.md @@ -10,6 +10,9 @@ - 进行RPC调用 - [Hello World](zh/start/rpc/helloworld.md) - [Dubbo JSON RPC](zh/start/rpc/dubbo_json_rpc.md) + - 使用File API + - [基于阿里云OSS](zh/start/file/start.md) + - [API插件:注册您自己的API](zh/start/api_plugin/helloworld.md) - 集成 Istio - [作为 Istio 的数据面](zh/start/istio/start.md) - 在四层网络进行流量干预 @@ -18,7 +21,6 @@ - [方法级别限流](zh/start/stream_filter/flow_control.md) - [健康检查、查询运行时元数据](zh/start/actuator/start.md) - [trace管理](zh/start/trace/trace.md) - - [OSS访问文件](zh/start/file/start.md) - [将业务逻辑通过 WASM 下沉进sidecar](zh/start/wasm/start.md) - [基于 WASM 跟 Runtime 实现的 Faas 模型](zh/start/faas/start.md) - 用户手册 @@ -70,6 +72,7 @@ - [Sequencer API设计文档](zh/design/sequencer/design.md) - [file API设计文档](zh/design/file/file-design.md) - [FaaS 设计文档](zh/design/faas/faas-poc-design.md) + - [API插件](zh/design/api_plugin/design.md) - 贡献指南 - [新手攻略:从零开始成为 Layotto 贡献者](zh/development/start-from-zero.md) - [文档贡献指南](zh/development/contributing-doc.md) diff --git a/docs/zh/design/api_plugin/design.md b/docs/zh/design/api_plugin/design.md new file mode 100644 index 0000000000..7d6c11c1ac --- /dev/null +++ b/docs/zh/design/api_plugin/design.md @@ -0,0 +1,120 @@ +# Hierarchical API design +## 1. 需求分析 +### 1.1. 解决什么问题 +解决扩展性问题。不管是Dapr还是开源Layotto的API,目前都无法完全满足生产需求。 + +回看操作系统领域POSIX API和system call的发展历史,我们可以学到很多,借此预测Runtime的未来。我们可以说,Runtime API将来也不可能完全满足用户需求。想想OS领域,即使有POSIX API了,一些场景还是需要绕开标准API、用特殊指令操作特殊硬件。 + + +Dapr的扩展性是通过Binding API解决,但是这种非结构化的API有很多问题(比如破坏可移植性、不支持stream等语义) + +### 1.2. 用户场景和需求 +举例来说,有以下用户场景: +1. 公司有自己的定制API需求,因为是非通用需求、不适合做到开源Layotto/Dapr上,于是公司的中间件团队想自己开发到sidecar里。如果公司的项目import 开源Layotto或者Dapr,按目前的架构是没法扩展开发API的,只能Fork出来做扩展 + +![image](https://user-images.githubusercontent.com/26001097/131614836-60d797c8-b80b-4018-ad43-c2b874d35660.png) + +这种情况下的用户需求: +- sdk下沉; +- 支持多语言; +- 多云部署(只不过需要中间件团队自己为多云环境开发组件,没有社区现成的组件拿来用了) + +2. 公司有新API需求,适合做到开源项目里,于是提需求给社区,但是社区很难达成共识、争了几个月还没落地(例如https://github.com/dapr/dapr/issues/2988 )。这种情况公司可能有业务压力,没法等那么久,希望自己先实现、落地,等社区实现新功能后再迁移过来。 + +这种情况下的用户需求: +- 用户对该功能自主可控,不需要(同时用中文和英文)说服社区、说服世界才能做这个功能 +- 快速扩展、服务业务 + +3. 用户想给Dapr API加字段,先在自己的Fork版本里添加了字段、满足线上需求,然后将PR提给社区。社区拒绝添加该字段,PR被关闭。用户很尴尬:这字段已经在线上使用了,怎么处理? + +## 2. High level design +### 2.1. Hierarchical API +参考OS领域当年是怎么定API的,我们可以把Runtime API设计成多层: + + +![img.png](../../../img/api_plugin/img.png) + +分别对应OS领域的: +- POSIX API +- 各种Unix-like系统自己的System Call (有可移植性,通过不同的硬件驱动实现相同的接口) +- 特殊硬件提供的特殊功能 (没有可移植性) + +基于这种思想,我们可以设计API插件,支持用户扩展自己的私有API + +![image](https://user-images.githubusercontent.com/26001097/131614802-c6f6a556-4e8b-4fee-b899-275a80e00eb6.png) + + +### 2.2. 设计目标 +1. 让有定制开发需求的开源用户通过import Layotto的方式使用Layotto,而不是Fork + +2. 开发api plugin足够简单 + +3. 配置文件公用同一个json,新增api plugin无需新增配置文件 + +### 2.3. 用户扩展开发时的编程界面 + +![image](https://user-images.githubusercontent.com/26001097/131614952-ccfc7d11-d376-41b0-b16c-2f17bfd2c9fc.png) + +#### step 1. 实现自己的私有API + +如果需要自己的私有API,用户需要实现GrpcAPI interface,以及相应的构造函数。 + +这个GrpcAPI就是您自己的API,它需要实现一些生命周期管理方法。目前只定义了Init和Register。 + +```go +// GrpcAPI is the interface of API plugin. It has lifecycle related methods +type GrpcAPI interface { + // init this API before binding it to the grpc server. For example,you can call app to query their subscriptions. + Init(conn *grpc.ClientConn) error + // Bind this API to the grpc server + Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) mgrpc.RegisteredServer +} + +// NewGrpcAPI is the constructor of GrpcAPI +type NewGrpcAPI func( + appId string, + hellos map[string]hello.HelloService, + configStores map[string]configstores.Store, + rpcs map[string]rpc.Invoker, + pubSubs map[string]pubsub.PubSub, + stateStores map[string]state.Store, + files map[string]file.File, + lockStores map[string]lock.LockStore, + sequencers map[string]sequencer.Store, + sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), +) GrpcAPI + +``` + +#### step 2. 将自己的API注册进Layotto + +用户可以把Layotto的main复制粘贴出来,按需修改,去掉用不到的东西(比如用不到etcd的分布式锁组件,可以在自己的main里删掉它) + +如果用户写了自己的API,可以在main里将它注册进Layotto + +```go + +func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrpc.RegisteredServer, error) { + // ........... + + // 3. run + server, err := rt.Run( + runtime.WithGrpcOptions(opts...), + // register your grpc API here + runtime.WithGrpcAPI( + // default grpc API + default_api.NewGrpcAPI, + // a demo to show how to register your own API + helloworld_api.NewHelloWorldAPI, + ), + // Hello + runtime.WithHelloFactory( + hello.NewHelloFactory("helloworld", helloworld.NewHelloWorld), + ), + // ........... +``` + +#### step 3. 编译运行Layotto +Layotto启动过程中,会回调每个注册进来的API的生命周期方法(Init,Register) + +启动完成后,您的API就会对外提供 grpc 服务 \ No newline at end of file diff --git a/docs/zh/start/api_plugin/helloworld.md b/docs/zh/start/api_plugin/helloworld.md new file mode 100644 index 0000000000..b71ebc4fb6 --- /dev/null +++ b/docs/zh/start/api_plugin/helloworld.md @@ -0,0 +1,50 @@ +# API plugin: register your own API +This is a demo to show you how to register your own API. + +Layotto has the api-plugin feature to let you add your own API based on your need. + +## step 0. change directory +```shell +cd ${projectpath}/cmd/layotto_multiple_api +``` + +## step 1. start Layotto with a new helloworld API +Build and run Layotto : + +```shell +go build -o layotto +# run it +./layotto start -c ../../configs/config_in_memory.json +``` + +Q: What happened? + +Check the code in `main.go` and you will find a new API was registered during startup: + +```go + // register your grpc API here + runtime.WithGrpcAPI( + // default grpc API + default_api.NewGrpcAPI, + // a demo to show how to register your own API + helloworld_api.NewHelloWorldAPI, + ), +``` + +## step 2. invoke the helloworld API +```shell +go run client/main.go +``` +The result will be: + +```shell +Greeting: Hello world +``` + +This message is the response of the helloworld API you just registered in step 1. + +## Next +You can refer to the demo code to implement your own API. +Have a try ! + +For more details,you can refer to the [design doc](zh/design/api_plugin/design.md) \ No newline at end of file diff --git a/go.mod b/go.mod index 45e93eb9c3..ec1239f224 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/grpc v1.39.0 + google.golang.org/grpc/examples v0.0.0-20210818220435-8ab16ef276a3 google.golang.org/protobuf v1.27.1 mosn.io/api v0.0.0-20210714065837-5b4c2d66e70c mosn.io/layotto/components v0.0.0-20211020084508-6f5ee3cfeba0 diff --git a/pkg/grpc/api.go b/pkg/grpc/default_api/api.go similarity index 96% rename from pkg/grpc/api.go rename to pkg/grpc/default_api/api.go index c4f77f38fe..8512a4b069 100644 --- a/pkg/grpc/api.go +++ b/pkg/grpc/default_api/api.go @@ -14,13 +14,15 @@ * limitations under the License. */ -package grpc +package default_api import ( "context" "errors" "fmt" "io" + grpc_api "mosn.io/layotto/pkg/grpc" + mgrpc "mosn.io/mosn/pkg/filter/network/grpc" "strings" "sync" @@ -66,6 +68,10 @@ import ( "mosn.io/pkg/log" ) +const ( + Metadata_key_pubsubName = "pubsubName" +) + var ( ErrNoInstance = errors.New("no instance found") bytesPool = sync.Pool{ @@ -74,6 +80,9 @@ var ( return new([]byte) }, } + // FIXME I put it here for compatibility.Don't write singleton like this ! + // It should be refactored and deleted. + LayottoAPISingleton API ) type API interface { @@ -114,6 +123,8 @@ type API interface { GetNextId(context.Context, *runtimev1pb.GetNextIdRequest) (*runtimev1pb.GetNextIdResponse, error) // InvokeBinding Binding API InvokeBinding(context.Context, *runtimev1pb.InvokeBindingRequest) (*runtimev1pb.InvokeBindingResponse, error) + // GrpcAPI related + grpc_api.GrpcAPI } // api is a default implementation for MosnRuntimeServer. @@ -129,6 +140,38 @@ type api struct { lockStores map[string]lock.LockStore sequencers map[string]sequencer.Store sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) + // app callback + AppCallbackConn *grpc.ClientConn + topicPerComponent map[string]TopicSubscriptions + // json + json jsoniter.API +} + +func (a *api) Init(conn *grpc.ClientConn) error { + // 1. set connection + a.AppCallbackConn = conn + return a.startSubscribing() +} + +func (a *api) Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) mgrpc.RegisteredServer { + LayottoAPISingleton = a + runtimev1pb.RegisterRuntimeServer(s, a) + return registeredServer +} + +func NewGrpcAPI( + appId string, + hellos map[string]hello.HelloService, + configStores map[string]configstores.Store, + rpcs map[string]rpc.Invoker, + pubSubs map[string]pubsub.PubSub, + stateStores map[string]state.Store, + files map[string]file.File, + lockStores map[string]lock.LockStore, + sequencers map[string]sequencer.Store, + sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), +) grpc_api.GrpcAPI { + return NewAPI(appId, hellos, configStores, rpcs, pubSubs, stateStores, files, lockStores, sequencers, sendToOutputBindingFn) } func NewAPI( @@ -163,6 +206,7 @@ func NewAPI( lockStores: lockStores, sequencers: sequencers, sendToOutputBindingFn: sendToOutputBindingFn, + json: jsoniter.ConfigFastest, } } diff --git a/pkg/grpc/default_api/api_callback.go b/pkg/grpc/default_api/api_callback.go new file mode 100644 index 0000000000..254a4d46e7 --- /dev/null +++ b/pkg/grpc/default_api/api_callback.go @@ -0,0 +1,220 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package default_api + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "github.com/dapr/components-contrib/contenttype" + "github.com/dapr/components-contrib/pubsub" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + runtime_pubsub "mosn.io/layotto/pkg/runtime/pubsub" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" + _ "net/http/pprof" + + "mosn.io/pkg/log" +) + +func (a *api) startSubscribing() error { + // 1. check if there is no need to do it + if len(a.pubSubs) == 0 { + return nil + } + // 2. list topics + topicRoutes, err := a.getInterestedTopics() + if err != nil { + return err + } + // return if no need to dosubscription + if len(topicRoutes) == 0 { + return nil + } + // 3. loop subscribe + for name, pubsub := range a.pubSubs { + if err := a.beginPubSub(name, pubsub, topicRoutes); err != nil { + return err + } + } + return nil +} + +func (a *api) beginPubSub(pubsubName string, ps pubsub.PubSub, topicRoutes map[string]TopicSubscriptions) error { + // 1. call app to find topic topic2Details. + v, ok := topicRoutes[pubsubName] + if !ok { + return nil + } + // 2. loop subscribing every + for topic, route := range v.topic2Details { + // TODO limit topic scope + log.DefaultLogger.Debugf("[runtime][beginPubSub]subscribing to topic=%s on pubsub=%s", topic, pubsubName) + // ask component to subscribe + if err := ps.Subscribe(pubsub.SubscribeRequest{ + Topic: topic, + Metadata: route.metadata, + }, func(ctx context.Context, msg *pubsub.NewMessage) error { + if msg.Metadata == nil { + msg.Metadata = make(map[string]string, 1) + } + msg.Metadata[Metadata_key_pubsubName] = pubsubName + return a.publishMessageGRPC(ctx, msg) + }); err != nil { + log.DefaultLogger.Warnf("[runtime][beginPubSub]failed to subscribe to topic %s: %s", topic, err) + return err + } + } + + return nil +} + +type Details struct { + metadata map[string]string +} + +type TopicSubscriptions struct { + topic2Details map[string]Details +} + +func (a *api) getInterestedTopics() (map[string]TopicSubscriptions, error) { + // 1. check + if a.topicPerComponent != nil { + return a.topicPerComponent, nil + } + if a.AppCallbackConn == nil { + return make(map[string]TopicSubscriptions), nil + } + comp2Topic := make(map[string]TopicSubscriptions) + var subscriptions []*runtimev1pb.TopicSubscription + + // 2. handle app subscriptions + client := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn) + subscriptions = runtime_pubsub.ListTopicSubscriptions(client, log.DefaultLogger) + // TODO handle declarative subscriptions + + // 3. prepare result + for _, s := range subscriptions { + if s == nil { + continue + } + if _, ok := comp2Topic[s.PubsubName]; !ok { + comp2Topic[s.PubsubName] = TopicSubscriptions{topic2Details: make(map[string]Details)} + } + comp2Topic[s.PubsubName].topic2Details[s.Topic] = Details{metadata: s.Metadata} + } + + // 4. log + if len(comp2Topic) > 0 { + for pubsubName, v := range comp2Topic { + topics := []string{} + for topic := range v.topic2Details { + topics = append(topics, topic) + } + log.DefaultLogger.Infof("[runtime][getInterestedTopics]app is subscribed to the following topics: %v through pubsub=%s", topics, pubsubName) + } + } + // 5. cache the result + a.topicPerComponent = comp2Topic + return comp2Topic, nil +} + +func (a *api) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) error { + // 1. Unmarshal to cloudEvent model + var cloudEvent map[string]interface{} + err := a.json.Unmarshal(msg.Data, &cloudEvent) + if err != nil { + log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err) + return err + } + + // 2. Drop msg if the current cloud event has expired + if pubsub.HasExpired(cloudEvent) { + log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string)) + return nil + } + + // 3. Convert to proto domain struct + envelope := &runtimev1pb.TopicEventRequest{ + Id: cloudEvent[pubsub.IDField].(string), + Source: cloudEvent[pubsub.SourceField].(string), + DataContentType: cloudEvent[pubsub.DataContentTypeField].(string), + Type: cloudEvent[pubsub.TypeField].(string), + SpecVersion: cloudEvent[pubsub.SpecVersionField].(string), + Topic: msg.Topic, + PubsubName: msg.Metadata[Metadata_key_pubsubName], + } + + // set data field + if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil { + decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string)) + if decodeErr != nil { + log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr) + return err + } + + envelope.Data = decoded + } else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil { + envelope.Data = nil + + if contenttype.IsStringContentType(envelope.DataContentType) { + envelope.Data = []byte(data.(string)) + } else if contenttype.IsJSONContentType(envelope.DataContentType) { + envelope.Data, _ = a.json.Marshal(data) + } + } + // TODO tracing + + // 4. Call appcallback + clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn) + res, err := clientV1.OnTopicEvent(ctx, envelope) + + // 5. Check result + return retryStrategy(err, res, cloudEvent) +} + +// retryStrategy returns error when the message should be redelivered +func retryStrategy(err error, res *runtimev1pb.TopicEventResponse, cloudEvent map[string]interface{}) error { + if err != nil { + errStatus, hasErrStatus := status.FromError(err) + if hasErrStatus && (errStatus.Code() == codes.Unimplemented) { + // DROP + log.DefaultLogger.Warnf("[runtime]non-retriable error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) + return nil + } + + err = errors.New(fmt.Sprintf("error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err)) + log.DefaultLogger.Debugf("%s", err) + // on error from application, return error for redelivery of event + return err + } + + switch res.GetStatus() { + case runtimev1pb.TopicEventResponse_SUCCESS: + // on uninitialized status, this is the case it defaults to as an uninitialized status defaults to 0 which is + // success from protobuf definition + return nil + case runtimev1pb.TopicEventResponse_RETRY: + return errors.New(fmt.Sprintf("RETRY status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string))) + case runtimev1pb.TopicEventResponse_DROP: + log.DefaultLogger.Warnf("[runtime]DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) + return nil + } + // Consider unknown status field as error and retry + return errors.New(fmt.Sprintf("unknown status returned from app while processing pub/sub event %v: %v", cloudEvent[pubsub.IDField].(string), res.GetStatus())) +} diff --git a/pkg/grpc/api_errors.go b/pkg/grpc/default_api/api_errors.go similarity index 97% rename from pkg/grpc/api_errors.go rename to pkg/grpc/default_api/api_errors.go index 301fc8695d..0c63cba6e6 100644 --- a/pkg/grpc/api_errors.go +++ b/pkg/grpc/default_api/api_errors.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package grpc +package default_api import ( "google.golang.org/grpc/codes" diff --git a/pkg/grpc/api_test.go b/pkg/grpc/default_api/api_test.go similarity index 94% rename from pkg/grpc/api_test.go rename to pkg/grpc/default_api/api_test.go index 93201abd0b..c55c848180 100644 --- a/pkg/grpc/api_test.go +++ b/pkg/grpc/default_api/api_test.go @@ -14,36 +14,31 @@ * limitations under the License. */ -package grpc +package default_api import ( "context" - "errors" + "encoding/json" "fmt" - "io" - "net" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - "testing" - "time" - - "mosn.io/layotto/components/file" - "github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/state" "github.com/golang/mock/gomock" - "github.com/golang/protobuf/ptypes/any" "github.com/stretchr/testify/assert" - tmock "github.com/stretchr/testify/mock" - "google.golang.org/grpc" + rawGRPC "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" + l8grpc "mosn.io/layotto/pkg/grpc" + "net" + "testing" + "errors" + jsoniter "github.com/json-iterator/go" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "io" "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" "mosn.io/layotto/components/lock" "mosn.io/layotto/components/rpc" - mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" "mosn.io/layotto/components/sequencer" "mosn.io/layotto/pkg/mock" mock_invoker "mosn.io/layotto/pkg/mock/components/invoker" @@ -51,7 +46,18 @@ import ( mock_pubsub "mosn.io/layotto/pkg/mock/components/pubsub" mock_sequencer "mosn.io/layotto/pkg/mock/components/sequencer" mock_state "mosn.io/layotto/pkg/mock/components/state" + mock_appcallback "mosn.io/layotto/pkg/mock/runtime/appcallback" runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" + + "time" + + "mosn.io/layotto/components/file" + + "github.com/golang/protobuf/ptypes/any" + tmock "github.com/stretchr/testify/mock" + "google.golang.org/grpc" + + mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" ) const ( @@ -133,6 +139,58 @@ func TestSayHello(t *testing.T) { }) } +func TestMosnRuntime_publishMessageGRPC(t *testing.T) { + t.Run("publish success", func(t *testing.T) { + subResp := &runtimev1pb.TopicEventResponse{ + Status: runtimev1pb.TopicEventResponse_SUCCESS, + } + // init grpc server + mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) + mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(subResp, nil) + + lis := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) + go func() { + s.Serve(lis) + }() + + // init callback client + callbackClient, err := grpc.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return lis.Dial() + })) + assert.Nil(t, err) + + cloudEvent := map[string]interface{}{ + pubsub.IDField: "id", + pubsub.SourceField: "source", + pubsub.DataContentTypeField: "content-type", + pubsub.TypeField: "type", + pubsub.SpecVersionField: "v1.0.0", + pubsub.DataBase64Field: "bGF5b3R0bw==", + } + + data, err := json.Marshal(cloudEvent) + assert.Nil(t, err) + + msg := &pubsub.NewMessage{ + Data: data, + Topic: "layotto", + Metadata: make(map[string]string), + } + a := NewAPI("", nil, nil, nil, nil, nil, nil, nil, nil, nil) + + var apiForTest = a.(*api) + //apiForTest.errInt = func(err error, format string, args ...interface{}) { + // log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) + //} + apiForTest.AppCallbackConn = callbackClient + apiForTest.json = jsoniter.ConfigFastest + err = apiForTest.publishMessageGRPC(context.Background(), msg) + assert.Nil(t, err) + }) +} + func startTestRuntimeAPIServer(port int, testAPIServer API) *grpc.Server { lis, _ := net.Listen("tcp", fmt.Sprintf(":%d", port)) opts := []grpc.ServerOption{grpc.WriteBufferSize(1)} @@ -1087,3 +1145,8 @@ func TestGetFileMeta(t *testing.T) { assert.Equal(t, resp.LastModified, "123") assert.Equal(t, int(resp.Size), 10) } + +func TestNewGrpcServer(t *testing.T) { + apiInterface := &api{} + l8grpc.NewGrpcServer(l8grpc.WithGrpcAPIs([]l8grpc.GrpcAPI{apiInterface}), l8grpc.WithNewServer(l8grpc.NewDefaultServer), l8grpc.WithGrpcOptions()) +} diff --git a/pkg/grpc/grpc.go b/pkg/grpc/grpc.go index 8c6879837d..b089a89554 100644 --- a/pkg/grpc/grpc.go +++ b/pkg/grpc/grpc.go @@ -19,7 +19,6 @@ package grpc import ( "google.golang.org/grpc" "mosn.io/layotto/diagnostics" - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" ) @@ -34,11 +33,16 @@ func NewGrpcServer(opts ...Option) mgrpc.RegisteredServer { if o.maker != nil { srvMaker = o.maker } - return srvMaker(o.api, o.options...) + return srvMaker(o.apis, o.options...) } -func NewDefaultServer(api API, opts ...grpc.ServerOption) mgrpc.RegisteredServer { +func NewDefaultServer(apis []GrpcAPI, opts ...grpc.ServerOption) mgrpc.RegisteredServer { s := grpc.NewServer(opts...) - runtimev1pb.RegisterRuntimeServer(s, api) - return s + // create registeredServer to manage lifecycle of the grpc server + var registeredServer mgrpc.RegisteredServer = s + // loop registering grpc api + for _, grpcAPI := range apis { + registeredServer = grpcAPI.Register(s, registeredServer) + } + return registeredServer } diff --git a/pkg/grpc/grpc_api.go b/pkg/grpc/grpc_api.go new file mode 100644 index 0000000000..598385f60c --- /dev/null +++ b/pkg/grpc/grpc_api.go @@ -0,0 +1,53 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/components-contrib/state" + "google.golang.org/grpc" + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/file" + "mosn.io/layotto/components/hello" + "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/rpc" + "mosn.io/layotto/components/sequencer" + mgrpc "mosn.io/mosn/pkg/filter/network/grpc" +) + +// GrpcAPI is the interface of API plugin. It has lifecycle related methods +type GrpcAPI interface { + // init this API before binding it to the grpc server. For example,you can call app to query their subscriptions. + Init(conn *grpc.ClientConn) error + // Bind this API to the grpc server + Register(s *grpc.Server, registeredServer mgrpc.RegisteredServer) mgrpc.RegisteredServer +} + +// NewGrpcAPI is the constructor of GrpcAPI +type NewGrpcAPI func( + appId string, + hellos map[string]hello.HelloService, + configStores map[string]configstores.Store, + rpcs map[string]rpc.Invoker, + pubSubs map[string]pubsub.PubSub, + stateStores map[string]state.Store, + files map[string]file.File, + lockStores map[string]lock.LockStore, + sequencers map[string]sequencer.Store, + sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), +) GrpcAPI diff --git a/pkg/grpc/grpc_test.go b/pkg/grpc/grpc_test.go deleted file mode 100644 index f33bf25c2e..0000000000 --- a/pkg/grpc/grpc_test.go +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2021 Layotto Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package grpc - -import ( - "testing" -) - -func TestNewGrpcServer(t *testing.T) { - apiInterface := &api{} - NewGrpcServer(WithAPI(apiInterface), WithNewServer(NewDefaultServer), WithGrpcOptions()) -} diff --git a/pkg/grpc/options.go b/pkg/grpc/options.go index 839937bc35..943d4100ba 100644 --- a/pkg/grpc/options.go +++ b/pkg/grpc/options.go @@ -22,22 +22,20 @@ import ( ) type grpcOptions struct { - api API + apis []GrpcAPI maker NewServer options []grpc.ServerOption } type Option func(o *grpcOptions) -// add an api for grpc server. -// api CANNOT be nil. -func WithAPI(api API) Option { +func WithGrpcAPIs(apis []GrpcAPI) Option { return func(o *grpcOptions) { - o.api = api + o.apis = apis } } -type NewServer func(api API, opts ...grpc.ServerOption) mgrpc.RegisteredServer +type NewServer func(apis []GrpcAPI, opts ...grpc.ServerOption) mgrpc.RegisteredServer func WithNewServer(f NewServer) Option { return func(o *grpcOptions) { diff --git a/pkg/integrate/api/helloworld/grpc_api.go b/pkg/integrate/api/helloworld/grpc_api.go new file mode 100644 index 0000000000..751d275fb0 --- /dev/null +++ b/pkg/integrate/api/helloworld/grpc_api.go @@ -0,0 +1,68 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package helloworld + +import ( + "context" + "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/components-contrib/state" + rawGRPC "google.golang.org/grpc" + pb "google.golang.org/grpc/examples/helloworld/helloworld" + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/file" + "mosn.io/layotto/components/hello" + "mosn.io/layotto/components/lock" + "mosn.io/layotto/components/rpc" + "mosn.io/layotto/components/sequencer" + "mosn.io/layotto/pkg/grpc" + mgrpc "mosn.io/mosn/pkg/filter/network/grpc" +) + +func NewHelloWorldAPI( + appId string, + hellos map[string]hello.HelloService, + configStores map[string]configstores.Store, + rpcs map[string]rpc.Invoker, + pubSubs map[string]pubsub.PubSub, + stateStores map[string]state.Store, + files map[string]file.File, + lockStores map[string]lock.LockStore, + sequencers map[string]sequencer.Store, + sendToOutputBindingFn func(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error), +) grpc.GrpcAPI { + return &server{} +} + +// server is used to implement helloworld.GreeterServer. +type server struct { + pb.UnimplementedGreeterServer +} + +func (s *server) Init(conn *rawGRPC.ClientConn) error { + return nil +} + +func (s *server) Register(grpcServer *rawGRPC.Server, registeredServer mgrpc.RegisteredServer) mgrpc.RegisteredServer { + pb.RegisterGreeterServer(grpcServer, s) + return registeredServer +} + +// SayHello implements helloworld.GreeterServer +func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { + return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil +} diff --git a/pkg/runtime/const.go b/pkg/runtime/const.go index 151646ef12..c11504b496 100644 --- a/pkg/runtime/const.go +++ b/pkg/runtime/const.go @@ -19,6 +19,5 @@ package runtime import "time" const ( - Metadata_key_pubsubName = "pubsubName" - dialTimeout = time.Second * 30 + dialTimeout = time.Second * 30 ) diff --git a/pkg/runtime/options.go b/pkg/runtime/options.go index b5ce6e8452..50eaed4168 100644 --- a/pkg/runtime/options.go +++ b/pkg/runtime/options.go @@ -52,6 +52,8 @@ type runtimeOptions struct { srvMaker rgrpc.NewServer errInt ErrInterceptor options []grpc.ServerOption + // new grpc api + apiFactorys []rgrpc.NewGrpcAPI } type Option func(o *runtimeOptions) @@ -68,6 +70,12 @@ func WithGrpcOptions(options ...grpc.ServerOption) Option { } } +func WithGrpcAPI(apiFuncs ...rgrpc.NewGrpcAPI) Option { + return func(o *runtimeOptions) { + o.apiFactorys = append(o.apiFactorys, apiFuncs...) + } +} + type ErrInterceptor func(err error, format string, args ...interface{}) func WithErrInterceptor(i ErrInterceptor) Option { diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index c4d9fb3c36..71cda8051d 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -18,7 +18,6 @@ package runtime import ( "context" - "encoding/base64" "errors" "fmt" "strings" @@ -29,13 +28,9 @@ import ( "mosn.io/layotto/components/file" - "github.com/dapr/components-contrib/contenttype" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/state" - jsoniter "github.com/json-iterator/go" rawGRPC "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/hello" "mosn.io/layotto/components/lock" @@ -50,8 +45,6 @@ import ( runtime_pubsub "mosn.io/layotto/pkg/runtime/pubsub" runtime_sequencer "mosn.io/layotto/pkg/runtime/sequencer" runtime_state "mosn.io/layotto/pkg/runtime/state" - "mosn.io/layotto/pkg/wasm" - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" "mosn.io/pkg/log" ) @@ -72,29 +65,19 @@ type MosnRuntime struct { sequencerRegistry runtime_sequencer.Registry bindingsRegistry mbindings.Registry // component pool - hellos map[string]hello.HelloService - configStores map[string]configstores.Store - rpcs map[string]rpc.Invoker - pubSubs map[string]pubsub.PubSub - topicPerComponent map[string]TopicSubscriptions - states map[string]state.Store - files map[string]file.File - locks map[string]lock.LockStore - sequencers map[string]sequencer.Store - outputBindings map[string]bindings.OutputBinding + hellos map[string]hello.HelloService + configStores map[string]configstores.Store + rpcs map[string]rpc.Invoker + pubSubs map[string]pubsub.PubSub + states map[string]state.Store + files map[string]file.File + locks map[string]lock.LockStore + sequencers map[string]sequencer.Store + outputBindings map[string]bindings.OutputBinding // app callback AppCallbackConn *rawGRPC.ClientConn // extends errInt ErrInterceptor - json jsoniter.API -} - -type Details struct { - metadata map[string]string -} - -type TopicSubscriptions struct { - topic2Details map[string]Details } func NewMosnRuntime(runtimeConfig *MosnRuntimeConfig) *MosnRuntime { @@ -120,7 +103,6 @@ func NewMosnRuntime(runtimeConfig *MosnRuntimeConfig) *MosnRuntime { locks: make(map[string]lock.LockStore), sequencers: make(map[string]sequencer.Store), outputBindings: make(map[string]bindings.OutputBinding), - json: jsoniter.ConfigFastest, } } @@ -150,10 +132,12 @@ func (m *MosnRuntime) sendToOutputBinding(name string, req *bindings.InvokeReque } func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { + // prepare runtimeOptions var o runtimeOptions for _, opt := range opts { opt(&o) } + // set ErrInterceptor if o.errInt != nil { m.errInt = o.errInt } else { @@ -161,30 +145,42 @@ func (m *MosnRuntime) Run(opts ...Option) (mgrpc.RegisteredServer, error) { log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } } - + // init runtime with runtimeOptions if err := m.initRuntime(&o); err != nil { return nil, err } + // prepare grpcOpts var grpcOpts []grpc.Option if o.srvMaker != nil { grpcOpts = append(grpcOpts, grpc.WithNewServer(o.srvMaker)) } - wasm.Layotto = grpc.NewAPI( - m.runtimeConfig.AppManagement.AppId, - m.hellos, - m.configStores, - m.rpcs, - m.pubSubs, - m.states, - m.files, - m.locks, - m.sequencers, - m.sendToOutputBinding, - ) + // create GrpcAPIs + var apis []grpc.GrpcAPI + for _, apiFactory := range o.apiFactorys { + api := apiFactory( + m.runtimeConfig.AppManagement.AppId, + m.hellos, + m.configStores, + m.rpcs, + m.pubSubs, + m.states, + m.files, + m.locks, + m.sequencers, + m.sendToOutputBinding, + ) + // init the GrpcAPI + if err := api.Init(m.AppCallbackConn); err != nil { + return nil, err + } + apis = append(apis, api) + } + // put them into grpc options grpcOpts = append(grpcOpts, grpc.WithGrpcOptions(o.options...), - grpc.WithAPI(wasm.Layotto), + grpc.WithGrpcAPIs(apis), ) + // create grpc server m.srv = grpc.NewGrpcServer(grpcOpts...) return m.srv, nil } @@ -305,27 +301,29 @@ func (m *MosnRuntime) initRpcs(rpcs ...*rpc.Factory) error { func (m *MosnRuntime) initPubSubs(factorys ...*runtime_pubsub.Factory) error { // 1. init components log.DefaultLogger.Infof("[runtime] start initializing pubsub components") - // register all config store services implementation + // register all components implementation m.pubSubRegistry.Register(factorys...) for name, config := range m.runtimeConfig.PubSubManagement { + // create component comp, err := m.pubSubRegistry.Create(name) if err != nil { m.errInt(err, "create pubsub component %s failed", name) return err } + // check config consumerID := strings.TrimSpace(config.Metadata["consumerID"]) if consumerID == "" { config.Metadata["consumerID"] = m.runtimeConfig.AppManagement.AppId } - + // init this component with the config if err := comp.Init(pubsub.Metadata{Properties: config.Metadata}); err != nil { m.errInt(err, "init pubsub component %s failed", name) return err } + // register this component m.pubSubs[name] = comp } - // 2. start subscribing - return m.startSubscribing() + return nil } func (m *MosnRuntime) initStates(factorys ...*runtime_state.Factory) error { @@ -440,184 +438,6 @@ func (m *MosnRuntime) initSequencers(factorys ...*runtime_sequencer.Factory) err return nil } -func (m *MosnRuntime) startSubscribing() error { - // 1. check if there is no need to do it - if len(m.pubSubs) == 0 { - return nil - } - topicRoutes, err := m.getInterestedTopics() - if err != nil { - return err - } - if len(topicRoutes) == 0 { - // no need - return nil - } - // 2. loop subscribe - for name, pubsub := range m.pubSubs { - if err := m.beginPubSub(name, pubsub, topicRoutes); err != nil { - return err - } - } - return nil -} - -func (m *MosnRuntime) beginPubSub(pubsubName string, ps pubsub.PubSub, topicRoutes map[string]TopicSubscriptions) error { - // 1. call app to find topic topic2Details. - v, ok := topicRoutes[pubsubName] - if !ok { - return nil - } - // 2. loop subscribing every - for topic, route := range v.topic2Details { - // TODO limit topic scope - log.DefaultLogger.Debugf("[runtime][beginPubSub]subscribing to topic=%s on pubsub=%s", topic, pubsubName) - // ask component to subscribe - if err := ps.Subscribe(pubsub.SubscribeRequest{ - Topic: topic, - Metadata: route.metadata, - }, func(ctx context.Context, msg *pubsub.NewMessage) error { - if msg.Metadata == nil { - msg.Metadata = make(map[string]string, 1) - } - msg.Metadata[Metadata_key_pubsubName] = pubsubName - return m.publishMessageGRPC(ctx, msg) - }); err != nil { - log.DefaultLogger.Warnf("[runtime][beginPubSub]failed to subscribe to topic %s: %s", topic, err) - return err - } - } - - return nil -} - -func (m *MosnRuntime) getInterestedTopics() (map[string]TopicSubscriptions, error) { - // 1. check - if m.topicPerComponent != nil { - return m.topicPerComponent, nil - } - if m.AppCallbackConn == nil { - return make(map[string]TopicSubscriptions), nil - } - comp2Topic := make(map[string]TopicSubscriptions) - var subscriptions []*runtimev1pb.TopicSubscription - - // 2. handle app subscriptions - client := runtimev1pb.NewAppCallbackClient(m.AppCallbackConn) - subscriptions = runtime_pubsub.ListTopicSubscriptions(client, log.DefaultLogger) - // TODO handle declarative subscriptions - - // 3. prepare result - for _, s := range subscriptions { - if s == nil { - continue - } - if _, ok := comp2Topic[s.PubsubName]; !ok { - comp2Topic[s.PubsubName] = TopicSubscriptions{topic2Details: make(map[string]Details)} - } - comp2Topic[s.PubsubName].topic2Details[s.Topic] = Details{metadata: s.Metadata} - } - - // 4. log - if len(comp2Topic) > 0 { - for pubsubName, v := range comp2Topic { - topics := []string{} - for topic := range v.topic2Details { - topics = append(topics, topic) - } - log.DefaultLogger.Infof("[runtime][getInterestedTopics]app is subscribed to the following topics: %v through pubsub=%s", topics, pubsubName) - } - } - // 5. cache the result - m.topicPerComponent = comp2Topic - return comp2Topic, nil -} - -func (m *MosnRuntime) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) error { - // 1. Unmarshal to cloudEvent model - var cloudEvent map[string]interface{} - err := m.json.Unmarshal(msg.Data, &cloudEvent) - if err != nil { - log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err) - return err - } - - // 2. Drop msg if the current cloud event has expired - if pubsub.HasExpired(cloudEvent) { - log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string)) - return nil - } - - // 3. Convert to proto domain struct - envelope := &runtimev1pb.TopicEventRequest{ - Id: cloudEvent[pubsub.IDField].(string), - Source: cloudEvent[pubsub.SourceField].(string), - DataContentType: cloudEvent[pubsub.DataContentTypeField].(string), - Type: cloudEvent[pubsub.TypeField].(string), - SpecVersion: cloudEvent[pubsub.SpecVersionField].(string), - Topic: msg.Topic, - PubsubName: msg.Metadata[Metadata_key_pubsubName], - } - - // set data field - if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil { - decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string)) - if decodeErr != nil { - log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr) - return err - } - - envelope.Data = decoded - } else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil { - envelope.Data = nil - - if contenttype.IsStringContentType(envelope.DataContentType) { - envelope.Data = []byte(data.(string)) - } else if contenttype.IsJSONContentType(envelope.DataContentType) { - envelope.Data, _ = m.json.Marshal(data) - } - } - // TODO tracing - - // 4. Call appcallback - clientV1 := runtimev1pb.NewAppCallbackClient(m.AppCallbackConn) - res, err := clientV1.OnTopicEvent(ctx, envelope) - - // 5. Check result - return retryStrategy(err, res, cloudEvent) -} - -// retryStrategy returns error when the message should be redelivered -func retryStrategy(err error, res *runtimev1pb.TopicEventResponse, cloudEvent map[string]interface{}) error { - if err != nil { - errStatus, hasErrStatus := status.FromError(err) - if hasErrStatus && (errStatus.Code() == codes.Unimplemented) { - // DROP - log.DefaultLogger.Warnf("[runtime]non-retriable error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err) - return nil - } - - err = errors.New(fmt.Sprintf("error returned from app while processing pub/sub event %v: %s", cloudEvent[pubsub.IDField].(string), err)) - log.DefaultLogger.Debugf("%s", err) - // on error from application, return error for redelivery of event - return err - } - - switch res.GetStatus() { - case runtimev1pb.TopicEventResponse_SUCCESS: - // on uninitialized status, this is the case it defaults to as an uninitialized status defaults to 0 which is - // success from protobuf definition - return nil - case runtimev1pb.TopicEventResponse_RETRY: - return errors.New(fmt.Sprintf("RETRY status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string))) - case runtimev1pb.TopicEventResponse_DROP: - log.DefaultLogger.Warnf("[runtime]DROP status returned from app while processing pub/sub event %v", cloudEvent[pubsub.IDField].(string)) - return nil - } - // Consider unknown status field as error and retry - return errors.New(fmt.Sprintf("unknown status returned from app while processing pub/sub event %v: %v", cloudEvent[pubsub.IDField].(string), res.GetStatus())) -} - func (m *MosnRuntime) initAppCallbackConnection() error { // init the client connection for calling app if m.runtimeConfig == nil || m.runtimeConfig.AppManagement.GrpcCallbackPort == 0 { diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index c05ffc0a2d..4a0d315d65 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -20,6 +20,10 @@ import ( "context" "encoding/json" "fmt" + "google.golang.org/grpc/test/bufconn" + "mosn.io/layotto/pkg/grpc/default_api" + mock_appcallback "mosn.io/layotto/pkg/mock/runtime/appcallback" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" "net" "testing" @@ -29,11 +33,8 @@ import ( "github.com/dapr/components-contrib/pubsub" "github.com/dapr/components-contrib/state" "github.com/golang/mock/gomock" - jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" rawGRPC "google.golang.org/grpc" - "google.golang.org/grpc/test/bufconn" - "mosn.io/pkg/log" "mosn.io/layotto/components/configstores" @@ -47,18 +48,17 @@ import ( mock_pubsub "mosn.io/layotto/pkg/mock/components/pubsub" mock_sequencer "mosn.io/layotto/pkg/mock/components/sequencer" mock_state "mosn.io/layotto/pkg/mock/components/state" - mock_appcallback "mosn.io/layotto/pkg/mock/runtime/appcallback" mlock "mosn.io/layotto/pkg/runtime/lock" mpubsub "mosn.io/layotto/pkg/runtime/pubsub" msequencer "mosn.io/layotto/pkg/runtime/sequencer" mstate "mosn.io/layotto/pkg/runtime/state" - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" ) func TestNewMosnRuntime(t *testing.T) { runtimeConfig := &MosnRuntimeConfig{} rt := NewMosnRuntime(runtimeConfig) assert.NotNil(t, rt) + rt.Stop() } func TestMosnRuntime_GetInfo(t *testing.T) { @@ -66,22 +66,35 @@ func TestMosnRuntime_GetInfo(t *testing.T) { rt := NewMosnRuntime(runtimeConfig) runtimeInfo := rt.GetInfo() assert.NotNil(t, runtimeInfo) + rt.Stop() } func TestMosnRuntime_Run(t *testing.T) { t.Run("run succ", func(t *testing.T) { runtimeConfig := &MosnRuntimeConfig{} rt := NewMosnRuntime(runtimeConfig) - server, err := rt.Run() + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + ) assert.Nil(t, err) assert.NotNil(t, server) + rt.Stop() }) t.Run("no runtime config", func(t *testing.T) { rt := NewMosnRuntime(nil) - _, err := rt.Run() + _, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + ) assert.NotNil(t, err) assert.Equal(t, "[runtime] init error:no runtimeConfig", err.Error()) + rt.Stop() }) } @@ -113,37 +126,9 @@ func TestMosnRuntime_initAppCallbackConnection(t *testing.T) { func TestMosnRuntime_initPubSubs(t *testing.T) { t.Run("normal", func(t *testing.T) { - // mock callback response - subResp := &runtimev1pb.ListTopicSubscriptionsResponse{ - Subscriptions: []*runtimev1pb.TopicSubscription{ - { - PubsubName: "mock", - Topic: "layotto", - Metadata: nil, - }, - }, - } - // init grpc server - mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) - mockAppCallbackServer.EXPECT().ListTopicSubscriptions(gomock.Any(), gomock.Any()).Return(subResp, nil) - - lis := bufconn.Listen(1024 * 1024) - s := rawGRPC.NewServer() - runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) - go func() { - s.Serve(lis) - }() - - // init callback client - callbackClient, err := rawGRPC.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { - return lis.Dial() - })) - assert.Nil(t, err) - // mock pubsub component mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) - mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Return(nil) f := func() pubsub.PubSub { return mockPubSub } @@ -159,12 +144,11 @@ func TestMosnRuntime_initPubSubs(t *testing.T) { } // construct MosnRuntime m := NewMosnRuntime(cfg) - m.AppCallbackConn = callbackClient m.errInt = func(err error, format string, args ...interface{}) { log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) } // test initPubSubs - err = m.initPubSubs(mpubsub.NewFactory("mock", f)) + err := m.initPubSubs(mpubsub.NewFactory("mock", f)) // assert result assert.Nil(t, err) }) @@ -314,58 +298,6 @@ func TestMosnRuntime_initLocks(t *testing.T) { }) } -func TestMosnRuntime_publishMessageGRPC(t *testing.T) { - t.Run("publish success", func(t *testing.T) { - subResp := &runtimev1pb.TopicEventResponse{ - Status: runtimev1pb.TopicEventResponse_SUCCESS, - } - // init grpc server - mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) - mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(subResp, nil) - - lis := bufconn.Listen(1024 * 1024) - s := rawGRPC.NewServer() - runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) - go func() { - s.Serve(lis) - }() - - // init callback client - callbackClient, err := rawGRPC.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { - return lis.Dial() - })) - assert.Nil(t, err) - - cloudEvent := map[string]interface{}{ - pubsub.IDField: "id", - pubsub.SourceField: "source", - pubsub.DataContentTypeField: "content-type", - pubsub.TypeField: "type", - pubsub.SpecVersionField: "v1.0.0", - pubsub.DataBase64Field: "bGF5b3R0bw==", - } - - data, err := json.Marshal(cloudEvent) - assert.Nil(t, err) - - msg := &pubsub.NewMessage{ - Data: data, - Topic: "layotto", - Metadata: make(map[string]string), - } - - cfg := &MosnRuntimeConfig{} - m := NewMosnRuntime(cfg) - m.errInt = func(err error, format string, args ...interface{}) { - log.DefaultLogger.Errorf("[runtime] occurs an error: "+err.Error()+", "+format, args...) - } - m.AppCallbackConn = callbackClient - m.json = jsoniter.ConfigFastest - err = m.publishMessageGRPC(context.Background(), msg) - assert.Nil(t, err) - }) -} - type MockBindings struct { } @@ -398,3 +330,233 @@ func TestMosnRuntime_initOutputBinding(t *testing.T) { m.initOutputBinding(registry) assert.NotNil(t, m.outputBindings["mockOutbindings"]) } + +func TestMosnRuntime_runWithPubsub(t *testing.T) { + t.Run("normal", func(t *testing.T) { + // mock pubsub component + mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) + mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Return(nil) + f := func() pubsub.PubSub { + return mockPubSub + } + + // 2. construct runtime + rt, _ := runtimeWithCallbackConnection(t) + + // 3. Run + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + // PubSub + WithPubSubFactory( + mpubsub.NewFactory("mock", f), + ), + ) + // 4. assert + assert.Nil(t, err) + assert.NotNil(t, server) + + // 5. stop + rt.Stop() + }) + + t.Run("init_with_callback", func(t *testing.T) { + cloudEvent := constructCloudEvent() + data, err := json.Marshal(cloudEvent) + assert.Nil(t, err) + // mock pubsub component + mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) + mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).DoAndReturn(func(req pubsub.SubscribeRequest, handler pubsub.Handler) error { + if req.Topic == "layotto" { + return handler(context.Background(), &pubsub.NewMessage{ + Data: data, + Topic: "layotto", + Metadata: nil, + }) + } else { + return nil + } + }) + f := func() pubsub.PubSub { + return mockPubSub + } + + // 2. construct runtime + rt, mockAppCallbackServer := runtimeWithCallbackConnection(t) + + topicResp := &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_SUCCESS} + mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(topicResp, nil) + // 3. Run + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + // PubSub + WithPubSubFactory( + mpubsub.NewFactory("mock", f), + ), + ) + // 4. assert + assert.Nil(t, err) + assert.NotNil(t, server) + + // 5. stop + rt.Stop() + }) + + t.Run("callback_fail_then_retry", func(t *testing.T) { + cloudEvent := constructCloudEvent() + data, err := json.Marshal(cloudEvent) + assert.Nil(t, err) + // mock pubsub component + mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) + mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).DoAndReturn(func(req pubsub.SubscribeRequest, handler pubsub.Handler) error { + if req.Topic == "layotto" { + err := handler(context.Background(), &pubsub.NewMessage{ + Data: data, + Topic: "layotto", + Metadata: nil, + }) + assert.NotNil(t, err) + return nil + } else { + return nil + } + }) + f := func() pubsub.PubSub { + return mockPubSub + } + + // 2. construct runtime + rt, mockAppCallbackServer := runtimeWithCallbackConnection(t) + + topicResp := &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_RETRY} + mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(topicResp, nil) + // 3. Run + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + // PubSub + WithPubSubFactory( + mpubsub.NewFactory("mock", f), + ), + ) + // 4. assert + assert.Nil(t, err) + assert.NotNil(t, server) + + // 5. stop + rt.Stop() + }) + + t.Run("callback_drop", func(t *testing.T) { + cloudEvent := constructCloudEvent() + data, err := json.Marshal(cloudEvent) + assert.Nil(t, err) + // mock pubsub component + mockPubSub := mock_pubsub.NewMockPubSub(gomock.NewController(t)) + mockPubSub.EXPECT().Init(gomock.Any()).Return(nil) + mockPubSub.EXPECT().Subscribe(gomock.Any(), gomock.Any()).DoAndReturn(func(req pubsub.SubscribeRequest, handler pubsub.Handler) error { + if req.Topic == "layotto" { + err := handler(context.Background(), &pubsub.NewMessage{ + Data: data, + Topic: "layotto", + Metadata: nil, + }) + assert.Nil(t, err) + return nil + } else { + return nil + } + }) + f := func() pubsub.PubSub { + return mockPubSub + } + + // 2. construct runtime + rt, mockAppCallbackServer := runtimeWithCallbackConnection(t) + + topicResp := &runtimev1pb.TopicEventResponse{Status: runtimev1pb.TopicEventResponse_DROP} + mockAppCallbackServer.EXPECT().OnTopicEvent(gomock.Any(), gomock.Any()).Return(topicResp, nil) + // 3. Run + server, err := rt.Run( + // register your grpc API here + WithGrpcAPI( + default_api.NewGrpcAPI, + ), + // PubSub + WithPubSubFactory( + mpubsub.NewFactory("mock", f), + ), + ) + // 4. assert + assert.Nil(t, err) + assert.NotNil(t, server) + + // 5. stop + rt.Stop() + }) +} + +func constructCloudEvent() map[string]interface{} { + cloudEvent := make(map[string]interface{}) + cloudEvent[pubsub.IDField] = "1" + cloudEvent[pubsub.SpecVersionField] = "1" + cloudEvent[pubsub.SourceField] = "adsdafdas" + cloudEvent[pubsub.DataContentTypeField] = "application/json" + cloudEvent[pubsub.TypeField] = "adsdafdas" + return cloudEvent +} + +func runtimeWithCallbackConnection(t *testing.T) (*MosnRuntime, *mock_appcallback.MockAppCallbackServer) { + // 1. prepare callback + // mock callback response + subResp := &runtimev1pb.ListTopicSubscriptionsResponse{ + Subscriptions: []*runtimev1pb.TopicSubscription{ + { + PubsubName: "mock", + Topic: "layotto", + Metadata: nil, + }, + }, + } + // init grpc server for callback + mockAppCallbackServer := mock_appcallback.NewMockAppCallbackServer(gomock.NewController(t)) + mockAppCallbackServer.EXPECT().ListTopicSubscriptions(gomock.Any(), gomock.Any()).Return(subResp, nil) + + lis := bufconn.Listen(1024 * 1024) + s := rawGRPC.NewServer() + runtimev1pb.RegisterAppCallbackServer(s, mockAppCallbackServer) + go func() { + s.Serve(lis) + }() + + // 2. construct those necessary fields for mosn runtime + // init callback client + callbackClient, err := rawGRPC.DialContext(context.Background(), "bufnet", rawGRPC.WithInsecure(), rawGRPC.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return lis.Dial() + })) + assert.Nil(t, err) + + // 3. construct mosn runtime + cfg := &MosnRuntimeConfig{ + PubSubManagement: map[string]mpubsub.Config{ + "mock": { + Metadata: map[string]string{ + "target": "layotto", + }, + }, + }, + } + rt := NewMosnRuntime(cfg) + rt.AppCallbackConn = callbackClient + return rt, mockAppCallbackServer +} diff --git a/pkg/wasm/imports.go b/pkg/wasm/imports.go index 6903c7aaca..736a70294a 100644 --- a/pkg/wasm/imports.go +++ b/pkg/wasm/imports.go @@ -19,7 +19,7 @@ package wasm import ( "context" anypb "github.com/golang/protobuf/ptypes/any" - "mosn.io/layotto/pkg/grpc" + "mosn.io/layotto/pkg/grpc/default_api" runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" "mosn.io/mosn/pkg/wasm/abi/proxywasm010" "mosn.io/proxy-wasm-go-host/proxywasm/common" @@ -35,14 +35,12 @@ type LayottoHandler struct { var _ proxywasm.ImportsHandler = &LayottoHandler{} -var Layotto grpc.API - func (d *LayottoHandler) GetState(storeName string, key string) (string, proxywasm.WasmResult) { req := &runtimev1pb.GetStateRequest{ StoreName: storeName, Key: key, } - resp, err := Layotto.GetState(context.Background(), req) + resp, err := default_api.LayottoAPISingleton.GetState(context.Background(), req) if err != nil { return "", proxywasm.WasmResultInternalFailure } @@ -57,7 +55,7 @@ func (d *LayottoHandler) InvokeService(id string, method string, param string) ( Data: &anypb.Any{Value: []byte(param)}, }, } - resp, err := Layotto.InvokeService(context.Background(), req) + resp, err := default_api.LayottoAPISingleton.InvokeService(context.Background(), req) if err != nil { return "", proxywasm.WasmResultInternalFailure } diff --git a/pkg/wasm/imports_test.go b/pkg/wasm/imports_test.go index 07dae2ee5f..e32f2f3145 100644 --- a/pkg/wasm/imports_test.go +++ b/pkg/wasm/imports_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" "mosn.io/layotto/components/rpc" mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" - "mosn.io/layotto/pkg/grpc" + "mosn.io/layotto/pkg/grpc/default_api" mock_invoker "mosn.io/layotto/pkg/mock/components/invoker" mock_state "mosn.io/layotto/pkg/mock/components/state" proxywasm "mosn.io/proxy-wasm-go-host/proxywasm/v1" @@ -52,7 +52,7 @@ func TestGetState(t *testing.T) { Metadata: nil, } mockStore.EXPECT().Get(gomock.Any()).Return(compResp, nil) - Layotto = grpc.NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil, nil, nil) + default_api.LayottoAPISingleton = default_api.NewAPI("", nil, nil, nil, nil, map[string]state.Store{"mock": mockStore}, nil, nil, nil, nil) value, ok := d.GetState("mock", "mykey") assert.Equal(t, proxywasm.WasmResultOk, ok) @@ -74,7 +74,7 @@ func TestInvokeService(t *testing.T) { assert.Equal(t, "id_2", req.Id) return resp, nil }) - Layotto = grpc.NewAPI("", nil, nil, map[string]rpc.Invoker{mosninvoker.Name: mockInvoker}, nil, nil, nil, nil, nil, nil) + default_api.LayottoAPISingleton = default_api.NewAPI("", nil, nil, map[string]rpc.Invoker{mosninvoker.Name: mockInvoker}, nil, nil, nil, nil, nil, nil) result, ok := d.InvokeService("id_2", "", "book1") assert.Equal(t, proxywasm.WasmResultOk, ok)