Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Look into columnar data store #521

Closed
jflasher opened this issue Sep 24, 2018 · 6 comments
Closed

Look into columnar data store #521

jflasher opened this issue Sep 24, 2018 · 6 comments

Comments

@jflasher
Copy link
Contributor

Building upon #455 we likely want to store the data (or some version of it) in http://parquet.apache.org/ or https://orc.apache.org/. This should great decrease costs of large queries across the data using tools like Athena as well as increase performance. We'd need to figure out the right format as well as partition schemes and aggregation levels (bin these up by fetch, hour, day, week, etc?).

@jflasher
Copy link
Contributor Author

jflasher commented Jan 15, 2020

Overview and Results

The goal of this work was to see how storing the data on S3 in different formats would change the performance of the queries run via Amazon Athena. This is the main service we use to do large-scale data aggregations (as well as allow full archive queries), so optimizing this can lead to great gains.

I created a number of sample queries and table definitions (both described below). You can see the results summarized in the table below, some disucssion below that.

Table locations latest global avg parameter avg date bound
fetches_realtime 1:07, 246.36 4:36, 492.68 0:55, 246.35 0:53, 246.35 0:49, 246.37
fetches_realtime_gzipped 0:51, 8.51 3:54, 17.02 0:40, 8.51 0:37, 8.51 0:32, 8.51
fetches_realtime_parquet 0:35, 2.33 3:00, 3.36 0:14, 1.62 0:08, 0.22 0:05, 0.25
fetches_realtime_parquet_snappy 0:35, 2.33 3:02, 3.75 0:13, 1.98 0:06, 0.27 0:05, 0.37
fetches_realtime_parquet_nobuckets 0:15, 3.54 1:09, 4.67 0:05, 1.55 0:03, 0.23 0:05, 0.28
fetches_realtime_parquet_snappy_nobuckets 0:13, 5.23 1:10, 6.93 0:06, 1.85 0:06, 0.28 0:03, 0.40
fetches_realtime_orc 0:19, 3.27 0:49, 6.19 0:10, 2.93 0:08, 0.27 0:05, 0.29
fetches_realtime_orc_snappy 0:16, 4.27 0:39, 7.25 0:14, 2.91 0:04, 0.38 0:04, 0.27

Format for values in table are Time (minutes), Data Scanned (GB)

fetches_realtime and fetches_realtime_gzipped acted as expected. Since there are no partitions or buckets, it's always scanning the same amount of data, no matter the query. Of the Parquet formatted tables, there does not seem to be a big difference between using SNAPPY or GZIP compression (based on https://docs.aws.amazon.com/athena/latest/ug/ctas-examples.html#ctas-example-compression it looks like GZIP is the default, if that's not true, there is no difference in the _snappy tables above). SNAPPY compression with no buckets looks to always perform the worst. Parquet, with date_local buckets and partitioned on parameter, using the default compression (which should be GZIP) seems to generally perform the best.

Curiously, the last query which was date bound seems to scan more data. Because we're using date_local as a bucket, I would have thought it would have only scanned the buckets it needed.

But the big thing here is that, just for those two big aggregations, we'd see a ~75% cost reduction for Athena.

EDIT: I just added runs for ORC (GZIP and SNAPPY compressions). At a quick scan, ORC looks to be much more performant, but at the cost of reading a bit more data.

Queries

Locations Aggregation

SELECT UPPER(country) as country, round(coordinates.longitude, 5) AS lon, round(coordinates.latitude, 5) AS lat, min(date.local) AS firstUpdated, max(date.local) AS lastUpdated, sourcename as sourceName, sourcetype as sourceType, city, location FROM TABLE_NAME WHERE (coordinates.longitude BETWEEN -180 AND 180) AND (coordinates.latitude BETWEEN -90 AND 90) GROUP BY UPPER(country), round(coordinates.longitude,5), round(coordinates.latitude,5), sourcename, sourcetype, city, location ORDER BY country, firstUpdated

Latest Aggregation

