Skip to content

Commit

Permalink
Merge pull request #180 from tobymao/revert_now
Browse files Browse the repository at this point in the history
fix: revert back to client side time
  • Loading branch information
tobymao authored Oct 18, 2024
2 parents 52e8fef + 80d2227 commit 9169d57
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 21 deletions.
26 changes: 14 additions & 12 deletions saq/queue/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from saq.multiplexer import Multiplexer
from saq.queue.base import Queue, logger
from saq.queue.postgres_ddl import DDL_STATEMENTS
from saq.utils import now, seconds
from saq.utils import now, now_seconds

if t.TYPE_CHECKING:
from collections.abc import Iterable
Expand Down Expand Up @@ -218,11 +218,11 @@ async def count(self, kind: CountKind) -> int:
FROM {jobs_table}
WHERE status = 'queued'
AND queue = %(queue)s
AND NOW() >= TO_TIMESTAMP(scheduled)
AND %(now)s >= scheduled
"""
)
).format(jobs_table=self.jobs_table),
{"queue": self.name},
{"queue": self.name, "now": now_seconds()},
)
elif kind == "active":
await cursor.execute(
Expand Down Expand Up @@ -282,6 +282,8 @@ async def sweep(self, lock: int = 60, abort: float = 5.0) -> list[str]:
self._has_sweep_lock = True

async with self.pool.connection() as conn, conn.cursor() as cursor:
now_ts = now_seconds()

await cursor.execute(
SQL(
dedent(
Expand All @@ -290,16 +292,14 @@ async def sweep(self, lock: int = 60, abort: float = 5.0) -> list[str]:
DELETE FROM {jobs_table}
WHERE queue = %(queue)s
AND status IN ('aborted', 'complete', 'failed')
AND NOW() >= TO_TIMESTAMP(expire_at);
AND %(now)s >= expire_at;
"""
)
).format(
jobs_table=self.jobs_table,
stats_table=self.stats_table,
),
{
"queue": self.name,
},
{"queue": self.name, "now": now_ts},
)

await cursor.execute(
Expand All @@ -308,13 +308,14 @@ async def sweep(self, lock: int = 60, abort: float = 5.0) -> list[str]:
"""
-- Delete expired stats
DELETE FROM {stats_table}
WHERE NOW() >= TO_TIMESTAMP(expire_at);
WHERE %(now)s >= expire_at;
"""
)
).format(
jobs_table=self.jobs_table,
stats_table=self.stats_table,
),
{"now": now_ts},
)

await cursor.execute(
Expand Down Expand Up @@ -570,7 +571,7 @@ async def _dequeue(self) -> None:
FROM {jobs_table}
WHERE status = 'queued'
AND queue = %(queue)s
AND NOW() >= TO_TIMESTAMP(scheduled)
AND %(now)s >= scheduled
AND priority BETWEEN %(plow)s AND %(phigh)s
AND group_key NOT IN (
SELECT DISTINCT group_key
Expand All @@ -596,6 +597,7 @@ async def _dequeue(self) -> None:
),
{
"queue": self.name,
"now": now_seconds(),
"limit": self._waiting,
"plow": self._priorities[0],
"phigh": self._priorities[1],
Expand Down Expand Up @@ -656,7 +658,7 @@ async def _enqueue(self, job: Job) -> Job | None:
"status": job.status,
"priority": job.priority,
"group_key": job.group_key,
"scheduled": job.scheduled or int(seconds(now())),
"scheduled": job.scheduled or int(now_seconds()),
},
)

Expand Down Expand Up @@ -722,7 +724,7 @@ async def _retry(self, job: Job, error: str | None) -> None:
if next_retry_delay:
scheduled = time.time() + next_retry_delay
else:
scheduled = job.scheduled or seconds(now())
scheduled = job.scheduled or now_seconds()

await self.update(job, scheduled=int(scheduled), expire_at=None)

