CSV 、JSON 形式のデータを Google Cloud Platform のデータウェアハウス「BigQuery」に ETL「Extract・Transform・Load」する為の Cloud Functions です。
- ETLの流れ
1. BigQuery に連携したいデータ(CSV or JSON)を Cloud Storage にアップロード
2. Cloud Storage トリガーにより Cloud Functions が発火、Cloud Dataflow にジョブを作成
3. Cloud Dataflow がアップロードされたデータをルールに従って加工し、BigQuery に保存
<バケット名>/
└── <データセット名>
├── raw/ // 連携したいデータのアップロード用ディレクトリ
│ └── <テーブル名>_yyyymmdd.csv
├── schema/
│ └── <テーブル名>_schema.json // BigQuery テーブルのスキーマ定義ファイル
└── udf/
└── <テーブル名>_udf.json // データ加工用 Javascript ファイル
1-1. Google Platform プロジェクトの作成
プロジェクト ID を取得
1-2. Cloud Storage にデータアップロード用の Bucket を作成
バケット名の取得
- データセット名: exampledataset
- テーブル名: exampletable
「2-1.」で決めた名前の通り、BigQuery にデータセット「exampledataset」を作成する
データアップロード用 Bucket に下記の通りフォルダを作成する
<bucket name>/
└── exampledataset
├── raw/
├── schema/
└── udf/
exampletable_schema.json
{
"BigQuery Schema": [
{
"description": "ユーザーID",
"name": "id",
"type": "INT64"
},
{
"description": "名前",
"name": "name",
"type": "STRING"
},
{
"description": "ふりがな",
"name": "ruby",
"type": "STRING"
},
{
"description": "メールアドレス",
"name": "email",
"type": "STRING"
},
{
"description": "性別",
"name": "sex",
"type": "STRING"
},
{
"description": "年齢",
"name": "age",
"type": "INT64"
},
{
"description": "誕生日",
"name": "birthday",
"type": "DATE"
},
{
"description": "都道府県",
"name": "prefectures",
"type": "STRING"
},
{
"description": "電話番号",
"name": "tel",
"type": "STRING"
}
]
}
exampletable_udf.json
function transformCSV(line) {
var values = line.split(',');
var obj = new Object();
// Edit the below line
obj.id = values[0];
obj.name = values[1];
obj.ruby = values[2];
obj.email = values[3];
obj.sex = values[4];
obj.age = values[5];
obj.birthday = values[6];
obj.prefectures = values[7];
obj.tel = values[8];
// Edit the above line
var jsonString = JSON.stringify(obj);
return jsonString;
}
function transformJSON(line) {
return line;
}
「2-3.」で作成したディレクトリに「exampletable_schema.json」と「exampletable_udf.json」を配置する。
<bucket name>/
└── exampledataset
├── raw/
├── schema/
│ └── exampletable_schema.json
└── udf/
└── exampletable_udf.json
handleRawDataFileUploaded/constants.js
module.exports = Object.freeze({
PROJECT_ID: '<使用するGCPプロジェクトID>',
BUCKET_NAME: '<データアップロード用バケット名>'
});
gcloud auth login
gcloud config set project <使用するGCPプロジェクトID>
cd handleRawDataFileUploaded
gcloud functions deploy handleRawDataFileUploaded --region asia-northeast1 --runtime nodejs8 --trigger-resource <データアップロード用バケット名> --trigger-event google.storage.object.finalize