select * from TABLE_NAME as db join (select max(from_iso8601_timestamp(date.utc)) max_date, location, city, parameter from TABLE_NAME group by location, city, parameter) as temp on db.location = temp.location and db.city = temp.city and db.parameter = temp.parameter and from_iso8601_timestamp(db.date.utc) = max_date

Global Averages Example

select count(*), avg(value), location, city, country from TABLE_NAME group by location, city, country;

Single Parameter Averages Example

select count(*), avg(value), location, city, country from TABLE_NAME where parameter = 'pm25' group by location, city, country;

Date-bound Example

select count(*), avg(value), location, city, country from TABLE_NAME where parameter = 'pm25' and date.local between '2019-01-01' and '2019-12-31' group by location, city, country;

Table Creation

Parquet (GZIP)

(Run time: 3 minutes 27 seconds, Data scanned: 246.34 GB)

CREATE TABLE fetches_realtime_parquet_nobuckets
WITH (
      format = 'PARQUET',
      partitioned_by = ARRAY['parameter'],
      bucketed_by = ARRAY['date_local'],
      bucket_count = 10)
AS SELECT date.utc as date_utc, date.local as date_local, location, country, value, unit, city, attribution, averagingperiod, coordinates, sourcename, sourcetype, mobile, parameter
FROM fetches_realtime;

Parquet (GZIP, 10 date_local buckets)

(Run time: 9 minutes 7 seconds, Data scanned: 246.34 GB)

CREATE TABLE fetches_realtime_parquet
WITH (
      format = 'PARQUET',
      partitioned_by = ARRAY['parameter'],
      bucketed_by = ARRAY['date_local'],
      bucket_count = 10)
AS SELECT date.utc as date_utc, date.local as date_local, location, country, value, unit, city, attribution, averagingperiod, coordinates, sourcename, sourcetype, mobile, parameter
FROM fetches_realtime;

Parquet (SNAPPY, 10 date_local buckets)

(Run time: 8 minutes 37 seconds, Data scanned: 246.36 GB)

CREATE TABLE fetches_realtime_parquet_snappy
WITH (
      format = 'PARQUET',
      parquet_compression = 'SNAPPY',
      partitioned_by = ARRAY['parameter'],
      bucketed_by = ARRAY['date_local'],
      bucket_count = 10)
AS SELECT date.utc as date_utc, date.local as date_local, location, country, value, unit, city, attribution, averagingperiod, coordinates, sourcename, sourcetype, mobile, parameter
FROM fetches_realtime;

Parquet (SNAPPY)

(Run time: 3 minutes 11 seconds, Data scanned: 246.36 GB)

CREATE TABLE fetches_realtime_parquet_snappy_nobuckets
WITH (
      format = 'PARQUET',
      parquet_compression = 'SNAPPY',
      partitioned_by = ARRAY['parameter']
     )
AS SELECT date.utc as date_utc, date.local as date_local, location, country, value, unit, city, attribution, averagingperiod, coordinates, sourcename, sourcetype, mobile, parameter
FROM fetches_realtime;

ORC (GZIP, 10 date_local buckets)

(Run time: 7 minutes 35 seconds, Data scanned: 272.4 GB)

CREATE TABLE fetches_realtime_orc
WITH (
      format = 'ORC',
      partitioned_by = ARRAY['parameter'],
      bucketed_by = ARRAY['date_local'],
      bucket_count = 10)
AS SELECT date.utc as date_utc, date.local as date_local, location, country, value, unit, city, attribution, averagingperiod, coordinates, sourcename, sourcetype, mobile, parameter
FROM fetches_realtime;

ORC (SNAPPY, , 10 date_local buckets)

(Run time: 7 minutes 30 seconds, Data scanned: 272.4 GB)

CREATE TABLE fetches_realtime_orc_snappy
WITH (
      format = 'ORC',
      orc_compression = 'SNAPPY',
      partitioned_by = ARRAY['parameter'],
      bucketed_by = ARRAY['date_local'],
      bucket_count = 10)
