Skip to content

Commit

Permalink
Drop the usage of metaclass in the XCom backend. (#570)
Browse files Browse the repository at this point in the history
Further addresses issue #566.

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow authored Nov 4, 2021
1 parent b08fb62 commit f91b53f
Showing 1 changed file with 17 additions and 20 deletions.
37 changes: 17 additions & 20 deletions python/vineyard/contrib/airflow/xcom/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import logging
import os
import sys
from typing import Any

from airflow.configuration import conf
Expand Down Expand Up @@ -50,26 +49,24 @@ def _resolve_vineyard_xcom_options():
return options


class __VineyardXComMeta:
def __init__(cls, *args, **kwargs):
cls.__options = None

@property
def options(cls):
if cls.__options is None:
cls.__options = _resolve_vineyard_xcom_options()
return cls.__options


class VineyardXCom(BaseXCom, metaclass=__VineyardXComMeta):
class VineyardXCom(BaseXCom):
"""
Custom Backend Serving to use Vineyard.
Setup your airflow environment by specifying the following
environment varable:
the environment varable:
export AIRFLOW__CORE__XCOM_BACKEND=vineyard.contrib.airflow.xcom.VineyardXCom
"""

__options = None

@classmethod
def options(cls):
if cls.__options is None:
cls.__options = _resolve_vineyard_xcom_options()
return cls.__options

@reconstructor
def init_on_load(self):
"""
Expand Down Expand Up @@ -98,7 +95,7 @@ def set(cls, key, value, execution_date, task_id, dag_id, session=None):
if targets:
logger.info("Drop duplicates from vineyard: %s", targets)
try:
client = vineyard.connect(cls.options['ipc_socket'])
client = vineyard.connect(cls.options()['ipc_socket'])
client.delete(targets)
except Exception as e:
logger.error('Failed to drop duplicates from vineyard: %s', e)
Expand Down Expand Up @@ -126,7 +123,7 @@ def delete(cls, xcoms, session=None):
session.delete(xcom)
logger.info("Drop from vineyard: %s", targets)
try:
client = vineyard.connect(cls.options['ipc_socket'])
client = vineyard.connect(cls.options()['ipc_socket'])
client.delete(targets)
except Exception as e:
logger.error('Failed to drop from vineyard: %s', e)
Expand All @@ -152,17 +149,17 @@ def clear(
if targets:
logger.info("Drop from vineyard: %s", targets)
try:
client = vineyard.connect(cls.options['ipc_socket'])
client = vineyard.connect(cls.options()['ipc_socket'])
client.delete(targets)
except Exception as e:
logger.error('Failed to drop from vineyard: %s', e)
query.delete()

@staticmethod
def serialize_value(value: Any):
client = vineyard.connect(VineyardXCom.options['ipc_socket'])
client = vineyard.connect(VineyardXCom.options()['ipc_socket'])
value_id = client.put(value)
if VineyardXCom.options['persist']:
if VineyardXCom.options()['persist']:
client.persist(value_id)
logger.debug("serialize_value: %s -> %r", value, value_id)
return BaseXCom.serialize_value(repr(value_id))
Expand All @@ -187,7 +184,7 @@ def post_resolve_value(result: "XCom", value: Any, session: Session = None) -> A
It will also record the migrated xcom value into the db as well to make
sure it can be dropped properly.
'''
client = vineyard.connect(VineyardXCom.options['ipc_socket'])
client = vineyard.connect(VineyardXCom.options()['ipc_socket'])
object_id = vineyard.ObjectID(value)

meta = client.get_meta(object_id)
Expand Down

0 comments on commit f91b53f

Please sign in to comment.