Skip to content

Commit

Permalink
Refactor to move command queuing logic to MysqlClient
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Dec 2, 2023
1 parent 81a08e5 commit a398f6e
Show file tree
Hide file tree
Showing 6 changed files with 1,269 additions and 99 deletions.
18 changes: 17 additions & 1 deletion src/Io/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class Connection extends EventEmitter
*/
private $stream;

/** @var Parser */
private $parser;

/** @var LoopInterface */
private $loop;

Expand All @@ -57,13 +60,15 @@ class Connection extends EventEmitter
*
* @param SocketConnectionInterface $stream
* @param Executor $executor
* @param Parser $parser
* @param LoopInterface $loop
* @param ?float $idlePeriod
*/
public function __construct(SocketConnectionInterface $stream, Executor $executor, LoopInterface $loop, $idlePeriod)
public function __construct(SocketConnectionInterface $stream, Executor $executor, Parser $parser, LoopInterface $loop, $idlePeriod)
{
$this->stream = $stream;
$this->executor = $executor;
$this->parser = $parser;

$this->loop = $loop;
if ($idlePeriod !== null) {
Expand All @@ -74,6 +79,17 @@ public function __construct(SocketConnectionInterface $stream, Executor $executo
$stream->on('close', [$this, 'handleConnectionClosed']);
}

/**
* busy executing some command such as query or ping
*
* @return bool
* @throws void
*/
public function isBusy()
{
return $this->parser->isBusy() || !$this->executor->isIdle();
}

/**
* {@inheritdoc}
*/
Expand Down
2 changes: 1 addition & 1 deletion src/Io/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public function createConnection(
$executor = new Executor();
$parser = new Parser($stream, $executor);

$connection = new Connection($stream, $executor, $this->loop, $idlePeriod);
$connection = new Connection($stream, $executor, $parser, $this->loop, $idlePeriod);
$command = $executor->enqueue($authCommand);
$parser->start();

Expand Down
11 changes: 11 additions & 0 deletions src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ public function __construct(DuplexStreamInterface $stream, Executor $executor)
});
}

/**
* busy executing some command such as query or ping
*
* @return bool
* @throws void
*/
public function isBusy()
{
return $this->currCommand !== null;
}

