From d75608f36425c01e43c638c2a698025c25ea3887 Mon Sep 17 00:00:00 2001 From: Zach <51114270+zach030@users.noreply.github.com> Date: Thu, 13 Jan 2022 17:17:21 +0800 Subject: [PATCH] feature: support dapr state api (#377) * dapr/state:add get/save state dapr api Signed-off-by: zach * dapr/state:add query/delete state Signed-off-by: zach * dapr/state_api:replace cel-go with v0.5.1 Signed-off-by: zach * dapr/state_api:fix state api ut Signed-off-by: zach * dapr/state_api:init slice with cap Signed-off-by: zach * dapr/state_api: remove encrypt dependency Signed-off-by: zach * remove unused code * add comments Co-authored-by: seeflood <349895584@qq.com> --- go.mod | 8 + go.sum | 33 +- pkg/converter/state.go | 116 ------ pkg/converter/state_test.go | 125 ------- pkg/grpc/dapr/dapr_api_state.go | 490 ++++++++++++++++++++++++++ pkg/grpc/dapr/dapr_api_unimplement.go | 29 +- pkg/grpc/default_api/api.go | 268 +------------- pkg/grpc/default_api/api_state.go | 192 ++++++++++ pkg/messages/api_errors.go | 5 + pkg/runtime/state/compatibility.go | 23 -- 10 files changed, 715 insertions(+), 574 deletions(-) delete mode 100644 pkg/converter/state.go delete mode 100644 pkg/converter/state_test.go create mode 100644 pkg/grpc/dapr/dapr_api_state.go create mode 100644 pkg/grpc/default_api/api_state.go diff --git a/go.mod b/go.mod index f2fe6a6bda..bfcc931f2e 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,21 @@ module mosn.io/layotto go 1.14 require ( + github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect + github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect + github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/agrea/ptr v0.0.0-20180711073057-77a518d99b7b github.com/alicebob/miniredis/v2 v2.16.0 github.com/dapr/components-contrib v1.5.1-rc.1 github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233 + github.com/dimchansky/utfbom v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/gammazero/workerpool v1.1.2 github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/json-iterator/go v1.1.11 github.com/phayes/freeport v0.0.0-20171002181615-b8543db493a5 github.com/pkg/errors v0.9.1 @@ -20,6 +25,8 @@ require ( github.com/stretchr/testify v1.7.0 github.com/urfave/cli v1.22.1 github.com/valyala/fasthttp v1.28.0 + go.uber.org/automaxprocs v1.4.0 // indirect + golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/grpc v1.39.0 @@ -31,6 +38,7 @@ require ( mosn.io/mosn v0.25.1-0.20211217125944-69b50c40af81 mosn.io/pkg v0.0.0-20211217101631-d914102d1baf mosn.io/proxy-wasm-go-host v0.1.1-0.20210524020952-3fb13ba763a6 + nhooyr.io/websocket v1.8.7 // indirect ) replace ( diff --git a/go.sum b/go.sum index afa806347e..650acaec7e 100644 --- a/go.sum +++ b/go.sum @@ -84,8 +84,9 @@ github.com/Azure/go-autorest/autorest/adal v0.9.5 h1:Y3bBUV4rTuxenJJs41HU3qmqsb+ github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A= github.com/Azure/go-autorest/autorest/azure/auth v0.4.2 h1:iM6UAvjR97ZIeR93qTcwpKNMpV+/FTWjwEbuPD495Tk= github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM= -github.com/Azure/go-autorest/autorest/azure/cli v0.3.1 h1:LXl088ZQlP0SBppGFsRZonW6hSvwgL5gRByMbvUbx8U= github.com/Azure/go-autorest/autorest/azure/cli v0.3.1/go.mod h1:ZG5p860J94/0kI9mNJVoIoLgXcirM2gF5i2kWloofxw= +github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 h1:dMOmEJfkLKW/7JsokJqkyoYSgmR08hi9KrhjZb+JALY= +github.com/Azure/go-autorest/autorest/azure/cli v0.4.2/go.mod h1:7qkJkT+j6b+hIpzMOwPChJhTqS8VbsqqgULzMNRugoM= github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA= github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g= github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw= @@ -100,11 +101,13 @@ github.com/Azure/go-autorest/autorest/to v0.3.0/go.mod h1:MgwOyqaIuKdG4TL/2ywSsI github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk= github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= github.com/Azure/go-autorest/autorest/validation v0.2.0/go.mod h1:3EEqHnBxQGHXRYq3HT1WyXAvT7LLY3tl70hw6tQIbjI= -github.com/Azure/go-autorest/autorest/validation v0.3.0 h1:3I9AAI63HfcLtphd9g39ruUwRI+Ca+z/f36KHPFRUss= github.com/Azure/go-autorest/autorest/validation v0.3.0/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E= +github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac= +github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E= github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc= -github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE= github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= +github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg= +github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= @@ -299,7 +302,6 @@ github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoC github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f h1:o/kfcElHqOiXqcou5a3rIlMc7oJbMQkeLk0VQJ7zgqY= github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= -github.com/colinmarc/hdfs/v2 v2.2.0 h1:4AaIlTq+/sWmeqYhI0dX8bD4YrMQM990tRjm636FkGM= github.com/colinmarc/hdfs/v2 v2.2.0/go.mod h1:Wss6n3mtaZyRwWaqtSH+6ge01qT0rw9dJJmvoUnIQ/E= github.com/containerd/containerd v1.4.1/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= @@ -364,8 +366,9 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/didip/tollbooth v4.0.2+incompatible/go.mod h1:A9b0665CE6l1KmzpDws2++elm/CsuWBMa5Jv4WY0PEY= -github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TRo4= github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8= +github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U= +github.com/dimchansky/utfbom v1.1.1/go.mod h1:SxdoEBH5qIqFocHMyGOXVAybYJdr71b1Q/j0mACtrfE= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= @@ -709,8 +712,9 @@ github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iP github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs= github.com/hashicorp/go-msgpack v1.1.5/go.mod h1:gWVc3sv/wbDmR3rQsj1CAktEZzoz1YNK9NfGLXJ69/4= -github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-plugin v1.0.1 h1:4OtAfUGbnKC6yS48p0CtMX2oFYtzFZVv6rok3cRWgnE= github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= @@ -801,9 +805,7 @@ github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0f github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko= -github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= -github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= @@ -811,11 +813,8 @@ github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= -github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= -github.com/jcmturner/gokrb5/v8 v8.4.1 h1:IGSJfqBzMS6TA0oJ7DxXdyzPK563QHa8T2IqER2ggyQ= github.com/jcmturner/gokrb5/v8 v8.4.1/go.mod h1:T1hnNppQsBtxW0tCHMHTkAt8n/sABdzZgZdoFrZaZNM= -github.com/jcmturner/rpc/v2 v2.0.2 h1:gMB4IwRXYsWw4Bc6o/az2HJgFUA1ffSh90i26ZJ6Xl0= github.com/jcmturner/rpc/v2 v2.0.2/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= github.com/jinzhu/copier v0.3.2/go.mod h1:24xnZezI2Yqac9J61UC6/dG/k76ttpq0DdJI3QmUvro= @@ -1384,11 +1383,8 @@ github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxt github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= github.com/zouyx/agollo/v4 v4.0.7 h1:aUwdnCz+1jExZbZvN1nZ+kKjn62Ula5joekZHTccj2E= github.com/zouyx/agollo/v4 v4.0.7/go.mod h1:unhojnZiGLlT4gLpWz3Oa7sGcChZWv/1DJBkV6s8uAE= -go.beyondstorage.io/endpoint v1.2.0 h1:/7mgKquTykeqJ9op82hso2+WQfECeywGd/Lda1N3tF4= go.beyondstorage.io/endpoint v1.2.0/go.mod h1:oZ7Z7HZ7mAo337JBLjuCF/DM66HVEUu6+hw68c3UcLs= -go.beyondstorage.io/services/hdfs v0.3.0 h1:DvPxjEpUIkqyW1aMoj9C5/TDzhosxLMSqF7wUMXCE7E= go.beyondstorage.io/services/hdfs v0.3.0/go.mod h1:yU0eL80JczPPuR5hoK21Dck0H9gXDBu3WN51Od7wtlo= -go.beyondstorage.io/v5 v5.0.0 h1:k9Axfgbt+oZXoDwSBVCl1XANHSL4rkNTGP2Lz9YdJe0= go.beyondstorage.io/v5 v5.0.0/go.mod h1:3wV9gCQnqu7tD/3LMeo2yimUKIeTSHpTc6wHSb0yY20= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -1464,8 +1460,9 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.8.0 h1:CUhrE4N1rqSE6FM9ecihEjRkLQu8cDfgDyoOs83mEY4= go.uber.org/atomic v1.8.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/automaxprocs v1.3.0 h1:II28aZoGdaglS5vVNnspf28lnZpXScxtIozx1lAjdb0= go.uber.org/automaxprocs v1.3.0/go.mod h1:9CWT6lKIep8U41DDaPiH6eFscnTyjfTANNQNx6LrIcA= +go.uber.org/automaxprocs v1.4.0 h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0= +go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q= go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -1520,8 +1517,9 @@ golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWP golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= -golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -2071,8 +2069,9 @@ mosn.io/mosn v0.25.1-0.20211217125944-69b50c40af81 h1:fzaxZAsG0JC3PBTQ6M8aNoCA4p mosn.io/mosn v0.25.1-0.20211217125944-69b50c40af81/go.mod h1:JwLkls6oMaap0+P1uZ1d1ccdLPigdK8xH8gDSm3SEq4= mosn.io/pkg v0.0.0-20211217101631-d914102d1baf h1:PaYMeKbmtMnhnzzQyKQifxAtkKrCv5uti8Tr00WvX+Y= mosn.io/pkg v0.0.0-20211217101631-d914102d1baf/go.mod h1:tK3Vbw6CcVeJ9H/BGjJ1wn6hRXt4Oxjfq1+gkOM0zG8= -nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= +nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= +nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/pkg/converter/state.go b/pkg/converter/state.go deleted file mode 100644 index e03381d896..0000000000 --- a/pkg/converter/state.go +++ /dev/null @@ -1,116 +0,0 @@ -// -// Copyright 2021 Layotto Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package converter - -import ( - "github.com/dapr/components-contrib/state" - "mosn.io/layotto/pkg/common" - runtime_state "mosn.io/layotto/pkg/runtime/state" - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" -) - -func GetResponse2GetStateResponse(compResp *state.GetResponse) *runtimev1pb.GetStateResponse { - resp := &runtimev1pb.GetStateResponse{} - if compResp != nil { - resp.Etag = common.PointerToString(compResp.ETag) - resp.Data = compResp.Data - resp.Metadata = compResp.Metadata - } - return resp -} - -func GetResponse2BulkStateItem(compResp *state.GetResponse, key string) *runtimev1pb.BulkStateItem { - resp := &runtimev1pb.BulkStateItem{} - resp.Key = key - if compResp != nil { - resp.Data = compResp.Data - resp.Etag = common.PointerToString(compResp.ETag) - resp.Metadata = compResp.Metadata - } - return resp -} - -func BulkGetResponse2BulkStateItem(compResp *state.BulkGetResponse) *runtimev1pb.BulkStateItem { - if compResp == nil { - return &runtimev1pb.BulkStateItem{} - } - return &runtimev1pb.BulkStateItem{ - Key: runtime_state.GetOriginalStateKey(compResp.Key), - Data: compResp.Data, - Etag: common.PointerToString(compResp.ETag), - Metadata: compResp.Metadata, - Error: compResp.Error, - } -} - -func StateItem2SetRequest(grpcReq *runtimev1pb.StateItem, key string) *state.SetRequest { - req := &state.SetRequest{ - Key: key, - } - if grpcReq == nil { - return req - } - req.Metadata = grpcReq.Metadata - req.Value = grpcReq.Value - if grpcReq.Etag != nil { - req.ETag = &grpcReq.Etag.Value - } - if grpcReq.Options != nil { - req.Options = state.SetStateOption{ - Consistency: runtime_state.StateConsistencyToString(grpcReq.Options.Consistency), - Concurrency: runtime_state.StateConcurrencyToString(grpcReq.Options.Concurrency), - } - } - return req -} - -func DeleteStateRequest2DeleteRequest(grpcReq *runtimev1pb.DeleteStateRequest, key string) *state.DeleteRequest { - req := &state.DeleteRequest{ - Key: key, - } - if grpcReq == nil { - return req - } - req.Metadata = grpcReq.Metadata - if grpcReq.Etag != nil { - req.ETag = &grpcReq.Etag.Value - } - if grpcReq.Options != nil { - req.Options = state.DeleteStateOption{ - Concurrency: runtime_state.StateConcurrencyToString(grpcReq.Options.Concurrency), - Consistency: runtime_state.StateConsistencyToString(grpcReq.Options.Consistency), - } - } - return req -} - -func StateItem2DeleteRequest(grpcReq *runtimev1pb.StateItem, key string) *state.DeleteRequest { - req := &state.DeleteRequest{ - Key: key, - } - if grpcReq == nil { - return req - } - req.Metadata = grpcReq.Metadata - if grpcReq.Etag != nil { - req.ETag = &grpcReq.Etag.Value - } - if grpcReq.Options != nil { - req.Options = state.DeleteStateOption{ - Concurrency: runtime_state.StateConcurrencyToString(grpcReq.Options.Concurrency), - Consistency: runtime_state.StateConsistencyToString(grpcReq.Options.Consistency), - } - } - return req -} diff --git a/pkg/converter/state_test.go b/pkg/converter/state_test.go deleted file mode 100644 index 7f12008480..0000000000 --- a/pkg/converter/state_test.go +++ /dev/null @@ -1,125 +0,0 @@ -// -// Copyright 2021 Layotto Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package converter - -import ( - "github.com/dapr/components-contrib/state" - "github.com/stretchr/testify/assert" - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" - "testing" -) - -func TestGetResponse2GetStateResponse(t *testing.T) { - resp := GetResponse2GetStateResponse(&state.GetResponse{ - Data: []byte("v"), - ETag: nil, - Metadata: make(map[string]string), - }) - assert.Equal(t, resp.Data, []byte("v")) - assert.Equal(t, resp.Etag, "") - assert.True(t, len(resp.Metadata) == 0) -} - -func TestGetResponse2BulkStateItem(t *testing.T) { - itm := GetResponse2BulkStateItem(&state.GetResponse{ - Data: []byte("v"), - ETag: nil, - Metadata: make(map[string]string), - }, "key") - assert.Equal(t, itm.Key, "key") - assert.Equal(t, itm.Data, []byte("v")) - assert.Equal(t, itm.Etag, "") - assert.Equal(t, itm.Error, "") - assert.True(t, len(itm.Metadata) == 0) -} - -func TestBulkGetResponse2BulkStateItem(t *testing.T) { - t.Run("convert nil", func(t *testing.T) { - itm := BulkGetResponse2BulkStateItem(nil) - assert.NotNil(t, itm) - }) - t.Run("normal", func(t *testing.T) { - itm := BulkGetResponse2BulkStateItem(&state.BulkGetResponse{ - Key: "key", - Data: []byte("v"), - ETag: nil, - Metadata: nil, - Error: "", - }) - assert.Equal(t, itm.Key, "key") - assert.Equal(t, itm.Data, []byte("v")) - assert.Equal(t, itm.Etag, "") - assert.Equal(t, itm.Error, "") - assert.True(t, len(itm.Metadata) == 0) - }) -} - -func TestStateItem2SetRequest(t *testing.T) { - req := StateItem2SetRequest(&runtimev1pb.StateItem{ - Key: "", - Value: []byte("v"), - Etag: nil, - Metadata: nil, - Options: &runtimev1pb.StateOptions{ - Concurrency: runtimev1pb.StateOptions_CONCURRENCY_UNSPECIFIED, - Consistency: runtimev1pb.StateOptions_CONSISTENCY_UNSPECIFIED, - }, - }, "appid||key") - assert.Equal(t, req.Key, "appid||key") - assert.Equal(t, req.Value, []byte("v")) - assert.Nil(t, req.ETag) - assert.Equal(t, req.Options.Consistency, "") - assert.Equal(t, req.Options.Concurrency, "") -} - -func TestDeleteStateRequest2DeleteRequest(t *testing.T) { - t.Run("nil", func(t *testing.T) { - req := DeleteStateRequest2DeleteRequest(nil, "") - assert.NotNil(t, req) - }) - t.Run("normal", func(t *testing.T) { - req := DeleteStateRequest2DeleteRequest(&runtimev1pb.DeleteStateRequest{ - StoreName: "redis", - Key: "", - Etag: nil, - Options: &runtimev1pb.StateOptions{ - Concurrency: runtimev1pb.StateOptions_CONCURRENCY_LAST_WRITE, - Consistency: runtimev1pb.StateOptions_CONSISTENCY_EVENTUAL, - }, - Metadata: nil, - }, "appid||key") - assert.Equal(t, req.Key, "appid||key") - assert.Nil(t, req.ETag) - assert.Equal(t, req.Options.Consistency, "eventual") - assert.Equal(t, req.Options.Concurrency, "last-write") - }) -} - -func TestStateItem2DeleteRequest(t *testing.T) { - req := StateItem2DeleteRequest(&runtimev1pb.StateItem{ - Key: "", - Value: []byte("v"), - Etag: nil, - Metadata: nil, - Options: &runtimev1pb.StateOptions{ - Concurrency: runtimev1pb.StateOptions_CONCURRENCY_LAST_WRITE, - Consistency: runtimev1pb.StateOptions_CONSISTENCY_EVENTUAL, - }, - }, "appid||key") - assert.Equal(t, req.Key, "appid||key") - assert.Nil(t, req.ETag) - assert.Nil(t, req.ETag) - assert.Equal(t, req.Options.Consistency, "eventual") - assert.Equal(t, req.Options.Concurrency, "last-write") -} diff --git a/pkg/grpc/dapr/dapr_api_state.go b/pkg/grpc/dapr/dapr_api_state.go new file mode 100644 index 0000000000..1e8faa1795 --- /dev/null +++ b/pkg/grpc/dapr/dapr_api_state.go @@ -0,0 +1,490 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dapr + +import ( + "context" + + "github.com/dapr/components-contrib/state" + "github.com/gammazero/workerpool" + "github.com/golang/protobuf/ptypes/empty" + jsoniter "github.com/json-iterator/go" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + "mosn.io/layotto/pkg/common" + dapr_common_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/common/v1" + dapr_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" + "mosn.io/layotto/pkg/messages" + state2 "mosn.io/layotto/pkg/runtime/state" + "mosn.io/pkg/log" +) + +func (d *daprGrpcAPI) SaveState(ctx context.Context, in *dapr_v1pb.SaveStateRequest) (*emptypb.Empty, error) { + // 1. get store + store, err := d.getStateStore(in.StoreName) + if err != nil { + log.DefaultLogger.Errorf("[runtime] [grpc.SaveState] error: %v", err) + return &emptypb.Empty{}, err + } + // 2. convert requests + var reqs []state.SetRequest + for _, s := range in.States { + key, err := state2.GetModifiedStateKey(s.Key, in.StoreName, d.appId) + if err != nil { + return &emptypb.Empty{}, err + } + reqs = append(reqs, *StateItem2SetRequest(s, key)) + } + // 3. query + err = store.BulkSet(reqs) + // 4. check result + if err != nil { + err = d.wrapDaprComponentError(err, messages.ErrStateSave, in.StoreName, err.Error()) + log.DefaultLogger.Errorf("[runtime] [grpc.SaveState] error: %v", err) + return &emptypb.Empty{}, err + } + return &emptypb.Empty{}, nil +} + +// GetState obtains the state for a specific key. +func (d *daprGrpcAPI) GetState(ctx context.Context, request *dapr_v1pb.GetStateRequest) (*dapr_v1pb.GetStateResponse, error) { + // 1. get store + store, err := d.getStateStore(request.StoreName) + if err != nil { + log.DefaultLogger.Errorf("[runtime] [grpc.GetState] error: %v", err) + return nil, err + } + // 2. generate the actual key + key, err := state2.GetModifiedStateKey(request.Key, request.StoreName, d.appId) + if err != nil { + return &dapr_v1pb.GetStateResponse{}, err + } + req := &state.GetRequest{ + Key: key, + Metadata: request.GetMetadata(), + Options: state.GetStateOption{ + Consistency: StateConsistencyToString(request.Consistency), + }, + } + // 3. query + compResp, err := store.Get(req) + // 4. check result + if err != nil { + err = status.Errorf(codes.Internal, messages.ErrStateGet, request.Key, request.StoreName, err.Error()) + log.DefaultLogger.Errorf("[runtime] [grpc.GetState] %v", err) + return &dapr_v1pb.GetStateResponse{}, err + } + return GetResponse2GetStateResponse(compResp), nil +} + +func (d *daprGrpcAPI) GetBulkState(ctx context.Context, request *dapr_v1pb.GetBulkStateRequest) (*dapr_v1pb.GetBulkStateResponse, error) { + // 1. get store + store, err := d.getStateStore(request.StoreName) + if err != nil { + log.DefaultLogger.Errorf("[runtime] [grpc.GetBulkState] error: %v", err) + return &dapr_v1pb.GetBulkStateResponse{}, err + } + + bulkResp := &dapr_v1pb.GetBulkStateResponse{} + if len(request.Keys) == 0 { + return bulkResp, nil + } + + // 2. store.BulkGet + // 2.1. convert reqs + reqs := make([]state.GetRequest, len(request.Keys)) + for i, k := range request.Keys { + key, err := state2.GetModifiedStateKey(k, request.StoreName, d.appId) + if err != nil { + return &dapr_v1pb.GetBulkStateResponse{}, err + } + r := state.GetRequest{ + Key: key, + Metadata: request.GetMetadata(), + } + reqs[i] = r + } + // 2.2. query + support, responses, err := store.BulkGet(reqs) + if err != nil { + return bulkResp, err + } + // 2.3. parse and return result if store supports this method + if support { + for i := 0; i < len(responses); i++ { + bulkResp.Items = append(bulkResp.Items, BulkGetResponse2BulkStateItem(&responses[i])) + } + return bulkResp, nil + } + + // 3. Simulate the method if the store doesn't support it + n := len(reqs) + pool := workerpool.New(int(request.Parallelism)) + resultCh := make(chan *dapr_v1pb.BulkStateItem, n) + for i := 0; i < n; i++ { + pool.Submit(generateGetStateTask(store, &reqs[i], resultCh)) + } + pool.StopWait() + for { + select { + case item, ok := <-resultCh: + if !ok { + return bulkResp, nil + } + bulkResp.Items = append(bulkResp.Items, item) + default: + return bulkResp, nil + } + } +} + +func (d *daprGrpcAPI) QueryStateAlpha1(ctx context.Context, request *dapr_v1pb.QueryStateRequest) (*dapr_v1pb.QueryStateResponse, error) { + ret := &dapr_v1pb.QueryStateResponse{} + + // 1. get state store component + store, err := d.getStateStore(request.StoreName) + if err != nil { + log.DefaultLogger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) + return ret, err + } + + // 2. check if this store has the query feature + querier, ok := store.(state.Querier) + if !ok { + err = status.Errorf(codes.Unimplemented, messages.ErrNotFound, "Query") + log.DefaultLogger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) + return ret, err + } + + // 3. Unmarshal query dsl + var req state.QueryRequest + if err = jsoniter.Unmarshal([]byte(request.GetQuery()), &req.Query); err != nil { + err = status.Errorf(codes.InvalidArgument, messages.ErrMalformedRequest, err.Error()) + log.DefaultLogger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) + return ret, err + } + req.Metadata = request.GetMetadata() + + // 4. delegate to the store + resp, err := querier.Query(&req) + // 5. convert response + if err != nil { + err = status.Errorf(codes.Internal, messages.ErrStateQuery, request.GetStoreName(), err.Error()) + log.DefaultLogger.Errorf("[runtime] [grpc.QueryStateAlpha1] error: %v", err) + return ret, err + } + if resp == nil || len(resp.Results) == 0 { + return ret, nil + } + + ret.Results = make([]*dapr_v1pb.QueryStateItem, len(resp.Results)) + ret.Token = resp.Token + ret.Metadata = resp.Metadata + + for i := range resp.Results { + ret.Results[i] = &dapr_v1pb.QueryStateItem{ + Key: state2.GetOriginalStateKey(resp.Results[i].Key), + Data: resp.Results[i].Data, + } + } + return ret, nil +} + +func (d *daprGrpcAPI) DeleteState(ctx context.Context, request *dapr_v1pb.DeleteStateRequest) (*emptypb.Empty, error) { + // 1. get store + store, err := d.getStateStore(request.StoreName) + if err != nil { + log.DefaultLogger.Errorf("[runtime] [grpc.DeleteState] error: %v", err) + return &emptypb.Empty{}, err + } + // 2. generate the actual key + key, err := state2.GetModifiedStateKey(request.Key, request.StoreName, d.appId) + if err != nil { + return &empty.Empty{}, err + } + // 3. convert and send request + err = store.Delete(DeleteStateRequest2DeleteRequest(request, key)) + // 4. check result + if err != nil { + err = d.wrapDaprComponentError(err, messages.ErrStateDelete, request.Key, err.Error()) + log.DefaultLogger.Errorf("[runtime] [grpc.DeleteState] error: %v", err) + return &empty.Empty{}, err + } + return &empty.Empty{}, nil +} + +func (d *daprGrpcAPI) DeleteBulkState(ctx context.Context, request *dapr_v1pb.DeleteBulkStateRequest) (*emptypb.Empty, error) { + // 1. get store + store, err := d.getStateStore(request.StoreName) + if err != nil { + log.DefaultLogger.Errorf("[runtime] [grpc.DeleteBulkState] error: %v", err) + return &empty.Empty{}, err + } + // 2. convert request + reqs := make([]state.DeleteRequest, 0, len(request.States)) + for _, item := range request.States { + key, err := state2.GetModifiedStateKey(item.Key, request.StoreName, d.appId) + if err != nil { + return &empty.Empty{}, err + } + reqs = append(reqs, *StateItem2DeleteRequest(item, key)) + } + // 3. send request + err = store.BulkDelete(reqs) + // 4. check result + if err != nil { + log.DefaultLogger.Errorf("[runtime] [grpc.DeleteBulkState] error: %v", err) + return &emptypb.Empty{}, err + } + return &emptypb.Empty{}, nil +} + +func (d *daprGrpcAPI) ExecuteStateTransaction(ctx context.Context, request *dapr_v1pb.ExecuteStateTransactionRequest) (*emptypb.Empty, error) { + // 1. check params + if d.stateStores == nil || len(d.stateStores) == 0 { + err := status.Error(codes.FailedPrecondition, messages.ErrStateStoresNotConfigured) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + return &emptypb.Empty{}, err + } + storeName := request.StoreName + if d.stateStores[storeName] == nil { + err := status.Errorf(codes.InvalidArgument, messages.ErrStateStoreNotFound, storeName) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + return &emptypb.Empty{}, err + } + // 2. find store + store, ok := d.transactionalStateStores[storeName] + if !ok { + err := status.Errorf(codes.Unimplemented, messages.ErrStateStoreNotSupported, storeName) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + return &emptypb.Empty{}, err + } + // 3. convert request + operations := []state.TransactionalStateOperation{} + for _, op := range request.Operations { + // 3.1. extract and validate fields + var operation state.TransactionalStateOperation + var req = op.Request + // tolerant npe + if req == nil { + log.DefaultLogger.Warnf("[runtime] [grpc.ExecuteStateTransaction] one of TransactionalStateOperation.Request is nil") + continue + } + key, err := state2.GetModifiedStateKey(req.Key, request.StoreName, d.appId) + if err != nil { + return &emptypb.Empty{}, err + } + // 3.2. prepare TransactionalStateOperation struct according to the operation type + switch state.OperationType(op.OperationType) { + case state.Upsert: + operation = state.TransactionalStateOperation{ + Operation: state.Upsert, + Request: *StateItem2SetRequest(req, key), + } + case state.Delete: + operation = state.TransactionalStateOperation{ + Operation: state.Delete, + Request: *StateItem2DeleteRequest(req, key), + } + default: + err := status.Errorf(codes.Unimplemented, messages.ErrNotSupportedStateOperation, op.OperationType) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + return &emptypb.Empty{}, err + } + operations = append(operations, operation) + } + // 4. submit transactional request + err := store.Multi(&state.TransactionalStateRequest{ + Operations: operations, + Metadata: request.Metadata, + }) + // 5. check result + if err != nil { + err = status.Errorf(codes.Internal, messages.ErrStateTransaction, err.Error()) + log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) + return &emptypb.Empty{}, err + } + return &emptypb.Empty{}, nil +} + +func (d *daprGrpcAPI) getStateStore(name string) (state.Store, error) { + if d.stateStores == nil || len(d.stateStores) == 0 { + return nil, status.Error(codes.FailedPrecondition, messages.ErrStateStoresNotConfigured) + } + + if d.stateStores[name] == nil { + return nil, status.Errorf(codes.InvalidArgument, messages.ErrStateStoreNotFound, name) + } + return d.stateStores[name], nil +} + +// wrapDaprComponentError parse and wrap error from dapr component +func (d *daprGrpcAPI) wrapDaprComponentError(err error, format string, args ...interface{}) error { + e, ok := err.(*state.ETagError) + if !ok { + return status.Errorf(codes.Internal, format, args...) + } + switch e.Kind() { + case state.ETagMismatch: + return status.Errorf(codes.Aborted, format, args...) + case state.ETagInvalid: + return status.Errorf(codes.InvalidArgument, format, args...) + } + + return status.Errorf(codes.Internal, format, args...) +} + +func StateItem2SetRequest(grpcReq *dapr_common_v1pb.StateItem, key string) *state.SetRequest { + req := &state.SetRequest{ + Key: key, + } + if grpcReq == nil { + return req + } + req.Metadata = grpcReq.Metadata + req.Value = grpcReq.Value + if grpcReq.Etag != nil { + req.ETag = &grpcReq.Etag.Value + } + if grpcReq.Options != nil { + req.Options = state.SetStateOption{ + Consistency: StateConsistencyToString(grpcReq.Options.Consistency), + Concurrency: StateConcurrencyToString(grpcReq.Options.Concurrency), + } + } + return req +} + +func GetResponse2GetStateResponse(compResp *state.GetResponse) *dapr_v1pb.GetStateResponse { + resp := &dapr_v1pb.GetStateResponse{} + if compResp != nil { + resp.Etag = common.PointerToString(compResp.ETag) + resp.Data = compResp.Data + resp.Metadata = compResp.Metadata + } + return resp +} + +func StateConsistencyToString(c dapr_common_v1pb.StateOptions_StateConsistency) string { + switch c { + case dapr_common_v1pb.StateOptions_CONSISTENCY_EVENTUAL: + return "eventual" + case dapr_common_v1pb.StateOptions_CONSISTENCY_STRONG: + return "strong" + } + return "" +} + +func StateConcurrencyToString(c dapr_common_v1pb.StateOptions_StateConcurrency) string { + switch c { + case dapr_common_v1pb.StateOptions_CONCURRENCY_FIRST_WRITE: + return "first-write" + case dapr_common_v1pb.StateOptions_CONCURRENCY_LAST_WRITE: + return "last-write" + } + + return "" +} + +func generateGetStateTask(store state.Store, req *state.GetRequest, resultCh chan *dapr_v1pb.BulkStateItem) func() { + return func() { + // get + r, err := store.Get(req) + // convert + var item *dapr_v1pb.BulkStateItem + if err != nil { + item = &dapr_v1pb.BulkStateItem{ + Key: state2.GetOriginalStateKey(req.Key), + Error: err.Error(), + } + } else { + item = GetResponse2BulkStateItem(r, state2.GetOriginalStateKey(req.Key)) + } + // collect result + select { + case resultCh <- item: + default: + //never happen + log.DefaultLogger.Errorf("[api.generateGetStateTask] can not push result to the resultCh. item: %+v", item) + } + } +} + +func BulkGetResponse2BulkStateItem(compResp *state.BulkGetResponse) *dapr_v1pb.BulkStateItem { + if compResp == nil { + return &dapr_v1pb.BulkStateItem{} + } + return &dapr_v1pb.BulkStateItem{ + Key: state2.GetOriginalStateKey(compResp.Key), + Data: compResp.Data, + Etag: common.PointerToString(compResp.ETag), + Metadata: compResp.Metadata, + Error: compResp.Error, + } +} + +func GetResponse2BulkStateItem(compResp *state.GetResponse, key string) *dapr_v1pb.BulkStateItem { + resp := &dapr_v1pb.BulkStateItem{} + resp.Key = key + if compResp != nil { + resp.Data = compResp.Data + resp.Etag = common.PointerToString(compResp.ETag) + resp.Metadata = compResp.Metadata + } + return resp +} + +func DeleteStateRequest2DeleteRequest(grpcReq *dapr_v1pb.DeleteStateRequest, key string) *state.DeleteRequest { + req := &state.DeleteRequest{ + Key: key, + } + if grpcReq == nil { + return req + } + req.Metadata = grpcReq.Metadata + if grpcReq.Etag != nil { + req.ETag = &grpcReq.Etag.Value + } + if grpcReq.Options != nil { + req.Options = state.DeleteStateOption{ + Concurrency: StateConcurrencyToString(grpcReq.Options.Concurrency), + Consistency: StateConsistencyToString(grpcReq.Options.Consistency), + } + } + return req +} + +func StateItem2DeleteRequest(grpcReq *dapr_common_v1pb.StateItem, key string) *state.DeleteRequest { + req := &state.DeleteRequest{ + Key: key, + } + if grpcReq == nil { + return req + } + req.Metadata = grpcReq.Metadata + if grpcReq.Etag != nil { + req.ETag = &grpcReq.Etag.Value + } + if grpcReq.Options != nil { + req.Options = state.DeleteStateOption{ + Concurrency: StateConcurrencyToString(grpcReq.Options.Concurrency), + Consistency: StateConsistencyToString(grpcReq.Options.Consistency), + } + } + return req +} diff --git a/pkg/grpc/dapr/dapr_api_unimplement.go b/pkg/grpc/dapr/dapr_api_unimplement.go index affb6e0cf2..04633b63f6 100644 --- a/pkg/grpc/dapr/dapr_api_unimplement.go +++ b/pkg/grpc/dapr/dapr_api_unimplement.go @@ -18,38 +18,11 @@ package dapr import ( "context" + "google.golang.org/protobuf/types/known/emptypb" "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" ) -func (d *daprGrpcAPI) GetState(ctx context.Context, request *runtime.GetStateRequest) (*runtime.GetStateResponse, error) { - panic("implement me") -} - -func (d *daprGrpcAPI) GetBulkState(ctx context.Context, request *runtime.GetBulkStateRequest) (*runtime.GetBulkStateResponse, error) { - panic("implement me") -} - -func (d *daprGrpcAPI) SaveState(ctx context.Context, request *runtime.SaveStateRequest) (*emptypb.Empty, error) { - panic("implement me") -} - -func (d *daprGrpcAPI) QueryStateAlpha1(ctx context.Context, request *runtime.QueryStateRequest) (*runtime.QueryStateResponse, error) { - panic("implement me") -} - -func (d *daprGrpcAPI) DeleteState(ctx context.Context, request *runtime.DeleteStateRequest) (*emptypb.Empty, error) { - panic("implement me") -} - -func (d *daprGrpcAPI) DeleteBulkState(ctx context.Context, request *runtime.DeleteBulkStateRequest) (*emptypb.Empty, error) { - panic("implement me") -} - -func (d *daprGrpcAPI) ExecuteStateTransaction(ctx context.Context, request *runtime.ExecuteStateTransactionRequest) (*emptypb.Empty, error) { - panic("implement me") -} - func (d *daprGrpcAPI) PublishEvent(ctx context.Context, request *runtime.PublishEventRequest) (*emptypb.Empty, error) { panic("implement me") } diff --git a/pkg/grpc/default_api/api.go b/pkg/grpc/default_api/api.go index 88c1e7c450..5d958885be 100644 --- a/pkg/grpc/default_api/api.go +++ b/pkg/grpc/default_api/api.go @@ -21,13 +21,14 @@ import ( "errors" "fmt" "io" + "strings" + "sync" + grpc_api "mosn.io/layotto/pkg/grpc" "mosn.io/layotto/pkg/grpc/dapr" dapr_common_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/common/v1" dapr_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" - "strings" - "sync" l8_comp_pubsub "mosn.io/layotto/components/pubsub" @@ -38,7 +39,6 @@ import ( _ "net/http/pprof" "github.com/dapr/components-contrib/state" - "github.com/gammazero/workerpool" "github.com/golang/protobuf/ptypes/empty" "mosn.io/layotto/components/file" @@ -62,7 +62,6 @@ import ( "mosn.io/layotto/components/rpc" "mosn.io/layotto/components/sequencer" "mosn.io/layotto/pkg/messages" - runtime_state "mosn.io/layotto/pkg/runtime/state" runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" "mosn.io/pkg/log" ) @@ -485,38 +484,6 @@ func (a *api) doPublishEvent(ctx context.Context, pubsubName string, topic strin return &emptypb.Empty{}, nil } -// GetState obtains the state for a specific key. -func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*runtimev1pb.GetStateResponse, error) { - // 1. get store - store, err := a.getStateStore(in.StoreName) - if err != nil { - log.DefaultLogger.Errorf("[runtime] [grpc.GetState] error: %v", err) - return nil, err - } - // 2. generate the actual key - key, err := runtime_state.GetModifiedStateKey(in.Key, in.StoreName, a.appId) - if err != nil { - return &runtimev1pb.GetStateResponse{}, err - } - req := state.GetRequest{ - Key: key, - Metadata: in.Metadata, - Options: state.GetStateOption{ - Consistency: runtime_state.StateConsistencyToString(in.Consistency), - }, - } - // 3. query - compResp, err := store.Get(&req) - // 4. check result - if err != nil { - err = status.Errorf(codes.Internal, messages.ErrStateGet, in.Key, in.StoreName, err.Error()) - log.DefaultLogger.Errorf("[runtime] [grpc.GetState] %v", err) - return &runtimev1pb.GetStateResponse{}, err - } - - return converter.GetResponse2GetStateResponse(compResp), nil -} - func (a *api) getStateStore(name string) (state.Store, error) { if a.stateStores == nil || len(a.stateStores) == 0 { return nil, status.Error(codes.FailedPrecondition, messages.ErrStateStoresNotConfigured) @@ -528,118 +495,6 @@ func (a *api) getStateStore(name string) (state.Store, error) { return a.stateStores[name], nil } -func (a *api) GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequest) (*runtimev1pb.GetBulkStateResponse, error) { - // 1. get store - store, err := a.getStateStore(in.StoreName) - if err != nil { - log.DefaultLogger.Errorf("[runtime] [grpc.GetBulkState] error: %v", err) - return &runtimev1pb.GetBulkStateResponse{}, err - } - - bulkResp := &runtimev1pb.GetBulkStateResponse{} - if len(in.Keys) == 0 { - return bulkResp, nil - } - - // 2. store.BulkGet - // 2.1. convert reqs - reqs := make([]state.GetRequest, len(in.Keys)) - for i, k := range in.Keys { - key, err := runtime_state.GetModifiedStateKey(k, in.StoreName, a.appId) - if err != nil { - return &runtimev1pb.GetBulkStateResponse{}, err - } - r := state.GetRequest{ - Key: key, - Metadata: in.Metadata, - } - reqs[i] = r - } - // 2.2. query - support, responses, err := store.BulkGet(reqs) - if err != nil { - return bulkResp, err - } - // 2.3. parse and return result if store supports this method - if support { - for i := 0; i < len(responses); i++ { - bulkResp.Items = append(bulkResp.Items, converter.BulkGetResponse2BulkStateItem(&responses[i])) - } - return bulkResp, nil - } - - // 3. Simulate the method if the store doesn't support it - n := len(reqs) - pool := workerpool.New(int(in.Parallelism)) - resultCh := make(chan *runtimev1pb.BulkStateItem, n) - for i := 0; i < n; i++ { - pool.Submit(generateGetStateTask(store, &reqs[i], resultCh)) - } - pool.StopWait() - for { - select { - case item, ok := <-resultCh: - if !ok { - return bulkResp, nil - } - bulkResp.Items = append(bulkResp.Items, item) - default: - return bulkResp, nil - } - } -} - -func generateGetStateTask(store state.Store, req *state.GetRequest, resultCh chan *runtimev1pb.BulkStateItem) func() { - return func() { - // get - r, err := store.Get(req) - // convert - var item *runtimev1pb.BulkStateItem - if err != nil { - item = &runtimev1pb.BulkStateItem{ - Key: runtime_state.GetOriginalStateKey(req.Key), - Error: err.Error(), - } - } else { - item = converter.GetResponse2BulkStateItem(r, runtime_state.GetOriginalStateKey(req.Key)) - } - // collect result - select { - case resultCh <- item: - default: - //never happen - log.DefaultLogger.Errorf("[api.generateGetStateTask] can not push result to the resultCh. item: %+v", item) - } - } -} - -func (a *api) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*emptypb.Empty, error) { - // 1. get store - store, err := a.getStateStore(in.StoreName) - if err != nil { - log.DefaultLogger.Errorf("[runtime] [grpc.SaveState] error: %v", err) - return &emptypb.Empty{}, err - } - // 2. convert requests - reqs := []state.SetRequest{} - for _, s := range in.States { - key, err := runtime_state.GetModifiedStateKey(s.Key, in.StoreName, a.appId) - if err != nil { - return &emptypb.Empty{}, err - } - reqs = append(reqs, *converter.StateItem2SetRequest(s, key)) - } - // 3. query - err = store.BulkSet(reqs) - // 4. check result - if err != nil { - err = a.wrapDaprComponentError(err, messages.ErrStateSave, in.StoreName, err.Error()) - log.DefaultLogger.Errorf("[runtime] [grpc.SaveState] error: %v", err) - return &emptypb.Empty{}, err - } - return &emptypb.Empty{}, nil -} - // wrapDaprComponentError parse and wrap error from dapr component func (a *api) wrapDaprComponentError(err error, format string, args ...interface{}) error { e, ok := err.(*state.ETagError) @@ -656,123 +511,6 @@ func (a *api) wrapDaprComponentError(err error, format string, args ...interface return status.Errorf(codes.Internal, format, args...) } -func (a *api) DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateRequest) (*emptypb.Empty, error) { - // 1. get store - store, err := a.getStateStore(in.StoreName) - if err != nil { - log.DefaultLogger.Errorf("[runtime] [grpc.DeleteState] error: %v", err) - return &emptypb.Empty{}, err - } - // 2. generate the actual key - key, err := runtime_state.GetModifiedStateKey(in.Key, in.StoreName, a.appId) - if err != nil { - return &empty.Empty{}, err - } - // 3. convert and send request - err = store.Delete(converter.DeleteStateRequest2DeleteRequest(in, key)) - // 4. check result - if err != nil { - err = a.wrapDaprComponentError(err, messages.ErrStateDelete, in.Key, err.Error()) - log.DefaultLogger.Errorf("[runtime] [grpc.DeleteState] error: %v", err) - return &empty.Empty{}, err - } - return &empty.Empty{}, nil -} - -func (a *api) DeleteBulkState(ctx context.Context, in *runtimev1pb.DeleteBulkStateRequest) (*empty.Empty, error) { - // 1. get store - store, err := a.getStateStore(in.StoreName) - if err != nil { - log.DefaultLogger.Errorf("[runtime] [grpc.DeleteBulkState] error: %v", err) - return &empty.Empty{}, err - } - // 2. convert request - reqs := make([]state.DeleteRequest, 0, len(in.States)) - for _, item := range in.States { - key, err := runtime_state.GetModifiedStateKey(item.Key, in.StoreName, a.appId) - if err != nil { - return &empty.Empty{}, err - } - reqs = append(reqs, *converter.StateItem2DeleteRequest(item, key)) - } - // 3. send request - err = store.BulkDelete(reqs) - // 4. check result - if err != nil { - log.DefaultLogger.Errorf("[runtime] [grpc.DeleteBulkState] error: %v", err) - return &emptypb.Empty{}, err - } - return &emptypb.Empty{}, nil -} - -func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.ExecuteStateTransactionRequest) (*emptypb.Empty, error) { - // 1. check params - if a.stateStores == nil || len(a.stateStores) == 0 { - err := status.Error(codes.FailedPrecondition, messages.ErrStateStoresNotConfigured) - log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) - return &emptypb.Empty{}, err - } - storeName := in.StoreName - if a.stateStores[storeName] == nil { - err := status.Errorf(codes.InvalidArgument, messages.ErrStateStoreNotFound, storeName) - log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) - return &emptypb.Empty{}, err - } - // 2. find store - store, ok := a.transactionalStateStores[storeName] - if !ok { - err := status.Errorf(codes.Unimplemented, messages.ErrStateStoreNotSupported, storeName) - log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) - return &emptypb.Empty{}, err - } - // 3. convert request - operations := []state.TransactionalStateOperation{} - for _, op := range in.Operations { - // 3.1. extract and validate fields - var operation state.TransactionalStateOperation - var req = op.Request - // tolerant npe - if req == nil { - log.DefaultLogger.Warnf("[runtime] [grpc.ExecuteStateTransaction] one of TransactionalStateOperation.Request is nil") - continue - } - key, err := runtime_state.GetModifiedStateKey(req.Key, in.StoreName, a.appId) - if err != nil { - return &emptypb.Empty{}, err - } - // 3.2. prepare TransactionalStateOperation struct according to the operation type - switch state.OperationType(op.OperationType) { - case state.Upsert: - operation = state.TransactionalStateOperation{ - Operation: state.Upsert, - Request: *converter.StateItem2SetRequest(req, key), - } - case state.Delete: - operation = state.TransactionalStateOperation{ - Operation: state.Delete, - Request: *converter.StateItem2DeleteRequest(req, key), - } - default: - err := status.Errorf(codes.Unimplemented, messages.ErrNotSupportedStateOperation, op.OperationType) - log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) - return &emptypb.Empty{}, err - } - operations = append(operations, operation) - } - // 4. submit transactional request - err := store.Multi(&state.TransactionalStateRequest{ - Operations: operations, - Metadata: in.Metadata, - }) - // 5. check result - if err != nil { - err = status.Errorf(codes.Internal, messages.ErrStateTransaction, err.Error()) - log.DefaultLogger.Errorf("[runtime] [grpc.ExecuteStateTransaction] error: %v", err) - return &emptypb.Empty{}, err - } - return &emptypb.Empty{}, nil -} - func (a *api) GetFile(req *runtimev1pb.GetFileRequest, stream runtimev1pb.Runtime_GetFileServer) error { if a.fileOps[req.StoreName] == nil { return status.Errorf(codes.InvalidArgument, "not supported store type: %+v", req.StoreName) diff --git a/pkg/grpc/default_api/api_state.go b/pkg/grpc/default_api/api_state.go new file mode 100644 index 0000000000..836141b439 --- /dev/null +++ b/pkg/grpc/default_api/api_state.go @@ -0,0 +1,192 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package default_api + +import ( + "context" + _ "net/http/pprof" + + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + dapr_common_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/common/v1" + dapr_v1pb "mosn.io/layotto/pkg/grpc/dapr/proto/runtime/v1" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" +) + +// GetState obtains the state for a specific key. +func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*runtimev1pb.GetStateResponse, error) { + if in == nil { + return &runtimev1pb.GetStateResponse{}, status.Error(codes.InvalidArgument, "GetStateRequest is nil") + } + daprReq := &dapr_v1pb.GetStateRequest{ + StoreName: in.GetStoreName(), + Key: in.GetKey(), + Consistency: dapr_common_v1pb.StateOptions_StateConsistency(in.GetConsistency()), + Metadata: in.GetMetadata(), + } + resp, err := a.daprAPI.GetState(ctx, daprReq) + if err != nil { + return &runtimev1pb.GetStateResponse{}, err + } + return &runtimev1pb.GetStateResponse{ + Data: resp.GetData(), + Etag: resp.GetEtag(), + Metadata: resp.GetMetadata(), + }, nil +} + +func (a *api) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*emptypb.Empty, error) { + if in == nil { + return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "SaveStateRequest is nil") + } + // convert request + daprReq := &dapr_v1pb.SaveStateRequest{ + StoreName: in.StoreName, + States: convertStatesToDaprPB(in.States), + } + // delegate to dapr api implementation + return a.daprAPI.SaveState(ctx, daprReq) +} + +func (a *api) GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequest) (*runtimev1pb.GetBulkStateResponse, error) { + if in == nil { + return &runtimev1pb.GetBulkStateResponse{}, status.Error(codes.InvalidArgument, "GetBulkStateRequest is nil") + } + daprReq := &dapr_v1pb.GetBulkStateRequest{ + StoreName: in.GetStoreName(), + Keys: in.GetKeys(), + Parallelism: in.GetParallelism(), + Metadata: in.GetMetadata(), + } + resp, err := a.daprAPI.GetBulkState(ctx, daprReq) + if err != nil { + return &runtimev1pb.GetBulkStateResponse{}, err + } + ret := &runtimev1pb.GetBulkStateResponse{Items: make([]*runtimev1pb.BulkStateItem, 0)} + for _, item := range resp.Items { + ret.Items = append(ret.Items, &runtimev1pb.BulkStateItem{ + Key: item.GetKey(), + Data: item.GetData(), + Etag: item.GetEtag(), + Error: item.GetError(), + Metadata: item.GetMetadata(), + }) + } + return ret, nil +} + +func (a *api) DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateRequest) (*emptypb.Empty, error) { + if in == nil { + return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "DeleteStateRequest is nil") + } + daprReq := &dapr_v1pb.DeleteStateRequest{ + StoreName: in.GetStoreName(), + Key: in.GetKey(), + Etag: convertEtagToDaprPB(in.Etag), + Options: convertOptionsToDaprPB(in.Options), + Metadata: in.GetMetadata(), + } + return a.daprAPI.DeleteState(ctx, daprReq) +} + +func (a *api) DeleteBulkState(ctx context.Context, in *runtimev1pb.DeleteBulkStateRequest) (*empty.Empty, error) { + if in == nil { + return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "DeleteBulkStateRequest is nil") + } + daprReq := &dapr_v1pb.DeleteBulkStateRequest{ + StoreName: in.GetStoreName(), + States: convertStatesToDaprPB(in.States), + } + return a.daprAPI.DeleteBulkState(ctx, daprReq) +} + +func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.ExecuteStateTransactionRequest) (*emptypb.Empty, error) { + if in == nil { + return &emptypb.Empty{}, status.Error(codes.InvalidArgument, "ExecuteStateTransactionRequest is nil") + } + daprReq := &dapr_v1pb.ExecuteStateTransactionRequest{ + StoreName: in.GetStoreName(), + Operations: convertTransactionalStateOperationToDaprPB(in.Operations), + Metadata: in.GetMetadata(), + } + return a.daprAPI.ExecuteStateTransaction(ctx, daprReq) +} + +// some code for converting from runtimev1pb to dapr_common_v1pb + +func convertEtagToDaprPB(etag *runtimev1pb.Etag) *dapr_common_v1pb.Etag { + if etag == nil { + return &dapr_common_v1pb.Etag{} + } + return &dapr_common_v1pb.Etag{Value: etag.GetValue()} +} + +func convertOptionsToDaprPB(op *runtimev1pb.StateOptions) *dapr_common_v1pb.StateOptions { + if op == nil { + return &dapr_common_v1pb.StateOptions{} + } + return &dapr_common_v1pb.StateOptions{ + Concurrency: dapr_common_v1pb.StateOptions_StateConcurrency(op.Concurrency), + Consistency: dapr_common_v1pb.StateOptions_StateConsistency(op.Consistency), + } +} + +func convertStatesToDaprPB(states []*runtimev1pb.StateItem) []*dapr_common_v1pb.StateItem { + dStates := make([]*dapr_common_v1pb.StateItem, 0, len(states)) + if states == nil { + return dStates + } + for _, s := range states { + ds := &dapr_common_v1pb.StateItem{ + Key: s.Key, + Value: s.Value, + Metadata: s.Metadata, + } + if s.Etag != nil { + ds.Etag = convertEtagToDaprPB(s.Etag) + } + if s.Options != nil { + ds.Options = convertOptionsToDaprPB(s.Options) + } + dStates = append(dStates, ds) + } + return dStates +} + +func convertTransactionalStateOperationToDaprPB(ops []*runtimev1pb.TransactionalStateOperation) []*dapr_v1pb.TransactionalStateOperation { + ret := make([]*dapr_v1pb.TransactionalStateOperation, 0, len(ops)) + for i := 0; i < len(ops); i++ { + op := ops[i] + var req *dapr_common_v1pb.StateItem + if op.Request != nil { + req = &dapr_common_v1pb.StateItem{ + Key: op.GetRequest().GetKey(), + Value: op.GetRequest().GetValue(), + Etag: convertEtagToDaprPB(op.GetRequest().GetEtag()), + Metadata: op.GetRequest().GetMetadata(), + Options: convertOptionsToDaprPB(op.GetRequest().GetOptions()), + } + } + ret = append(ret, &dapr_v1pb.TransactionalStateOperation{ + OperationType: op.OperationType, + Request: req, + }) + } + return ret +} diff --git a/pkg/messages/api_errors.go b/pkg/messages/api_errors.go index 6ff828a844..4bf3323c8b 100644 --- a/pkg/messages/api_errors.go +++ b/pkg/messages/api_errors.go @@ -24,12 +24,17 @@ const ( ErrPubsubCloudEventsSer = "error when marshalling cloud event envelope for topic %s pubsub %s: %s" ErrPubsubPublishMessage = "error when publish to topic %s in pubsub %s: %s" ErrPubsubCloudEventCreation = "cannot create cloudevent: %s" + // Http. + ErrNotFound = "method %q is not found" + ErrMalformedRequest = "failed deserializing HTTP body: %s" + ErrMalformedRequestData = "can't serialize request data field: %s" // State ErrStateStoresNotConfigured = "state store is not configured" ErrStateStoreNotFound = "state store %s is not found" ErrStateGet = "fail to get %s from state store %s: %s" ErrStateDelete = "failed deleting state with key %s: %s" ErrStateSave = "failed saving state in state store %s: %s" + ErrStateQuery = "failed query in state store %s: %s" // StateTransaction ErrStateStoreNotSupported = "state store %s doesn't support transaction" ErrNotSupportedStateOperation = "operation type %s not supported" diff --git a/pkg/runtime/state/compatibility.go b/pkg/runtime/state/compatibility.go index d6d9cb0a37..612fb2d97f 100644 --- a/pkg/runtime/state/compatibility.go +++ b/pkg/runtime/state/compatibility.go @@ -17,7 +17,6 @@ package state import ( "fmt" - runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" "strings" "github.com/pkg/errors" @@ -100,25 +99,3 @@ func checkKeyIllegal(key string) error { } return nil } - -func StateConsistencyToString(c runtimev1pb.StateOptions_StateConsistency) string { - switch c { - case runtimev1pb.StateOptions_CONSISTENCY_EVENTUAL: - return "eventual" - case runtimev1pb.StateOptions_CONSISTENCY_STRONG: - return "strong" - } - - return "" -} - -func StateConcurrencyToString(c runtimev1pb.StateOptions_StateConcurrency) string { - switch c { - case runtimev1pb.StateOptions_CONCURRENCY_FIRST_WRITE: - return "first-write" - case runtimev1pb.StateOptions_CONCURRENCY_LAST_WRITE: - return "last-write" - } - - return "" -}