Skip to content

Commit

Permalink
Merge pull request #154 from jeromegamez/introduce-enums
Browse files Browse the repository at this point in the history
Replace `*Enum` classes with a native Enums
  • Loading branch information
WyriHaximus authored May 24, 2024
2 parents 3a2923f + f827bab commit ad5b979
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 85 deletions.
90 changes: 43 additions & 47 deletions src/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ class Channel implements ChannelInterface, EventEmitterInterface
/** @var Buffer */
private $bodyBuffer;

/** @var int */
private $state = ChannelStateEnum::READY;
private ChannelState $state = ChannelState::Ready;

/** @var int */
private $mode = ChannelModeEnum::REGULAR;
private ChannelMode $mode = ChannelMode::Regular;

/** @var Deferred */
private $closeDeferred;
Expand Down Expand Up @@ -119,10 +117,8 @@ public function getChannelId(): int

/**
* Returns the channel mode.
*
* @return int
*/
public function getMode(): int
public function getMode(): ChannelMode
{
return $this->mode;
}
Expand Down Expand Up @@ -165,7 +161,7 @@ public function removeReturnListener(callable $callback)
*/
public function addAckListener(callable $callback)
{
if ($this->mode !== ChannelModeEnum::CONFIRM) {
if ($this->mode !== ChannelMode::Confirm) {
throw new ChannelException("Ack/nack listener can be added when channel in confirm mode.");
}

Expand All @@ -182,7 +178,7 @@ public function addAckListener(callable $callback)
*/
public function removeAckListener(callable $callback)
{
if ($this->mode !== ChannelModeEnum::CONFIRM) {
if ($this->mode !== ChannelMode::Confirm) {
throw new ChannelException("Ack/nack listener can be removed when channel in confirm mode.");
}

Expand All @@ -202,16 +198,16 @@ public function removeAckListener(callable $callback)
*/
public function close(int $replyCode = 0, string $replyText = ""): void
{
if ($this->state === ChannelStateEnum::CLOSED) {
if ($this->state === ChannelState::Closed) {
throw new ChannelException("Trying to close already closed channel #{$this->channelId}.");
}

if ($this->state === ChannelStateEnum::CLOSING) {
if ($this->state === ChannelState::Closing) {
await($this->closePromise);
return;
}

$this->state = ChannelStateEnum::CLOSING;
$this->state = ChannelState::Closing;

$this->connection->channelClose($this->channelId, $replyCode, 0, 0, $replyText);
$this->closeDeferred = new Deferred();
Expand Down Expand Up @@ -279,12 +275,12 @@ public function get(string $queue = "", bool $noAck = false): Message|null
return null;

} elseif ($response instanceof MethodBasicGetOkFrame) {
$this->state = ChannelStateEnum::AWAITING_HEADER;
$this->state = ChannelState::AwaitingHeader;

$headerFrame = $this->connection->awaitContentHeader($this->getChannelId());
$this->headerFrame = $headerFrame;
$this->bodySizeRemaining = $headerFrame->bodySize;
$this->state = ChannelStateEnum::AWAITING_BODY;
$this->state = ChannelState::AwaitingBody;

while ($this->bodySizeRemaining > 0) {
$bodyFrame = $this->connection->awaitContentBody($this->getChannelId());
Expand All @@ -293,13 +289,13 @@ public function get(string $queue = "", bool $noAck = false): Message|null
$this->bodySizeRemaining -= $bodyFrame->payloadSize;

if ($this->bodySizeRemaining < 0) {
$this->state = ChannelStateEnum::ERROR;
$this->state = ChannelState::Error;
$this->connection->disconnect(Constants::STATUS_SYNTAX_ERROR, $errorMessage = "Body overflow, received " . (-$this->bodySizeRemaining) . " more bytes.");
throw new ChannelException($errorMessage);
}
}

$this->state = ChannelStateEnum::READY;
$this->state = ChannelState::Ready;

$message = new Message(
null,
Expand Down Expand Up @@ -327,7 +323,7 @@ public function publish($body, array $headers = [], string $exchange = '', strin
{
$response = $this->publishImpl($body, $headers, $exchange, $routingKey, $mandatory, $immediate);

if ($this->mode === ChannelModeEnum::CONFIRM) {
if ($this->mode === ChannelMode::Confirm) {
return ++$this->deliveryTag;
} else {
return $response;
Expand All @@ -349,12 +345,12 @@ public function cancel(string $consumerTag, bool $nowait = false): bool|\Bunny\P
*/
public function txSelect(): \Bunny\Protocol\MethodTxSelectOkFrame
{
if ($this->mode !== ChannelModeEnum::REGULAR) {
if ($this->mode !== ChannelMode::Regular) {
throw new ChannelException("Channel not in regular mode, cannot change to transactional mode.");
}

$response = $this->txSelectImpl();
$this->mode = ChannelModeEnum::TRANSACTIONAL;
$this->mode = ChannelMode::Transactional;

return $response;
}
Expand All @@ -364,7 +360,7 @@ public function txSelect(): \Bunny\Protocol\MethodTxSelectOkFrame
*/
public function txCommit(): \Bunny\Protocol\MethodTxCommitOkFrame
{
if ($this->mode !== ChannelModeEnum::TRANSACTIONAL) {
if ($this->mode !== ChannelMode::Transactional) {
throw new ChannelException("Channel not in transactional mode, cannot call 'tx.commit'.");
}

Expand All @@ -376,7 +372,7 @@ public function txCommit(): \Bunny\Protocol\MethodTxCommitOkFrame
*/
public function txRollback(): \Bunny\Protocol\MethodTxRollbackOkFrame
{
if ($this->mode !== ChannelModeEnum::TRANSACTIONAL) {
if ($this->mode !== ChannelMode::Transactional) {
throw new ChannelException("Channel not in transactional mode, cannot call 'tx.rollback'.");
}

Expand All @@ -388,7 +384,7 @@ public function txRollback(): \Bunny\Protocol\MethodTxRollbackOkFrame
*/
public function confirmSelect(callable $callback = null, bool $nowait = false): \Bunny\Protocol\MethodConfirmSelectOkFrame
{
if ($this->mode !== ChannelModeEnum::REGULAR) {
if ($this->mode !== ChannelMode::Regular) {
throw new ChannelException("Channel not in regular mode, cannot change to transactional mode.");
}

Expand All @@ -400,7 +396,7 @@ public function confirmSelect(callable $callback = null, bool $nowait = false):

private function enterConfirmMode(callable $callback = null): void
{
$this->mode = ChannelModeEnum::CONFIRM;
$this->mode = ChannelMode::Confirm;
$this->deliveryTag = 0;

if ($callback) {
Expand All @@ -415,26 +411,26 @@ private function enterConfirmMode(callable $callback = null): void
*/
public function onFrameReceived(AbstractFrame $frame): void
{
if ($this->state === ChannelStateEnum::ERROR) {
if ($this->state === ChannelState::Error) {
throw new ChannelException("Channel in error state.");
}

if ($this->state === ChannelStateEnum::CLOSED) {
if ($this->state === ChannelState::Closed) {
throw new ChannelException("Received frame #{$frame->type} on closed channel #{$this->channelId}.");
}

if ($frame instanceof MethodFrame) {
if ($this->state === ChannelStateEnum::CLOSING && !($frame instanceof MethodChannelCloseOkFrame)) {
if ($this->state === ChannelState::Closing && !($frame instanceof MethodChannelCloseOkFrame)) {
// drop frames in closing state
return;

} elseif ($this->state !== ChannelStateEnum::READY && !($frame instanceof MethodChannelCloseOkFrame)) {
} elseif ($this->state !== ChannelState::Ready && !($frame instanceof MethodChannelCloseOkFrame)) {
$currentState = $this->state;
$this->state = ChannelStateEnum::ERROR;
$this->state = ChannelState::Error;

if ($currentState === ChannelStateEnum::AWAITING_HEADER) {
if ($currentState === ChannelState::AwaitingHeader) {
$msg = "Got method frame, expected header frame.";
} elseif ($currentState === ChannelStateEnum::AWAITING_BODY) {
} elseif ($currentState === ChannelState::AwaitingBody) {
$msg = "Got method frame, expected body frame.";
} else {
throw new \LogicException("Unhandled channel state.");
Expand All @@ -446,7 +442,7 @@ public function onFrameReceived(AbstractFrame $frame): void
}

if ($frame instanceof MethodChannelCloseOkFrame) {
$this->state = ChannelStateEnum::CLOSED;
$this->state = ChannelState::Closed;

if ($this->closeDeferred !== null) {
$this->closeDeferred->resolve($this->channelId);
Expand All @@ -459,11 +455,11 @@ public function onFrameReceived(AbstractFrame $frame): void

} elseif ($frame instanceof MethodBasicReturnFrame) {
$this->returnFrame = $frame;
$this->state = ChannelStateEnum::AWAITING_HEADER;
$this->state = ChannelState::AwaitingHeader;

} elseif ($frame instanceof MethodBasicDeliverFrame) {
$this->deliverFrame = $frame;
$this->state = ChannelStateEnum::AWAITING_HEADER;
$this->state = ChannelState::AwaitingHeader;

} elseif ($frame instanceof MethodBasicAckFrame) {
foreach ($this->ackCallbacks as $callback) {
Expand All @@ -482,17 +478,17 @@ public function onFrameReceived(AbstractFrame $frame): void
}

} elseif ($frame instanceof ContentHeaderFrame) {
if ($this->state === ChannelStateEnum::CLOSING) {
if ($this->state === ChannelState::Closing) {
// drop frames in closing state
return;

} elseif ($this->state !== ChannelStateEnum::AWAITING_HEADER) {
} elseif ($this->state !== ChannelState::AwaitingHeader) {
$currentState = $this->state;
$this->state = ChannelStateEnum::ERROR;
$this->state = ChannelState::Error;

if ($currentState === ChannelStateEnum::READY) {
if ($currentState === ChannelState::Ready) {
$msg = "Got header frame, expected method frame.";
} elseif ($currentState === ChannelStateEnum::AWAITING_BODY) {
} elseif ($currentState === ChannelState::AwaitingBody) {
$msg = "Got header frame, expected content frame.";
} else {
throw new \LogicException("Unhandled channel state.");
Expand All @@ -507,24 +503,24 @@ public function onFrameReceived(AbstractFrame $frame): void
$this->bodySizeRemaining = $frame->bodySize;

if ($this->bodySizeRemaining > 0) {
$this->state = ChannelStateEnum::AWAITING_BODY;
$this->state = ChannelState::AwaitingBody;
} else {
$this->state = ChannelStateEnum::READY;
$this->state = ChannelState::Ready;
$this->onBodyComplete();
}

} elseif ($frame instanceof ContentBodyFrame) {
if ($this->state === ChannelStateEnum::CLOSING) {
if ($this->state === ChannelState::Closing) {
// drop frames in closing state
return;

} elseif ($this->state !== ChannelStateEnum::AWAITING_BODY) {
} elseif ($this->state !== ChannelState::AwaitingBody) {
$currentState = $this->state;
$this->state = ChannelStateEnum::ERROR;
$this->state = ChannelState::Error;

if ($currentState === ChannelStateEnum::READY) {
if ($currentState === ChannelState::Ready) {
$msg = "Got body frame, expected method frame.";
} elseif ($currentState === ChannelStateEnum::AWAITING_HEADER) {
} elseif ($currentState === ChannelState::AwaitingHeader) {
$msg = "Got body frame, expected header frame.";
} else {
throw new \LogicException("Unhandled channel state.");
Expand All @@ -539,11 +535,11 @@ public function onFrameReceived(AbstractFrame $frame): void
$this->bodySizeRemaining -= $frame->payloadSize;

if ($this->bodySizeRemaining < 0) {
$this->state = ChannelStateEnum::ERROR;
$this->state = ChannelState::Error;
$this->connection->disconnect(Constants::STATUS_SYNTAX_ERROR, "Body overflow, received " . (-$this->bodySizeRemaining) . " more bytes.");

} elseif ($this->bodySizeRemaining === 0) {
$this->state = ChannelStateEnum::READY;
$this->state = ChannelState::Ready;
$this->onBodyComplete();
}

Expand Down
5 changes: 2 additions & 3 deletions src/ChannelInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ interface ChannelInterface
{
/**
* Returns the channel mode.
*
* @return int
*/
public function getMode(): int;
public function getMode(): ChannelMode;

/**
* Listener is called whenever 'basic.return' frame is received with arguments (Message $returnedMessage, MethodBasicReturnFrame $frame)
*
Expand Down
11 changes: 4 additions & 7 deletions src/ChannelModeEnum.php → src/ChannelMode.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,21 @@
*
* @author Jakub Kulhan <jakub.kulhan@gmail.com>
*/
final class ChannelModeEnum
enum ChannelMode
{

/**
* Regular AMQP guarantees of published messages delivery.
*/
const REGULAR = 1;
case Regular;

/**
* Messages are published after 'tx.commit'.
*/
const TRANSACTIONAL = 2;
case Transactional;

/**
* Broker sends asynchronously 'basic.ack's for delivered messages.
*/
const CONFIRM = 3;



case Confirm;
}
16 changes: 7 additions & 9 deletions src/ChannelStateEnum.php → src/ChannelState.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,35 @@
*
* @author Jakub Kulhan <jakub.kulhan@gmail.com>
*/
final class ChannelStateEnum
enum ChannelState
{

/**
* Channel is ready to receive messages.
*/
const READY = 1;
case Ready;

/**
* Channel got method that is followed by header/content frames and now waits for header frame.
*/
const AWAITING_HEADER = 2;
case AwaitingHeader;

/**
* Channel got method and header frame and now waits for body frame.
*/
const AWAITING_BODY = 3;
case AwaitingBody;

/**
* An error occurred on channel.
*/
const ERROR = 4;
case Error;

/**
* Channel is being closed.
*/
const CLOSING = 5;
case Closing;

/**
* Channel has received channel.close-ok frame.
*/
const CLOSED = 6;

case Closed;
}
Loading

0 comments on commit ad5b979

Please sign in to comment.