From 465d1122843e7e4151490559fc2a3d460500d072 Mon Sep 17 00:00:00 2001 From: Pierlou Date: Tue, 12 Mar 2024 09:32:19 +0100 Subject: [PATCH] feat: remove custom operators --- plugins/operators/.gitignore | 2 - plugins/operators/clean_folder.py | 32 ----- plugins/operators/elastic_create_index.py | 84 ------------- plugins/operators/elastic_fill_index.py | 134 -------------------- plugins/operators/mail_datagouv.py | 90 -------------- plugins/operators/mattermost.py | 45 ------- plugins/operators/papermill_minio.py | 143 ---------------------- plugins/operators/papermill_only.py | 80 ------------ plugins/operators/python_minio.py | 80 ------------ plugins/operators/simple_papermill.py | 80 ------------ 10 files changed, 770 deletions(-) delete mode 100755 plugins/operators/.gitignore delete mode 100755 plugins/operators/clean_folder.py delete mode 100755 plugins/operators/elastic_create_index.py delete mode 100755 plugins/operators/elastic_fill_index.py delete mode 100755 plugins/operators/mail_datagouv.py delete mode 100755 plugins/operators/mattermost.py delete mode 100755 plugins/operators/papermill_minio.py delete mode 100755 plugins/operators/papermill_only.py delete mode 100755 plugins/operators/python_minio.py delete mode 100755 plugins/operators/simple_papermill.py diff --git a/plugins/operators/.gitignore b/plugins/operators/.gitignore deleted file mode 100755 index a1b9cbb..0000000 --- a/plugins/operators/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -.env -__pycache__/ \ No newline at end of file diff --git a/plugins/operators/clean_folder.py b/plugins/operators/clean_folder.py deleted file mode 100755 index e3a9270..0000000 --- a/plugins/operators/clean_folder.py +++ /dev/null @@ -1,32 +0,0 @@ -import os -import shutil -from typing import Optional - -from airflow.models import BaseOperator - - -class CleanFolderOperator(BaseOperator): - """ - Clean tmp folder - :param folder_path: path of folder to clean - :type folder_path: str - - """ - - supports_lineage = True - - template_fields = ("folder_path",) - - def __init__( - self, - *, - folder_path: Optional[str] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.folder_path = folder_path - - def execute(self, context): - if os.path.exists(self.folder_path) and os.path.isdir(self.folder_path): - shutil.rmtree(self.folder_path) diff --git a/plugins/operators/elastic_create_index.py b/plugins/operators/elastic_create_index.py deleted file mode 100755 index 7c08f47..0000000 --- a/plugins/operators/elastic_create_index.py +++ /dev/null @@ -1,84 +0,0 @@ -import json -import logging -from typing import Optional - -import requests -from airflow.models import BaseOperator - - -class ElasticCreateIndexOperator(BaseOperator): - """ - Create elasticsearch Index - :param elastic_url: endpoint url of elasticsearch - :type elastic_url: str - :param elastic_index: index to create - :type elastic_index: str - :param elastic_index_shards: number of shards for index - :type elastic_index_shards: int - :param elastic_user: user for elasticsearch - :type elastic_user: str - :param elastic_password: password for elasticsearch - :type elastic_password: str - """ - - supports_lineage = True - - template_fields = ( - "elastic_url", - "elastic_index", - "elastic_user", - "elastic_password", - "elastic_index_shards", - ) - - def __init__( - self, - *, - elastic_url: Optional[str] = None, - elastic_index: Optional[str] = None, - elastic_user: Optional[str] = None, - elastic_password: Optional[str] = None, - elastic_index_shards: Optional[str] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.elastic_url = elastic_url - self.elastic_index = elastic_index - self.elastic_user = elastic_user - self.elastic_password = elastic_password - self.elastic_index_shards = elastic_index_shards - - def execute(self, context): - if not self.elastic_url: - raise ValueError("Please provide elasticsearch url endpoint") - - try: - r = requests.delete( - self.elastic_url + self.elastic_index, - auth=(self.elastic_user, self.elastic_password), - ) - except ( - requests.exceptions.ConnectionError, - requests.exceptions.RequestException, - ): - pass - - if self.elastic_index_shards is not None: - settings = { - "settings": {"index": {"number_of_shards": self.elastic_index_shards}} - } - else: - settings = {} - - headers = {"Content-Type": "application/json"} - - r = requests.put( - self.elastic_url + self.elastic_index, - headers=headers, - data=json.dumps(settings), - auth=(self.elastic_user, self.elastic_password), - ) - - logging.info(r.json()) - assert r.json()["acknowledged"] is True diff --git a/plugins/operators/elastic_fill_index.py b/plugins/operators/elastic_fill_index.py deleted file mode 100755 index d140006..0000000 --- a/plugins/operators/elastic_fill_index.py +++ /dev/null @@ -1,134 +0,0 @@ -import json -import logging -from typing import Optional - -import pandas as pd -import requests -from airflow.models import BaseOperator -from minio import Minio - - -class ElasticFillIndexOperator(BaseOperator): - """ - Fill elasticsearch Index - :param elastic_url: endpoint url of elasticsearch - :type elastic_url: str - :param elastic_index: index to create - :type elastic_index: str - :param elastic_user: user for elasticsearch - :type elastic_user: str - :param elastic_password: password for elasticsearch - :type elastic_password: str - :param elastic_bulk_size: size of bulk for indexation - :type elastic_bulk_size: int - :param minio_url: minio url where report should be store - :type minio_url: str - :param minio_bucket: minio bucket where report should be store - :type minio_bucket: str - :param minio_user: minio user which will store report - :type minio_user: str - :param minio_password: minio password of minio user - :type minio_password: str - :param minio_filepath: complete filepath where to store report - :type minio_filepath: str - :param column_id: column which will be used for id in elasticsearch - :type column_id: str - """ - - supports_lineage = True - - template_fields = ( - "elastic_url", - "elastic_index", - "elastic_user", - "elastic_password", - "elastic_bulk_size", - "minio_url", - "minio_bucket", - "minio_user", - "minio_password", - "minio_filepath", - "column_id", - ) - - def __init__( - self, - *, - elastic_url: Optional[str] = None, - elastic_index: Optional[str] = None, - elastic_user: Optional[str] = None, - elastic_password: Optional[str] = None, - minio_url: Optional[str] = None, - minio_bucket: Optional[str] = None, - minio_user: Optional[str] = None, - minio_password: Optional[str] = None, - minio_filepath: Optional[str] = None, - column_id: Optional[str] = None, - elastic_bulk_size: Optional[str] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.elastic_url = elastic_url - self.elastic_index = elastic_index - self.elastic_user = elastic_user - self.elastic_password = elastic_password - self.minio_url = minio_url - self.minio_bucket = minio_bucket - self.minio_user = minio_user - self.minio_password = minio_password - self.minio_filepath = minio_filepath - self.column_id = column_id - self.elastic_bulk_size = elastic_bulk_size - - def execute(self, context): - if not self.elastic_url: - raise ValueError("Please provide elasticsearch url endpoint") - - client = Minio( - self.minio_url, - access_key=self.minio_user, - secret_key=self.minio_password, - secure=True, - ) - obj = client.get_object( - self.minio_bucket, - self.minio_filepath, - ) - df = pd.read_csv(obj, dtype=str) - - logging.info("Retrieve file ok - " + str(df.shape[0]) + " documents to process") - - df["_id"] = df[self.column_id] - df_as_json = df.to_json(orient="records", lines=True) - - cpt = 0 - final_json_string = "" - for json_document in df_as_json.split("\n"): - cpt = cpt + 1 - if json_document != "": - jdict = json.loads(json_document) - metadata = json.dumps({"index": {"_id": jdict["_id"]}}) - jdict.pop("_id") - final_json_string += metadata + "\n" + json.dumps(jdict) + "\n" - if cpt % self.elastic_bulk_size == 0: - if cpt % (self.elastic_bulk_size * 3) == 0: - logging.info(str(cpt) + " indexed documents") - headers = {"Content-type": "application/json", "Accept": "text/plain"} - requests.post( - self.elastic_url + self.elastic_index + "/_bulk", - data=final_json_string, - headers=headers, - timeout=60, - auth=(self.elastic_user, self.elastic_password), - ) - final_json_string = "" - - headers = {"Content-type": "application/json", "Accept": "text/plain"} - requests.post( - self.elastic_url + self.elastic_index + "/_bulk", - data=final_json_string, - headers=headers, - timeout=60, - auth=(self.elastic_user, self.elastic_password), - ) diff --git a/plugins/operators/mail_datagouv.py b/plugins/operators/mail_datagouv.py deleted file mode 100755 index b15dec6..0000000 --- a/plugins/operators/mail_datagouv.py +++ /dev/null @@ -1,90 +0,0 @@ -import logging -from typing import Optional - -import emails -from airflow.models import BaseOperator - - -class MailDatagouvOperator(BaseOperator): - """ - Send a mail - :param email_user: sender of email - :type email_user: str - :param email_password: sender of email - :type email_password: str - :param email_recipients: recipients of email - :type email_recipients: list - :param subject: subject of email - :type subject: str - :param message: corpus of email - :type message: str - :param attachment_path: path of attachment for email - :type attachment_path: str - - """ - - supports_lineage = True - - template_fields = ( - "email_user", - "email_password", - "email_recipients", - "subject", - "message", - "attachment_path", - ) - - def __init__( - self, - *, - email_user: Optional[str] = None, - email_password: Optional[str] = None, - email_recipients: Optional[list] = None, - subject: Optional[str] = None, - message: Optional[str] = None, - attachment_path: Optional[str] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.email_user = email_user - self.email_password = email_password - self.email_recipients = email_recipients - self.subject = subject - self.message = message - self.attachment_path = attachment_path - - def execute(self, context): - if not self.email_user or not self.email_password: - raise ValueError("Not enough information to send message") - - sender = self.email_user - message = self.message - subject = self.subject - message = emails.html( - html="

%s

" % message, subject=subject, mail_from=sender - ) - - smtp = { - "host": "mail.data.gouv.fr", - "port": 587, - "tls": True, - "user": self.email_user, - "password": self.email_password, - "timeout": 60, - } - if self.attachment_path: - message.attach( - data=open(self.attachment_path), - filename=self.attachment_path.split("/")[-1], - ) - - retry = True - tries = 0 - while retry: - r = message.send(to=self.email_recipients, smtp=smtp) - logging.info(r) - tries = tries + 1 - if (r.status_code == 250) | (tries == 5): - retry = False - assert r.status_code == 250 diff --git a/plugins/operators/mattermost.py b/plugins/operators/mattermost.py deleted file mode 100755 index 9903a40..0000000 --- a/plugins/operators/mattermost.py +++ /dev/null @@ -1,45 +0,0 @@ -from typing import Optional - -import requests -from airflow.models import BaseOperator - - -class MattermostOperator(BaseOperator): - """ - Executes a mattermost message - :param mattermost_endpoint: endpoint mattersmost for bot - :type mattermost_endpoint: str - :param text: text for mattermost message - :type text: str - :param image_url: url of image to post - :type image_url: str - """ - - supports_lineage = True - - template_fields = ("mattermost_endpoint", "text", "image_url") - - def __init__( - self, - *, - mattermost_endpoint: Optional[str] = None, - text: Optional[str] = None, - image_url: Optional[str] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.mattermost_endpoint = mattermost_endpoint - self.text = text - self.image_url = image_url - - def execute(self, context): - if not self.mattermost_endpoint or not self.text: - raise ValueError("Not enough information to send message") - - data = {} - data["text"] = self.text - if self.image_url: - data["attachments"] = [{"image_url": self.image_url}] - - requests.post(self.mattermost_endpoint, json=data) diff --git a/plugins/operators/papermill_minio.py b/plugins/operators/papermill_minio.py deleted file mode 100755 index 153ad4e..0000000 --- a/plugins/operators/papermill_minio.py +++ /dev/null @@ -1,143 +0,0 @@ -import codecs -import os -from typing import Dict, Optional - -import attr -import nbformat -import papermill as pm -from airflow.lineage.entities import File -from airflow.models import BaseOperator -from minio import Minio -from nbconvert import HTMLExporter - - -@attr.s(auto_attribs=True) -class NoteBook(File): - """Jupyter notebook""" - - type_hint: Optional[str] = "jupyter_notebook" - parameters: Optional[Dict] = {} - - meta_schema: str = __name__ + ".NoteBook" - - -class PapermillMinioOperator(BaseOperator): - """ - Executes a jupyter notebook through papermill that is annotated with parameters - :param input_nb: input notebook (can also be a NoteBook or a File inlet) - :type input_nb: str - :param output_nb: output notebook (can also be a NoteBook or File outlet) - :type output_nb: str - :param tmp_path: tmp path to store report during processing - :type tmp_path: str - :param minio_url: minio url where report should be store - :type minio_url: str - :param minio_bucket: minio bucket where report should be store - :type minio_bucket: str - :param minio_user: minio user which will store report - :type minio_user: str - :param minio_password: minio password of minio user - :type minio_password: str - :param minio_output_filepath: complete filepath where to store report - :type minio_output_filepath: str - :param parameters: the notebook parameters to set - :type parameters: dict - """ - - supports_lineage = True - - template_fields = ( - "input_nb", - "output_nb", - "tmp_path", - "minio_url", - "minio_bucket", - "minio_user", - "minio_password", - "minio_output_filepath", - "parameters", - ) - - def __init__( - self, - *, - input_nb: Optional[str] = None, - output_nb: Optional[str] = None, - tmp_path: Optional[str] = None, - minio_url: Optional[str] = None, - minio_bucket: Optional[str] = None, - minio_user: Optional[str] = None, - minio_password: Optional[str] = None, - minio_output_filepath: Optional[str] = None, - parameters: Optional[Dict] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.input_nb = input_nb - self.output_nb = output_nb - self.tmp_path = tmp_path - self.minio_url = minio_url - self.minio_bucket = minio_bucket - self.minio_user = minio_user - self.minio_password = minio_password - self.minio_output_filepath = minio_output_filepath - self.parameters = parameters - - def execute(self, context): - if not self.input_nb or not self.output_nb: - raise ValueError("Input notebook or output notebook is not specified") - - os.makedirs(os.path.dirname(self.tmp_path + "output/"), exist_ok=True) - - pm.execute_notebook( - self.input_nb, - self.tmp_path + self.output_nb, - parameters=self.parameters, - progress_bar=False, - report_mode=True, - ) - - exporter = HTMLExporter() - # read_file is '.ipynb', output_report is '.html' - output_report = os.path.splitext(self.tmp_path + self.output_nb)[0] + ".html" - output_notebook = nbformat.read(self.tmp_path + self.output_nb, as_version=4) - output, resources = exporter.from_notebook_node(output_notebook) - codecs.open(output_report, "w", encoding="utf-8").write(output) - - client = Minio( - self.minio_url, - access_key=self.minio_user, - secret_key=self.minio_password, - secure=True, - ) - - # check if bucket exists. - found = client.bucket_exists(self.minio_bucket) - if found: - client.fput_object( - self.minio_bucket, - self.minio_output_filepath + output_report.split("/")[-1], - output_report, - content_type="text/html; charset=utf-8", - metadata={"Content-Disposition": "inline"}, - ) - - for path, subdirs, files in os.walk(self.tmp_path + "output/"): - for name in files: - print(os.path.join(path, name)) - isFile = os.path.isfile(os.path.join(path, name)) - if isFile: - client.fput_object( - self.minio_bucket, - self.minio_output_filepath - + os.path.join(path, name).replace(self.tmp_path, ""), - os.path.join(path, name), - ) - - report_url = "https://{}/{}/{}".format( - self.minio_url, - self.minio_bucket, - self.minio_output_filepath + output_report.split("/")[-1], - ) - context["ti"].xcom_push(key="report_url", value=report_url) diff --git a/plugins/operators/papermill_only.py b/plugins/operators/papermill_only.py deleted file mode 100755 index be1aa66..0000000 --- a/plugins/operators/papermill_only.py +++ /dev/null @@ -1,80 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from typing import Dict, Optional - -import attr -import papermill as pm -from airflow.lineage.entities import File -from airflow.models import BaseOperator - - -@attr.s(auto_attribs=True) -class NoteBook(File): - """Jupyter notebook""" - - type_hint: Optional[str] = "jupyter_notebook" - parameters: Optional[Dict] = {} - - meta_schema: str = __name__ + ".NoteBook" - - -class PapermillOnlyOperator(BaseOperator): - """ - Executes a jupyter notebook through papermill that is annotated with parameters - :param input_nb: input notebook (can also be a NoteBook or a File inlet) - :type input_nb: str - :param output_nb: output notebook (can also be a NoteBook or File outlet) - :type output_nb: str - :param parameters: the notebook parameters to set - :type parameters: dict - """ - - supports_lineage = True - - template_fields = ("input_nb", "output_nb", "parameters") - - def __init__( - self, - *, - input_nb: Optional[str] = None, - output_nb: Optional[str] = None, - parameters: Optional[Dict] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.input_nb = input_nb - self.output_nb = output_nb - self.parameters = parameters - if input_nb: - self.inlets.append(NoteBook(url=input_nb, parameters=self.parameters)) - if output_nb: - self.outlets.append(NoteBook(url=output_nb)) - - def execute(self, context): - if not self.inlets or not self.outlets: - raise ValueError("Input notebook or output notebook is not specified") - - for i, item in enumerate(self.inlets): - pm.execute_notebook( - item.url, - self.outlets[i].url, - parameters=item.parameters, - progress_bar=False, - report_mode=True, - ) diff --git a/plugins/operators/python_minio.py b/plugins/operators/python_minio.py deleted file mode 100755 index 2d227d6..0000000 --- a/plugins/operators/python_minio.py +++ /dev/null @@ -1,80 +0,0 @@ -import os -from typing import Optional - -from airflow.operators.python import PythonOperator -from minio import Minio - - -class PythonMinioOperator(PythonOperator): - """ - Executes a Python function and uploads contents of tmp_path to Minio. - :param tmp_path: tmp path to store report during processing - :type tmp_path: str - :param minio_url: minio url where report should be store - :type minio_url: str - :param minio_bucket: minio bucket where report should be store - :type minio_bucket: str - :param minio_user: minio user which will store report - :type minio_user: str - :param minio_password: minio password of minio user - :type minio_password: str - :param minio_output_filepath: complete filepath where to store report - :type minio_output_filepath: str - """ - - template_fields = ( - "tmp_path", - "minio_url", - "minio_bucket", - "minio_user", - "minio_password", - "minio_output_filepath", - *PythonOperator.template_fields, - ) - - def __init__( - self, - *, - tmp_path: Optional[str] = None, - minio_url: Optional[str] = None, - minio_bucket: Optional[str] = None, - minio_user: Optional[str] = None, - minio_password: Optional[str] = None, - minio_output_filepath: Optional[str] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.tmp_path = tmp_path - self.minio_url = minio_url - self.minio_bucket = minio_bucket - self.minio_user = minio_user - self.minio_password = minio_password - self.minio_output_filepath = minio_output_filepath - - def execute(self, context): - os.makedirs(os.path.dirname(self.tmp_path + "output/"), exist_ok=True) - - super().execute(context) - - client = Minio( - self.minio_url, - access_key=self.minio_user, - secret_key=self.minio_password, - secure=True, - ) - - # check if bucket exists. - found = client.bucket_exists(self.minio_bucket) - if found: - for path, subdirs, files in os.walk(self.tmp_path + "output/"): - for name in files: - print(os.path.join(path, name)) - isFile = os.path.isfile(os.path.join(path, name)) - if isFile: - client.fput_object( - self.minio_bucket, - self.minio_output_filepath - + os.path.join(path, name).replace(self.tmp_path, ""), - os.path.join(path, name), - ) diff --git a/plugins/operators/simple_papermill.py b/plugins/operators/simple_papermill.py deleted file mode 100755 index f1d6f45..0000000 --- a/plugins/operators/simple_papermill.py +++ /dev/null @@ -1,80 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from typing import Dict, Optional - -import attr -import papermill as pm -from airflow.lineage.entities import File -from airflow.models import BaseOperator - - -@attr.s(auto_attribs=True) -class NoteBook(File): - """Jupyter notebook""" - - type_hint: Optional[str] = "jupyter_notebook" - parameters: Optional[Dict] = {} - - meta_schema: str = __name__ + ".NoteBook" - - -class SimplePapermillOperator(BaseOperator): - """ - Executes a jupyter notebook through papermill that is annotated with parameters - :param input_nb: input notebook (can also be a NoteBook or a File inlet) - :type input_nb: str - :param output_nb: output notebook (can also be a NoteBook or File outlet) - :type output_nb: str - :param parameters: the notebook parameters to set - :type parameters: dict - """ - - supports_lineage = True - - template_fields = ("input_nb", "output_nb", "parameters") - - def __init__( - self, - *, - input_nb: Optional[str] = None, - output_nb: Optional[str] = None, - parameters: Optional[Dict] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - - self.input_nb = input_nb - self.output_nb = output_nb - self.parameters = parameters - if input_nb: - self.inlets.append(NoteBook(url=input_nb, parameters=self.parameters)) - if output_nb: - self.outlets.append(NoteBook(url=output_nb)) - - def execute(self, context): - if not self.inlets or not self.outlets: - raise ValueError("Input notebook or output notebook is not specified") - - for i, item in enumerate(self.inlets): - pm.execute_notebook( - item.url, - self.outlets[i].url, - parameters=item.parameters, - progress_bar=False, - report_mode=True, - )