Skip to content

Commit

Permalink
fix: made heartbeats threadsafe (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikBjare authored Nov 8, 2022
1 parent 1d6d24e commit 92997c2
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions aw_server/rest.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import json
import traceback
from datetime import datetime, timedelta
from functools import wraps
from threading import Lock
from typing import Dict
import traceback
import json

from flask import request, Blueprint, jsonify, current_app, make_response
from flask_restx import Api, Resource, fields
import iso8601
from datetime import datetime, timedelta

from aw_core import schema
from aw_core.models import Event
from aw_query.exceptions import QueryException
from flask import (
Blueprint,
current_app,
jsonify,
make_response,
request,
)
from flask_restx import Api, Resource, fields

from . import logger
from .api import ServerAPI
Expand Down Expand Up @@ -268,6 +274,10 @@ def delete(self, bucket_id: str, event_id: int):

@api.route("/0/buckets/<string:bucket_id>/heartbeat")
class HeartbeatResource(Resource):
def __init__(self, *args, **kwargs):
self.lock = Lock()
super().__init__(*args, **kwargs)

@api.expect(event, validate=True)
@api.param(
"pulsetime", "Largest timewindow allowed between heartbeats for them to merge"
Expand All @@ -281,7 +291,18 @@ def post(self, bucket_id):
else:
raise BadRequest("MissingParameter", "Missing required parameter pulsetime")

event = current_app.api.heartbeat(bucket_id, heartbeat, pulsetime)
# This lock is meant to ensure that only one heartbeat is processed at a time,
# as the heartbeat function is not thread-safe.
# This should maybe be moved into the api.py file instead (but would be very messy).
aquired = self.lock.acquire(timeout=1)
if not aquired:
logger.warning(
"Heartbeat lock could not be aquired within a reasonable time, this likely indicates a bug."
)
try:
event = current_app.api.heartbeat(bucket_id, heartbeat, pulsetime)
finally:
self.lock.release()
return event.to_json_dict(), 200


Expand Down

0 comments on commit 92997c2

Please sign in to comment.