Skip to content

Commit

Permalink
Extend the gRPC/HTTP Gateway API to support ETag in the resourceIdFil…
Browse files Browse the repository at this point in the history
…ter.

This update enables the API to handle ETag values in the resourceIdFilter.
When using gRPC or HTTP streaming, the response for an unmodified resource
will have `data.status` set to `NOT_MODIFIED`, and `data.content.data`
will be empty. Additionally, for HTTP GET requests to
`/api/v1/devices/{deviceId}/resources/{resourceHref}` with an ETag in
the header, the response will return an HTTP status code indicating that
the resource has not been modified.
  • Loading branch information
jkralik committed Oct 24, 2023
1 parent 9e88317 commit 20a1ead
Show file tree
Hide file tree
Showing 68 changed files with 2,294 additions and 1,083 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ jobs:
TEST_IDENTITY_STORE_LOG_LEVEL=${{ matrix.identityStore.log.level }} TEST_IDENTITY_STORE_LOG_DUMP_BODY=${{ matrix.identityStore.log.dumpBody }} \
${{ matrix.args }}
- name: Remove simulators container
if: ${{ always() }}
run: |
make simulators/remove
- name: Collect cloud server logs when the test fails
if: ${{ failure() }}
run: |
cat .tmp/devsim*/*.log
- name: Prepare upload files
run: |
mkdir -p ./outputs
Expand Down
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,9 @@ issues:
- text: "var-naming:|exported:"
linters:
- revive
- path: grpc-gateway/client/subscription.go
text: "SA1019:"
linters:
- staticcheck
# Fix found issues (if it's supported by the linter).
# fix: true
13 changes: 11 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,20 @@ define RUN-DOCKER-DEVICE
endef

define CLEAN-DOCKER-DEVICE
docker rm -f $(1) || true
sudo rm -rf $(WORKING_DIRECTORY)/.tmp/$(1) || true
endef

simulators/clean:
define REMOVE-DOCKER-DEVICE
docker stop --time 300 $(1) || true
docker rm -f $(1) || true
endef

simulators/remove:
$(call REMOVE-DOCKER-DEVICE,$(DEVICE_SIMULATOR_NAME))
$(call REMOVE-DOCKER-DEVICE,$(DEVICE_SIMULATOR_RES_OBSERVABLE_NAME))
.PHONY: simulators/remove

simulators/clean: simulators/remove
$(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_NAME))
$(call CLEAN-DOCKER-DEVICE,$(DEVICE_SIMULATOR_RES_OBSERVABLE_NAME))
.PHONY: simulators/clean
Expand Down
6 changes: 4 additions & 2 deletions bundle/client/grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,11 @@ func getResource(ctx context.Context, client pbGW.GrpcGatewayClient, deviceID, h
if deviceID != "" {
deviceIdFilter = append(deviceIdFilter, deviceID)
}
var resourceIdFilter []string
var resourceIdFilter []*pbGW.ResourceIdFilter
if href != "" {
resourceIdFilter = append(resourceIdFilter, commands.NewResourceID(deviceID, href).ToString())
resourceIdFilter = append(resourceIdFilter, &pbGW.ResourceIdFilter{
ResourceId: commands.NewResourceID(deviceID, href),
})
}
getClient, err := client.GetResources(ctx, &pbGW.GetResourcesRequest{
ResourceIdFilter: resourceIdFilter,
Expand Down
2 changes: 1 addition & 1 deletion cloud2cloud-gateway/service/retrieveDevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func unmarshalContent(c *commands.Content) (interface{}, error) {
return m, nil
}

func (rh *RequestHandler) RetrieveResources(ctx context.Context, resourceIdFilter []string, deviceIdFilter []string) (map[string][]Representation, error) {
func (rh *RequestHandler) RetrieveResources(ctx context.Context, resourceIdFilter []*pbGRPC.ResourceIdFilter, deviceIdFilter []string) (map[string][]Representation, error) {
client, err := rh.gwClient.GetResources(ctx, &pbGRPC.GetResourcesRequest{
DeviceIdFilter: deviceIdFilter,
ResourceIdFilter: resourceIdFilter,
Expand Down
7 changes: 6 additions & 1 deletion cloud2cloud-gateway/service/retrieveResource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ import (
"strings"

"github.com/gorilla/mux"
pbGRPC "github.com/plgd-dev/hub/v2/grpc-gateway/pb"
kitNetHttp "github.com/plgd-dev/hub/v2/pkg/net/http"
"github.com/plgd-dev/hub/v2/resource-aggregate/commands"
)

func (rh *RequestHandler) RetrieveResourceBase(ctx context.Context, w http.ResponseWriter, resourceID *commands.ResourceId, encoder responseWriterEncoderFunc) (int, error) {
allResources, err := rh.RetrieveResources(ctx, []string{resourceID.ToString()}, nil)
allResources, err := rh.RetrieveResources(ctx, []*pbGRPC.ResourceIdFilter{
{
ResourceId: resourceID,
},
}, nil)
if err != nil {
return kitNetHttp.ErrToStatusWithDef(err, http.StatusForbidden), err
}
Expand Down
2 changes: 1 addition & 1 deletion cloud2cloud-gateway/service/subscribeToDevices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestRequestHandlerSubscribeToDevicesOffline(t *testing.T) {
defer tearDown()
coapgwCfg := coapgwTest.MakeConfig(t)
coapgwCfg.Log.DumpBody = true
coapgwCfg.APIs.COAP.Addr = "localhost:45684"
coapgwCfg.APIs.COAP.Addr = "localhost:45685"
gwShutdown := coapgwTest.New(t, coapgwCfg)
ctx = kitNetGrpc.CtxWithToken(ctx, oauthTest.GetDefaultAccessToken(t))

Expand Down
1 change: 1 addition & 0 deletions cloud2cloud-gateway/service/subscribeToResource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestRequestHandlerSubscribeToResourceTokenTimeout(t *testing.T) {
time.Sleep(time.Second * 10)
// stop and start c2c-gw and let it try reestablish resource subscription with expired token
c2cgwShutdown()
time.Sleep(time.Second)
c2cgwShutdown = c2cTest.SetUp(t)
defer c2cgwShutdown()

Expand Down
2 changes: 2 additions & 0 deletions cloud2cloud-gateway/service/updateResource.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func statusToHttpStatus(status commands.Status) int {
return http.StatusServiceUnavailable
case commands.Status_NOT_IMPLEMENTED:
return http.StatusNotImplemented
case commands.Status_NOT_MODIFIED:
return http.StatusNotModified
case commands.Status_ACCEPTED:
return http.StatusAccepted
case commands.Status_ERROR:
Expand Down
46 changes: 36 additions & 10 deletions coap-gateway/coapconv/coapconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package coapconv
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -34,6 +35,8 @@ func StatusToCoapCode(status commands.Status, operation Operation) codes.Code {
}
case commands.Status_CREATED:
return codes.Created
case commands.Status_NOT_MODIFIED:
return codes.Valid
case commands.Status_ACCEPTED:
return codes.Valid
case commands.Status_BAD_REQUEST:
Expand All @@ -52,11 +55,14 @@ func StatusToCoapCode(status commands.Status, operation Operation) codes.Code {
return codes.BadRequest
}

func CoapCodeToStatus(code codes.Code) commands.Status {
func CoapCodeToStatus(code codes.Code, operation Operation) commands.Status {
switch code {
case codes.Changed, codes.Content, codes.Deleted:
return commands.Status_OK
case codes.Valid:
if operation == Retrieve {
return commands.Status_NOT_MODIFIED
}
return commands.Status_ACCEPTED
case codes.BadRequest:
return commands.Status_BAD_REQUEST
Expand Down Expand Up @@ -133,8 +139,8 @@ func NewCoapResourceRetrieveRequest(ctx context.Context, messagePool *pool.Pool,
if event.GetResourceInterface() != "" {
req.AddOptionString(message.URIQuery, "if="+event.GetResourceInterface())
}
if len(event.GetEtag()) > 0 {
if err := req.AddETag(event.GetEtag()); err != nil {
for _, etag := range event.GetEtag() {
if err := req.AddETag(etag); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -199,14 +205,34 @@ func getETagFromMessage(msg interface{ ETag() ([]byte, error) }) []byte {
return etag
}

func getETagsFromMessage(msg interface{ ETags(b [][]byte) (int, error) }) [][]byte {
if msg == nil {
return nil
}
etags := make([][]byte, 32)
for {
n, err := msg.ETags(etags)
if errors.Is(err, message.ErrTooSmall) {
etags = make([][]byte, len(etags)*2)
continue
}
if err != nil {
return nil
}
etags = etags[:n]
break
}
return etags
}

func NewConfirmResourceRetrieveRequest(resourceID *commands.ResourceId, correlationID, connectionID string, req *pool.Message) *commands.ConfirmResourceRetrieveRequest {
content := NewContent(req.Options(), req.Body())
metadata := NewCommandMetadata(req.Sequence(), connectionID)

return &commands.ConfirmResourceRetrieveRequest{
ResourceId: resourceID,
CorrelationId: correlationID,
Status: CoapCodeToStatus(req.Code()),
Status: CoapCodeToStatus(req.Code(), Retrieve),
Content: content,
CommandMetadata: metadata,
Etag: getETagFromMessage(req),
Expand All @@ -220,7 +246,7 @@ func NewConfirmResourceUpdateRequest(resourceID *commands.ResourceId, correlatio
return &commands.ConfirmResourceUpdateRequest{
ResourceId: resourceID,
CorrelationId: correlationID,
Status: CoapCodeToStatus(req.Code()),
Status: CoapCodeToStatus(req.Code(), Update),
Content: content,
CommandMetadata: metadata,
}
Expand Down Expand Up @@ -255,7 +281,7 @@ func NewConfirmResourceDeleteRequest(resourceID *commands.ResourceId, correlatio
return &commands.ConfirmResourceDeleteRequest{
ResourceId: resourceID,
CorrelationId: correlationID,
Status: CoapCodeToStatus(req.Code()),
Status: CoapCodeToStatus(req.Code(), Delete),
Content: content,
CommandMetadata: metadata,
}
Expand All @@ -269,7 +295,7 @@ func NewNotifyResourceChangedRequest(resourceID *commands.ResourceId, connection
ResourceId: resourceID,
Content: content,
CommandMetadata: metadata,
Status: CoapCodeToStatus(req.Code()),
Status: CoapCodeToStatus(req.Code(), Update),
Etag: getETagFromMessage(req),
}
}
Expand Down Expand Up @@ -322,7 +348,7 @@ func NewNotifyResourceChangedRequestsFromBatchResourceDiscovery(deviceID, connec
}
ct := contentFormat
data := r.Content
code := CoapCodeToStatus(req.Code())
code := CoapCodeToStatus(req.Code(), Retrieve)
if isEmpty {
// if we gets empty content we consider it as not found. Empty message is send when resource is deleted/acls don't allows as to access.
ct = -1
Expand Down Expand Up @@ -406,7 +432,7 @@ func NewRetrieveResourceRequest(resourceID *commands.ResourceId, req *mux.Messag
CorrelationId: correlationID,
ResourceInterface: resourceInterface,
CommandMetadata: metadata,
Etag: getETagFromMessage(req),
Etag: getETagsFromMessage(req),
}, nil
}

Expand Down Expand Up @@ -434,7 +460,7 @@ func NewConfirmResourceCreateRequest(resourceID *commands.ResourceId, correlatio
return &commands.ConfirmResourceCreateRequest{
ResourceId: resourceID,
CorrelationId: correlationID,
Status: CoapCodeToStatus(req.Code()),
Status: CoapCodeToStatus(req.Code(), Create),
Content: content,
CommandMetadata: metadata,
}
Expand Down
15 changes: 11 additions & 4 deletions coap-gateway/service/clientObserveHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ func (s *resourceSubscription) eventHandler(e *pb.Event) error {
func (s *resourceSubscription) Init(ctx context.Context) error {
res := &commands.ResourceId{DeviceId: s.deviceID, Href: s.href}
client, err := s.client.server.rdClient.GetResources(ctx, &pb.GetResourcesRequest{
ResourceIdFilter: []string{res.ToString()},
ResourceIdFilter: []*pb.ResourceIdFilter{
{
ResourceId: res,
},
},
})
if err != nil {
return err
Expand Down Expand Up @@ -222,10 +226,13 @@ func newResourceSubscription(req *mux.Message, client *session, authCtx *authori
seqNum: 2,
}

res := &commands.ResourceId{DeviceId: deviceID, Href: href}
sub := subscription.New(r.eventHandler, req.Token().String(), &pb.SubscribeToEvents_CreateSubscription{
ResourceIdFilter: []string{res.ToString()},
EventFilter: []pb.SubscribeToEvents_CreateSubscription_Event{pb.SubscribeToEvents_CreateSubscription_RESOURCE_CHANGED, pb.SubscribeToEvents_CreateSubscription_UNREGISTERED, pb.SubscribeToEvents_CreateSubscription_RESOURCE_UNPUBLISHED},
ResourceIdFilter: []*pb.ResourceIdFilter{
{
ResourceId: &commands.ResourceId{DeviceId: deviceID, Href: href},
},
},
EventFilter: []pb.SubscribeToEvents_CreateSubscription_Event{pb.SubscribeToEvents_CreateSubscription_RESOURCE_CHANGED, pb.SubscribeToEvents_CreateSubscription_UNREGISTERED, pb.SubscribeToEvents_CreateSubscription_RESOURCE_UNPUBLISHED},
})
r.sub = sub

Expand Down
6 changes: 4 additions & 2 deletions coap-gateway/service/clientRetrieveHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ func clientRetrieveHandler(req *mux.Message, client *session) (*pool.Message, er

func clientRetrieveFromResourceTwinHandler(ctx context.Context, client *session, deviceID, href string, etag []byte) (*commands.Content, coapCodes.Code, error) {
RetrieveResourcesClient, err := client.server.rdClient.GetResources(ctx, &pbGRPC.GetResourcesRequest{
ResourceIdFilter: []string{
commands.NewResourceID(deviceID, href).ToString(),
ResourceIdFilter: []*pbGRPC.ResourceIdFilter{
{
ResourceId: commands.NewResourceID(deviceID, href),
},
},
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ require (
github.com/pseudomuto/protoc-gen-doc v1.5.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.17.0
github.com/tidwall/sjson v1.2.5
github.com/ugorji/go/codec v1.2.11
github.com/vincent-petithory/dataurl v1.0.0
go.mongodb.org/mongo-driver v1.12.1
Expand Down Expand Up @@ -105,6 +107,8 @@ require (
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM=
github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
Expand Down
8 changes: 5 additions & 3 deletions grpc-gateway/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (c *Client) GetResourceLinksIterator(ctx context.Context, deviceIDs []strin
// }
// if it.Err != nil {
// }
func (c *Client) GetResourcesIterator(ctx context.Context, resourceIDs []string, deviceIDs []string, resourceTypes ...string) *kitNetGrpc.Iterator {
func (c *Client) GetResourcesIterator(ctx context.Context, resourceIDs []*pb.ResourceIdFilter, deviceIDs []string, resourceTypes ...string) *kitNetGrpc.Iterator {
r := pb.GetResourcesRequest{ResourceIdFilter: resourceIDs, DeviceIdFilter: deviceIDs, TypeFilter: resourceTypes}
return kitNetGrpc.NewIterator(c.gateway.GetResources(ctx, &r))
}
Expand All @@ -244,10 +244,12 @@ func (c *Client) GetResourcesByResourceIDs(
resourceIDsCallbacks ...ResourceIDCallback,
) error {
tc := make(map[string]func(*pb.Resource), len(resourceIDsCallbacks))
resourceIDs := make([]string, 0, len(resourceIDsCallbacks))
resourceIDs := make([]*pb.ResourceIdFilter, 0, len(resourceIDsCallbacks))
for _, c := range resourceIDsCallbacks {
tc[c.ResourceID.GetDeviceId()+c.ResourceID.GetHref()] = c.Callback
resourceIDs = append(resourceIDs, c.ResourceID.ToString())
resourceIDs = append(resourceIDs, &pb.ResourceIdFilter{
ResourceId: c.ResourceID,
})
}

it := c.GetResourcesIterator(ctx, resourceIDs, nil)
Expand Down
8 changes: 6 additions & 2 deletions grpc-gateway/client/resourceSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ func NewResourceSubscription(ctx context.Context, resourceID *commands.ResourceI
err = client.Send(&pb.SubscribeToEvents{
Action: &pb.SubscribeToEvents_CreateSubscription_{
CreateSubscription: &pb.SubscribeToEvents_CreateSubscription{
ResourceIdFilter: []string{resourceID.ToString()},
EventFilter: filterEvents,
ResourceIdFilter: []*pb.ResourceIdFilter{
{
ResourceId: resourceID,
},
},
EventFilter: filterEvents,
},
},
})
Expand Down
Loading

0 comments on commit 20a1ead

Please sign in to comment.