Skip to content

Commit

Permalink
[dtest] endpoint to fetch tagged (#3138)
Browse files Browse the repository at this point in the history
  • Loading branch information
gediminasgu authored Feb 1, 2021
1 parent 25fbe60 commit 54474fd
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
30 changes: 12 additions & 18 deletions scripts/docker-integration-tests/simple/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,18 @@ curl -vvvsS -X POST 0.0.0.0:9003/writetagged -d '{
}'

echo "Read data"
queryResult=$(curl -sSf -X POST 0.0.0.0:9003/query -d '{
"namespace": "unagg",
"query": {
"regexp": {
"field": "city",
"regexp": ".*"
}
},
"rangeStart": 0,
"rangeEnd":'"$(date +"%s")"'
}' | jq '.results | length')

if [ "$queryResult" -lt 1 ]; then
echo "Result not found"
exit 1
else
echo "Result found"
fi
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff \
'[ "$(curl -sSf -X POST 0.0.0.0:9003/query -d "{
\"namespace\": \"unagg\",
\"query\": {
\"regexp\": {
\"field\": \"city\",
\"regexp\": \".*\"
}
},
\"rangeStart\": 0,
\"rangeEnd\":'\"$(date +\"%s\")\"'
}" | jq ".results | length")" == "1" ]'

echo "Deleting placement"
curl -vvvsSf -X DELETE 0.0.0.0:7201/api/v1/services/m3db/placement
Expand Down
18 changes: 18 additions & 0 deletions src/cmd/tools/dtest/docker/harness/resources/dbnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type Node interface {
AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error)
// Fetch fetches datapoints.
Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error)
// FetchTagged fetches datapoints by tag.
FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error)
// Exec executes the given commands on the node container, returning
// stdout and stderr from the container.
Exec(commands ...string) (string, error)
Expand Down Expand Up @@ -267,6 +269,22 @@ func (c *dbNode) Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) {
return dps, nil
}

func (c *dbNode) FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) {
if c.resource.closed {
return nil, errClosed
}

logger := c.resource.logger.With(zapMethod("fetchtagged"))
result, err := c.tchanClient.TChannelClientFetchTagged(timeout, req)
if err != nil {
logger.Error("could not fetch", zap.Error(err))
return nil, err
}

logger.Info("fetched", zap.Int("series_count", len(result.GetElements())))
return result, nil
}

func (c *dbNode) Restart() error {
if c.resource.closed {
return errClosed
Expand Down
8 changes: 8 additions & 0 deletions src/dbnode/integration/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ func (client *TestTChannelClient) TChannelClientFetch(
return client.node.Fetch(ctx, req)
}

// TChannelClientFetchTagged fulfills a fetch by tag request using a tchannel client.
func (client *TestTChannelClient) TChannelClientFetchTagged(
timeout time.Duration, req *rpc.FetchTaggedRequest,
) (*rpc.FetchTaggedResult_, error) {
ctx, _ := thrift.NewContext(timeout)
return client.node.FetchTagged(ctx, req)
}

// TChannelClientAggregateTiles runs a request for AggregateTiles.
func (client *TestTChannelClient) TChannelClientAggregateTiles(
timeout time.Duration, req *rpc.AggregateTilesRequest,
Expand Down

0 comments on commit 54474fd

Please sign in to comment.