diff --git a/aw_server/rest.py b/aw_server/rest.py index 4184eaff..6895ce4a 100644 --- a/aw_server/rest.py +++ b/aw_server/rest.py @@ -1,16 +1,22 @@ +import json +import traceback +from datetime import datetime, timedelta from functools import wraps +from threading import Lock from typing import Dict -import traceback -import json -from flask import request, Blueprint, jsonify, current_app, make_response -from flask_restx import Api, Resource, fields import iso8601 -from datetime import datetime, timedelta - from aw_core import schema from aw_core.models import Event from aw_query.exceptions import QueryException +from flask import ( + Blueprint, + current_app, + jsonify, + make_response, + request, +) +from flask_restx import Api, Resource, fields from . import logger from .api import ServerAPI @@ -268,6 +274,10 @@ def delete(self, bucket_id: str, event_id: int): @api.route("/0/buckets//heartbeat") class HeartbeatResource(Resource): + def __init__(self, *args, **kwargs): + self.lock = Lock() + super().__init__(*args, **kwargs) + @api.expect(event, validate=True) @api.param( "pulsetime", "Largest timewindow allowed between heartbeats for them to merge" @@ -281,7 +291,18 @@ def post(self, bucket_id): else: raise BadRequest("MissingParameter", "Missing required parameter pulsetime") - event = current_app.api.heartbeat(bucket_id, heartbeat, pulsetime) + # This lock is meant to ensure that only one heartbeat is processed at a time, + # as the heartbeat function is not thread-safe. + # This should maybe be moved into the api.py file instead (but would be very messy). + aquired = self.lock.acquire(timeout=1) + if not aquired: + logger.warning( + "Heartbeat lock could not be aquired within a reasonable time, this likely indicates a bug." + ) + try: + event = current_app.api.heartbeat(bucket_id, heartbeat, pulsetime) + finally: + self.lock.release() return event.to_json_dict(), 200