Expand All @@ -741,7 +743,7 @@ async def _finish(
connection
) if connection else self.pool.connection() as conn, conn.cursor() as cursor:
if job.ttl >= 0:
expire_at = seconds(now()) + job.ttl if job.ttl > 0 else None
expire_at = now_seconds() + job.ttl if job.ttl > 0 else None
await self.update(job, status=status, expire_at=expire_at, connection=conn)
else:
await cursor.execute(
Expand Down
4 changes: 2 additions & 2 deletions saq/queue/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)
from saq.multiplexer import Multiplexer
from saq.queue.base import Queue, logger
from saq.utils import millis, now, seconds
from saq.utils import millis, now, now_seconds

try:
from redis import asyncio as aioredis
Expand Down Expand Up @@ -211,7 +211,7 @@ async def schedule(self, lock: int = 1) -> t.List[str]:
job_id.decode("utf-8")
for job_id in await self._schedule_script(
keys=[self._schedule, self._incomplete, self._queued],
args=[lock, seconds(now())],
args=[lock, now_seconds()],
)
or []
]
Expand Down
4 changes: 4 additions & 0 deletions saq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def now() -> int:
return int(time.time() * 1000)


def now_seconds() -> float:
return time.time()


def uuid1() -> str:
"""Generates a string representation of a UUID1"""
return str(uuid.uuid1())
Expand Down
4 changes: 2 additions & 2 deletions saq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from saq.job import Status
from saq.queue import Queue
from saq.utils import cancel_tasks, millis, now, seconds
from saq.utils import cancel_tasks, millis, now, now_seconds

if t.TYPE_CHECKING:
from asyncio import Task
Expand Down Expand Up @@ -174,7 +174,7 @@ async def schedule(self, lock: int = 1) -> None:
kwargs = cron_job.__dict__.copy()
function = kwargs.pop("function").__qualname__
kwargs["key"] = f"cron:{function}" if kwargs.pop("unique") else None
scheduled = croniter(kwargs.pop("cron"), seconds(now())).get_next()
scheduled = croniter(kwargs.pop("cron"), now_seconds()).get_next()

await self.queue.enqueue(
function,
Expand Down
11 changes: 8 additions & 3 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ async def test_sweep(self, mock_time: MagicMock) -> None:

@mock.patch("saq.utils.time")
async def test_sweep_stuck(self, mock_time: MagicMock) -> None:
mock_time.time.return_value = 0
job1 = await self.queue.enqueue("test")
assert job1
job = await self.dequeue()
Expand Down Expand Up @@ -718,9 +719,13 @@ async def test_finish_ttl_negative(self, mock_time: MagicMock) -> None:
result = await cursor.fetchone()
self.assertIsNone(result)

async def test_cron_job_close_to_target(self) -> None:
await self.enqueue("test", scheduled=time.time() + 0.5)
job = await self.queue.dequeue(timeout=0.1)
@mock.patch("saq.utils.time")
async def test_cron_job_close_to_target(self, mock_time: MagicMock) -> None:
mock_time.time.return_value = 1000.5
await self.enqueue("test", scheduled=1001)
# The job is scheduled to run at 1001, but we're running at 1000.5
# so it should not be picked up
job = await self.queue.dequeue(timeout=1)
assert not job

async def test_bad_connection(self) -> None:
Expand Down
3 changes: 1 addition & 2 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,9 +537,8 @@ async def test_cron(self, mock_logger: MagicMock) -> None:
self.assertEqual(await self.queue.count("queued"), 0)
self.assertEqual(await self.queue.count("incomplete"), 0)
await worker.schedule()
self.assertEqual(await self.queue.count("queued"), 0)
self.assertEqual(await self.queue.count("incomplete"), 1)
await asyncio.sleep(1.5)
await asyncio.sleep(1)

self.assertEqual(await self.queue.count("queued"), 1)
self.assertEqual(await self.queue.count("incomplete"), 1)
Expand Down

0 comments on commit 9169d57

Please sign in to comment.