Skip to content

Commit

Permalink
[batch] Re-create Batch Api client on every request
Browse files Browse the repository at this point in the history
  • Loading branch information
dinvlad committed Apr 6, 2020
1 parent 296f55d commit 5019a67
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions batch/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@ def render_job(name: str, spec: str, job_input: str) -> V1Job:
)


def callback(batch_v1: BatchV1Api, namespace: str, spec: str) -> Callable[[Message], None]:
def callback(namespace: str, spec: str) -> Callable[[Message], None]:
def cb(m: Message):
try:
job_name = md5(m.message_id.encode('utf-8')).hexdigest()
job_input = m.data.decode('utf-8')
log.info(f'Submitting job {job_name} with input "{job_input}"')

job = render_job(job_name, spec, job_input)
batch_v1.create_namespaced_job(namespace, job)
get_batch_v1().create_namespaced_job(namespace, job)
log.info(f'Submitted job {job_name}')
except Exception:
log.exception('PubSub subscriber callback')
m.ack()
return cb


def listen(subscription: str, batch_v1: BatchV1Api, namespace: str, spec: str) -> None:
def listen(subscription: str, namespace: str, spec: str) -> None:
subscriber = pubsub_v1.SubscriberClient()
with subscriber:
cb = callback(batch_v1, namespace, spec)
cb = callback(namespace, spec)
streaming_pull = subscriber.subscribe(subscription, cb)
log.info(f'Listening to subscription {subscription}')
try:
Expand All @@ -57,7 +57,7 @@ def load_job_spec(spec_path: str) -> str:
return f.read()


def get_batch_v1():
def get_batch_v1() -> BatchV1Api:
try:
load_kube_config()
except:
Expand All @@ -74,9 +74,7 @@ def main():
log.basicConfig(level=log_level)

spec = load_job_spec(spec_path)
batch_v1 = get_batch_v1()

listen(subscription, batch_v1, namespace, spec)
listen(subscription, namespace, spec)


if __name__ == '__main__':
Expand Down

0 comments on commit 5019a67

Please sign in to comment.