diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index ca957bf264..1d3482c232 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -233,7 +233,7 @@ function test_query_limits_applied { ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 3" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/query?query=database_write_tagged_success) = "400" ]]' - # Test the docs limit applied when directly querying + # Test the default docs limit applied when directly querying # coordinator (docs limit set by header) echo "Test query docs limit with coordinator limit header" ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ @@ -391,46 +391,6 @@ function test_series { '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]' } -function test_label_query_limits_applied { - # Test that require exhaustive does nothing if limits are not hit - echo "Test label limits with require-exhaustive headers true (below limit therefore no error)" - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]' - - # the header takes precedence over the configured default series limit - echo "Test label series limit with coordinator limit header (default requires exhaustive so error)" - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' - - echo "Test label series limit with require-exhaustive headers false" - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' - - echo "Test label series limit with require-exhaustive headers true (above limit therefore error)" - # Test that require exhaustive error is returned - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ -n $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' - # Test that require exhaustive error is 4xx - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' - - echo "Test label docs limit with coordinator limit header (default requires exhaustive so error)" - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' - - echo "Test label docs limit with require-exhaustive headers false" - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -H "M3-Limit-Max-Docs: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]' - - echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)" - # Test that require exhaustive error is returned - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]' - # Test that require exhaustive error is 4xx - ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]' -} - echo "Running readiness test" test_readiness @@ -449,7 +409,6 @@ test_prometheus_query_native_timeout test_query_restrict_tags test_prometheus_remote_write_map_tags test_series -test_label_query_limits_applied echo "Running function correctness tests" test_correctness diff --git a/scripts/docker-integration-tests/simple/test.sh b/scripts/docker-integration-tests/simple/test.sh index d85be4fa92..9c32d50973 100755 --- a/scripts/docker-integration-tests/simple/test.sh +++ b/scripts/docker-integration-tests/simple/test.sh @@ -143,24 +143,18 @@ curl -vvvsS -X POST 0.0.0.0:9003/writetagged -d '{ }' echo "Read data" -queryResult=$(curl -sSf -X POST 0.0.0.0:9003/query -d '{ - "namespace": "unagg", - "query": { - "regexp": { - "field": "city", - "regexp": ".*" - } - }, - "rangeStart": 0, - "rangeEnd":'"$(date +"%s")"' -}' | jq '.results | length') - -if [ "$queryResult" -lt 1 ]; then - echo "Result not found" - exit 1 -else - echo "Result found" -fi +ATTEMPTS=3 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf -X POST 0.0.0.0:9003/query -d "{ + \"namespace\": \"unagg\", + \"query\": { + \"regexp\": { + \"field\": \"city\", + \"regexp\": \".*\" + } + }, + \"rangeStart\": 0, + \"rangeEnd\":'\"$(date +\"%s\")\"' + }" | jq ".results | length")" == "1" ]' echo "Deleting placement" curl -vvvsSf -X DELETE 0.0.0.0:7201/api/v1/services/m3db/placement diff --git a/site/content/includes/prometheus.yml b/site/content/includes/prometheus.yml new file mode 100644 index 0000000000..78385d88b9 --- /dev/null +++ b/site/content/includes/prometheus.yml @@ -0,0 +1,32 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + external_labels: + monitor: 'codelab-monitor' + +remote_read: + - url: "http://localhost:7201/api/v1/prom/remote/read" + # To test reading even when local Prometheus has the data + read_recent: true +remote_write: + - url: "http://localhost:7201/api/v1/prom/remote/write" + +scrape_configs: + - job_name: 'prometheus' + scrape_interval: 5s + static_configs: + - targets: ['localhost:9090'] + + - job_name: 'node' + scrape_interval: 5s + static_configs: + - targets: ['localhost:8080', 'localhost:8081'] + labels: + group: 'production' + - targets: ['localhost:8082'] + labels: + group: 'canary' + + - job_name: 'm3' + static_configs: + - targets: ['localhost:7203'] diff --git a/site/content/includes/quickstart-common-steps.md b/site/content/includes/quickstart-common-steps.md index 6deb5b521e..0d688ca7c8 100644 --- a/site/content/includes/quickstart-common-steps.md +++ b/site/content/includes/quickstart-common-steps.md @@ -276,34 +276,15 @@ You can write metrics using one of two endpoints: - _[{{% apiendpoint %}}prom/remote/write](/docs/m3coordinator/api/remote/)_ - Write a Prometheus remote write query to M3DB with a binary snappy compressed Prometheus WriteRequest protobuf message. - _{{% apiendpoint %}}json/write_ - Write a JSON payload of metrics data. This endpoint is quick for testing purposes but is not as performant for production usage. -For this quickstart, use the _{{% apiendpoint %}}json/write_ endpoint to write a tagged metric to M3 with the following data in the request body, all fields are required: +{{< tabs name="prom_http_write" >}} +{{< tab name="Prometheus" >}} -- `tags`: An object of at least one `name`/`value` pairs -- `timestamp`: The UNIX timestamp for the data -- `value`: The value for the data, can be of any type - -{{% notice tip %}} -The examples below use `__name__` as the name for one of the tags, which is a Prometheus reserved tag that allows you to query metrics using the value of the tag to filter results. -{{% /notice %}} - -{{% notice tip %}} -Label names may contain ASCII letters, numbers, underscores, and Unicode characters. They must match the regex `[a-zA-Z_][a-zA-Z0-9_]*`. Label names beginning with `__` are reserved for internal use. [Read more in the Prometheus documentation](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). -{{% /notice %}} - -{{< tabs name="write_metrics" >}} -{{< tab name="Command 1" >}} - -{{< codeinclude file="docs/includes/write-metrics-1.sh" language="shell" >}} - -{{< /tab >}} -{{< tab name="Command 2" >}} - -{{< codeinclude file="docs/includes/write-metrics-2.sh" language="shell" >}} +{{< fileinclude file="quickstart-prometheus-steps.md" >}} {{< /tab >}} -{{< tab name="Command 3" >}} +{{< tab name="HTTP API" >}} -{{< codeinclude file="docs/includes/write-metrics-3.sh" language="shell" >}} +{{< fileinclude file="quickstart-http-steps.md" >}} {{< /tab >}} {{< /tabs >}} @@ -443,6 +424,95 @@ curl -X "POST" -G "{{% apiendpoint %}}query_range" \ {{% /tab %}} {{< /tabs >}} +#### Values collected from Prometheus + +If you followed the steps above for collecting metrics from Prometheus, the examples above work, but don't return any results. To query those results, use the following commands to return a sum of the values. + +{{< tabs name="example_promql_sum" >}} +{{% tab name="Linux" %}} + + + +```shell +curl -X "POST" -G "{{% apiendpoint %}}query_range" \ + -d "query=third_avenue_sum" \ + -d "start=$(date "+%s" -d "45 seconds ago")" \ + -d "end=$( date +%s )" \ + -d "step=500s" | jq . +``` + +{{% /tab %}} +{{% tab name="macOS/BSD" %}} + +```shell +curl -X "POST" -G "{{% apiendpoint %}}query_range" \ + -d "query=third_avenue_sum" \ + -d "start=$( date -v -45S +%s )" \ + -d "end=$( date +%s )" \ + -d "step=500s" | jq . +``` + +{{% /tab %}} +{{% tab name="Output" %}} + +```json +{ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "__name__": "third_avenue_sum", + "group": "canary", + "instance": "localhost:8082", + "job": "node", + "monitor": "codelab-monitor" + }, + "values": [ + [ + 1608737991, + "5801.45" + ] + ] + }, + { + "metric": { + "__name__": "third_avenue_sum", + "group": "production", + "instance": "localhost:8080", + "job": "node", + "monitor": "codelab-monitor" + }, + "values": [ + [ + 1608737991, + "5501.45" + ] + ] + }, + { + "metric": { + "__name__": "third_avenue_sum", + "group": "production", + "instance": "localhost:8081", + "job": "node", + "monitor": "codelab-monitor" + }, + "values": [ + [ + 1608737991, + "13480.27" + ] + ] + } + ] + } +} +``` + +{{% /tab %}} +{{< /tabs >}} + +```shell +curl -X "POST" -G "{{% apiendpoint %}}query_range" \ + -d "query=third_avenue" \ + -d "start=$(date "+%s" -d "45 seconds ago")" \ + -d "end=$( date +%s )" \ + -d "step=5s" | jq . +``` + +{{% /tab %}} +{{% tab name="macOS/BSD" %}} + +```shell +curl -X "POST" -G "{{% apiendpoint %}}query_range" \ + -d "query=third_avenue" \ + -d "start=$( date -v -45S +%s )" \ + -d "end=$( date +%s )" \ + -d "step=5s" | jq . +``` + +{{% /tab %}} +{{% tab name="Output" %}} + +```json +{ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "__name__": "third_avenue", + "checkout": "1", + "city": "new_york" + }, + "values": [ + [ + {{% now %}}, + "3347.26" + ], + [ + {{% now %}}, + "5347.26" + ], + [ + {{% now %}}, + "7347.26" + ] + ] + } + ] + } +} +``` + +{{% /tab %}} +{{< /tabs >}} + +#### Values above a certain number + +{{< tabs name="example_promql_range" >}} +{{% tab name="Linux" %}} + + + +```shell +curl -X "POST" -G "{{% apiendpoint %}}query_range" \ + -d "query=third_avenue > 6000" \ + -d "start=$(date "+%s" -d "45 seconds ago")" \ + -d "end=$( date +%s )" \ + -d "step=5s" | jq . +``` + +{{% /tab %}} +{{% tab name="macOS/BSD" %}} + +```shell +curl -X "POST" -G "{{% apiendpoint %}}query_range" \ + -d "query=third_avenue > 6000" \ + -d "start=$(date -v -45S "+%s")" \ + -d "end=$( date +%s )" \ + -d "step=5s" | jq . +``` + +{{% /tab %}} +{{% tab name="Output" %}} + +```json +{ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "__name__": "third_avenue", + "checkout": "1", + "city": "new_york" + }, + "values": [ + [ + {{% now %}}, + "7347.26" + ] + ] + } + ] + } +} +``` + +{{% /tab %}} +{{< /tabs >}} diff --git a/site/content/includes/quickstart-http-steps.md b/site/content/includes/quickstart-http-steps.md new file mode 100644 index 0000000000..c34f13172d --- /dev/null +++ b/site/content/includes/quickstart-http-steps.md @@ -0,0 +1,31 @@ +You can use the _{{% apiendpoint %}}json/write_ endpoint to write a tagged metric to M3 with the following data in the request body, all fields are required: + +- `tags`: An object of at least one `name`/`value` pairs +- `timestamp`: The UNIX timestamp for the data +- `value`: The float64 value for the data + +{{% notice tip %}} +The examples below use `__name__` as the name for one of the tags, which is a Prometheus reserved tag that allows you to query metrics using the value of the tag to filter results. +{{% /notice %}} + +{{% notice tip %}} +Label names may contain ASCII letters, numbers, underscores, and Unicode characters. They must match the regex `[a-zA-Z_][a-zA-Z0-9_]*`. Label names beginning with `__` are reserved for internal use. [Read more in the Prometheus documentation](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). +{{% /notice %}} + +{{< tabs name="write_metrics" >}} +{{< tab name="Command 1" >}} + +{{< codeinclude file="docs/includes/write-metrics-1.sh" language="shell" >}} + +{{< /tab >}} +{{< tab name="Command 2" >}} + +{{< codeinclude file="docs/includes/write-metrics-2.sh" language="shell" >}} + +{{< /tab >}} +{{< tab name="Command 3" >}} + +{{< codeinclude file="docs/includes/write-metrics-3.sh" language="shell" >}} + +{{< /tab >}} +{{< /tabs >}} \ No newline at end of file diff --git a/site/content/includes/quickstart-prometheus-steps.md b/site/content/includes/quickstart-prometheus-steps.md new file mode 100644 index 0000000000..9fc5903d27 --- /dev/null +++ b/site/content/includes/quickstart-prometheus-steps.md @@ -0,0 +1,63 @@ +This quickstart uses [the textfile collector feature](https://github.com/prometheus/node_exporter#textfile-collector) of the Prometheus node exporter to export metrics to Prometheus that M3 then ingests. To follow the next steps, [download node_exporter](https://github.com/prometheus/node_exporter#installation-and-usage). + +#### Configure and Start Prometheus + +With M3 running and ready to receive metrics, change your Prometheus configuration to add M3 as `remote_read` and `remote_write` URLs, and as a job. With the configuration below, Prometheus scrapes metrics from two local nodes `localhost:8080` and `localhost:8081` in the `production` group, and one local node `localhost:8082` in the `canary` group: + +{{< codeinclude file="docs/includes/prometheus.yml" language="yaml" >}} + +Start Prometheus using the new configuration with the command below: + +```shell +prometheus --config.file=prometheus.yml +``` + +#### Start Node Exporters + +The three commands below simulate three point of sale (POS) devices reporting an hourly sales total for the POS. The node_exporter textfile collector uses[ _.prom_ files](https://prometheus.io/docs/instrumenting/exposition_formats/) for metrics, and only loads files from a directory, so this requires some extra steps for each node for this example. + +{{< tabs name="node_exporters" >}} +{{< tab name="Node 1" >}} + +{{% notice tip %}} +Below is the text file for the first node, copy and paste it into a new _.prom_ file in a directory named _node-1_, or [download the file](/docs/includes/quickstart/node-1/metrics-1.prom) into that new folder. +{{% /notice %}} + +{{< codeinclude file="docs/includes/quickstart/node-1/metrics-1.prom" language="text" >}} + +Run node_exporter from its install location, passing the directory that contains the textfile, and the public facing address, which for this example serves as the identifier of the POS. + +{{< codeinclude file="docs/includes/quickstart/node-1/node_exporter-1.sh" language="shell" >}} + +{{< /tab >}} +{{< tab name="Node 2" >}} + +{{% notice tip %}} +Below is the text file for the second node, copy and paste it into a new _.prom_ file in a directory named _node-2_, or [download the file](/docs/includes/quickstart/node-2/metrics-2.prom) into that new folder. +{{% /notice %}} + +{{< codeinclude file="docs/includes/quickstart/node-2/metrics-2.prom" language="text" >}} + +Run node_exporter from its install location, passing the directory that contains the textfile, and the public facing address, which for this example serves as the identifier of the POS. + +{{< codeinclude file="docs/includes/quickstart/node-2/node_exporter-2.sh" language="shell" >}} + +{{< /tab >}} +{{< tab name="Node 3" >}} + +{{% notice tip %}} +Below is the text file for the second node, copy and paste it into a new _.prom_ file in a directory named _node-3_, or [download the file](/docs/includes/quickstart/node-3/metrics-3.prom) into that new folder. +{{% /notice %}} + +{{< codeinclude file="docs/includes/quickstart/node-3/metrics-3.prom" language="text" >}} + +Run node_exporter from it's install location, passing the directory that contains the textfile, and the public facing address, which for this example serves as the identifier of the POS. + +{{< codeinclude file="docs/includes/quickstart/node-3/node_exporter-3.sh" language="shell" >}} + +{{< /tab >}} +{{< /tabs >}} + +{{% notice tip %}} +You can now confirm that the node_exporter exported metrics to Prometheus by searching for `third_avenue` in the Prometheus dashboard. +{{% /notice %}} diff --git a/site/content/includes/quickstart/node-1/metrics-1.prom b/site/content/includes/quickstart/node-1/metrics-1.prom new file mode 100644 index 0000000000..63586fb075 --- /dev/null +++ b/site/content/includes/quickstart/node-1/metrics-1.prom @@ -0,0 +1,20 @@ +# HELP third_avenue The total number of revenue from a POS +# TYPE third_avenue histogram +third_avenue_sum 3347.26 +third_avenue_sum 3447.66 +third_avenue_sum 3367.34 +third_avenue_sum 4366.29 +third_avenue_sum 4566.01 +third_avenue_sum 4892.03 +third_avenue_sum 5013.18 +third_avenue_sum 5030.72 +third_avenue_sum 5057.10 +third_avenue_sum 5079.99 +third_avenue_sum 5093.93 +third_avenue_sum 5102.63 +third_avenue_sum 5130.19 +third_avenue_sum 5190.49 +third_avenue_sum 5235.01 +third_avenue_sum 5289.39 +third_avenue_sum 5390.93 +third_avenue_sum 5501.45 diff --git a/site/content/includes/quickstart/node-1/node_exporter-1.sh b/site/content/includes/quickstart/node-1/node_exporter-1.sh new file mode 100644 index 0000000000..0305507679 --- /dev/null +++ b/site/content/includes/quickstart/node-1/node_exporter-1.sh @@ -0,0 +1,2 @@ +#!/bin/bash +./node_exporter --collector.textfile.directory=/node-1/ --web.listen-address 127.0.0.1:8081 diff --git a/site/content/includes/quickstart/node-2/metrics-2.prom b/site/content/includes/quickstart/node-2/metrics-2.prom new file mode 100644 index 0000000000..29a3a26314 --- /dev/null +++ b/site/content/includes/quickstart/node-2/metrics-2.prom @@ -0,0 +1,15 @@ +# HELP third_avenue The total number of revenue from a POS +# TYPE third_avenue histogram +third_avenue_sum 8347.26 +third_avenue_sum 9237.66 +third_avenue_sum 10111.34 +third_avenue_sum 11178.29 +third_avenue_sum 11200.09 +third_avenue_sum 12905.93 +third_avenue_sum 13004.82 +third_avenue_sum 13289.57 +third_avenue_sum 13345.19 +third_avenue_sum 13390.28 +third_avenue_sum 13412.92 +third_avenue_sum 13465.61 +third_avenue_sum 13480.27 diff --git a/site/content/includes/quickstart/node-2/node_exporter-2.sh b/site/content/includes/quickstart/node-2/node_exporter-2.sh new file mode 100644 index 0000000000..74521c756b --- /dev/null +++ b/site/content/includes/quickstart/node-2/node_exporter-2.sh @@ -0,0 +1,2 @@ +#!/bin/bash +./node_exporter --collector.textfile.directory=/node-2/ --web.listen-address 127.0.0.1:8082 diff --git a/site/content/includes/quickstart/node-3/metrics-3.prom b/site/content/includes/quickstart/node-3/metrics-3.prom new file mode 100644 index 0000000000..c7a1cec24a --- /dev/null +++ b/site/content/includes/quickstart/node-3/metrics-3.prom @@ -0,0 +1,20 @@ +# HELP third_avenue The total number of revenue from a POS +# TYPE third_avenue histogram +third_avenue_sum 1347.26 +third_avenue_sum 1447.66 +third_avenue_sum 2367.34 +third_avenue_sum 3366.29 +third_avenue_sum 3566.01 +third_avenue_sum 3892.03 +third_avenue_sum 4013.18 +third_avenue_sum 4130.72 +third_avenue_sum 4219.10 +third_avenue_sum 5079.99 +third_avenue_sum 5093.93 +third_avenue_sum 5102.63 +third_avenue_sum 5330.19 +third_avenue_sum 5500.49 +third_avenue_sum 5535.01 +third_avenue_sum 5689.39 +third_avenue_sum 5790.93 +third_avenue_sum 5801.45 diff --git a/site/content/includes/quickstart/node-3/node_exporter-3.sh b/site/content/includes/quickstart/node-3/node_exporter-3.sh new file mode 100644 index 0000000000..ccbabc51eb --- /dev/null +++ b/site/content/includes/quickstart/node-3/node_exporter-3.sh @@ -0,0 +1,2 @@ +#!/bin/bash +./node_exporter --collector.textfile.directory=/node-3/ --web.listen-address 127.0.0.1:8083 diff --git a/site/content/quickstart/create-database.sh b/site/content/quickstart/create-database.sh deleted file mode 100644 index 104dce0586..0000000000 --- a/site/content/quickstart/create-database.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash -curl -X POST http://localhost:7201/api/v1/database/create -d '{ - "type": "local", - "namespaceName": "default", - "retentionTime": "12h" -}' | jq . diff --git a/site/content/quickstart/write-metrics-1.sh b/site/content/quickstart/write-metrics-1.sh deleted file mode 100755 index 453100c993..0000000000 --- a/site/content/quickstart/write-metrics-1.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash -curl -X POST http://localhost:7201/api/v1/json/write -d '{ - "tags": - { - "__name__": "third_avenue", - "city": "new_york", - "checkout": "1" - }, - "timestamp": '\"$(date "+%s")\"', - "value": 3347.26 -}' diff --git a/site/content/quickstart/write-metrics-2.sh b/site/content/quickstart/write-metrics-2.sh deleted file mode 100755 index eea2d30348..0000000000 --- a/site/content/quickstart/write-metrics-2.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash -curl -X POST http://localhost:7201/api/v1/json/write -d '{ - "tags": - { - "__name__": "third_avenue", - "city": "new_york", - "checkout": "1" - }, - "timestamp": '\"$(date "+%s")\"', - "value": 5347.26 -}' diff --git a/site/content/quickstart/write-metrics-3.sh b/site/content/quickstart/write-metrics-3.sh deleted file mode 100755 index e6b3a5f046..0000000000 --- a/site/content/quickstart/write-metrics-3.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash -curl -X POST http://localhost:7201/api/v1/json/write -d '{ - "tags": - { - "__name__": "third_avenue", - "city": "new_york", - "checkout": "1" - }, - "timestamp": '\"$(date "+%s")\"', - "value": 7347.26 -}' diff --git a/site/go.mod b/site/go.mod index c51383b444..4c78761ce7 100644 --- a/site/go.mod +++ b/site/go.mod @@ -2,4 +2,4 @@ module m3-site go 1.15 -require github.com/chronosphereio/victor v0.0.0-20201222112852-eaf9ae24e2db // indirect +require github.com/chronosphereio/victor v0.0.0-20201229142059-d2026c6102a7 // indirect diff --git a/site/go.sum b/site/go.sum index 14aceadf1b..da60e7459a 100644 --- a/site/go.sum +++ b/site/go.sum @@ -2,3 +2,5 @@ github.com/chronosphereio/victor v0.0.0-20201217171243-b2ba7c848932 h1:NcHWcnCIE github.com/chronosphereio/victor v0.0.0-20201217171243-b2ba7c848932/go.mod h1:wz1ngMsk+1D1ug2ObnI3zXs+/ZdBPrWLb6R1WQW3XNM= github.com/chronosphereio/victor v0.0.0-20201222112852-eaf9ae24e2db h1:qEMGd5zaqbT8eaKJ25n99rgakWgUhX3xnsLae1N6Si8= github.com/chronosphereio/victor v0.0.0-20201222112852-eaf9ae24e2db/go.mod h1:wz1ngMsk+1D1ug2ObnI3zXs+/ZdBPrWLb6R1WQW3XNM= +github.com/chronosphereio/victor v0.0.0-20201229142059-d2026c6102a7 h1:VQHAfLGF53imcPuvx0jxNPFHqS9h0vSyq4asWxEdYlU= +github.com/chronosphereio/victor v0.0.0-20201229142059-d2026c6102a7/go.mod h1:wz1ngMsk+1D1ug2ObnI3zXs+/ZdBPrWLb6R1WQW3XNM= diff --git a/src/cmd/tools/dtest/docker/harness/resources/dbnode.go b/src/cmd/tools/dtest/docker/harness/resources/dbnode.go index 2d12305ad7..f9598e5829 100644 --- a/src/cmd/tools/dtest/docker/harness/resources/dbnode.go +++ b/src/cmd/tools/dtest/docker/harness/resources/dbnode.go @@ -101,6 +101,8 @@ type Node interface { AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error) // Fetch fetches datapoints. Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) + // FetchTagged fetches datapoints by tag. + FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) // Exec executes the given commands on the node container, returning // stdout and stderr from the container. Exec(commands ...string) (string, error) @@ -267,6 +269,22 @@ func (c *dbNode) Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) { return dps, nil } +func (c *dbNode) FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { + if c.resource.closed { + return nil, errClosed + } + + logger := c.resource.logger.With(zapMethod("fetchtagged")) + result, err := c.tchanClient.TChannelClientFetchTagged(timeout, req) + if err != nil { + logger.Error("could not fetch", zap.Error(err)) + return nil, err + } + + logger.Info("fetched", zap.Int("series_count", len(result.GetElements()))) + return result, nil +} + func (c *dbNode) Restart() error { if c.resource.closed { return errClosed diff --git a/src/dbnode/client/session_fetch_test.go b/src/dbnode/client/session_fetch_test.go index c0c8afa717..b249de2f82 100644 --- a/src/dbnode/client/session_fetch_test.go +++ b/src/dbnode/client/session_fetch_test.go @@ -170,7 +170,10 @@ func testSessionFetchIDs(t *testing.T, testOpts testOptions) { } fetchBatchOps, enqueueWg := prepareTestFetchEnqueues(t, ctrl, session, expectedFetches) + valueWriteWg := sync.WaitGroup{} + valueWriteWg.Add(1) go func() { + defer valueWriteWg.Done() // Fulfill fetch ops once enqueued enqueueWg.Wait() fulfillFetchBatchOps(t, testOpts, fetches, *fetchBatchOps, 0) @@ -181,6 +184,8 @@ func testSessionFetchIDs(t *testing.T, testOpts testOptions) { results, err := session.FetchIDs(nsID, fetches.IDsIter(), start, end) if testOpts.expectedErr == nil { require.NoError(t, err) + // wait for testValues to be written to before reading to assert. + valueWriteWg.Wait() assertFetchResults(t, start, end, fetches, results, testOpts.annEqual) } else { require.Equal(t, testOpts.expectedErr, err) diff --git a/src/dbnode/integration/client.go b/src/dbnode/integration/client.go index 287d367dba..f40b293c5d 100644 --- a/src/dbnode/integration/client.go +++ b/src/dbnode/integration/client.go @@ -129,6 +129,14 @@ func (client *TestTChannelClient) TChannelClientFetch( return client.node.Fetch(ctx, req) } +// TChannelClientFetchTagged fulfills a fetch by tag request using a tchannel client. +func (client *TestTChannelClient) TChannelClientFetchTagged( + timeout time.Duration, req *rpc.FetchTaggedRequest, +) (*rpc.FetchTaggedResult_, error) { + ctx, _ := thrift.NewContext(timeout) + return client.node.FetchTagged(ctx, req) +} + // TChannelClientAggregateTiles runs a request for AggregateTiles. func (client *TestTChannelClient) TChannelClientAggregateTiles( timeout time.Duration, req *rpc.AggregateTilesRequest, diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 5b60d27804..b779a73051 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift" "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/convert" tterrors "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/errors" @@ -58,7 +59,6 @@ import ( xtime "github.com/m3db/m3/src/x/time" apachethrift "github.com/apache/thrift/lib/go/thrift" - "github.com/m3db/m3/src/dbnode/namespace" opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" @@ -252,6 +252,9 @@ func (p pools) CheckedBytesWrapper() xpool.CheckedBytesWrapperPool { return p.ch type Service interface { rpc.TChanNode + // FetchTaggedIter returns an iterator for the results of FetchTagged. + FetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedRequest) (FetchTaggedResultsIter, error) + // Only safe to be called one time once the service has started. SetDatabase(db storage.Database) error @@ -712,11 +715,7 @@ func (s *service) readDatapoints( } func (s *service) FetchTagged(tctx thrift.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { - db, err := s.startReadRPCWithDB() - if err != nil { - return nil, err - } - defer s.readRPCCompleted() + callStart := s.nowFn() ctx := addSourceToContext(tctx, req.Source) ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchTagged) @@ -729,153 +728,288 @@ func (s *service) FetchTagged(tctx thrift.Context, req *rpc.FetchTaggedRequest) ) } - result, err := s.fetchTagged(ctx, db, req) + result, err := s.fetchTagged(ctx, req) if sampled && err != nil { sp.LogFields(opentracinglog.Error(err)) } sp.Finish() + s.metrics.fetchTagged.ReportSuccessOrError(err, s.nowFn().Sub(callStart)) return result, err } -func (s *service) fetchTagged(ctx context.Context, db storage.Database, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { - callStart := s.nowFn() +func (s *service) fetchTagged(ctx context.Context, req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { + iter, err := s.FetchTaggedIter(ctx, req) + if err != nil { + return nil, err + } + response := &rpc.FetchTaggedResult_{ + Elements: make([]*rpc.FetchTaggedIDResult_, 0, iter.NumIDs()), + Exhaustive: iter.Exhaustive(), + } + + for iter.Next(ctx) { + cur := iter.Current() + tagBytes, err := cur.WriteTags(nil) + if err != nil { + return nil, err + } + idResult := &rpc.FetchTaggedIDResult_{ + ID: cur.ID(), + NameSpace: iter.Namespace().Bytes(), + EncodedTags: tagBytes, + } + response.Elements = append(response.Elements, idResult) + segIter := cur.SegmentsIter() + for segIter.Next(ctx) { + idResult.Segments = append(idResult.Segments, segIter.Current()) + } + if segIter.Err() != nil { + return nil, segIter.Err() + } + } + if iter.Err() != nil { + return nil, iter.Err() + } + + return response, nil +} + +func (s *service) FetchTaggedIter(ctx context.Context, req *rpc.FetchTaggedRequest) (FetchTaggedResultsIter, error) { + db, err := s.startReadRPCWithDB() + if err != nil { + return nil, err + } + ctx.RegisterCloser(xresource.SimpleCloserFn(func() { + s.readRPCCompleted() + })) ns, query, opts, fetchData, err := convert.FromRPCFetchTaggedRequest(req, s.pools) if err != nil { - s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) return nil, tterrors.NewBadRequestError(err) } queryResult, err := db.QueryIDs(ctx, ns, query, opts) if err != nil { - s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) return nil, convert.ToRPCError(err) } - results := queryResult.Results - response := &rpc.FetchTaggedResult_{ - Exhaustive: queryResult.Exhaustive, - Elements: make([]*rpc.FetchTaggedIDResult_, 0, results.Size()), - } - nsIDBytes := ns.Bytes() + tagEncoder := s.pools.tagEncoder.Get() + ctx.RegisterFinalizer(tagEncoder) - // NB(r): Step 1 if reading data then read using an asynchronous block reader, - // but don't serialize yet so that all block reader requests can - // be issued at once before waiting for their results. - var encodedDataResults [][][]xio.BlockReader - if fetchData { - encodedDataResults = make([][][]xio.BlockReader, results.Size()) - } - if err := s.fetchReadEncoded(ctx, db, response, results, ns, nsIDBytes, callStart, opts, fetchData, encodedDataResults); err != nil { - s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) - return nil, err - } + return newFetchTaggedResultsIter(fetchTaggedResultsIterOpts{ + queryResult: queryResult, + queryOpts: opts, + fetchData: fetchData, + db: db, + docReader: docs.NewEncodedDocumentReader(), + nsID: ns, + tagEncoder: tagEncoder, + iOpts: s.opts.InstrumentOptions(), + }), nil +} - // Step 2: If fetching data read the results of the asynchronous block readers. - if fetchData { - if err := s.fetchReadResults(ctx, response, ns, encodedDataResults); err != nil { - s.metrics.fetchTagged.ReportError(s.nowFn().Sub(callStart)) - return nil, err - } - } +// BaseIter has common iterator methods. +type BaseIter interface { + // Next advances to the next element, returning if one exists. + // + // Iterators that embed this interface should expose a Current() function to return the element retrieved by Next. + // If an error occurs this returns false and it can be retrieved with Err. + Next(ctx context.Context) bool - s.metrics.fetchTagged.ReportSuccess(s.nowFn().Sub(callStart)) - return response, nil + // Err returns a non-nil error if an error occurred when calling Next(). + Err() error } -func (s *service) fetchReadEncoded(ctx context.Context, - db storage.Database, - response *rpc.FetchTaggedResult_, - results index.QueryResults, - nsID ident.ID, - nsIDBytes []byte, - callStart time.Time, - opts index.QueryOptions, - fetchData bool, - encodedDataResults [][][]xio.BlockReader, -) error { - ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchReadEncoded) - if sampled { - sp.LogFields( - opentracinglog.String("id", nsID.String()), - xopentracing.Time("callStart", callStart), - ) +// FetchTaggedResultsIter iterates over the results from FetchTagged +// The iterator is not thread safe and must only be accessed from a single goroutine. +type FetchTaggedResultsIter interface { + BaseIter + + // NumIDs returns the total number of series IDs in the result. + NumIDs() int + + // Exhaustive returns true if NumIDs is all IDs that the query could have returned. + Exhaustive() bool + + // Namespace is the namespace. + Namespace() ident.ID + + // Current returns the current IDResult fetched with Next. The result is only valid if Err is nil. + Current() IDResult +} + +// SegmentsIter iterates over the Segments for an IDResult. +type SegmentsIter interface { + BaseIter + // Current returns the current Segments. The result is only valid if Err() is nil. + Current() *rpc.Segments +} + +type fetchTaggedResultsIter struct { + queryResults *index.ResultsMap + idResults []IDResult + startInclusive time.Time + endExclusive time.Time + db storage.Database + idx int + exhaustive bool + fetchData bool + cur IDResult + err error + nsID ident.ID + docReader *docs.EncodedDocumentReader + tagEncoder serialize.TagEncoder + iOpts instrument.Options +} + +type fetchTaggedResultsIterOpts struct { + queryResult index.QueryResult + queryOpts index.QueryOptions + fetchData bool + db storage.Database + docReader *docs.EncodedDocumentReader + nsID ident.ID + tagEncoder serialize.TagEncoder + iOpts instrument.Options +} + +func newFetchTaggedResultsIter(opts fetchTaggedResultsIterOpts) FetchTaggedResultsIter { //nolint: gocritic + iter := &fetchTaggedResultsIter{ + queryResults: opts.queryResult.Results.Map(), + idResults: make([]IDResult, 0, opts.queryResult.Results.Map().Len()), + exhaustive: opts.queryResult.Exhaustive, + db: opts.db, + fetchData: opts.fetchData, + startInclusive: opts.queryOpts.StartInclusive, + endExclusive: opts.queryOpts.EndExclusive, + nsID: opts.nsID, + docReader: opts.docReader, + tagEncoder: opts.tagEncoder, + iOpts: opts.iOpts, } - defer sp.Finish() - i := 0 - // Re-use reader and id for more memory-efficient processing of - // tags from doc.Metadata - reader := docs.NewEncodedDocumentReader() - for _, entry := range results.Map().Iter() { - idx := i - i++ + return iter +} - // NB(r): Use a bytes ID here so that this ID doesn't need to be - // copied by the blockRetriever in the streamRequest method when - // it checks if the ID is finalizeable or not with IsNoFinalize. - id := ident.BytesID(entry.Key()) +func (i *fetchTaggedResultsIter) NumIDs() int { + return i.queryResults.Len() +} + +func (i *fetchTaggedResultsIter) Exhaustive() bool { + return i.exhaustive +} + +func (i *fetchTaggedResultsIter) Namespace() ident.ID { + return i.nsID +} - d := entry.Value() - metadata, err := docs.MetadataFromDocument(d, reader) - if err != nil { - return err +func (i *fetchTaggedResultsIter) Next(ctx context.Context) bool { + if i.idx >= i.queryResults.Len() { + return false + } + // TODO(rhall): don't request all series blocks at once. + if i.idx == 0 { + for _, entry := range i.queryResults.Iter() { // nolint: gocritic + result := IDResult{ + queryResult: entry, + docReader: i.docReader, + tagEncoder: i.tagEncoder, + iOpts: i.iOpts, + } + if i.fetchData { + // NB(r): Use a bytes ID here so that this ID doesn't need to be + // copied by the blockRetriever in the streamRequest method when + // it checks if the ID is finalizeable or not with IsNoFinalize. + id := ident.BytesID(result.queryResult.Key()) + result.blockReaders, i.err = i.db.ReadEncoded(ctx, i.nsID, id, i.startInclusive, i.endExclusive) + if i.err != nil { + return false + } + } + i.idResults = append(i.idResults, result) } - tags := idxconvert.ToSeriesTags(metadata, idxconvert.Opts{NoClone: true}) + } + i.cur = i.idResults[i.idx] + i.idx++ + return true +} - enc := s.pools.tagEncoder.Get() - ctx.RegisterFinalizer(enc) - encodedTags, err := s.encodeTags(enc, tags) - if err != nil { // This is an invariant, should never happen - return tterrors.NewInternalError(err) - } +func (i *fetchTaggedResultsIter) Err() error { + return i.err +} - elem := &rpc.FetchTaggedIDResult_{ - NameSpace: nsIDBytes, - ID: id.Bytes(), - EncodedTags: encodedTags.Bytes(), - } - response.Elements = append(response.Elements, elem) - if !fetchData { - continue - } +func (i *fetchTaggedResultsIter) Current() IDResult { + return i.cur +} - encoded, err := db.ReadEncoded(ctx, nsID, id, - opts.StartInclusive, opts.EndExclusive) - if err != nil { - return convert.ToRPCError(err) - } else { - encodedDataResults[idx] = encoded - } +// IDResult is the FetchTagged result for a series ID. +type IDResult struct { + queryResult index.ResultsMapEntry + docReader *docs.EncodedDocumentReader + tagEncoder serialize.TagEncoder + blockReaders [][]xio.BlockReader + iOpts instrument.Options +} + +// ID returns the series ID. +func (i *IDResult) ID() []byte { + return i.queryResult.Key() +} + +// WriteTags writes the encoded tags to provided slice. Callers must use the returned reference in case the slice needs +// to grow, just like append(). +func (i *IDResult) WriteTags(dst []byte) ([]byte, error) { + metadata, err := docs.MetadataFromDocument(i.queryResult.Value(), i.docReader) + if err != nil { + return nil, err } - return nil + tags := idxconvert.ToSeriesTags(metadata, idxconvert.Opts{NoClone: true}) + encodedTags, err := encodeTags(i.tagEncoder, tags, i.iOpts) + if err != nil { // This is an invariant, should never happen + return nil, tterrors.NewInternalError(err) + } + dst = append(dst[:0], encodedTags.Bytes()...) + i.tagEncoder.Reset() + return dst, nil } -func (s *service) fetchReadResults( - ctx context.Context, - response *rpc.FetchTaggedResult_, - nsID ident.ID, - encodedDataResults [][][]xio.BlockReader, -) error { - ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchReadResults) - if sampled { - sp.LogFields( - opentracinglog.String("id", nsID.String()), - opentracinglog.Int("elementCount", len(response.Elements)), - ) +// SegmentsIter returns an iterator for the Segments. +func (i *IDResult) SegmentsIter() SegmentsIter { + return &segmentsIter{ + blockReaders: i.blockReaders, } - defer sp.Finish() +} + +type segmentsIter struct { + blockReaders [][]xio.BlockReader + idx int + cur *rpc.Segments + err error +} - for idx := range response.Elements { - segments, rpcErr := s.readEncodedResult(ctx, encodedDataResults[idx]) +func (i *segmentsIter) Next(_ context.Context) bool { + for i.idx < len(i.blockReaders) { + var rpcErr *rpc.Error + i.cur, rpcErr = readEncodedResultSegment(i.blockReaders[i.idx]) + i.idx++ if rpcErr != nil { - return rpcErr + i.err = rpcErr + return false + } + if i.cur != nil { + return true } - - response.Elements[idx].Segments = segments } - return nil + return false +} + +func (i *segmentsIter) Current() *rpc.Segments { + return i.cur +} + +func (i *segmentsIter) Err() error { + return i.err } func (s *service) Aggregate(tctx thrift.Context, req *rpc.AggregateQueryRequest) (*rpc.AggregateQueryResult_, error) { @@ -968,14 +1102,14 @@ func (s *service) AggregateRaw(tctx thrift.Context, req *rpc.AggregateQueryRawRe return response, nil } -func (s *service) encodeTags( +func encodeTags( enc serialize.TagEncoder, tags ident.TagIterator, -) (checked.Bytes, error) { + iOpts instrument.Options) (checked.Bytes, error) { if err := enc.Encode(tags); err != nil { // should never happen err = xerrors.NewRenamedError(err, fmt.Errorf("unable to encode tags")) - instrument.EmitAndLogInvariantViolation(s.opts.InstrumentOptions(), func(l *zap.Logger) { + instrument.EmitAndLogInvariantViolation(iOpts, func(l *zap.Logger) { l.Error(err.Error()) }) return nil, err @@ -984,7 +1118,7 @@ func (s *service) encodeTags( if !ok { // should never happen err := fmt.Errorf("unable to encode tags: unable to unwrap bytes") - instrument.EmitAndLogInvariantViolation(s.opts.InstrumentOptions(), func(l *zap.Logger) { + instrument.EmitAndLogInvariantViolation(iOpts, func(l *zap.Logger) { l.Error(err.Error()) }) return nil, err @@ -1309,7 +1443,7 @@ func (s *service) getBlocksMetadataV2FromResult( if tags != nil && tags.Remaining() > 0 { enc := s.pools.tagEncoder.Get() ctx.RegisterFinalizer(enc) - encoded, err := s.encodeTags(enc, tags) + encoded, err := encodeTags(enc, tags, s.opts.InstrumentOptions()) if err != nil { return nil, err } @@ -2342,7 +2476,7 @@ func (s *service) readEncodedResult( })) for _, readers := range encoded { - segment, err := s.readEncodedResultSegment(readers) + segment, err := readEncodedResultSegment(readers) if err != nil { return nil, err } @@ -2355,7 +2489,7 @@ func (s *service) readEncodedResult( return segments, nil } -func (s *service) readEncodedResultSegment( +func readEncodedResultSegment( readers []xio.BlockReader, ) (*rpc.Segments, *rpc.Error) { converted, err := convert.ToSegments(readers) diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index 81dc4be3fb..fcbb8f8a1e 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1700,11 +1700,9 @@ func TestServiceFetchTagged(t *testing.T) { sp.Finish() spans := mtr.FinishedSpans() - require.Len(t, spans, 4) - assert.Equal(t, tracepoint.FetchReadEncoded, spans[0].OperationName) - assert.Equal(t, tracepoint.FetchReadResults, spans[1].OperationName) - assert.Equal(t, tracepoint.FetchTagged, spans[2].OperationName) - assert.Equal(t, "root", spans[3].OperationName) + require.Len(t, spans, 2) + assert.Equal(t, tracepoint.FetchTagged, spans[0].OperationName) + assert.Equal(t, "root", spans[1].OperationName) } func TestServiceFetchTaggedIsOverloaded(t *testing.T) { diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 581d39b03f..64d285f701 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1421,7 +1421,6 @@ func (i *nsIndex) AggregateQuery( results := i.aggregateResultsPool.Get() aopts := index.AggregateResultsOptions{ SizeLimit: opts.SeriesLimit, - DocsLimit: opts.DocsLimit, FieldFilter: opts.FieldFilter, Type: opts.Type, } @@ -1688,16 +1687,7 @@ func (i *nsIndex) execBlockQueryFn( sp.LogFields(logFields...) defer sp.Finish() - docResults, ok := results.(index.DocumentResults) - if !ok { // should never happen - state.Lock() - err := fmt.Errorf("unknown results type [%T] received during query", results) - state.multiErr = state.multiErr.Add(err) - state.Unlock() - return - } - - blockExhaustive, err := block.Query(ctx, cancellable, query, opts, docResults, logFields) + blockExhaustive, err := block.Query(ctx, cancellable, query, opts, results, logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in @@ -1735,16 +1725,7 @@ func (i *nsIndex) execBlockWideQueryFn( sp.LogFields(logFields...) defer sp.Finish() - docResults, ok := results.(index.DocumentResults) - if !ok { // should never happen - state.Lock() - err := fmt.Errorf("unknown results type [%T] received during wide query", results) - state.multiErr = state.multiErr.Add(err) - state.Unlock() - return - } - - _, err := block.Query(ctx, cancellable, query, opts, docResults, logFields) + _, err := block.Query(ctx, cancellable, query, opts, results, logFields) if err == index.ErrUnableToQueryBlockClosed { // NB(r): Because we query this block outside of the results lock, it's // possible this block may get closed if it slides out of retention, in diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index fd2809818e..ed0b03e093 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -21,14 +21,21 @@ package index import ( - "math" + "fmt" "sync" + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/fst/encoding/docs" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" ) +const missingDocumentFields = "invalid document fields: empty %s" + +// NB: emptyValues is an AggregateValues with no values, used for tracking +// terms only rather than terms and values. +var emptyValues = AggregateValues{hasValues: false} + type aggregatedResults struct { sync.RWMutex @@ -36,7 +43,6 @@ type aggregatedResults struct { aggregateOpts AggregateResultsOptions resultsMap *AggregateResultsMap - size int totalDocsCount int idPool ident.Pool @@ -95,109 +101,201 @@ func (r *aggregatedResults) Reset( // reset all keys in the map next r.resultsMap.Reset() r.totalDocsCount = 0 - r.size = 0 // NB: could do keys+value in one step but I'm trying to avoid // using an internal method of a code-gen'd type. r.Unlock() } +func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, int, error) { + r.Lock() + err := r.addDocumentsBatchWithLock(batch) + size := r.resultsMap.Len() + docsCount := r.totalDocsCount + len(batch) + r.totalDocsCount = docsCount + r.Unlock() + return size, docsCount, err +} + func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { return r.aggregateOpts } func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, int) { r.Lock() - defer r.Unlock() - - remainingDocs := math.MaxInt64 - if r.aggregateOpts.DocsLimit != 0 { - remainingDocs = r.aggregateOpts.DocsLimit - r.totalDocsCount - } - - // NB: already hit doc limit. - if remainingDocs <= 0 { - for idx := 0; idx < len(batch); idx++ { - batch[idx].Field.Finalize() - for _, term := range batch[idx].Terms { - term.Finalize() - } - } - - return r.size, r.totalDocsCount - } - - // NB: cannot insert more than max docs, so that acts as the upper bound here. - remainingInserts := remainingDocs - if r.aggregateOpts.SizeLimit != 0 { - if remaining := r.aggregateOpts.SizeLimit - r.size; remaining < remainingInserts { - remainingInserts = remaining - } - } - - docs := 0 - numInserts := 0 - for _, entry := range batch { - if docs >= remainingDocs || numInserts >= remainingInserts { - entry.Field.Finalize() - for _, term := range entry.Terms { - term.Finalize() - } - - r.size += numInserts - r.totalDocsCount += docs - return r.size, r.totalDocsCount - } - - docs++ + valueInsertions := 0 + for _, entry := range batch { //nolint:gocritic f := entry.Field aggValues, ok := r.resultsMap.Get(f) if !ok { - if remainingInserts > numInserts { - numInserts++ - aggValues = r.valuesPool.Get() + aggValues = r.valuesPool.Get() + // we can avoid the copy because we assume ownership of the passed ident.ID, + // but still need to finalize it. + r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: false, + }) + } else { + // because we already have a entry for this field, we release the ident back to + // the underlying pool. + f.Finalize() + } + valuesMap := aggValues.Map() + for _, t := range entry.Terms { + if !valuesMap.Contains(t) { // we can avoid the copy because we assume ownership of the passed ident.ID, // but still need to finalize it. - r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{ + valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ NoCopyKey: true, NoFinalizeKey: false, }) + valueInsertions++ } else { - // this value exceeds the limit, so should be released to the underling - // pool without adding to the map. - f.Finalize() + // because we already have a entry for this term, we release the ident back to + // the underlying pool. + t.Finalize() } - } else { - // because we already have a entry for this field, we release the ident back to - // the underlying pool. - f.Finalize() } + } + size := r.resultsMap.Len() + docsCount := r.totalDocsCount + valueInsertions + r.totalDocsCount = docsCount + r.Unlock() + return size, docsCount +} - valuesMap := aggValues.Map() - for _, t := range entry.Terms { - if remainingDocs > docs { - docs++ - if !valuesMap.Contains(t) { - // we can avoid the copy because we assume ownership of the passed ident.ID, - // but still need to finalize it. - if remainingInserts > numInserts { - valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: false, - }) - numInserts++ - continue - } - } +func (r *aggregatedResults) addDocumentsBatchWithLock( + batch []doc.Document, +) error { + for _, doc := range batch { //nolint:gocritic + switch r.aggregateOpts.Type { + case AggregateTagNamesAndValues: + if err := r.addDocumentWithLock(doc); err != nil { + return err } - t.Finalize() + case AggregateTagNames: + // NB: if aggregating by name only, can ignore any additional documents + // after the result map size exceeds the optional size limit, since all + // incoming terms are either duplicates or new values which will exceed + // the limit. + size := r.resultsMap.Len() + if r.aggregateOpts.SizeLimit > 0 && size >= r.aggregateOpts.SizeLimit { + return nil + } + + if err := r.addDocumentTermsWithLock(doc); err != nil { + return err + } + default: + return fmt.Errorf("unsupported aggregation type: %v", r.aggregateOpts.Type) + } + } + + return nil +} + +func (r *aggregatedResults) addDocumentTermsWithLock( + container doc.Document, +) error { + document, err := docs.MetadataFromDocument(container, &r.encodedDocReader) + if err != nil { + return fmt.Errorf("unable to decode encoded document; %w", err) + } + for _, field := range document.Fields { //nolint:gocritic + if err := r.addTermWithLock(field.Name); err != nil { + return fmt.Errorf("unable to add document terms [%+v]: %w", document, err) + } + } + + return nil +} + +func (r *aggregatedResults) addTermWithLock( + term []byte, +) error { + if len(term) == 0 { + return fmt.Errorf(missingDocumentFields, "term") + } + + // if a term filter is provided, ensure this field matches the filter, + // otherwise ignore it. + filter := r.aggregateOpts.FieldFilter + if filter != nil && !filter.Allow(term) { + return nil + } + + // NB: can cast the []byte -> ident.ID to avoid an alloc + // before we're sure we need it. + termID := ident.BytesID(term) + if r.resultsMap.Contains(termID) { + // NB: this term is already added; continue. + return nil + } + + // Set results map to an empty AggregateValues since we only care about + // existence of the term in the map, rather than its set of values. + r.resultsMap.Set(termID, emptyValues) + return nil +} + +func (r *aggregatedResults) addDocumentWithLock( + container doc.Document, +) error { + document, err := docs.MetadataFromDocument(container, &r.encodedDocReader) + if err != nil { + return fmt.Errorf("unable to decode encoded document; %w", err) + } + for _, field := range document.Fields { //nolint:gocritic + if err := r.addFieldWithLock(field.Name, field.Value); err != nil { + return fmt.Errorf("unable to add document [%+v]: %w", document, err) } } - r.size += numInserts - r.totalDocsCount += docs - return r.size, r.totalDocsCount + return nil +} + +func (r *aggregatedResults) addFieldWithLock( + term []byte, + value []byte, +) error { + if len(term) == 0 { + return fmt.Errorf(missingDocumentFields, "term") + } + + // if a term filter is provided, ensure this field matches the filter, + // otherwise ignore it. + filter := r.aggregateOpts.FieldFilter + if filter != nil && !filter.Allow(term) { + return nil + } + + // NB: can cast the []byte -> ident.ID to avoid an alloc + // before we're sure we need it. + termID := ident.BytesID(term) + valueID := ident.BytesID(value) + + valueMap, found := r.resultsMap.Get(termID) + if found { + return valueMap.addValue(valueID) + } + + // NB: if over limit, do not add any new values to the map. + if r.aggregateOpts.SizeLimit > 0 && + r.resultsMap.Len() >= r.aggregateOpts.SizeLimit { + // Early return if limit enforced and we hit our limit. + return nil + } + + aggValues := r.valuesPool.Get() + if err := aggValues.addValue(valueID); err != nil { + // Return these values to the pool. + r.valuesPool.Put(aggValues) + return err + } + + r.resultsMap.Set(termID, aggValues) + return nil } func (r *aggregatedResults) Namespace() ident.ID { @@ -216,9 +314,9 @@ func (r *aggregatedResults) Map() *AggregateResultsMap { func (r *aggregatedResults) Size() int { r.RLock() - size := r.size + l := r.resultsMap.Len() r.RUnlock() - return size + return l } func (r *aggregatedResults) TotalDocsCount() int { diff --git a/src/dbnode/storage/index/aggregate_results_test.go b/src/dbnode/storage/index/aggregate_results_test.go index bf6368ead4..7a514b484b 100644 --- a/src/dbnode/storage/index/aggregate_results_test.go +++ b/src/dbnode/storage/index/aggregate_results_test.go @@ -21,211 +21,436 @@ package index import ( - "sort" + "bytes" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" -) -func entries(entries ...AggregateResultsEntry) []AggregateResultsEntry { return entries } + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) -func genResultsEntry(field string, terms ...string) AggregateResultsEntry { - entryTerms := make([]ident.ID, 0, len(terms)) - for _, term := range terms { - entryTerms = append(entryTerms, ident.StringID(term)) +func genDoc(strs ...string) doc.Document { + if len(strs)%2 != 0 { + panic("invalid test setup; need even str length") } - return AggregateResultsEntry{ - Field: ident.StringID(field), - Terms: entryTerms, + fields := make([]doc.Field, len(strs)/2) + for i := range fields { + fields[i] = doc.Field{ + Name: []byte(strs[i*2]), + Value: []byte(strs[i*2+1]), + } } + + return doc.NewDocumentFromMetadata(doc.Metadata{Fields: fields}) } -func toMap(res AggregateResults) map[string][]string { - entries := res.Map().Iter() - resultMap := make(map[string][]string, len(entries)) - for _, entry := range entries { //nolint:gocritic - terms := entry.value.Map().Iter() - resultTerms := make([]string, 0, len(terms)) - for _, term := range terms { - resultTerms = append(resultTerms, term.Key().String()) - } +func TestAggResultsInsertInvalid(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) + assert.True(t, res.EnforceLimits()) - sort.Strings(resultTerms) - resultMap[entry.Key().String()] = resultTerms - } + dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) + size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) + require.Error(t, err) + require.Equal(t, 0, size) + require.Equal(t, 1, docsCount) + + require.Equal(t, 0, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) - return resultMap + dInvalid = genDoc("", "foo") + size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) + require.Error(t, err) + require.Equal(t, 0, size) + require.Equal(t, 2, docsCount) + + require.Equal(t, 0, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) } -func TestAggResultsInsertWithRepeatedFields(t *testing.T) { +func TestAggResultsInsertEmptyTermValue(t *testing.T) { res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - entries := entries(genResultsEntry("foo", "baz", "baz", "baz", "qux")) - size, docsCount := res.AddFields(entries) - require.Equal(t, 3, size) - require.Equal(t, 5, docsCount) - require.Equal(t, 3, res.Size()) - require.Equal(t, 5, res.TotalDocsCount()) - - expected := map[string][]string{ - "foo": {"baz", "qux"}, + dValidEmptyTerm := genDoc("foo", "") + size, docsCount, err := res.AddDocuments([]doc.Document{dValidEmptyTerm}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 1, docsCount) + + require.Equal(t, 1, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) +} + +func TestAggResultsInsertBatchOfTwo(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) + d1 := genDoc("d1", "") + d2 := genDoc("d2", "") + size, docsCount, err := res.AddDocuments([]doc.Document{d1, d2}) + require.NoError(t, err) + require.Equal(t, 2, size) + require.Equal(t, 2, docsCount) + + require.Equal(t, 2, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) +} + +func TestAggResultsTermOnlyInsert(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{ + Type: AggregateTagNames, + }, testOpts) + dInvalid := doc.NewDocumentFromMetadata(doc.Metadata{Fields: []doc.Field{{}}}) + size, docsCount, err := res.AddDocuments([]doc.Document{dInvalid}) + require.Error(t, err) + require.Equal(t, 0, size) + require.Equal(t, 1, docsCount) + + require.Equal(t, 0, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) + + dInvalid = genDoc("", "foo") + size, docsCount, err = res.AddDocuments([]doc.Document{dInvalid}) + require.Error(t, err) + require.Equal(t, 0, size) + require.Equal(t, 2, docsCount) + + require.Equal(t, 0, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) + + valid := genDoc("foo", "") + size, docsCount, err = res.AddDocuments([]doc.Document{valid}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 3, docsCount) + + require.Equal(t, 1, res.Size()) + require.Equal(t, 3, res.TotalDocsCount()) +} + +func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) { + dValid := genDoc("foo", "bar") + size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 1, docsCount) + + require.Equal(t, 1, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) + + size, docsCount, err = res.AddDocuments([]doc.Document{dValid}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 2, docsCount) + + require.Equal(t, 1, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) +} + +func TestAggResultsInsertIdempotency(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) + testAggResultsInsertIdempotency(t, res) +} + +func TestAggResultsTermOnlyInsertIdempotency(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{ + Type: AggregateTagNames, + }, testOpts) + testAggResultsInsertIdempotency(t, res) +} + +func TestInvalidAggregateType(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{ + Type: 100, + }, testOpts) + dValid := genDoc("foo", "bar") + size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) + require.Error(t, err) + require.Equal(t, 0, size) + require.Equal(t, 1, docsCount) +} + +func TestAggResultsSameName(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) + d1 := genDoc("foo", "bar") + size, docsCount, err := res.AddDocuments([]doc.Document{d1}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 1, docsCount) + + rMap := res.Map() + aggVals, ok := rMap.Get(ident.StringID("foo")) + require.True(t, ok) + require.Equal(t, 1, aggVals.Size()) + assert.True(t, aggVals.Map().Contains(ident.StringID("bar"))) + + d2 := genDoc("foo", "biz") + size, docsCount, err = res.AddDocuments([]doc.Document{d2}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 2, docsCount) + + aggVals, ok = rMap.Get(ident.StringID("foo")) + require.True(t, ok) + require.Equal(t, 2, aggVals.Size()) + assert.True(t, aggVals.Map().Contains(ident.StringID("bar"))) + assert.True(t, aggVals.Map().Contains(ident.StringID("biz"))) +} + +func assertNoValuesInNameOnlyAggregate(t *testing.T, v AggregateValues) { + assert.False(t, v.hasValues) + assert.Nil(t, v.valuesMap) + assert.Nil(t, v.pool) + + assert.Equal(t, 0, v.Size()) + assert.Nil(t, v.Map()) + assert.False(t, v.HasValues()) +} + +func TestAggResultsTermOnlySameName(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{ + Type: AggregateTagNames, + }, testOpts) + d1 := genDoc("foo", "bar") + size, docsCount, err := res.AddDocuments([]doc.Document{d1}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 1, docsCount) + + rMap := res.Map() + aggVals, ok := rMap.Get(ident.StringID("foo")) + require.True(t, ok) + assertNoValuesInNameOnlyAggregate(t, aggVals) + + d2 := genDoc("foo", "biz") + size, docsCount, err = res.AddDocuments([]doc.Document{d2}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 2, docsCount) + + aggVals, ok = rMap.Get(ident.StringID("foo")) + require.True(t, ok) + require.False(t, aggVals.hasValues) + assertNoValuesInNameOnlyAggregate(t, aggVals) +} + +func addMultipleDocuments(t *testing.T, res AggregateResults) (int, int) { + _, _, err := res.AddDocuments([]doc.Document{ + genDoc("foo", "bar"), + genDoc("fizz", "bar"), + genDoc("buzz", "bar"), + }) + require.NoError(t, err) + + _, _, err = res.AddDocuments([]doc.Document{ + genDoc("foo", "biz"), + genDoc("fizz", "bar"), + }) + require.NoError(t, err) + + size, docsCount, err := res.AddDocuments([]doc.Document{ + genDoc("foo", "baz", "buzz", "bag", "qux", "qaz"), + }) + + require.NoError(t, err) + return size, docsCount +} + +func toFilter(strs ...string) AggregateFieldFilter { + b := make([][]byte, len(strs)) + for i, s := range strs { + b[i] = []byte(s) } - assert.Equal(t, expected, toMap(res)) + return AggregateFieldFilter(b) } -func TestWithLimits(t *testing.T) { - tests := []struct { - name string - entries []AggregateResultsEntry - sizeLimit int - docLimit int - exSeries int - exDocs int - expected map[string][]string - }{ - { - name: "single term", - entries: entries(genResultsEntry("foo")), - exSeries: 1, - exDocs: 1, - expected: map[string][]string{"foo": {}}, - }, - { - name: "same term", - entries: entries(genResultsEntry("foo"), genResultsEntry("foo")), - exSeries: 1, - exDocs: 2, - expected: map[string][]string{"foo": {}}, +var mergeTests = []struct { + name string + opts AggregateResultsOptions + expected map[string][]string +}{ + { + name: "no limit no filter", + opts: AggregateResultsOptions{}, + expected: map[string][]string{ + "foo": {"bar", "biz", "baz"}, + "fizz": {"bar"}, + "buzz": {"bar", "bag"}, + "qux": {"qaz"}, }, - { - name: "multiple terms", - entries: entries(genResultsEntry("foo"), genResultsEntry("bar")), - exSeries: 2, - exDocs: 2, - expected: map[string][]string{"foo": {}, "bar": {}}, + }, + { + name: "with limit no filter", + opts: AggregateResultsOptions{SizeLimit: 2}, + expected: map[string][]string{ + "foo": {"bar", "biz", "baz"}, + "fizz": {"bar"}, }, - { - name: "single entry", - entries: entries(genResultsEntry("foo", "bar")), - exSeries: 2, - exDocs: 2, - expected: map[string][]string{"foo": {"bar"}}, + }, + { + name: "no limit empty filter", + opts: AggregateResultsOptions{FieldFilter: toFilter()}, + expected: map[string][]string{ + "foo": {"bar", "biz", "baz"}, + "fizz": {"bar"}, + "buzz": {"bar", "bag"}, + "qux": {"qaz"}, }, - { - name: "single entry multiple fields", - entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "baz", "qux")), - exSeries: 4, - exDocs: 6, - expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + }, + { + name: "no limit matchless filter", + opts: AggregateResultsOptions{FieldFilter: toFilter("zig")}, + expected: map[string][]string{}, + }, + { + name: "empty limit with filter", + opts: AggregateResultsOptions{FieldFilter: toFilter("buzz")}, + expected: map[string][]string{ + "buzz": {"bar", "bag"}, }, - { - name: "multiple entry multiple fields", - entries: entries( - genResultsEntry("foo", "bar", "baz"), - genResultsEntry("foo", "baz", "baz", "qux")), - exSeries: 4, - exDocs: 7, - expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, + }, + { + name: "with limit with filter", + opts: AggregateResultsOptions{ + SizeLimit: 2, FieldFilter: toFilter("buzz", "qux", "fizz"), }, - { - name: "multiple entries", - entries: entries(genResultsEntry("foo", "baz"), genResultsEntry("bar", "baz", "qux")), - exSeries: 5, - exDocs: 5, - expected: map[string][]string{"foo": {"baz"}, "bar": {"baz", "qux"}}, + expected: map[string][]string{ + "fizz": {"bar"}, + "buzz": {"bar", "bag"}, }, + }, +} - { - name: "single entry query at size limit", - entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "qux")), - sizeLimit: 4, - exSeries: 4, - exDocs: 5, - expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, - }, - { - name: "single entry query at doc limit", - entries: entries(genResultsEntry("foo", "bar", "baz", "baz", "qux")), - docLimit: 5, - exSeries: 4, - exDocs: 5, - expected: map[string][]string{"foo": {"bar", "baz", "qux"}}, - }, +func TestAggResultsMerge(t *testing.T) { + for _, tt := range mergeTests { + t.Run(tt.name, func(t *testing.T) { + res := NewAggregateResults(nil, tt.opts, testOpts) + size, docsCount := addMultipleDocuments(t, res) + + require.Equal(t, len(tt.expected), size) + require.Equal(t, 6, docsCount) + ac := res.Map() + require.Equal(t, len(tt.expected), ac.Len()) + for k, v := range tt.expected { + aggVals, ok := ac.Get(ident.StringID(k)) + require.True(t, ok) + require.Equal(t, len(v), aggVals.Size()) + for _, actual := range v { + require.True(t, aggVals.Map().Contains(ident.StringID(actual))) + } + } + }) + } +} - { - name: "single entry query below size limit", - entries: entries(genResultsEntry("foo", "bar", "baz", "qux")), - sizeLimit: 3, - exSeries: 3, - exDocs: 4, - expected: map[string][]string{"foo": {"bar", "baz"}}, - }, - { - name: "single entry query below doc limit", - entries: entries(genResultsEntry("foo", "bar", "bar", "bar", "baz")), - docLimit: 3, - exSeries: 2, - exDocs: 3, - expected: map[string][]string{"foo": {"bar"}}, - }, +func TestAggResultsMergeNameOnly(t *testing.T) { + for _, tt := range mergeTests { + t.Run(tt.name+" name only", func(t *testing.T) { + tt.opts.Type = AggregateTagNames + res := NewAggregateResults(nil, tt.opts, testOpts) + size, docsCount := addMultipleDocuments(t, res) + + require.Equal(t, len(tt.expected), size) + require.Equal(t, 6, docsCount) + + ac := res.Map() + require.Equal(t, len(tt.expected), ac.Len()) + for k := range tt.expected { + aggVals, ok := ac.Get(ident.StringID(k)) + require.True(t, ok) + assertNoValuesInNameOnlyAggregate(t, aggVals) + } + }) + } +} - { - name: "multiple entry query below series limit", - entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), - sizeLimit: 3, - exSeries: 3, - exDocs: 4, - expected: map[string][]string{"foo": {"bar"}, "baz": {}}, - }, - { - name: "multiple entry query below doc limit", - entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), - docLimit: 3, - exSeries: 3, - exDocs: 3, - expected: map[string][]string{"foo": {"bar"}, "baz": {}}, - }, +func TestAggResultsInsertCopies(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) + dValid := genDoc("foo", "bar") - { - name: "multiple entry query both limits", - entries: entries(genResultsEntry("foo", "bar"), genResultsEntry("baz", "qux")), - docLimit: 3, - sizeLimit: 10, - exSeries: 3, - exDocs: 3, - expected: map[string][]string{"foo": {"bar"}, "baz": {}}, - }, + d, ok := dValid.Metadata() + require.True(t, ok) + name := d.Fields[0].Name + value := d.Fields[0].Value + size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 1, docsCount) + + found := false + + // our genny generated maps don't provide access to MapEntry directly, + // so we iterate over the map to find the added entry. Could avoid this + // in the future if we expose `func (m *Map) Entry(k Key) Entry {}` + for _, entry := range res.Map().Iter() { + // see if this key has the same value as the added document's ID. + n := entry.Key().Bytes() + if !bytes.Equal(name, n) { + continue + } + // ensure the underlying []byte for ID/Fields is at a different address + // than the original. + require.False(t, xtest.ByteSlicesBackedBySameData(n, name)) + v := entry.Value() + for _, f := range v.Map().Iter() { + v := f.Key().Bytes() + if !bytes.Equal(value, v) { + continue + } + + found = true + // ensure the underlying []byte for ID/Fields is at a different address + // than the original. + require.False(t, xtest.ByteSlicesBackedBySameData(v, value)) + } } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - res := NewAggregateResults(nil, AggregateResultsOptions{ - SizeLimit: tt.sizeLimit, - DocsLimit: tt.docLimit, - }, testOpts) - - size, docsCount := res.AddFields(tt.entries) - assert.Equal(t, tt.exSeries, size) - assert.Equal(t, tt.exDocs, docsCount) - assert.Equal(t, tt.exSeries, res.Size()) - assert.Equal(t, tt.exDocs, res.TotalDocsCount()) - - assert.Equal(t, tt.expected, toMap(res)) - }) + require.True(t, found) +} + +func TestAggResultsNameOnlyInsertCopies(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{ + Type: AggregateTagNames, + }, testOpts) + dValid := genDoc("foo", "bar") + d, ok := dValid.Metadata() + require.True(t, ok) + name := d.Fields[0].Name + size, docsCount, err := res.AddDocuments([]doc.Document{dValid}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 1, docsCount) + + found := false + // our genny generated maps don't provide access to MapEntry directly, + // so we iterate over the map to find the added entry. Could avoid this + // in the future if we expose `func (m *Map) Entry(k Key) Entry {}` + for _, entry := range res.Map().Iter() { + // see if this key has the same value as the added document's ID. + n := entry.Key().Bytes() + if !bytes.Equal(name, n) { + continue + } + + // ensure the underlying []byte for ID/Fields is at a different address + // than the original. + require.False(t, xtest.ByteSlicesBackedBySameData(n, name)) + found = true + assertNoValuesInNameOnlyAggregate(t, entry.Value()) } + + require.True(t, found) } func TestAggResultsReset(t *testing.T) { res := NewAggregateResults(ident.StringID("qux"), AggregateResultsOptions{}, testOpts) - size, docsCount := res.AddFields(entries(genResultsEntry("foo", "bar"))) - require.Equal(t, 2, size) - require.Equal(t, 2, docsCount) + d1 := genDoc("foo", "bar") + size, docsCount, err := res.AddDocuments([]doc.Document{d1}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 1, docsCount) aggVals, ok := res.Map().Get(ident.StringID("foo")) require.True(t, ok) @@ -271,9 +496,11 @@ func TestAggResultsResetNamespaceClones(t *testing.T) { func TestAggResultFinalize(t *testing.T) { // Create a Results and insert some data. res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) - size, docsCount := res.AddFields(entries(genResultsEntry("foo", "bar"))) - require.Equal(t, 2, size) - require.Equal(t, 2, docsCount) + d1 := genDoc("foo", "bar") + size, docsCount, err := res.AddDocuments([]doc.Document{d1}) + require.NoError(t, err) + require.Equal(t, 1, size) + require.Equal(t, 1, docsCount) // Ensure the data is present. rMap := res.Map() diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 86061f3a9c..1b02e60b53 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -122,6 +122,13 @@ func (s shardRangesSegmentsByVolumeType) forEachSegmentGroup(cb func(group block return nil } +type addAggregateResultsFn func( + cancellable *xresource.CancellableLifetime, + results AggregateResults, + batch []AggregateResultsEntry, + source []byte, +) ([]AggregateResultsEntry, int, int, error) + // nolint: maligned type block struct { sync.RWMutex @@ -133,6 +140,7 @@ type block struct { shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType newFieldsAndTermsIteratorFn newFieldsAndTermsIteratorFn newExecutorWithRLockFn newExecutorFn + addAggregateResultsFn addAggregateResultsFn blockStart time.Time blockEnd time.Time blockSize time.Duration @@ -141,6 +149,7 @@ type block struct { blockOpts BlockOptions nsMD namespace.Metadata namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager + queryLimits limits.QueryLimits docsLimit limits.LookbackLimit metrics blockMetrics @@ -250,10 +259,12 @@ func NewBlock( namespaceRuntimeOptsMgr: namespaceRuntimeOptsMgr, metrics: newBlockMetrics(scope), logger: iopts.Logger(), + queryLimits: opts.QueryLimits(), docsLimit: opts.QueryLimits().DocsLimit(), } b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator b.newExecutorWithRLockFn = b.executorWithRLock + b.addAggregateResultsFn = b.addAggregateResults return b, nil } @@ -401,7 +412,7 @@ func (b *block) Query( cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results DocumentResults, + results BaseResults, logFields []opentracinglog.Field, ) (bool, error) { ctx, sp := ctx.StartTraceSpan(tracepoint.BlockQuery) @@ -421,7 +432,7 @@ func (b *block) queryWithSpan( cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results DocumentResults, + results BaseResults, sp opentracing.Span, logFields []opentracinglog.Field, ) (bool, error) { @@ -530,7 +541,7 @@ func (b *block) closeAsync(closer io.Closer) { func (b *block) addQueryResults( cancellable *xresource.CancellableLifetime, - results DocumentResults, + results BaseResults, batch []doc.Document, source []byte, ) ([]doc.Document, int, int, error) { @@ -548,7 +559,7 @@ func (b *block) addQueryResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the resource. + // try to add the docs to the xresource. size, docsCount, err := results.AddDocuments(batch) // immediately release the checkout on the lifetime of query. @@ -639,17 +650,15 @@ func (b *block) aggregateWithSpan( } var ( - source = opts.Source - size = results.Size() - resultCount = results.TotalDocsCount() - batch = b.opts.AggregateResultsEntryArrayPool().Get() - maxBatch = cap(batch) - iterClosed = false // tracking whether we need to free the iterator at the end. - currBatchSize int - numAdded int + source = opts.Source + size = results.Size() + docsCount = results.TotalDocsCount() + batch = b.opts.AggregateResultsEntryArrayPool().Get() + batchSize = cap(batch) + iterClosed = false // tracking whether we need to free the iterator at the end. ) - if maxBatch == 0 { - maxBatch = defaultAggregateResultsEntryBatchSize + if batchSize == 0 { + batchSize = defaultAggregateResultsEntryBatchSize } // cleanup at the end @@ -675,16 +684,8 @@ func (b *block) aggregateWithSpan( })) } - if opts.SeriesLimit < maxBatch { - maxBatch = opts.SeriesLimit - } - - if opts.DocsLimit < maxBatch { - maxBatch = opts.DocsLimit - } - for _, reader := range readers { - if opts.LimitsExceeded(size, resultCount) { + if opts.LimitsExceeded(size, docsCount) { break } @@ -693,26 +694,22 @@ func (b *block) aggregateWithSpan( return false, err } iterClosed = false // only once the iterator has been successfully Reset(). + for iter.Next() { - if opts.LimitsExceeded(size, resultCount) { + if opts.LimitsExceeded(size, docsCount) { break } field, term := iter.Current() - batch, numAdded = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) - currBatchSize += numAdded - - // continue appending to the batch until we hit our max batch size - if currBatchSize < maxBatch { + batch = b.appendFieldAndTermToBatch(batch, field, term, iterateTerms) + if len(batch) < batchSize { continue } - batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source, currBatchSize) + batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) if err != nil { return false, err } - - currBatchSize = 0 } if err := iter.Err(); err != nil { @@ -726,23 +723,21 @@ func (b *block) aggregateWithSpan( } // Add last batch to results if remaining. - for len(batch) > 0 { - batch, size, resultCount, err = b.addAggregateResults(cancellable, results, batch, source, currBatchSize) + if len(batch) > 0 { + batch, size, docsCount, err = b.addAggregateResultsFn(cancellable, results, batch, source) if err != nil { return false, err } } - return opts.exhaustive(size, resultCount), nil + return opts.exhaustive(size, docsCount), nil } -// appendFieldAndTermToBatch adds the provided field / term onto the batch, -// optionally reusing the last element of the batch if it pertains to the same field. func (b *block) appendFieldAndTermToBatch( batch []AggregateResultsEntry, field, term []byte, includeTerms bool, -) ([]AggregateResultsEntry, int) { +) []AggregateResultsEntry { // NB(prateek): we make a copy of the (field, term) entries returned // by the iterator during traversal, because the []byte are only valid per entry during // the traversal (i.e. calling Next() invalidates the []byte). We choose to do this @@ -755,7 +750,6 @@ func (b *block) appendFieldAndTermToBatch( lastField []byte lastFieldIsValid bool reuseLastEntry bool - numAppended int ) // we are iterating multiple segments so we may receive duplicates (same field/term), but // as we are iterating one segment at a time, and because the underlying index structures @@ -778,7 +772,6 @@ func (b *block) appendFieldAndTermToBatch( reuseLastEntry = true entry = batch[len(batch)-1] // avoid alloc cause we already have the field } else { - numAppended++ // allocate id because this is the first time we've seen it // NB(r): Iterating fields FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -787,7 +780,6 @@ func (b *block) appendFieldAndTermToBatch( } if includeTerms { - numAppended++ // terms are always new (as far we know without checking the map for duplicates), so we allocate // NB(r): Iterating terms FST, this byte slice is only temporarily available // since we are pushing/popping characters from the stack as we iterate @@ -800,8 +792,7 @@ func (b *block) appendFieldAndTermToBatch( } else { batch = append(batch, entry) } - - return batch, numAppended + return batch } func (b *block) pooledID(id []byte) ident.ID { @@ -812,18 +803,15 @@ func (b *block) pooledID(id []byte) ident.ID { return b.opts.IdentifierPool().BinaryID(data) } -// addAggregateResults adds the fields on the batch -// to the provided results and resets the batch to be reused. func (b *block) addAggregateResults( cancellable *xresource.CancellableLifetime, results AggregateResults, batch []AggregateResultsEntry, source []byte, - currBatchSize int, ) ([]AggregateResultsEntry, int, int, error) { // update recently queried docs to monitor memory. if results.EnforceLimits() { - if err := b.docsLimit.Inc(currBatchSize, source); err != nil { + if err := b.docsLimit.Inc(len(batch), source); err != nil { return batch, 0, 0, err } } @@ -835,8 +823,8 @@ func (b *block) addAggregateResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the resource. - size, resultCount := results.AddFields(batch) + // try to add the docs to the xresource. + size, docsCount := results.AddFields(batch) // immediately release the checkout on the lifetime of query. cancellable.ReleaseCheckout() @@ -849,7 +837,7 @@ func (b *block) addAggregateResults( batch = batch[:0] // return results. - return batch, size, resultCount, nil + return batch, size, docsCount, nil } func (b *block) AddResults( diff --git a/src/dbnode/storage/index/block_prop_test.go b/src/dbnode/storage/index/block_prop_test.go index 0624a3923a..b4c32160c5 100644 --- a/src/dbnode/storage/index/block_prop_test.go +++ b/src/dbnode/storage/index/block_prop_test.go @@ -27,24 +27,31 @@ import ( "fmt" "math/rand" "os" + "sort" "testing" "time" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/dbnode/storage/limits" + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/fst" + "github.com/m3db/m3/src/m3ninx/index/segment/mem" idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/m3ninx/search/proptest" "github.com/m3db/m3/src/x/context" + "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xresource "github.com/m3db/m3/src/x/resource" "github.com/leanovate/gopter" + "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" ) var ( @@ -197,3 +204,204 @@ func newPropTestBlock(t *testing.T, blockStart time.Time, nsMeta namespace.Metad require.NoError(t, err) return blk, nil } + +type testFields struct { + name string + values []string +} + +func genField() gopter.Gen { + return gopter.CombineGens( + gen.AlphaString(), + gen.SliceOf(gen.AlphaString()), + ).Map(func(input []interface{}) testFields { + var ( + name = input[0].(string) + values = input[1].([]string) + ) + + return testFields{ + name: name, + values: values, + } + }) +} + +type propTestSegment struct { + metadata doc.Metadata + exCount int64 + segmentMap segmentMap +} + +type testValuesSet map[string]struct{} //nolint:gofumpt +type segmentMap map[string]testValuesSet //nolint:gofumpt + +func genTestSegment() gopter.Gen { + return gen.SliceOf(genField()).Map(func(input []testFields) propTestSegment { + segMap := make(segmentMap, len(input)) + for _, field := range input { //nolint:gocritic + for _, value := range field.values { + exVals, found := segMap[field.name] + if !found { + exVals = make(testValuesSet) + } + exVals[value] = struct{}{} + segMap[field.name] = exVals + } + } + + fields := make([]testFields, 0, len(input)) + for name, valSet := range segMap { + vals := make([]string, 0, len(valSet)) + for val := range valSet { + vals = append(vals, val) + } + + sort.Strings(vals) + fields = append(fields, testFields{name: name, values: vals}) + } + + sort.Slice(fields, func(i, j int) bool { + return fields[i].name < fields[j].name + }) + + docFields := []doc.Field{} + for _, field := range fields { //nolint:gocritic + for _, val := range field.values { + docFields = append(docFields, doc.Field{ + Name: []byte(field.name), + Value: []byte(val), + }) + } + } + + return propTestSegment{ + metadata: doc.Metadata{Fields: docFields}, + exCount: int64(len(segMap)), + segmentMap: segMap, + } + }) +} + +func verifyResults( + t *testing.T, + results AggregateResults, + exMap segmentMap, +) { + resultMap := make(segmentMap, results.Map().Len()) + for _, field := range results.Map().Iter() { //nolint:gocritic + name := field.Key().String() + _, found := resultMap[name] + require.False(t, found, "duplicate values in results map") + + values := make(testValuesSet, field.value.Map().Len()) + for _, value := range field.value.Map().Iter() { + val := value.Key().String() + _, found := values[val] + require.False(t, found, "duplicate values in results map") + + values[val] = struct{}{} + } + + resultMap[name] = values + } + + require.Equal(t, resultMap, exMap) +} + +func TestAggregateDocLimits(t *testing.T) { + var ( + parameters = gopter.DefaultTestParameters() + seed = time.Now().UnixNano() + reporter = gopter.NewFormatedReporter(true, 160, os.Stdout) + ) + + parameters.MinSuccessfulTests = 1000 + parameters.MinSize = 5 + parameters.MaxSize = 10 + parameters.Rng = rand.New(rand.NewSource(seed)) //nolint:gosec + properties := gopter.NewProperties(parameters) + + properties.Property("segments dedupe and have correct docs counts", prop.ForAll( + func(testSegment propTestSegment) (bool, error) { + seg, err := mem.NewSegment(mem.NewOptions()) + if err != nil { + return false, err + } + + _, err = seg.Insert(testSegment.metadata) + if err != nil { + return false, err + } + + err = seg.Seal() + if err != nil { + return false, err + } + + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + limitOpts := limits.NewOptions(). + SetInstrumentOptions(iOpts). + SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) + queryLimits, err := limits.NewQueryLimits((limitOpts)) + require.NoError(t, err) + testOpts = testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits) + + testMD := newTestNSMetadata(t) + start := time.Now().Truncate(time.Hour) + blk, err := NewBlock(start, testMD, BlockOptions{}, + namespace.NewRuntimeOptionsManager("foo"), testOpts) + if err != nil { + return false, err + } + + b, ok := blk.(*block) + if !ok { + return false, errors.New("bad block type") + } + + b.mutableSegments.foregroundSegments = []*readableSeg{ + newReadableSeg(seg, testOpts), + } + + results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ + Type: AggregateTagNamesAndValues, + }, testOpts) + + ctx := context.NewContext() + defer ctx.BlockingClose() + + exhaustive, err := b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{}, + results, + emptyLogFields) + + if err != nil { + return false, err + } + + require.True(t, exhaustive, errors.New("not exhaustive")) + verifyResults(t, results, testSegment.segmentMap) + found := false + for _, c := range scope.Snapshot().Counters() { + if c.Name() == "query-limit.total-docs-matched" { + require.Equal(t, testSegment.exCount, c.Value(), "docs count mismatch") + found = true + break + } + } + + require.True(t, found, "counter not found in metrics") + return true, nil + }, + genTestSegment(), + )) + + if !properties.Run(reporter) { + t.Errorf("failed with initial seed: %d", seed) + } +} diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index a45687da68..cedac63b67 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -23,6 +23,7 @@ package index import ( stdlibctx "context" "fmt" + "sort" "testing" "time" @@ -30,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/compaction" + "github.com/m3db/m3/src/dbnode/storage/limits" "github.com/m3db/m3/src/dbnode/test" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" @@ -40,15 +42,19 @@ import ( "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" xresource "github.com/m3db/m3/src/x/resource" + "github.com/m3db/m3/src/x/tallytest" xtime "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" opentracing "github.com/opentracing/opentracing-go" opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -1866,8 +1872,6 @@ func TestBlockAggregate(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - // NB: seriesLimit must be higher than the number of fields to be exhaustive. - seriesLimit := 10 testMD := newTestNSMetadata(t) start := time.Now().Truncate(time.Hour) blk, err := NewBlock(start, testMD, BlockOptions{}, @@ -1890,7 +1894,7 @@ func TestBlockAggregate(t *testing.T) { } results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ - SizeLimit: seriesLimit, + SizeLimit: 3, Type: AggregateTagNamesAndValues, }, testOpts) @@ -1902,23 +1906,24 @@ func TestBlockAggregate(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - iter.EXPECT().Reset(reader, gomock.Any()).Return(nil) - iter.EXPECT().Next().Return(true) - iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")) - iter.EXPECT().Next().Return(true) - iter.EXPECT().Current().Return([]byte("f1"), []byte("t2")) - iter.EXPECT().Next().Return(true) - iter.EXPECT().Current().Return([]byte("f2"), []byte("t1")) - iter.EXPECT().Next().Return(true) - iter.EXPECT().Current().Return([]byte("f1"), []byte("t3")) - iter.EXPECT().Next().Return(false) - iter.EXPECT().Err().Return(nil) - iter.EXPECT().Close().Return(nil) - + gomock.InOrder( + iter.EXPECT().Reset(reader, gomock.Any()).Return(nil), + iter.EXPECT().Next().Return(true), + iter.EXPECT().Current().Return([]byte("f1"), []byte("t1")), + iter.EXPECT().Next().Return(true), + iter.EXPECT().Current().Return([]byte("f1"), []byte("t2")), + iter.EXPECT().Next().Return(true), + iter.EXPECT().Current().Return([]byte("f2"), []byte("t1")), + iter.EXPECT().Next().Return(true), + iter.EXPECT().Current().Return([]byte("f1"), []byte("t3")), + iter.EXPECT().Next().Return(false), + iter.EXPECT().Err().Return(nil), + iter.EXPECT().Close().Return(nil), + ) exhaustive, err := b.Aggregate( ctx, xresource.NewCancellableLifetime(), - QueryOptions{SeriesLimit: seriesLimit}, + QueryOptions{SeriesLimit: 3}, results, emptyLogFields) require.NoError(t, err) @@ -2001,7 +2006,7 @@ func TestBlockAggregateNotExhaustive(t *testing.T) { require.False(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ - "f1": {}, + "f1": {"t1"}, }, results) sp.Finish() @@ -2242,3 +2247,199 @@ func testDoc3() doc.Metadata { }, } } + +func optionsWithAggResultsPool(capacity int) Options { + pool := NewAggregateResultsEntryArrayPool( + AggregateResultsEntryArrayPoolOpts{ + Capacity: capacity, + }, + ) + + pool.Init() + return testOpts.SetAggregateResultsEntryArrayPool(pool) +} + +func buildSegment(t *testing.T, term string, fields []string, opts mem.Options) *readableSeg { + seg, err := mem.NewSegment(opts) + require.NoError(t, err) + + docFields := make([]doc.Field, 0, len(fields)) + sort.Strings(fields) + for _, field := range fields { + docFields = append(docFields, doc.Field{Name: []byte(term), Value: []byte(field)}) + } + + _, err = seg.Insert(doc.Metadata{Fields: docFields}) + require.NoError(t, err) + + require.NoError(t, seg.Seal()) + return newReadableSeg(seg, testOpts) +} + +func TestBlockAggregateBatching(t *testing.T) { + memOpts := mem.NewOptions() + + var ( + batchSizeMap = make(map[string][]string) + batchSizeSegments = make([]*readableSeg, 0, defaultQueryDocsBatchSize) + ) + + for i := 0; i < defaultQueryDocsBatchSize; i++ { + fields := make([]string, 0, defaultQueryDocsBatchSize) + for j := 0; j < defaultQueryDocsBatchSize; j++ { + fields = append(fields, fmt.Sprintf("bar_%d", j)) + } + + if i == 0 { + batchSizeMap["foo"] = fields + } + + batchSizeSegments = append(batchSizeSegments, buildSegment(t, "foo", fields, memOpts)) + } + + tests := []struct { + name string + batchSize int + segments []*readableSeg + expectedDocsMatched int64 + expected map[string][]string + }{ + { + name: "single term multiple fields duplicated across readers", + batchSize: 3, + segments: []*readableSeg{ + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + }, + expectedDocsMatched: 1, + expected: map[string][]string{ + "foo": {"bar", "baz"}, + }, + }, + { + name: "multiple term multiple fields", + batchSize: 3, + segments: []*readableSeg{ + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + buildSegment(t, "foo", []string{"bag", "bat"}, memOpts), + buildSegment(t, "qux", []string{"bar", "baz"}, memOpts), + }, + expectedDocsMatched: 2, + expected: map[string][]string{ + "foo": {"bag", "bar", "bat", "baz"}, + "qux": {"bar", "baz"}, + }, + }, + { + name: "term present in first and third reader", + // NB: expecting three batches due to the way batches are split (on the + // first different term ID in a batch), will look like this: + // [{foo [bar baz]} {dog [bar baz]} {qux [bar]} + // [{qux [baz]} {qaz [bar baz]} {foo [bar]}] + // [{foo [baz]}] + batchSize: 3, + segments: []*readableSeg{ + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + buildSegment(t, "dog", []string{"bar", "baz"}, memOpts), + buildSegment(t, "qux", []string{"bar", "baz"}, memOpts), + buildSegment(t, "qaz", []string{"bar", "baz"}, memOpts), + buildSegment(t, "foo", []string{"bar", "baz"}, memOpts), + }, + expectedDocsMatched: 7, + expected: map[string][]string{ + "foo": {"bar", "baz"}, + "dog": {"bar", "baz"}, + "qux": {"bar", "baz"}, + "qaz": {"bar", "baz"}, + }, + }, + { + name: "batch size case", + batchSize: defaultQueryDocsBatchSize, + segments: batchSizeSegments, + expectedDocsMatched: 1, + expected: batchSizeMap, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scope := tally.NewTestScope("", nil) + iOpts := instrument.NewOptions().SetMetricsScope(scope) + limitOpts := limits.NewOptions(). + SetInstrumentOptions(iOpts). + SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}). + SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}) + queryLimits, err := limits.NewQueryLimits((limitOpts)) + require.NoError(t, err) + testOpts = optionsWithAggResultsPool(tt.batchSize). + SetInstrumentOptions(iOpts). + SetQueryLimits(queryLimits) + + testMD := newTestNSMetadata(t) + start := time.Now().Truncate(time.Hour) + blk, err := NewBlock(start, testMD, BlockOptions{}, + namespace.NewRuntimeOptionsManager("foo"), testOpts) + require.NoError(t, err) + + b, ok := blk.(*block) + require.True(t, ok) + + // NB: wrap existing aggregate results fn to more easily inspect batch size. + addAggregateResultsFn := b.addAggregateResultsFn + b.addAggregateResultsFn = func( + cancellable *xresource.CancellableLifetime, + results AggregateResults, + batch []AggregateResultsEntry, + source []byte, + ) ([]AggregateResultsEntry, int, int, error) { + // NB: since both terms and values count towards the batch size, initialize + // this with batch size to account for terms. + count := len(batch) + for _, entry := range batch { + count += len(entry.Terms) + } + + // FIXME: this currently fails, but will be fixed after + // https://github.com/m3db/m3/pull/3133 is reverted. + // require.True(t, count <= tt.batchSize, + // fmt.Sprintf("batch %v exceeds batchSize %d", batch, tt.batchSize)) + + return addAggregateResultsFn(cancellable, results, batch, source) + } + + b.mutableSegments.foregroundSegments = tt.segments + results := NewAggregateResults(ident.StringID("ns"), AggregateResultsOptions{ + Type: AggregateTagNamesAndValues, + }, testOpts) + + ctx := context.NewContext() + defer ctx.BlockingClose() + + exhaustive, err := b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{}, + results, + emptyLogFields) + require.NoError(t, err) + require.True(t, exhaustive) + + snap := scope.Snapshot() + tallytest.AssertCounterValue(t, tt.expectedDocsMatched, snap, "query-limit.total-docs-matched", nil) + resultsMap := make(map[string][]string, results.Map().Len()) + for _, res := range results.Map().Iter() { + vals := make([]string, 0, res.Value().valuesMap.Len()) + for _, val := range res.Value().valuesMap.Iter() { + vals = append(vals, val.Key().String()) + } + + sort.Strings(vals) + resultsMap[res.Key().String()] = vals + } + + assert.Equal(t, tt.expected, resultsMap) + }) + } +} diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index ab6af6aded..0c44cf9614 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -128,123 +128,32 @@ func (mr *MockBaseResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockBaseResults)(nil).EnforceLimits)) } -// Finalize mocks base method -func (m *MockBaseResults) Finalize() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Finalize") -} - -// Finalize indicates an expected call of Finalize -func (mr *MockBaseResultsMockRecorder) Finalize() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockBaseResults)(nil).Finalize)) -} - -// MockDocumentResults is a mock of DocumentResults interface -type MockDocumentResults struct { - ctrl *gomock.Controller - recorder *MockDocumentResultsMockRecorder -} - -// MockDocumentResultsMockRecorder is the mock recorder for MockDocumentResults -type MockDocumentResultsMockRecorder struct { - mock *MockDocumentResults -} - -// NewMockDocumentResults creates a new mock instance -func NewMockDocumentResults(ctrl *gomock.Controller) *MockDocumentResults { - mock := &MockDocumentResults{ctrl: ctrl} - mock.recorder = &MockDocumentResultsMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockDocumentResults) EXPECT() *MockDocumentResultsMockRecorder { - return m.recorder -} - -// Namespace mocks base method -func (m *MockDocumentResults) Namespace() ident.ID { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Namespace") - ret0, _ := ret[0].(ident.ID) - return ret0 -} - -// Namespace indicates an expected call of Namespace -func (mr *MockDocumentResultsMockRecorder) Namespace() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Namespace", reflect.TypeOf((*MockDocumentResults)(nil).Namespace)) -} - -// Size mocks base method -func (m *MockDocumentResults) Size() int { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Size") - ret0, _ := ret[0].(int) - return ret0 -} - -// Size indicates an expected call of Size -func (mr *MockDocumentResultsMockRecorder) Size() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockDocumentResults)(nil).Size)) -} - -// TotalDocsCount mocks base method -func (m *MockDocumentResults) TotalDocsCount() int { +// AddDocuments mocks base method +func (m *MockBaseResults) AddDocuments(batch []doc.Document) (int, int, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TotalDocsCount") + ret := m.ctrl.Call(m, "AddDocuments", batch) ret0, _ := ret[0].(int) - return ret0 -} - -// TotalDocsCount indicates an expected call of TotalDocsCount -func (mr *MockDocumentResultsMockRecorder) TotalDocsCount() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockDocumentResults)(nil).TotalDocsCount)) -} - -// EnforceLimits mocks base method -func (m *MockDocumentResults) EnforceLimits() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "EnforceLimits") - ret0, _ := ret[0].(bool) - return ret0 + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } -// EnforceLimits indicates an expected call of EnforceLimits -func (mr *MockDocumentResultsMockRecorder) EnforceLimits() *gomock.Call { +// AddDocuments indicates an expected call of AddDocuments +func (mr *MockBaseResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockDocumentResults)(nil).EnforceLimits)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockBaseResults)(nil).AddDocuments), batch) } // Finalize mocks base method -func (m *MockDocumentResults) Finalize() { +func (m *MockBaseResults) Finalize() { m.ctrl.T.Helper() m.ctrl.Call(m, "Finalize") } // Finalize indicates an expected call of Finalize -func (mr *MockDocumentResultsMockRecorder) Finalize() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockDocumentResults)(nil).Finalize)) -} - -// AddDocuments mocks base method -func (m *MockDocumentResults) AddDocuments(batch []doc.Document) (int, int, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddDocuments", batch) - ret0, _ := ret[0].(int) - ret1, _ := ret[1].(int) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// AddDocuments indicates an expected call of AddDocuments -func (mr *MockDocumentResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { +func (mr *MockBaseResultsMockRecorder) Finalize() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockDocumentResults)(nil).AddDocuments), batch) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockBaseResults)(nil).Finalize)) } // MockQueryResults is a mock of QueryResults interface @@ -326,18 +235,6 @@ func (mr *MockQueryResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockQueryResults)(nil).EnforceLimits)) } -// Finalize mocks base method -func (m *MockQueryResults) Finalize() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Finalize") -} - -// Finalize indicates an expected call of Finalize -func (mr *MockQueryResultsMockRecorder) Finalize() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockQueryResults)(nil).Finalize)) -} - // AddDocuments mocks base method func (m *MockQueryResults) AddDocuments(batch []doc.Document) (int, int, error) { m.ctrl.T.Helper() @@ -354,6 +251,18 @@ func (mr *MockQueryResultsMockRecorder) AddDocuments(batch interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockQueryResults)(nil).AddDocuments), batch) } +// Finalize mocks base method +func (m *MockQueryResults) Finalize() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Finalize") +} + +// Finalize indicates an expected call of Finalize +func (mr *MockQueryResultsMockRecorder) Finalize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Finalize", reflect.TypeOf((*MockQueryResults)(nil).Finalize)) +} + // Reset mocks base method func (m *MockQueryResults) Reset(nsID ident.ID, opts QueryResultsOptions) { m.ctrl.T.Helper() @@ -520,6 +429,22 @@ func (mr *MockAggregateResultsMockRecorder) EnforceLimits() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnforceLimits", reflect.TypeOf((*MockAggregateResults)(nil).EnforceLimits)) } +// AddDocuments mocks base method +func (m *MockAggregateResults) AddDocuments(batch []doc.Document) (int, int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddDocuments", batch) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// AddDocuments indicates an expected call of AddDocuments +func (mr *MockAggregateResultsMockRecorder) AddDocuments(batch interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockAggregateResults)(nil).AddDocuments), batch) +} + // Finalize mocks base method func (m *MockAggregateResults) Finalize() { m.ctrl.T.Helper() @@ -849,7 +774,7 @@ func (mr *MockBlockMockRecorder) WriteBatch(inserts interface{}) *gomock.Call { } // Query mocks base method -func (m *MockBlock) Query(ctx context.Context, cancellable *resource.CancellableLifetime, query Query, opts QueryOptions, results DocumentResults, logFields []log.Field) (bool, error) { +func (m *MockBlock) Query(ctx context.Context, cancellable *resource.CancellableLifetime, query Query, opts QueryOptions, results BaseResults, logFields []log.Field) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Query", ctx, cancellable, query, opts, results, logFields) ret0, _ := ret[0].(bool) diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index e54acefc2c..9e5e07ec07 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -159,29 +159,22 @@ type BaseResults interface { // EnforceLimits returns whether this should enforce and increment limits. EnforceLimits() bool - // Finalize releases any resources held by the Results object, - // including returning it to a backing pool. - Finalize() -} - -// DocumentResults is a collection of query results that allow accumulation of -// document values, it is synchronized when access to the results set is used -// as documented by the methods. -type DocumentResults interface { - BaseResults - // AddDocuments adds the batch of documents to the results set, it will // take a copy of the bytes backing the documents so the original can be // modified after this function returns without affecting the results map. // TODO(r): We will need to change this behavior once index fields are // mutable and the most recent need to shadow older entries. AddDocuments(batch []doc.Document) (size, docsCount int, err error) + + // Finalize releases any resources held by the Results object, + // including returning it to a backing pool. + Finalize() } // QueryResults is a collection of results for a query, it is synchronized // when access to the results set is used as documented by the methods. type QueryResults interface { - DocumentResults + BaseResults // Reset resets the Results object to initial state. Reset(nsID ident.ID, opts QueryResultsOptions) @@ -266,9 +259,6 @@ type AggregateResultsOptions struct { // overflown will return early successfully. SizeLimit int - // DocsLimit limits the amount of documents - DocsLimit int - // Type determines what result is required. Type AggregationType @@ -367,7 +357,7 @@ type Block interface { cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, - results DocumentResults, + results BaseResults, logFields []opentracinglog.Field, ) (bool, error) diff --git a/src/dbnode/storage/index/wide_query_results.go b/src/dbnode/storage/index/wide_query_results.go index cb5ad85312..af6b707584 100644 --- a/src/dbnode/storage/index/wide_query_results.go +++ b/src/dbnode/storage/index/wide_query_results.go @@ -38,8 +38,6 @@ var ErrWideQueryResultsExhausted = errors.New("no more values to add to wide que type shardFilterFn func(ident.ID) (uint32, bool) -var _ DocumentResults = (*wideResults)(nil) - type wideResults struct { sync.RWMutex size int @@ -74,7 +72,7 @@ func NewWideQueryResults( shardFilter shardFilterFn, collector chan *ident.IDBatch, opts WideQueryOptions, -) DocumentResults { +) BaseResults { batchSize := opts.BatchSize results := &wideResults{ nsID: namespaceID, diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index 5438bcebb9..a4cbadcdfa 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -829,7 +829,7 @@ func TestLimits(t *testing.T) { cancellable interface{}, query interface{}, opts interface{}, - results index.DocumentResults, + results index.BaseResults, logFields interface{}) (bool, error) { _, _, err = results.AddDocuments([]doc.Document{ // Results in size=1 and docs=2. diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index d5667ef6d5..a51bd9c5ce 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2810,6 +2810,35 @@ func (s *dbShard) LatestVolume(blockStart time.Time) (int, error) { return s.namespaceReaderMgr.latestVolume(s.shard, blockStart) } +func (s *dbShard) OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReader, error) { + latestVolume, err := s.LatestVolume(blockStart) + if err != nil { + return nil, err + } + + reader, err := s.newReaderFn(s.opts.BytesPool(), s.opts.CommitLogOptions().FilesystemOptions()) + if err != nil { + return nil, err + } + + openOpts := fs.DataReaderOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + Namespace: s.namespace.ID(), + Shard: s.ID(), + BlockStart: blockStart, + VolumeIndex: latestVolume, + }, + FileSetType: persist.FileSetFlushType, + StreamingEnabled: true, + } + + if err := reader.Open(openOpts); err != nil { + return nil, err + } + + return reader, nil +} + func (s *dbShard) ScanData( blockStart time.Time, processor fs.DataEntryProcessor, diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 5e2d9ec294..f957b4f35b 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -1954,6 +1954,43 @@ func TestShardAggregateTilesVerifySliceLengths(t *testing.T) { require.EqualError(t, err, "blockReaders and sourceBlockVolumes length mismatch (0 != 1)") } +func TestOpenStreamingReader(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + var ( + blockStart = time.Now().Truncate(time.Hour) + testOpts = DefaultTestOptions() + ) + + shard := testDatabaseShard(t, testOpts) + defer assert.NoError(t, shard.Close()) + + latestSourceVolume, err := shard.LatestVolume(blockStart) + require.NoError(t, err) + + openOpts := fs.DataReaderOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + Namespace: shard.namespace.ID(), + Shard: shard.ID(), + BlockStart: blockStart, + VolumeIndex: latestSourceVolume, + }, + FileSetType: persist.FileSetFlushType, + StreamingEnabled: true, + } + + reader := fs.NewMockDataFileSetReader(ctrl) + reader.EXPECT().Open(openOpts).Return(nil) + + shard.newReaderFn = func(pool.CheckedBytesPool, fs.Options) (fs.DataFileSetReader, error) { + return reader, nil + } + + _, err = shard.OpenStreamingReader(blockStart) + require.NoError(t, err) +} + func TestShardScan(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 3a3ef7e947..ee619f0e00 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1110,6 +1110,22 @@ func (mr *MockNamespaceMockRecorder) Shards() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shards", reflect.TypeOf((*MockNamespace)(nil).Shards)) } +// ReadableShardAt mocks base method +func (m *MockNamespace) ReadableShardAt(shardID uint32) (databaseShard, namespace.Context, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadableShardAt", shardID) + ret0, _ := ret[0].(databaseShard) + ret1, _ := ret[1].(namespace.Context) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// ReadableShardAt indicates an expected call of ReadableShardAt +func (mr *MockNamespaceMockRecorder) ReadableShardAt(shardID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadableShardAt", reflect.TypeOf((*MockNamespace)(nil).ReadableShardAt), shardID) +} + // SetIndex mocks base method func (m *MockNamespace) SetIndex(reverseIndex NamespaceIndex) error { m.ctrl.T.Helper() @@ -1331,6 +1347,22 @@ func (mr *MockdatabaseNamespaceMockRecorder) Shards() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shards", reflect.TypeOf((*MockdatabaseNamespace)(nil).Shards)) } +// ReadableShardAt mocks base method +func (m *MockdatabaseNamespace) ReadableShardAt(shardID uint32) (databaseShard, namespace.Context, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadableShardAt", shardID) + ret0, _ := ret[0].(databaseShard) + ret1, _ := ret[1].(namespace.Context) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// ReadableShardAt indicates an expected call of ReadableShardAt +func (mr *MockdatabaseNamespaceMockRecorder) ReadableShardAt(shardID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadableShardAt", reflect.TypeOf((*MockdatabaseNamespace)(nil).ReadableShardAt), shardID) +} + // SetIndex mocks base method func (m *MockdatabaseNamespace) SetIndex(reverseIndex NamespaceIndex) error { m.ctrl.T.Helper() @@ -1822,22 +1854,6 @@ func (mr *MockdatabaseNamespaceMockRecorder) AggregateTiles(ctx, sourceNs, opts return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseNamespace)(nil).AggregateTiles), ctx, sourceNs, opts) } -// ReadableShardAt mocks base method -func (m *MockdatabaseNamespace) ReadableShardAt(shardID uint32) (databaseShard, namespace.Context, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReadableShardAt", shardID) - ret0, _ := ret[0].(databaseShard) - ret1, _ := ret[1].(namespace.Context) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// ReadableShardAt indicates an expected call of ReadableShardAt -func (mr *MockdatabaseNamespaceMockRecorder) ReadableShardAt(shardID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadableShardAt", reflect.TypeOf((*MockdatabaseNamespace)(nil).ReadableShardAt), shardID) -} - // MockShard is a mock of Shard interface type MockShard struct { ctrl *gomock.Controller @@ -1931,6 +1947,21 @@ func (mr *MockShardMockRecorder) ScanData(blockStart, processor interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScanData", reflect.TypeOf((*MockShard)(nil).ScanData), blockStart, processor) } +// OpenStreamingReader mocks base method +func (m *MockShard) OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OpenStreamingReader", blockStart) + ret0, _ := ret[0].(fs.DataFileSetReader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OpenStreamingReader indicates an expected call of OpenStreamingReader +func (mr *MockShardMockRecorder) OpenStreamingReader(blockStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenStreamingReader", reflect.TypeOf((*MockShard)(nil).OpenStreamingReader), blockStart) +} + // MockdatabaseShard is a mock of databaseShard interface type MockdatabaseShard struct { ctrl *gomock.Controller @@ -2024,6 +2055,21 @@ func (mr *MockdatabaseShardMockRecorder) ScanData(blockStart, processor interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScanData", reflect.TypeOf((*MockdatabaseShard)(nil).ScanData), blockStart, processor) } +// OpenStreamingReader mocks base method +func (m *MockdatabaseShard) OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OpenStreamingReader", blockStart) + ret0, _ := ret[0].(fs.DataFileSetReader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OpenStreamingReader indicates an expected call of OpenStreamingReader +func (mr *MockdatabaseShardMockRecorder) OpenStreamingReader(blockStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenStreamingReader", reflect.TypeOf((*MockdatabaseShard)(nil).OpenStreamingReader), blockStart) +} + // OnEvictedFromWiredList mocks base method func (m *MockdatabaseShard) OnEvictedFromWiredList(id ident.ID, blockStart time.Time) { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index be916df8f3..e39b8169d4 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -283,6 +283,9 @@ type Namespace interface { // Shards returns the shard description. Shards() []Shard + // ReadableShardAt returns a readable (bootstrapped) shard by id. + ReadableShardAt(shardID uint32) (databaseShard, namespace.Context, error) + // SetIndex sets and enables reverse index for this namespace. SetIndex(reverseIndex NamespaceIndex) error @@ -475,9 +478,6 @@ type databaseNamespace interface { sourceNs databaseNamespace, opts AggregateTilesOptions, ) (int64, error) - - // ReadableShardAt returns a shard of this namespace by shardID. - ReadableShardAt(shardID uint32) (databaseShard, namespace.Context, error) } // SeriesReadWriteRef is a read/write reference for a series, @@ -515,6 +515,10 @@ type Shard interface { blockStart time.Time, processor fs.DataEntryProcessor, ) error + + // OpenStreamingDataReader creates and opens a streaming fs.DataFileSetReader + // on the latest volume of the given block. + OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReader, error) } type databaseShard interface { diff --git a/src/dbnode/tracepoint/tracepoint.go b/src/dbnode/tracepoint/tracepoint.go index a81fb5ba15..7690d91189 100644 --- a/src/dbnode/tracepoint/tracepoint.go +++ b/src/dbnode/tracepoint/tracepoint.go @@ -34,12 +34,6 @@ const ( // Query is the operation name for the tchannelthrift Query path. Query = "tchannelthrift/node.service.Query" - // FetchReadEncoded is the operation name for the tchannelthrift FetchReadEncoded path. - FetchReadEncoded = "tchannelthrift/node.service.FetchReadEncoded" - - // FetchReadResults is the operation name for the tchannelthrift FetchReadResults path. - FetchReadResults = "tchannelthrift/node.service.FetchReadResults" - // FetchReadSingleResult is the operation name for the tchannelthrift FetchReadSingleResult path. FetchReadSingleResult = "tchannelthrift/node.service.FetchReadSingleResult" diff --git a/src/m3ninx/index/segment/builder/multi_segments_builder.go b/src/m3ninx/index/segment/builder/multi_segments_builder.go index 7c8822b711..975f267d91 100644 --- a/src/m3ninx/index/segment/builder/multi_segments_builder.go +++ b/src/m3ninx/index/segment/builder/multi_segments_builder.go @@ -82,6 +82,17 @@ func (b *builderFromSegments) Reset() { } func (b *builderFromSegments) AddSegments(segments []segment.Segment) error { + // Order by largest -> smallest so that the first segment + // is the largest when iterating over term postings lists + // (which means it can be directly copied into the merged postings + // list via a union rather than needing to shift posting list + // IDs to take into account for duplicates). + // Note: This must be done first so that offset is correctly zero + // for the largest segment. + sort.Slice(segments, func(i, j int) bool { + return segments[i].Size() > segments[j].Size() + }) + // numMaxDocs can sometimes be larger than the actual number of documents // since some are duplicates numMaxDocs := 0 @@ -138,12 +149,6 @@ func (b *builderFromSegments) AddSegments(segments []segment.Segment) error { b.segmentsOffset += postings.ID(added) } - // Sort segments in descending order in terms of size so the multi segments - // terms iter more efficiently builds its postings list. - sort.Slice(b.segments, func(i, j int) bool { - return b.segments[i].segment.Size() > b.segments[j].segment.Size() - }) - // Make sure the terms iter has all the segments to combine data from b.termsIter.reset(b.segments) diff --git a/src/m3ninx/index/segment/builder/multi_segments_builder_test.go b/src/m3ninx/index/segment/builder/multi_segments_builder_test.go index b11097ecdd..04c286f083 100644 --- a/src/m3ninx/index/segment/builder/multi_segments_builder_test.go +++ b/src/m3ninx/index/segment/builder/multi_segments_builder_test.go @@ -21,12 +21,15 @@ package builder import ( + "fmt" "sort" + "strconv" "testing" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" @@ -45,13 +48,24 @@ func TestMultiSegmentsBuilderSortedBySizeDescending(t *testing.T) { for i := 1; i <= numSegments; i++ { pi := postings.NewMockIterator(ctrl) gomock.InOrder( + pi.EXPECT().Next().Return(true), + pi.EXPECT().Current().Return(postings.ID(0)), pi.EXPECT().Next().Return(false), pi.EXPECT().Close().Return(nil), ) r := segment.NewMockReader(ctrl) - it := index.NewIDDocIterator(nil, pi) + it := index.NewIDDocIterator(r, pi) gomock.InOrder( r.EXPECT().AllDocs().Return(it, nil), + r.EXPECT().Metadata(postings.ID(0)).Return(doc.Metadata{ + ID: []byte("foo" + strconv.Itoa(i)), + Fields: []doc.Field{ + { + Name: []byte("bar"), + Value: []byte("baz"), + }, + }, + }, nil), r.EXPECT().Close().Return(nil), ) @@ -73,7 +87,18 @@ func TestMultiSegmentsBuilderSortedBySizeDescending(t *testing.T) { actualSegs = append(actualSegs, segMD.segment) } - for i := 0; i < numSegments; i++ { - require.Equal(t, segs[i].Size(), actualSegs[i].Size()) + for i, segMD := range b.segments { + actualSize := segMD.segment.Size() + require.Equal(t, segs[i].Size(), actualSize) + + if i > 0 { + // Check that offset is going up. + actualPrevOffset := b.segments[i-1].offset + actualCurrOffset := b.segments[i].offset + require.True(t, actualCurrOffset > actualPrevOffset, + fmt.Sprintf("expected=(actualCurrOffset > actualPrevOffset)\n"+ + "actual=(actualCurrOffset=%d, actualPrevOffset=%d)\n", + actualCurrOffset, actualPrevOffset)) + } } }