Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Version #6

Merged
merged 6 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@
<a href="https://github.com/yiisoft" target="_blank">
<img src="https://yiisoft.github.io/docs/images/yii_logo.svg" height="100px">
</a>
<h1 align="center">Yii _____</h1>
<h1 align="center">Yii Queue Database Adapter</h1>
<br>
</p>

[![Latest Stable Version](https://poser.pugx.org/yiisoft/_____/v/stable.png)](https://packagist.org/packages/yiisoft/_____)
[![Total Downloads](https://poser.pugx.org/yiisoft/_____/downloads.png)](https://packagist.org/packages/yiisoft/_____)
[![Build status](https://github.com/yiisoft/_____/workflows/build/badge.svg)](https://github.com/yiisoft/_____/actions?query=workflow%3Abuild)
[![Code Coverage](https://codecov.io/gh/yiisoft/_____/branch/master/graph/badge.svg)](https://codecov.io/gh/yiisoft/_____)
[![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgit.luolix.top%2Fyiisoft%2F_____%2Fmaster)](https://dashboard.stryker-mutator.io/reports/github.com/yiisoft/_____/master)
[![static analysis](https://github.com/yiisoft/_____/workflows/static%20analysis/badge.svg)](https://github.com/yiisoft/_____/actions?query=workflow%3A%22static+analysis%22)
[![type-coverage](https://shepherd.dev/github/yiisoft/_____/coverage.svg)](https://shepherd.dev/github/yiisoft/_____)
[![psalm-level](https://shepherd.dev/github/yiisoft/_____/level.svg)](https://shepherd.dev/github/yiisoft/_____)
[![Latest Stable Version](https://poser.pugx.org/yiisoft/queue-db/v/stable.png)](https://packagist.org/packages/yiisoft/queue-db)
[![Total Downloads](https://poser.pugx.org/yiisoft/queue-db/downloads.png)](https://packagist.org/packages/yiisoft/queue-db)
[![Build status](https://github.com/yiisoft/queue-db/workflows/build/badge.svg)](https://github.com/yiisoft/queue-db/actions?query=workflow%3Abuild)
[![Code Coverage](https://codecov.io/gh/yiisoft/queue-db/branch/master/graph/badge.svg)](https://codecov.io/gh/yiisoft/queue-db)
[![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgit.luolix.top%2Fyiisoft%2Fqueue-db%2Fmaster)](https://dashboard.stryker-mutator.io/reports/github.com/yiisoft/queue-db/master)
[![static analysis](https://github.com/yiisoft/queue-db/workflows/static%20analysis/badge.svg)](https://github.com/yiisoft/queue-db/actions?query=workflow%3A%22static+analysis%22)
[![type-coverage](https://shepherd.dev/github/yiisoft/queue-db/coverage.svg)](https://shepherd.dev/github/yiisoft/queue-db)
[![psalm-level](https://shepherd.dev/github/yiisoft/queue-db/level.svg)](https://shepherd.dev/github/yiisoft/queue-db)

Yii Queue Database Adapter implemention for [Yii Queue](https://github.com/yiisoft/queue).
Works with databases implemented for [Yii Database](https://github.com/yiisoft/queue).


The package ...

## Requirements

Expand All @@ -26,7 +29,7 @@ The package ...
The package could be installed with composer:

```shell
composer require yiisoft/_____
composer require yiisoft/queue-db
```

## General usage
Expand Down Expand Up @@ -74,7 +77,7 @@ Use [ComposerRequireChecker](https://github.com/maglnet/ComposerRequireChecker)

## License

The Yii _____ is free software. It is released under the terms of the BSD License.
The Yii Queue Database Adapter is free software. It is released under the terms of the BSD License.
Please see [`LICENSE`](./LICENSE.md) for more information.

Maintained by [Yii Software](https://www.yiiframework.com/).
Expand Down
33 changes: 25 additions & 8 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
{
"name": "yiisoft/_____",
"name": "yiisoft/queue-db",
"type": "library",
"description": "_____",
"description": "Yii Queue DB adapter using yiisoft/db",
"keywords": [
"_____"
"yii",
"queue",
"sql",
"database",
"async"
],
"homepage": "https://www.yiiframework.com/",
"license": "BSD-3-Clause",
"support": {
"issues": "https://github.com/yiisoft/_____/issues?state=open",
"issues": "https://github.com/yiisoft/queue-db/issues?state=open",
"forum": "https://www.yiiframework.com/forum/",
"wiki": "https://www.yiiframework.com/wiki/",
"irc": "ircs://irc.libera.chat:6697/yii",
"chat": "https://t.me/yii3en",
"source": "https://github.com/yiisoft/_____"
"source": "https://github.com/yiisoft/queue-db"
},
"funding": [
{
Expand All @@ -28,7 +32,9 @@
"minimum-stability": "dev",
"prefer-stable": true,
"require": {
"php": "^8.1"
"php": "^8.1",
"yiisoft/db": "^1.0",
"yiisoft/queue": "dev-master"
},
"require-dev": {
"maglnet/composer-require-checker": "^4.7",
Expand All @@ -40,12 +46,23 @@
},
"autoload": {
"psr-4": {
"Yiisoft\\_____\\": "src"
"Yiisoft\\Queue\\Db\\": "src"
}
},
"autoload-dev": {
"psr-4": {
"Yiisoft\\_____\\Tests\\": "tests"
"Yiisoft\\Queue\\Db\\Tests\\": "tests"
}
},
"extra": {
"branch-alias": {
"dev-master": "3.0.x-dev"
},
"config-plugin-options": {
"source-directory": "config"
},
"config-plugin": {
"di": "di.php"
}
},
"config": {
Expand Down
197 changes: 197 additions & 0 deletions src/Adapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Db;

use InvalidArgumentException;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Db\Connection\ConnectionInterface;
use Yiisoft\Db\Query\Query;

final class Adapter implements AdapterInterface
{
/**
* @var int timeout
*/
public $mutexTimeout = 3;
/**
* @var string table name
*/
public $tableName = '{{%queue}}';
/**
* @var bool ability to delete released messages from table
*/
public $deleteReleased = true;


public function __construct(

Check warning on line 32 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L32

Added line #L32 was not covered by tests
private ConnectionInterface $db,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
) {
}

Check warning on line 36 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L36

Added line #L36 was not covered by tests

public function runExisting(callable $handlerCallback): void

Check warning on line 38 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L38

Added line #L38 was not covered by tests
{
$result = true;
while (($payload = $this->reserve()) && ($result === true)) {
if ($result = $handlerCallback(\unserialize($payload['job']))) {
$this->release($payload);

Check warning on line 43 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L40-L43

Added lines #L40 - L43 were not covered by tests
}
}
}

public function status(string|int $id): JobStatus

Check warning on line 48 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L48

Added line #L48 was not covered by tests
{
$id = (int) $id;

Check warning on line 50 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L50

Added line #L50 was not covered by tests

$payload = (new Query($this->db))
->from($this->tableName)
->where(['id' => $id])
->one();

Check warning on line 55 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L52-L55

Added lines #L52 - L55 were not covered by tests

if (!$payload) {
if ($this->deleteReleased) {
return JobStatus::done();

Check warning on line 59 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L57-L59

Added lines #L57 - L59 were not covered by tests
}

throw new InvalidArgumentException("Unknown message ID: $id.");

Check warning on line 62 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L62

Added line #L62 was not covered by tests
}

if (!$payload['reserved_at']) {
return JobStatus::waiting();

Check warning on line 66 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L65-L66

Added lines #L65 - L66 were not covered by tests
}

if (!$payload['done_at']) {
return JobStatus::reserved();

Check warning on line 70 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L69-L70

Added lines #L69 - L70 were not covered by tests
}

return JobStatus::done();

Check warning on line 73 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L73

Added line #L73 was not covered by tests
}

public function push(MessageInterface $message): MessageInterface

Check warning on line 76 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L76

Added line #L76 was not covered by tests
{
$metadata = $message->getMetadata();
$this->db->createCommand()->insert($this->tableName, [
'channel' => $this->channel,
'job' => \serialize($message),
'pushed_at' => time(),
'ttr' => $metadata['ttr'] ?? 300,
'delay' => $metadata['delay'] ?? 0,
'priority' => $metadata['priority'] ?? 1024,
])->execute();
$tableSchema = $this->db->getTableSchema($this->tableName);
$key = $tableSchema ? $this->db->getLastInsertID($tableSchema->getSequenceName()) : $tableSchema;

Check warning on line 88 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L78-L88

Added lines #L78 - L88 were not covered by tests

return new IdEnvelope($message, $key);

Check warning on line 90 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L90

Added line #L90 was not covered by tests
}

public function subscribe(callable $handlerCallback): void

Check warning on line 93 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L93

Added line #L93 was not covered by tests
{
$this->runExisting($handlerCallback);

Check warning on line 95 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L95

Added line #L95 was not covered by tests
}

public function withChannel(string $channel): self

Check warning on line 98 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L98

Added line #L98 was not covered by tests
{
if ($channel === $this->channel) {
return $this;

Check warning on line 101 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L100-L101

Added lines #L100 - L101 were not covered by tests
}

$new = clone $this;
$new->channel = $channel;

Check warning on line 105 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L104-L105

Added lines #L104 - L105 were not covered by tests

return $new;

Check warning on line 107 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L107

Added line #L107 was not covered by tests
}

/**
* Takes one message from waiting list and reserves it for handling.
*
* @return array|null payload
* @throws \Exception in case it hasn't waited the lock
*/
protected function reserve(): array|null

Check warning on line 116 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L116

Added line #L116 was not covered by tests
{
// TWK TODO ??? return $this->db->useMaster(function () {
// TWK TODO ??? if (!$this->mutex->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) {
// TWK TODO ??? throw new \Exception('Has not waited the lock.');
// TWK TODO ??? }

try {
$this->moveExpired();

Check warning on line 124 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L124

Added line #L124 was not covered by tests

// Reserve one message
$payload = (new Query($this->db))
->from($this->tableName)
->andWhere(['channel' => $this->channel, 'reserved_at' => null])
->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()])
->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
->limit(1)
->one();
if (is_array($payload)) {
$payload['reserved_at'] = time();
$payload['attempt'] = (int) $payload['attempt'] + 1;
$this->db->createCommand()->update($this->tableName, [
'reserved_at' => $payload['reserved_at'],
'attempt' => $payload['attempt'],
], [
'id' => $payload['id'],
])->execute();

Check warning on line 142 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L127-L142

Added lines #L127 - L142 were not covered by tests

// pgsql
if (is_resource($payload['job'])) {
$payload['job'] = stream_get_contents($payload['job']);

Check warning on line 146 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L145-L146

Added lines #L145 - L146 were not covered by tests
}
}
} finally {
// TWK TODO ??? $this->mutex->release(__CLASS__ . $this->channel);
}

return $payload;

Check warning on line 153 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L153

Added line #L153 was not covered by tests
// TWK TODO ??? });
}

/**
* @param array $payload
*/
protected function release($payload): void

Check warning on line 160 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L160

Added line #L160 was not covered by tests
{
if ($this->deleteReleased) {
$this->db->createCommand()->delete(
$this->tableName,
['id' => $payload['id']]
)->execute();

Check warning on line 166 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L162-L166

Added lines #L162 - L166 were not covered by tests
} else {
$this->db->createCommand()->update(
$this->tableName,
['done_at' => time()],
['id' => $payload['id']]
)->execute();

Check warning on line 172 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L168-L172

Added lines #L168 - L172 were not covered by tests
}
}

/**
* Moves expired messages into waiting list.
*/
private function moveExpired(): void

Check warning on line 179 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L179

Added line #L179 was not covered by tests
{
if ($this->reserveTime !== time()) {
$this->reserveTime = time();
$this->db->createCommand()->update(
$this->tableName,
['reserved_at' => null],
'[[reserved_at]] < :time - [[ttr]] and [[reserved_at]] is not null and [[done_at]] is null',
[':time' => $this->reserveTime]
)->execute();

Check warning on line 188 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L181-L188

Added lines #L181 - L188 were not covered by tests
}
}

/**
* @var int reserve time
*/
private $reserveTime = 0;

}
Loading