Skip to content

Commit

Permalink
[feat]deploy with k8s
Browse files Browse the repository at this point in the history
  • Loading branch information
tuancamtbtx committed Jul 2, 2024
1 parent 3f120b4 commit 4465369
Show file tree
Hide file tree
Showing 25 changed files with 769 additions and 1,615 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,6 @@ pip-selfcheck.json
.envrc
.direnv

.pylintrc
.pylintrc

airflow-ssh-secret.yaml
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ RUN apt-get update \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

RUN mkdir -p /lakehouse/airlake
COPY --chown=airflow:airflow . /lakehouse
RUN mkdir -p /bigdata/airlake
COPY --chown=airflow:airflow . /bigdata
COPY requirements.txt /

ENV PYTHONPATH=$PYTHONPATH:/lakehouse
ENV PYTHONPATH=$PYTHONPATH:/bigdata

USER airflow
RUN pip install --no-cache-dir -r /requirements.txt
Expand Down
11 changes: 4 additions & 7 deletions airlake/operators/bq_to_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from airflow.models import BaseOperator
from airflow.hooks.base import BaseHook
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.gcs import GCSHook as GoogleCloudStorageHook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook


from elasticsearch import Elasticsearch
from elasticsearch import helpers
from airflow.utils.context import Context
from typing import Any
import gzip
import json
import os
import io
from airlake.hooks.elasticsearch_hook import ElasticSearchHook


Expand All @@ -32,7 +29,7 @@ def __init__(
sql="",
destination_cloud_storage_uris="",
source_project_dataset_table="",
source_bucket="tiki-brain",
source_bucket="gcs-stg",
es_index="",
gcp_conn_id="bigquery_default",
gcs_bucket="",
Expand Down Expand Up @@ -75,7 +72,7 @@ def __init__(
self.size_bulk = size_bulk
# validate input from config dag

def execute(self, context):
def execute(self, context: Context) -> Any:
self.log.info(
"Sync table %s to elasticsearch", self.source_project_dataset_table
)
Expand Down
30 changes: 28 additions & 2 deletions airlake/operators/bq_to_gcs.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,28 @@
class BigqueryToGcsOperator:
pass
from typing import Any
from airflow.models import BaseOperator
from airflow.utils.context import Context

class BigqueryToGcsOperator(BaseOperator):
template_fields = (
"source_project_dataset_table",
"destination_cloud_storage_uris",
"gcs_bucket",
"gcs_prefix",
"gcp_conn_id",
"labels",
)
def __init__(
self,
source_project_dataset_table: str = None,
destination_cloud_storage_uris: str = None,
compression: str = "",
export_format: str = "CSV",
):
super(BigqueryToGcsOperator, self).__init__()
self.source_project_dataset_table = source_project_dataset_table
self.destination_cloud_storage_uris = destination_cloud_storage_uris
self.compression = compression
self.export_format = export_format

def execute(self, context: Context) -> Any:
return super().execute(context)
29 changes: 27 additions & 2 deletions airlake/operators/bq_to_sheet.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,27 @@
class BigqueryToSheetOperator:
pass
from typing import Any
from airflow.models import BaseOperator
from airflow.utils.context import Context

class BigqueryToSheetOperator(BaseOperator):
template_fields = (
"source_project_dataset_table",
"destination_cloud_storage_uris",
"gcs_bucket",
"gcs_prefix",
"gcp_conn_id",
"labels",
)
def __init__(
self,
source_project_dataset_table: str = None,
destination_cloud_storage_uris: str = None,
compression: str = "",
export_format: str = "CSV",
):
self.source_project_dataset_table = source_project_dataset_table
self.destination_cloud_storage_uris = destination_cloud_storage_uris
self.compression = compression
self.export_format = export_format

def execute(self, context: Context) -> Any:
return super().execute(context)
Loading

0 comments on commit 4465369

Please sign in to comment.