Skip to content

Commit

Permalink
Added integration tests for eventhubs-streamanalytics-eventhubs (#54)
Browse files Browse the repository at this point in the history
* Create test_spec.json

* Report throughput for multiple event hubs

* Changed ASA EH output to array of events

* Update README.md
  • Loading branch information
algattik authored Oct 3, 2019
1 parent 4cf0d8b commit e7d6788
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 10 deletions.
28 changes: 21 additions & 7 deletions components/azure-event-hubs/report-throughput.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,34 @@ set -euo pipefail
REPORT_THROUGHPUT_MINUTES=${REPORT_THROUGHPUT_MINUTES:-30}

ofs=2
eh_resource=$(az eventhubs namespace show -g $RESOURCE_GROUP -n "$EVENTHUB_NAMESPACE" --query id -o tsv)
eh_resources=$(az eventhubs namespace show -g $RESOURCE_GROUP -n "$EVENTHUB_NAMESPACE" --query id -o tsv)
if [ -n "${EVENTHUB_NAMESPACE_OUT:-}" ]; then
eh_resources="$eh_resources $(az eventhubs namespace show -g $RESOURCE_GROUP -n "$EVENTHUB_NAMESPACE_OUT" --query id -o tsv)"
fi
eh_capacity=$(az eventhubs namespace show -g $RESOURCE_GROUP -n "$EVENTHUB_NAMESPACE" --query sku.capacity -o tsv)
metric_names="IncomingMessages IncomingBytes OutgoingMessages OutgoingBytes ThrottledRequests"
fmt="%28s%20s%20s%20s%20s%20s\n"
fmt="%28s%12s%20s%20s%20s%20s%20s\n"
echo "Event Hub capacity: $eh_capacity throughput units (this determines MAX VALUE below)."
echo "Reporting aggregate metrics per minute, offset by $ofs minutes, for $REPORT_THROUGHPUT_MINUTES minutes."
printf "$fmt" "" $metric_names
printf "$fmt" "" "Event Hub #" $metric_names
PER_MIN=60
MB=1000000
printf "$fmt" "" $(tr -C " " "-" <<<$metric_names)
printf "$fmt" "MAX VALUE" "$((eh_capacity*1000*PER_MIN))" "$((eh_capacity*1*MB*PER_MIN))" "$((eh_capacity*4096*PER_MIN))" "$((eh_capacity*2*MB*PER_MIN))" "-"
printf "$fmt" "" $(tr -C " " "-" <<<$metric_names)
printf "$fmt" "" "-----------" $(tr -C " " "-" <<<$metric_names)
printf "$fmt" "MAX VALUE" "" "$((eh_capacity*1000*PER_MIN))" "$((eh_capacity*1*MB*PER_MIN))" "$((eh_capacity*4096*PER_MIN))" "$((eh_capacity*2*MB*PER_MIN))" "-"
printf "$fmt" "" "-----------" $(tr -C " " "-" <<<$metric_names)

for i in $(seq 1 $REPORT_THROUGHPUT_MINUTES) ; do
printf "$fmt" "$(date +%Y-%m-%dT%H:%M:%S%z)" $(az monitor metrics list --resource "$eh_resource" --interval PT1M --metrics $(tr " " "," <<< $metric_names) --offset ${ofs}M --query 'value[].timeseries[0].data[0].floor(total)' -o tsv)
eh_number=0
date=$(date +%Y-%m-%dT%H:%M:%S%z)
for eh in $eh_resources; do
eh_number=$((eh_number+1))
metrics=$(
az monitor metrics list --resource "$eh" --interval PT1M \
--metrics $(tr " " "," <<< $metric_names) --offset ${ofs}M \
--query 'value[].timeseries[0].data[0].floor(total)' -o tsv
)
printf "$fmt" "$date" "$eh_number" $metrics
done

# sleep until next full minute. "10#" is to force base 10 if string is e.g. "09"
sleep "$((60 - 10#$(date +%S) ))"
Expand Down
31 changes: 31 additions & 0 deletions eventhubs-streamanalytics-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,37 @@ The above settings has been chosen to sustain a 1000 msg/sec stream.

Please use Metrics pane in Stream Analytics, see "Input/Output Events" for throughput and "Watermark Delay" metric to see if the job is keeping up with the input rate. You can also use Event Hub "Metrics" pane to see if there are any "Throttled Requests" and adjust the Threshold Units accordingly. "Watermark Delay" is one of the key metric that will help you to understand if Stream Analytics is keeping up with the incoming data. If delay is constantly increasing, you need to take a look at the destination to see if it can keep up with the speed or check if you need to increase SU: https://azure.microsoft.com/en-us/blog/new-metric-in-azure-stream-analytics-tracks-latency-of-your-streaming-pipeline/.


The deployment script will also report performance, by default every minute for 30 minutes:

```
***** [M] Starting METRICS reporting
Event Hub capacity: 2 throughput units (this determines MAX VALUE below).
Reporting aggregate metrics per minute, offset by 2 minutes, for 30 minutes.
Event Hub # IncomingMessages IncomingBytes OutgoingMessages OutgoingBytes ThrottledRequests
----------- ---------------- ------------- ---------------- ------------- -----------------
MAX VALUE 120000 120000000 491520 240000000 -
----------- ---------------- ------------- ---------------- ------------- -----------------
2019-10-03T07:57:00 1 0 0 0 0 0
2019-10-03T07:57:00 2 0 0 0 0 0
2019-10-03T07:58:00 1 24050 22809797 24050 22809797 0
2019-10-03T07:58:00 2 0 0 0 0 0
2019-10-03T07:59:01 1 60037 56940526 60037 56940526 0
2019-10-03T07:59:01 2 341 62393762 0 0 0
2019-10-03T08:00:00 1 60090 56989878 60090 56989878 0
2019-10-03T08:00:00 2 375 65683281 0 0 0
2019-10-03T08:01:00 1 60036 56940643 60036 56940643 0
2019-10-03T08:01:00 2 376 65708824 0 0 0
```

In column "Event Hub #", 1 refers to the Event Hub used as input to Stream
Analytics, and 2 to the Event Hub used as output. After a few minutes of
ramp-up, the metrics for Event Hub 1 will show around 60k events/min
(depending on selected event rate, here 1k events/s). As Stream Analytics
batches up messages when outputting to Event Hubs, the rate in events/minute
on Event Hub 2 will be much lower, but you can see from the Incoming Bytes
metric that the data rate on both event hubs is similar.

## Stream Analytics

Note that the solution configurations have been verified with compatibility level 1.2. The deployed Stream Analytics solution doesn't do any analytics or projection, but it just inject an additional field using a simple Javascript UDF:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@
"serialization": {
"type": "JSON",
"properties": {
"encoding": "UTF8"
"encoding": "UTF8",
"format": "Array"
}
},
"datasource": {
Expand Down
13 changes: 13 additions & 0 deletions eventhubs-streamanalytics-eventhubs/test_spec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[
{
"stage": "2",
"short": "ese1",
"steps": "CIPTMV",
"minutes": "10",
"throughput": "1",
"extra_args": [
"a",
"simple"
]
}
]
12 changes: 11 additions & 1 deletion integration-tests/azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
azureSubscription: ARMConnection
scriptLocation: 'inlineScript'
inlineScript: az vm start -g "$AGENT_VM_RESOURCE_GROUP" -n "$AGENT_VM_NAME"
displayName: 'start agent'
displayName: 'start agent VM'

- job: run_tests
dependsOn: start_agent
Expand Down Expand Up @@ -57,3 +57,13 @@ jobs:
# Provide service principal (for Azure Data Explorer RBAC setup)
addSpnToEnvironment: true
displayName: 'pytest stage 3'

- job: stop_agent
dependsOn: run_tests
steps:
- task: AzureCLI@1
inputs:
azureSubscription: ARMConnection
scriptLocation: 'inlineScript'
inlineScript: az vm deallocate -g "$AGENT_VM_RESOURCE_GROUP" -n "$AGENT_VM_NAME" --no-wait
displayName: 'stop agent VM'
1 change: 1 addition & 0 deletions streaming/databricks/job/run-databricks-job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ wait_for_run () {
sleep 10
fi
done
echo

result_state=$(jq -r ".state.result_state" <<< "$run_info")
state_message=$(jq -r ".state.state_message" <<< "$run_info")
Expand Down
4 changes: 3 additions & 1 deletion streaming/databricks/notebooks/verify-eventhubs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ val schema = StructType(
StructField("processedAt", TimestampType) ::
Nil)

val arrayOfEventsSchema = ArrayType(schema)

val stagingTable = "tempresult_" + randomUUID().toString.replace("-","_")

var query = streamingData
.select(from_json(decode($"body", "UTF-8"), schema).as("eventData"), $"*")
.select(explode(from_json(decode($"body", "UTF-8"), arrayOfEventsSchema)).as("eventData"), $"*")
// When consuming from the output of eventhubs-streamanalytics-eventhubs pipeline, 'enqueuedAt' will haven been
// set when reading from the first eventhub, and the enqueued timestamp of the second eventhub is then the 'storedAt' time
.select($"eventData.*", $"offset", $"sequenceNumber", $"publisher", $"partitionKey", $"enqueuedTime".as("storedAt"))
Expand Down

0 comments on commit e7d6788

Please sign in to comment.