AS SELECT date.utc as date_utc, date.local as date_local, location, country, value, unit, city, attribution, averagingperiod, coordinates, sourcename, sourcetype, mobile, parameter
FROM fetches_realtime;

@jflasher
Copy link
Contributor Author

Updated the table to include ORC (GZIP and SNAPPY compressions).

@sruti
Copy link
Contributor

sruti commented Mar 18, 2020

Added some color-coding to more easily compare between different format and compression options:
Screen Shot 2020-03-16 at 7 42 53 PM

  • Because the cost is calculated by amount of data read ($5/1TB), I think it makes sense to choose parquet over orc because less data is read and the run time is not too different
  • For both orc and parquet, gzip compression seems to be the better option over snappy (which is great because we already do that anyway)
  • Buckets seem to reduce the amount of data read, but Athena doesn't allow for buckets when using 'INSERT_INTO' (which we plan to use for adding in new data, outlined in the comments below), the performance cost isn't substantial, and some queries seem to be faster without buckets
  • Not sure if data is currently partitioned by parameter, but makes sense to do that

Parquet with GZIP compression partitioned by parameter and no buckets seems to be the way to go.

@sruti
Copy link
Contributor

sruti commented Mar 18, 2020

INSERT_INTO investigation

  1. Created an s3 bucket with a single realtime data gzipped file (original size ~6MB, gzipped ~200KB)
  2. Ran CTAS query on it, 1.1 seconds, 0KB:
CREATE EXTERNAL TABLE `fetches_newdata`(
  `date` struct<utc:string,local:string> COMMENT 'from deserializer',
  `parameter` string COMMENT 'from deserializer',
  `location` string COMMENT 'from deserializer',
  `value` float COMMENT 'from deserializer',
  `unit` string COMMENT 'from deserializer',
  `city` string COMMENT 'from deserializer',
  `attribution` array<struct<name:string,url:string>> COMMENT 'from deserializer',
  `averagingperiod` struct<unit:string,value:float> COMMENT 'from deserializer',
  `coordinates` struct<latitude:float,longitude:float> COMMENT 'from deserializer',
  `country` string COMMENT 'from deserializer',
  `sourcename` string COMMENT 'from deserializer',
  `sourcetype` string COMMENT 'from deserializer',
  `mobile` string COMMENT 'from deserializer')
ROW FORMAT SERDE
  'org.openx.data.jsonserde.JsonSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://smtest-temp-newfetchdata/'

(pretty much the same as openaq.ddl)

  1. Ran an INSERT_INTO query, 7.8 seconds, 194.49KB:
INSERT INTO fetches_realtime_parquet
SELECT date.utc as date_utc, date.local as date_local, location, country, value, unit, city, attribution, averagingperiod, coordinates, sourcename, sourcetype, mobile, parameter
FROM fetches_newdata;

You don't have to specify the format because it automatically uses the format/partitions the table being inserted into has

Seems like this is a viable option for adding in data as it comes in

@sruti
Copy link
Contributor

sruti commented Mar 18, 2020

As for how to switch the data to the new format, here's a proposed plan that won't significantly change our current processes:
2020-03-18 10-14_page_1

  • A new lambda function to run a CTAS query (like the one @jflasher has above) to create a table in parquet format with partitions by parameter from the existing data. Repeat this process weekly.
  • Modify existing lambda function that currently runs gzip compression after every fetch to additionally store the data in a temporary bucket
  • A new lambda function uses INSERT_INTO on the temporary bucket to add the data into the existing table, then deletes the file from the temporary bucket

@jflasher
Copy link
Contributor Author

I think this looks like a great plan! Do you think we could just use the existing data bucket and have a realtime-parquet prefix and even the realtime-temp prefix in there as well? For the Lambda IAM role that does the Insert Into and Delete, we just need to make sure it only has permission to delete from the realtime-temp prefix though so we don't accidentally delete the wrong data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants