diff --git a/.vscode/arduino.json b/.vscode/arduino.json new file mode 100644 index 0000000..ef52cd9 --- /dev/null +++ b/.vscode/arduino.json @@ -0,0 +1,3 @@ +{ + "port": "COM5" +} \ No newline at end of file diff --git a/AzureSetup.md b/AzureSetup.md index 54341a2..01e3217 100644 --- a/AzureSetup.md +++ b/AzureSetup.md @@ -11,4 +11,4 @@ The following operations can all be done from the Azure Portal. I'm showing the 3. Create an IoT Device with the command `az iot hub device-identity create -g RES_GROUP_NAME -n IOTHUB_NAME --device-id NAME_YOUR_DEVICE`. This creates an IoT device configuration on the IoT Hub, which you'll use to push readings to it. By default the authentication method is Shared Access Key. -4. Copy the IoT device's Connection String, with the command: `az iot hub device-identity show-connection-string -g RES_GROUP_NAME -n IOTHUB_NAME --device-id NAME_YOUR_DEVICE`. Copy the string starting with "Hostname=..." to a text editor for later use. \ No newline at end of file +4. Copy the IoT device's Connection String, with the command: `az iot hub device-identity show-connection-string -g RES_GROUP_NAME -n IOTHUB_NAME --device-id NAME_YOUR_DEVICE`. Copy the string starting with "Hostname=..." to a text editor for later use. diff --git a/BME680Setup.md b/BME680Setup.md index 93334b4..0945b55 100644 --- a/BME680Setup.md +++ b/BME680Setup.md @@ -48,4 +48,25 @@ This is a Python client library to talk with Azure IoT Hub. There's also a C ver The SDK's documentation is here: https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-sdks (scroll down to the "Azure IoT Hub device SDK for Python" section). -And that's it. You now have a working Zero, the sensor is working, and all the requirements are installed. The next step includes the remaining steps to [get the Zero to do readings in an interesting format and then upload them to Azure](DeviceUploadData.md). \ No newline at end of file +And that's it. You now have a working Zero, the sensor is working, and all the requirements are installed. The next step includes the remaining steps to [get the Zero to do readings in an interesting format and then upload them to Azure](DeviceUploadData.md). + + +## Other links + +- Bosch forum - Explanation on static IAQ, breath VOC and CO2 equivalent - https://community.bosch-sensortec.com/t5/MEMS-sensors-forum/Explanation-on-static-IAQ-breath-VOC-and-CO2-equivalent/td-p/7413 + + - Static IAQ: The main difference between IAQ and static IAQ (sIAQ) relies in the scaling factor calculated based on the recent sensor history. The sIAQ output has been optimized for stationary applications (e.g. fixed indoor devices) whereas the IAQ output is ideal for mobile application (e.g. carry-on devices). + - bVOCeq estimate: The breath VOC equivalent output (bVOCeq) estimates the total VOC concentration [ppm] in the environment. It is calculated based on the sIAQ output and derived from lab tests. + - CO2eq estimate: Estimates a CO2-equivalent (CO2eq) concentration [ppm] in the environment. It is also calculated based on the sIAQ output and derived from VOC measurements and correlation from field studies. + - Since bVOCeq and CO2eq are based on the sIAQ output, they are expected to perform optimally in stationnary applications where the main source of VOCs in the environment comes from human activity (e.g. in a bedroom). + + +- Bosch forum - BME680: IAQ accuracy definition - https://community.bosch-sensortec.com/t5/MEMS-sensors-forum/BME680-IAQ-accuracy-definition/td-p/5920 + + - IAQ Accuracy=0 could either mean: BSEC was just started, and the sensor is stabilizing (this lasts normally 5min in LP mode or 20min in ULP mode), +there was a timing violation (i.e. BSEC was called too early or too late), which should be indicated by a warning/error flag by BSEC, + - IAQ Accuracy=1 means the background history of BSEC is uncertain. This typically means the gas sensor data was too stable for BSEC to clearly define its references, + - IAQ Accuracy=2 means BSEC found a new calibration data and is currently calibrating, + - IAQ Accuracy=3 means BSEC calibrated successfully. + +- BlueDot - Air Quality Measurement (IAQ) with the BME680 - https://www.bluedot.space/tutorials/air-quality-measurement-with-the-bme680/ \ No newline at end of file diff --git a/README.md b/README.md index 5981e9a..fb1dff6 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ The process is divided in these steps: 3. **Azure**: data received in an Azure IoT Hub is read by Azure Stream Analytics 4. **Azure**: an Azure Stream Analytics job pushes the data into an Azure SQL Database + - Raw data is stored as "Bronze" + - Further processing is done to aggregate and remove peaks, for Silver data 5. **PowerPI** is used to do visualizations on the data read from the Azure SQL Database. This is the big picture overview as of writing. I've noticed from data I've already gathered that there are both "bad readings" (reported by the sensor) that have to be ignored, and anomalies that have to either removed from the data or smoothed out. I'll add steps above (probably in Stream Analytics) to handle these. @@ -41,6 +43,8 @@ To do this, follow steps [4 - Save and upload readings](DeviceUploadData.md). ## Process the incoming data in Azure -And finally, what I'm doing in Azure, of course :) +Desired processing steps over the incoming data stream are essentially filtering and aggregation. I store the "bronze" data (i.e. data as is received without any changes) in a table, and then do filtering/aggregation and store this in another table. I'm aggregating/averaging every 30 seconds but can have more aggregations by simply adding its configuration. For this processing I'm using Azure Stream Analytics, and [the detail of what I'm doing is here](StreamProcessing.md). -**TBD** \ No newline at end of file +## Links + +- BlueDot - Air Quality Measurement (IAQ) with the BME680 - https://www.bluedot.space/tutorials/air-quality-measurement-with-the-bme680/ \ No newline at end of file diff --git a/StreamProcessing.md b/StreamProcessing.md new file mode 100644 index 0000000..c847e60 --- /dev/null +++ b/StreamProcessing.md @@ -0,0 +1,162 @@ +# Stream Processing of Incoming sensor telemetry + +## Setup - creating output tables + +I'm using [Azure SQL Database](https://docs.microsoft.com/en-us/azure/azure-sql/) (tier S2 to S4) to store this data. + +1. Bronze table: `RawSensorReadings` - unchanged data, stored as is received without any processing: + +```sql +SET ANSI_NULLS ON +GO +SET QUOTED_IDENTIFIER ON +GO +CREATE TABLE [dbo].[RawSensorReadings]( + [DeviceId] [varchar](255) NOT NULL, + [ReadingTime] [datetime] NOT NULL, + [IAQ] [float] NOT NULL, + [IAQState] [tinyint] NOT NULL, + [Temperature] [float] NOT NULL, + [RelativeHumidity] [float] NOT NULL, + [Pressure] [float] NOT NULL, + [Gas] [int] NOT NULL, + [SensorStatus] [smallint] NOT NULL, + [StaticIAQ] [float] NOT NULL, + [eCO2] [real] NOT NULL, + [bVOCe] [real] NOT NULL +) ON [PRIMARY] +GO +``` + +2. Silver table: `SensorReadingsFilteredAveraged` - with some anomalies filtered out and 30 second averaging in a tumbling window: + +```sql +SET ANSI_NULLS ON +GO +SET QUOTED_IDENTIFIER ON +GO +CREATE TABLE [dbo].[SensorReadingsFilteredAveraged]( + [DeviceId] [varchar](255) NOT NULL, + [ReadingTime] [datetime] NOT NULL, + [IAQ] [float] NOT NULL, + [IAQState] [tinyint] NOT NULL, + [Temperature] [float] NOT NULL, + [RelativeHumidity] [float] NOT NULL, + [Pressure] [float] NOT NULL, + [Gas] [int] NOT NULL, + [StaticIAQ] [float] NOT NULL, + [eCO2] [real] NOT NULL, + [bVOCe] [real] NOT NULL +) ON [PRIMARY] +GO +CREATE NONCLUSTERED INDEX [IDX_SensorReadingsFilteredAveraged_ReadingTime] ON [dbo].[SensorReadingsFilteredAveraged] +( + [ReadingTime] ASC +)WITH (STATISTICS_NORECOMPUTE = OFF, DROP_EXISTING = OFF, ONLINE = OFF) ON [PRIMARY] +GO +``` + +Note the differences between the tables: + +- The silver table doesn't have the SensorStatus (because all the Sensor Statuses signifiying errors have been filtered out); +- The silver table has an index on the `ReadingTime`, becasue these 2 bytes effectively serve has an Id. Remember that for a single device (which is my case) I have 1 new record every 30 seconds (24*60*2 = 2880/day). +- The values in the silver table are all averages over 30 second intervals, not direct readings from the sensor. + +## Stream processing + +The stream processing is done in [Azure Stream Analytics](https://docs.microsoft.com/en-us/azure/stream-analytics/) with an ASA Job: + +![stream analytics job overview](asa-job.png) + +Some relevant configuration options for the job are: + +- **Error ordering**: I accept late events up to 10 seconds, out of order events up to 10 seconds, and handle other events with "Adjust" (instead of drop) +- **Error policy** (when writing to output): Retry +- **Compatibility level**: 1.2 (which should be the default, instead of 1.1) + + +The job has this has inputs/outputs: + +- `iothub-sensor-readings` - source of data, my IoT Hub. I configured a consumer group just for my source sensor, and have it using the Messaging endpoint and the CSV format (as generated by the C code in my Raspberry); +- `bronze-readings` - data destination, simply refers to the `RawSensorReadings` table; +- `silver-readings` - data destination, simply refers to the `SensorReadingsFilteredAveraged` table; + +The job that processes the data is the following: + +```sql +WITH + ConvertedReadings AS ( + SELECT + deviceid as DeviceId, CAST(readingtime AS DATETIME) as ReadingTime, CAST(iaqstate AS float) as IAQ, CAST(iaqaccuracy AS bigint) as IAQState, + CAST(temperature AS float) as Temperature, CAST(humidity as FLOAT) as RelativeHumidity, + CAST(pressure as Float) AS Pressure, CAST(gas as BIGINT) as Gas, + CAST(status AS BIGINT) as SensorStatus, + CAST(staticiaq AS FLOAT) as StaticIAQ, CAST(eco2ppm AS FLOAT) as eCO2, CAST(bvocppm AS FLOAT) as bVOCe, + PartitionId -- required to be here and throughout query because of ASA + FROM [iothub-sensor-readings] TIMESTAMP BY CAST(readingtime AS DATETIME) OVER DeviceId, PartitionId + PARTITION BY PartitionId + ), + FilteredReadings AS ( + SELECT DeviceId, ReadingTime, IAQ, IAQState, Temperature, RelativeHumidity, Pressure, Gas, StaticIAQ, eCO2, bVOCe, PartitionId -- removing SensorStatus + FROM ConvertedReadings + PARTITION BY PartitionId + WHERE SensorStatus = 0 AND Pressure > 800 AND Temperature BETWEEN 12 AND 26 -- filters out most common cases of wrong values, w/ exception of RelativeHumidity/Gas variations + ) + +-- Bronze data +SELECT DeviceId, ReadingTime, IAQ, IAQState, Temperature, RelativeHumidity, Pressure, Gas, SensorStatus, StaticIAQ, eCO2, bVOCe -- leaving out PartitionId +INTO [bronze-readings] +FROM ConvertedReadings +PARTITION BY PartitionId + +-- Silver data: filtered and averaged and rounded +SELECT DeviceId, MAX(ReadingTime) as ReadingTime, ROUND(AVG(IAQ),2) as IAQ, cast(AVG(IAQSTATE) as BIGINT) AS IAQState, ROUND(AVG(Temperature),2) as Temperature, ROUND(AVG(RelativeHumidity),2) as RelativeHumidity, ROUND(AVG(Pressure),2) as Pressure, + cast(AVG(Gas) as BIGINT) as Gas, ROUND(AVG(StaticIAQ),2) as StaticIAQ, ROUND(AVG(eCO2),10) as eCO2, ROUND(AVG(bVOCe),10) as bVOCe -- leaving out PartitionId +INTO [silver-readings] +FROM FilteredReadings +PARTITION BY PartitionId +GROUP BY TumblingWindow(second, 30), DeviceId, PartitionId +``` + +Some notes on the above: + +- I have two sub-queries on the `WITH` section. + - The *first* does type conversation and specifies `TIMESTAMP BY`, because we want to use the `ReadingTime` as the message's timestamp, not the arrival time at the IoTHub (remember they are sent up in bulk by the `scoop_up_data.py` script every few seconds). You'll also notice the `PARTITION BY`, which is mandatory when you use `TIMESTAMP BY/OVER` -- this is also why you'll see the `PartitionId` present in the different queries ([see this for more information](https://docs.microsoft.com/en-us/stream-analytics-query/timestamp-by-azure-stream-analytics#limitations-and-restrictions)). + - The *second* subquery does simple filtering of anomalous data based on the sensor status, pressure and temperature. These filters are pretty raw but built based on anomalies I've observed, and I'm simply dropping the entire row if one of these comes in. There are smarter ways to do this, [such as this using Machine Learning](https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-machine-learning-anomaly-detection). The `RelativeHumidity` occasionaly has several spikes and dips when things get unstable, averaging could be the best way to handle those (future work? :-). +- After the sub-queries, I then write the result of the first sub-query to the Bronze table (I list out all the fields because I explicitly need to leave out `PartitionId`) +- Write the result of the second sub-query to the Silver table. Again I filter out the `PartitionId` column, but more importantly apply a Tumbling window of 30 seconds (which should hold 10 readings -- the sensor writes one every 3 seconds) and average the values over that period - rouding/casting as necessary. + +And that's it! After starting the job the readings started flowing into my two tables, and as a curiosity my job diagram in Stream Analytics shows: + +![Stream Analytics Job Diagram](asa-job-diagram.png) + +## Word of caution + +Be careful with the options you set for *Event Ordering*. I'm using this: + +![ASA Job - Event Ordering](asa-event-ordering.png) + +I actually changed from Adjust to Drop for part of the development, and noticed that even increasing the period for the Out of Order events, I'd still have nothing in the output. This was due to my mis-understanding of what was happening here and a mixup between the timestamp in the message and the `ReadingTime` column inside the received CSV. You can see more information [in the documentation page](https://docs.microsoft.com/en-us/azure/stream-analytics/event-ordering). But if you see no errors but also no outputs, check these options. + +Also don't forget to add the `PARTITION BY` clause or you'll have an error such as this: `The streaming job failed: Stream Analytics job has validation errors: If TIMESTAMP BY OVER clause is used with partitioned streams, 'PartitionId' must be used as partition key. Please use \"PARTITION BY PartitionId\" clause for the input 'iothub-bme680-readings'`. You only get this when you start the job, it's not detected in the editor (unlike other errors). + +## Further links + +The following documentation links give additional helpful information: + +- Stream data as input into Stream Analytics - https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-define-inputs +- Understand time handling in Azure Stream Analytics - https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-time-handling +- Introduction to Stream Analytics windowing functions - https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-window-functions +- Configuring event ordering policies for Azure Stream Analytics: https://docs.microsoft.com/en-us/azure/stream-analytics/event-ordering +- Common query patterns in Azure Stream Analytics - https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-stream-analytics-query-patterns#send-data-to-multiple-outputs +- New metric in Azure Stream Analytics tracks latency of your streaming pipeline - https://azure.microsoft.com/en-us/blog/new-metric-in-azure-stream-analytics-tracks-latency-of-your-streaming-pipeline/ +- Anomaly detection in Azure Stream Analytics - https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-machine-learning-anomaly-detection +- TIMESTAMP BY (Azure Stream Analytics) - https://docs.microsoft.com/en-us/stream-analytics-query/timestamp-by-azure-stream-analytics + +Some other links were helpful: + +- https://www.purplefrogsystems.com/paul/2016/09/azure-stream-analytics-windowing-queries/ + +- https://stackoverflow.com/questions/60460530/azure-stream-analytics-query-with-tumbling-window-using-by-timestamp-works-fine + +- https://stackoverflow.com/questions/38084352/in-azure-stream-analytics-query-i-am-getting-an-error-when-using-timestamp-by diff --git a/asa-event-ordering.png b/asa-event-ordering.png new file mode 100644 index 0000000..81a9453 Binary files /dev/null and b/asa-event-ordering.png differ diff --git a/asa-job-diagram.png b/asa-job-diagram.png new file mode 100644 index 0000000..4dded98 Binary files /dev/null and b/asa-job-diagram.png differ diff --git a/asa-job.png b/asa-job.png new file mode 100644 index 0000000..63c32ab Binary files /dev/null and b/asa-job.png differ