Skip to content

Commit

Permalink
[add] some hook and operators
Browse files Browse the repository at this point in the history
  • Loading branch information
tuancamtbtx committed Jun 30, 2024
1 parent c703da4 commit faa9246
Show file tree
Hide file tree
Showing 21 changed files with 1,173 additions and 200 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Airflow Data Engineer
# Airflow For Data Engineer

[![Docker Pulls](https://badgen.net/docker/pulls/vantuan12345/airlake?icon=docker&label=pulls)](https://hub.docker.com/r/vantuan12345/airlake/)
[![Docker Stars](https://badgen.net/docker/stars/vantuan12345/spark-generator?icon=docker&label=stars)](https://hub.docker.com/r/vantuan12345/airlake/)
Expand All @@ -9,7 +9,15 @@
![Github last-commit](https://img.shields.io/github/last-commit/tuancamtbtx/airflow-example)

## Operator Supported:
| Operator |
|----------------------------|
| BigqueryToSheetOperator |
| BigQueryToGCSOperator |
|SFTPGetMultipleFilesOperator|

## Airflow on k8s

![Airflow k8s design](./assets/arch-diag-kubernetes.png)

## Flow Design

Expand Down Expand Up @@ -63,7 +71,7 @@ python3 -m pip install -r requirements_airflow.txt --constraint ./constraints.tx
# spark
python3 -m pip install -r ./requirements_nodeps.txt --constrain ./constraints.txt --no-deps --use-deprecated=legacy-resolver

# extra libs used in aircake
# extra libs used in airlake
python3 -m pip install -r requirements.txt --constraint ./constraints.txt --use-deprecated=legacy-resolver
```
**Setup Local Airflow**
Expand Down
29 changes: 29 additions & 0 deletions airlake/console/import_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import time
import logging
import argparse
import os

from airflow.configuration import conf


logging.getLogger("airflow").setLevel(logging.WARN)
logging.getLogger("airflow").setLevel(logging.WARN)
logging.getLogger("schedule").setLevel(logging.WARN)



def main():
parser = argparse.ArgumentParser()
parser.add_argument("--path", required=True, help="Path to local git dir")
parser.add_argument("--git_layout", default="git/dags_conf")
parser.add_argument(
"--sync_roles",
default=False,
action="store_true",
)
parser.add_argument(
"--num_process", default=8, type=int, help="NumProcess that parse file!"
)
args = parser.parse_args()
if __name__ == "__main__":
main()
Empty file added airlake/factory/dagbuilder.py
Empty file.
39 changes: 39 additions & 0 deletions airlake/factory/render.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
Render the config into the read to run dag
"""
from typing import Dict, Any
from datetime import datetime

import jinja2


DAG_TEMPLATE = jinja2.Template(
"""
'''
Generated by Airflow Datalake! Do not edit!
Timestamp {{ timestamp }}
If your have problem with google cloud account permissions,
make sure you include your team email.
Check detail at https://docs.google.com/spreadsheets/d/1Lsal8SDy8Np_8BNbgeou-rEzZfKv5qXIgHeGaLxNm5Q
'''
from airflow import DAG
from airlake.dynamic.render import render
conf = {{ json_conf }}
name = '{{ name }}'
render(globals(), name, conf)"""
)




def dump_to_py(name: str, conf: Dict[str, Any]):
"""Dumps configs from dict into python dag file"""
return DAG_TEMPLATE.render(
name=name,
json_conf=conf,
timestamp=datetime.now().astimezone().isoformat(),
)
120 changes: 120 additions & 0 deletions airlake/hooks/elasticsearch_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from airflow.providers.http.hooks.http import HttpHook
import time


class ElasticSearchHook(HttpHook):
def __init__(self, elasticsearch_conn_id="", index=""):
self.elasticsearch_conn_id = elasticsearch_conn_id
self.index = index
conn = self.get_connection(elasticsearch_conn_id)
self.host = conn.host
self.port = conn.port
self.user = conn.login
self.password = conn.password

def get_client(self):
es_client = Elasticsearch(
[self.host],
scheme="https",
port=self.port,
http_auth=(self.user, self.password),
use_ssl=False
)
return es_client

def parse_data(self, data, es_index):
actions = []
for obj in data:
if "id" in obj:
action = {
"_index": es_index,
"_id": obj['id'],
"_source": obj,
}
actions.append(action)
return actions

def build_schema_to_elasticseach_schema(self, bq_schema, is_update):
TYPE_MAPPING = {
"INTEGER": "long",
"INT64": "long",
"FLOAT": "double",
"FLOAT64": "double",
"NUMERIC": "double",
"STRING": "text"
}
mapping_template = {
"properties": {
}
}
mapping_create_template = {
"mappings": {
"properties":{

}
}
}
fields = bq_schema.get("fields")
if is_update == True:
for obj in fields:
name = obj.get("name")
dtype = obj.get("type")
if dtype.upper() in TYPE_MAPPING:
props_type = {}
props_type["type"] = TYPE_MAPPING[dtype]
mapping_template["properties"][name] = props_type
return mapping_template
else:
for obj in fields:
name = obj.get("name")
dtype = obj.get("type")
if dtype.upper() in TYPE_MAPPING:
props_type = {}
props_type["type"] = TYPE_MAPPING[dtype]
mapping_create_template['mappings']["properties"][name] = props_type
return mapping_create_template

def bulk(self, data, es_index):
es_client = self.get_client()
actions = self.parse_data(data, es_index)
helpers.bulk(es_client, actions)

def remove_index(self, es_index):
es_client = self.get_client()
self.log.info("remove index: %s", es_index)
es_client.indices.delete(index=es_index, ignore=[400, 404])

def alias_index(self,es_index, alias_index):
es_client = self.get_client()
if es_client.indices.exists_alias(alias_index) == False:
self.log.info("set alias index is not exist: %s", alias_index)
resp = es_client.indices.put_alias(index=es_index, name=alias_index, ignore=[400,404])
if 'status' in resp:
return False
else:
self.log.info("set alias index is already exist: %s", alias_index)
self.remove_index(alias_index)
resp = es_client.indices.put_alias(index=es_index, name=alias_index, ignore=[400,404])
if 'status' in resp:
return False
return True

def generate_alias_index_by_time(self, es_index):
ts = time.time()
return es_index + '_' + str(ts)

def get_key_exist(self,keys, prefix):
key_arr = []
for key in keys:
if key.startswith(prefix):
key_arr.append(key)
return key_arr

def get_alias_by_prefix(self,prefix_index):
es_client = self.get_client()
alias_dict = es_client.indices.get_alias("*")
keys = alias_dict.keys()
return self.get_key_exist(keys,prefix_index)

110 changes: 110 additions & 0 deletions airlake/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from typing import Iterable
import logging

import pandas as pd
from airflow.providers.google.common.hooks.base_google import (
GoogleBaseHook as GoogleCloudBaseHook,
)
from airflow.providers.google.cloud.hooks.bigquery import split_tablename
from google.cloud import bigquery, bigquery_storage
from google.cloud.bigquery import job
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import BigQueryReadClient


class BigQueryNativeHookMixing:
def get_client(self) -> bigquery.Client:
credentials = self._get_credentials()
return bigquery.Client(credentials=credentials, project=credentials.project_id)

@property
def bqstorage_client(self):
return bigquery_storage.BigQueryReadClient(
credentials=self._get_credentials(),
)

def get_bigquery_storage_client_v1(self) -> BigQueryReadClient:
return BigQueryReadClient(credentials=self._get_credentials())

def pandas_df(self, sql: str, labels: dict = None, *args, **kwargs) -> pd.DataFrame:
job: bigquery.QueryJob = self.job(sql)
return job.result().to_dataframe(bqstorage_client=self.bqstorage_client)

def stream(self, sql: str) -> Iterable[dict]:
return self.job(sql).result()

def total_rows(self, sql: str) -> int:
r = self.job(sql).result()
return r.total_rows

def job(self, sql: str, labels: dict = None) -> bigquery.QueryJob:
logging.info("Executing `%s`", sql)
job_config = bigquery.QueryJobConfig()
job_config.use_legacy_sql = False
job_config.use_query_cache = False
job_config.labels = labels if labels is not None else {}
query_job = self.get_client().query(sql, job_config=job_config)
logging.info(
"Got job id `%s`. Destination table `%s`",
query_job.job_id,
query_job.destination,
)
return query_job

def run_extract(
self,
source_project_dataset_table,
destination_cloud_storage_uris,
compression="NONE",
export_format="CSV",
field_delimiter=",",
print_header=True,
labels=None,
):
client = self.get_client()
source_project, source_dataset, source_table = split_tablename(
table_input=source_project_dataset_table,
default_project_id=client.project,
var_name="source_project_dataset_table",
)
dataset_ref = client.dataset(source_dataset, project=source_project)
table_ref = dataset_ref.table(source_table)

logging.info("Exporting to %s", destination_cloud_storage_uris)
conf = job.ExtractJobConfig()
conf.destination_format = export_format
conf.field_delimiter = field_delimiter
conf.print_header = print_header
conf.compression = compression

extract_job = client.extract_table(
table_ref,
destination_cloud_storage_uris,
job_config=conf,
)
extract_job.result() # Wai

def create_read_streams(
self,
project,
dataset,
table,
data_format,
read_options,
max_streams=0,
):
requested_session = types.ReadSession(
table=f'projects/{project}/datasets/{dataset}/tables/{table}',
data_format=data_format,
read_options=read_options,
)

return self.get_bigquery_storage_client_v1().create_read_session(
parent=f'projects/{project}',
read_session=requested_session,
max_stream_count=max_streams,
)


class BigQueryNativeHook(GoogleCloudBaseHook, BigQueryNativeHookMixing):
pass
Loading

0 comments on commit faa9246

Please sign in to comment.