public function start()
{
$this->stream->on('data', [$this, 'handleData']);
Expand Down
150 changes: 105 additions & 45 deletions src/MysqlClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use React\EventLoop\LoopInterface;
use React\Mysql\Io\Connection;
use React\Mysql\Io\Factory;
use React\Promise\Deferred;
use React\Promise\Promise;
use React\Socket\ConnectorInterface;
use React\Stream\ReadableStreamInterface;
Expand Down Expand Up @@ -58,6 +59,13 @@ class MysqlClient extends EventEmitter
/** @var ?Connection */
private $connection;

/**
* array of outstanding connection requests to send next commands once a connection becomes ready
*
* @var array<int,Deferred<Connection>>
*/
private $pending = [];

/**
* set to true only between calling `quit()` and the connection closing in response
*
Expand All @@ -77,44 +85,6 @@ public function __construct(
$this->uri = $uri;
}

/**
* @return PromiseInterface<Connection>
*/
private function getConnection()
{
// happy path: reuse existing connection unless it is already closing after an idle timeout
if ($this->connection !== null && ($this->quitting || $this->connection->state !== Connection::STATE_CLOSING)) {
return \React\Promise\resolve($this->connection);
}

if ($this->connecting !== null) {
return $this->connecting;
}

// force-close connection if still waiting for previous disconnection
if ($this->connection !== null) {
assert($this->connection->state === Connection::STATE_CLOSING);
$this->connection->close();
}

// create new connection if not already connected or connecting
$this->connecting = $connecting = $this->factory->createConnection($this->uri);
$this->connecting->then(function (Connection $connection) {
$this->connection = $connection;
$this->connecting = null;

// connection completed => remember only until closed
$connection->on('close', function () {
$this->connection = null;
});
}, function () {
// connection failed => discard connection attempt
$this->connecting = null;
});

return $connecting;
}

/**
* Performs an async query.
*
Expand Down Expand Up @@ -176,12 +146,18 @@ private function getConnection()
*/
public function query($sql, array $params = [])
{
if ($this->closed) {
if ($this->closed || $this->quitting) {
return \React\Promise\reject(new Exception('Connection closed'));
}

return $this->getConnection()->then(function (Connection $connection) use ($sql, $params) {
return $connection->query($sql, $params);
return $connection->query($sql, $params)->then(function (MysqlResult $result) use ($connection) {
$this->handleConnectionReady($connection);
return $result;
}, function (\Exception $e) use ($connection) {
$this->handleConnectionReady($connection);
throw $e;
});
});
}

Expand Down Expand Up @@ -246,13 +222,22 @@ public function query($sql, array $params = [])
*/
public function queryStream($sql, $params = [])
{
if ($this->closed) {
if ($this->closed || $this->quitting) {
throw new Exception('Connection closed');
}

return \React\Promise\Stream\unwrapReadable(
$this->getConnection()->then(function (Connection $connection) use ($sql, $params) {
return $connection->queryStream($sql, $params);
$stream = $connection->queryStream($sql, $params);

$stream->on('end', function () use ($connection) {
$this->handleConnectionReady($connection);
});
$stream->on('error', function () use ($connection) {
$this->handleConnectionReady($connection);
});

return $stream;
})
);
}
Expand All @@ -279,12 +264,17 @@ public function queryStream($sql, $params = [])
*/
public function ping()
{
if ($this->closed) {
if ($this->closed || $this->quitting) {
return \React\Promise\reject(new Exception('Connection closed'));
}

return $this->getConnection()->then(function (Connection $connection) {
return $connection->ping();
return $connection->ping()->then(function () use ($connection) {
$this->handleConnectionReady($connection);
}, function (\Exception $e) use ($connection) {
$this->handleConnectionReady($connection);
throw $e;
});
});
}

Expand Down Expand Up @@ -312,7 +302,7 @@ public function ping()
*/
public function quit()
{
if ($this->closed) {
if ($this->closed || $this->quitting) {
return \React\Promise\reject(new Exception('Connection closed'));
}

Expand Down Expand Up @@ -379,7 +369,77 @@ public function close()
$this->connecting = null;
}

// clear all outstanding commands
foreach ($this->pending as $deferred) {
$deferred->reject(new \RuntimeException('Connection closed'));
}
$this->pending = [];

$this->emit('close');
$this->removeAllListeners();
}


/**
* @return PromiseInterface<Connection>
*/
private function getConnection()
{
$deferred = new Deferred();

// force-close connection if still waiting for previous disconnection due to idle timer
if ($this->connection !== null && $this->connection->state === Connection::STATE_CLOSING) {
$this->connection->close();
$this->connection = null;
}

// happy path: reuse existing connection unless it is currently busy executing another command
if ($this->connection !== null && !$this->connection->isBusy()) {
$deferred->resolve($this->connection);
return $deferred->promise();
}

// queue pending connection request until connection becomes ready
$this->pending[] = $deferred;

// create new connection if not already connected or connecting
if ($this->connection === null && $this->connecting === null) {
$this->connecting = $this->factory->createConnection($this->uri);
$this->connecting->then(function (Connection $connection) {
// connection completed => remember only until closed
$this->connecting = null;
$this->connection = $connection;
$connection->on('close', function () {
$this->connection = null;
});

// handle first command from queue when connection is ready
$this->handleConnectionReady($connection);
}, function (\Exception $e) {
// connection failed => discard connection attempt
$this->connecting = null;

foreach ($this->pending as $key => $deferred) {
$deferred->reject($e);
unset($this->pending[$key]);
}
});
}

return $deferred->promise();
}

private function handleConnectionReady(Connection $connection)
{
$deferred = \reset($this->pending);
if ($deferred === false) {
// nothing to do if there are no outstanding connection requests
return;
}

assert($deferred instanceof Deferred);
unset($this->pending[\key($this->pending)]);

$deferred->resolve($connection);
}
}
Loading

0 comments on commit a398f6e

Please sign in to comment.