Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop the usage of metaclass in the XCom backend. #570

Merged
merged 1 commit into from
Nov 4, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions python/vineyard/contrib/airflow/xcom/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import logging
import os
import sys
from typing import Any

from airflow.configuration import conf
Expand Down Expand Up @@ -50,26 +49,24 @@ def _resolve_vineyard_xcom_options():
return options


class __VineyardXComMeta:
def __init__(cls, *args, **kwargs):
cls.__options = None

@property
def options(cls):
if cls.__options is None:
cls.__options = _resolve_vineyard_xcom_options()
return cls.__options


class VineyardXCom(BaseXCom, metaclass=__VineyardXComMeta):
class VineyardXCom(BaseXCom):
"""
Custom Backend Serving to use Vineyard.

Setup your airflow environment by specifying the following
environment varable:
the environment varable:

export AIRFLOW__CORE__XCOM_BACKEND=vineyard.contrib.airflow.xcom.VineyardXCom
"""

__options = None

@classmethod
def options(cls):
if cls.__options is None:
cls.__options = _resolve_vineyard_xcom_options()
return cls.__options

@reconstructor
def init_on_load(self):
"""
Expand Down Expand Up @@ -98,7 +95,7 @@ def set(cls, key, value, execution_date, task_id, dag_id, session=None):
if targets:
logger.info("Drop duplicates from vineyard: %s", targets)
try:
client = vineyard.connect(cls.options['ipc_socket'])
client = vineyard.connect(cls.options()['ipc_socket'])
client.delete(targets)
except Exception as e:
logger.error('Failed to drop duplicates from vineyard: %s', e)
Expand Down Expand Up @@ -126,7 +123,7 @@ def delete(cls, xcoms, session=None):
session.delete(xcom)
logger.info("Drop from vineyard: %s", targets)
try:
client = vineyard.connect(cls.options['ipc_socket'])
client = vineyard.connect(cls.options()['ipc_socket'])
client.delete(targets)
except Exception as e:
logger.error('Failed to drop from vineyard: %s', e)
Expand All @@ -152,17 +149,17 @@ def clear(
if targets:
logger.info("Drop from vineyard: %s", targets)
try:
client = vineyard.connect(cls.options['ipc_socket'])
client = vineyard.connect(cls.options()['ipc_socket'])
client.delete(targets)
except Exception as e:
logger.error('Failed to drop from vineyard: %s', e)
query.delete()

@staticmethod
def serialize_value(value: Any):
client = vineyard.connect(VineyardXCom.options['ipc_socket'])
client = vineyard.connect(VineyardXCom.options()['ipc_socket'])
value_id = client.put(value)
if VineyardXCom.options['persist']:
if VineyardXCom.options()['persist']:
client.persist(value_id)
logger.debug("serialize_value: %s -> %r", value, value_id)
return BaseXCom.serialize_value(repr(value_id))
Expand All @@ -187,7 +184,7 @@ def post_resolve_value(result: "XCom", value: Any, session: Session = None) -> A
It will also record the migrated xcom value into the db as well to make
sure it can be dropped properly.
'''
client = vineyard.connect(VineyardXCom.options['ipc_socket'])
client = vineyard.connect(VineyardXCom.options()['ipc_socket'])
object_id = vineyard.ObjectID(value)

meta = client.get_meta(object_id)
Expand Down