Skip to content

Commit

Permalink
add sns driver + use profile to establish connection
Browse files Browse the repository at this point in the history
  • Loading branch information
fbaudry committed Feb 5, 2021
1 parent 3f19930 commit eee800d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
11 changes: 8 additions & 3 deletions pkg/enqueue/Resources.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Sns\SnsConnectionFactory;
use Enqueue\SnsQs\SnsQsConnectionFactory;
use Enqueue\Sqs\SqsConnectionFactory;
use Enqueue\Stomp\StompConnectionFactory;
Expand Down Expand Up @@ -42,7 +43,7 @@ public static function getAvailableConnections(): array

$availableMap = [];
foreach ($map as $connectionClass => $item) {
if (class_exists($connectionClass)) {
if (\class_exists($connectionClass)) {
$availableMap[$connectionClass] = $item;
}
}
Expand Down Expand Up @@ -156,6 +157,10 @@ public static function getKnownConnections(): array
'schemes' => ['sqs'],
'supportedSchemeExtensions' => [],
'package' => 'enqueue/sqs', ];
$map[SnsConnectionFactory::class] = [
'schemes' => ['sns'],
'supportedSchemeExtensions' => [],
'package' => 'enqueue/sns', ];
$map[SnsQsConnectionFactory::class] = [
'schemes' => ['snsqs'],
'supportedSchemeExtensions' => [],
Expand Down Expand Up @@ -183,9 +188,9 @@ public static function getKnownConnections(): array

public static function addConnection(string $connectionFactoryClass, array $schemes, array $extensions, string $package): void
{
if (class_exists($connectionFactoryClass)) {
if (\class_exists($connectionFactoryClass)) {
if (false == (new \ReflectionClass($connectionFactoryClass))->implementsInterface(ConnectionFactory::class)) {
throw new \InvalidArgumentException(sprintf('The connection factory class "%s" must implement "%s" interface.', $connectionFactoryClass, ConnectionFactory::class));
throw new \InvalidArgumentException(\sprintf('The connection factory class "%s" must implement "%s" interface.', $connectionFactoryClass, ConnectionFactory::class));
}
}

Expand Down
23 changes: 12 additions & 11 deletions pkg/sns/SnsConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ public function __construct($config = 'sns:')

if (empty($config)) {
$config = [];
} elseif (is_string($config)) {
} elseif (\is_string($config)) {
$config = $this->parseDsn($config);
} elseif (is_array($config)) {
if (array_key_exists('dsn', $config)) {
$config = array_replace_recursive($config, $this->parseDsn($config['dsn']));
} elseif (\is_array($config)) {
if (\array_key_exists('dsn', $config)) {
$config = \array_replace_recursive($config, $this->parseDsn($config['dsn']));

unset($config['dsn']);
}
} else {
throw new \LogicException(sprintf('The config must be either an array of options, a DSN string, null or instance of %s', AwsSnsClient::class));
throw new \LogicException(\sprintf('The config must be either an array of options, a DSN string, null or instance of %s', AwsSnsClient::class));
}

$this->config = array_replace($this->defaultConfig(), $config);
$this->config = \array_replace($this->defaultConfig(), $config);
}

/**
Expand All @@ -89,6 +89,10 @@ private function establishConnection(): SnsClient
$config['endpoint'] = $this->config['endpoint'];
}

if (isset($this->config['profile'])) {
$config['profile'] = $this->config['profile'];
}

if ($this->config['key'] && $this->config['secret']) {
$config['credentials'] = [
'key' => $this->config['key'],
Expand Down Expand Up @@ -117,13 +121,10 @@ private function parseDsn(string $dsn): array
$dsn = Dsn::parseFirst($dsn);

if ('sns' !== $dsn->getSchemeProtocol()) {
throw new \LogicException(sprintf(
'The given scheme protocol "%s" is not supported. It must be "sns"',
$dsn->getSchemeProtocol()
));
throw new \LogicException(\sprintf('The given scheme protocol "%s" is not supported. It must be "sns"', $dsn->getSchemeProtocol()));
}

return array_filter(array_replace($dsn->getQuery(), [
return \array_filter(\array_replace($dsn->getQuery(), [
'key' => $dsn->getString('key'),
'secret' => $dsn->getString('secret'),
'token' => $dsn->getString('token'),
Expand Down

0 comments on commit eee800d

Please sign in to comment.