Implement scheduled tasks

This commit is contained in:
Kijin Sung 2024-12-12 00:49:18 +09:00
parent 2f0ec84cc2
commit 53cd6e807d
7 changed files with 419 additions and 122 deletions

View file

@ -25,7 +25,7 @@ class Queue
}
/**
* Get the default driver.
* Get a Queue driver instance.
*
* @param string $name
* @return ?Drivers\QueueInterface
@ -49,6 +49,16 @@ class Queue
}
}
/**
* Get the DB driver instance, for managing scheduled tasks.
*
* @return Drivers\Queue\DB
*/
public static function getDbDriver(): Drivers\Queue\DB
{
return self::getDriver('db');
}
/**
* Get the list of supported Queue drivers.
*
@ -86,7 +96,9 @@ class Queue
}
/**
* Add a task.
* Add a task to the queue.
*
* The queued task will be executed as soon as possible.
*
* The handler can be in one of the following formats:
* - Global function, e.g. myHandler
@ -128,29 +140,58 @@ class Queue
}
/**
* Get the first task to execute immediately.
* Add a task to be executed at a specific time.
*
* If no tasks are pending, this method will return null.
* Detailed scheduling of tasks will be handled by each driver.
* The queued task will be executed once at the configured time.
* The rest is identical to addTask().
*
* @param int $blocking
* @return ?object
* @param int $time
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public static function getTask(int $blocking = 0): ?object
public static function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int
{
$driver_name = config('queue.driver');
if (!$driver_name)
if (!config('queue.enabled'))
{
throw new Exceptions\FeatureDisabled('Queue not configured');
}
$driver = self::getDriver($driver_name);
if (!$driver)
// This feature always uses the DB driver.
$driver = self::getDbDriver();
return $driver->addTaskAt($time, $handler, $args, $options);
}
/**
* Add a task to be executed at an interval.
*
* The queued task will be executed repeatedly at the scheduled interval.
* The synax for specifying the interval is the same as crontab.
* The rest is identical to addTask().
*
* @param string $interval
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public static function addTaskAtInterval(string $interval, string $handler, ?object $args = null, ?object $options = null): int
{
if (!config('queue.enabled'))
{
throw new Exceptions\FeatureDisabled('Queue not configured');
}
return $driver->getTask($blocking);
// Validate the interval syntax.
if (!self::checkIntervalSyntax($interval))
{
throw new Exceptions\InvalidRequest('Invalid interval syntax: ' . $interval);
}
// This feature always uses the DB driver.
$driver = self::getDbDriver();
return $driver->addTaskAtInterval($interval, $handler, $args, $options);
}
/**
@ -165,7 +206,80 @@ class Queue
}
/**
* Process the queue.
* Check the interval syntax.
*
* This method returns true if the interval string is well-formed,
* and false otherwise. However, it does not check that all the numbers
* are in the correct range (e.g. 0-59 for minutes).
*
* @param string $interval
* @return bool
*/
public static function checkIntervalSyntax(string $interval): bool
{
$parts = preg_split('/\s+/', $interval);
if (!$parts || count($parts) !== 5)
{
return false;
}
foreach ($parts as $part)
{
if (!preg_match('!^(?:\\*(?:/\d+)?|\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*)$!', $part))
{
return false;
}
}
return true;
}
/**
* Parse an interval string check it against a timestamp.
*
* This method returns true if the interval covers the given timestamp,
* and false otherwise.
*
* @param string $interval
* @param ?int $time
* @return bool
*/
public static function parseInterval(string $interval, ?int $time): bool
{
$parts = preg_split('/\s+/', $interval);
if (!$parts || count($parts) !== 5)
{
return false;
}
$current_time = explode(' ', ltrim(date('i G j n N', $time ?? time()), '0'));
foreach ($parts as $i => $part)
{
$subparts = explode(',', $part);
foreach ($subparts as $subpart)
{
if ($subpart === '*' || $subpart === $current_time[$i])
{
continue 2;
}
if ($subpart === '7' && $i === 4 && $current_time[$i] === '0')
{
continue 2;
}
if (preg_match('!^\\*/(\d+)?$!', $subpart, $matches) && ($div = intval($matches[1])) && (intval($current_time[$i]) % $div === 0))
{
continue 2;
}
if (preg_match('!^(\d+)-(\d+)$!', $subpart, $matches) && $current_time[$i] >= intval($matches[1]) && $current_time[$i] <= intval($matches[2]))
{
continue 2;
}
}
return false;
}
return true;
}
/**
* Process queued and scheduled tasks.
*
* This will usually be called by a separate script, run every minute
* through an external scheduler such as crontab or systemd.
@ -173,115 +287,54 @@ class Queue
* If you are on a shared hosting service, you may also call a URL
* using a "web cron" service provider.
*
* @param int $index
* @param int $count
* @param int $timeout
* @return void
*/
public static function process(int $timeout): void
public static function process(int $index, int $count, int $timeout): void
{
// This part will run in a loop until timeout.
$process_start_time = microtime(true);
// Get default driver instance.
$driver_name = config('queue.driver');
$driver = self::getDriver($driver_name);
if (!$driver_name || !$driver)
{
throw new Exceptions\FeatureDisabled('Queue not configured');
}
// Process scheduled tasks.
if ($index === 0)
{
$db_driver = self::getDbDriver();
$tasks = $db_driver->getScheduledTasks('once');
foreach ($tasks as $task)
{
self::_executeTask($task);
}
}
if ($index === 1 || $count < 2)
{
$db_driver = self::getDbDriver();
$tasks = $db_driver->getScheduledTasks('interval');
foreach ($tasks as $task)
{
$db_driver->updateLastRunTimestamp($task->id);
self::_executeTask($task);
}
}
// Process queued tasks.
while (true)
{
// Get a task from the driver.
// Get a task from the driver, with a 1 second delay at maximum.
$loop_start_time = microtime(true);
$task = self::getTask(1);
// Wait 1 second and loop back.
$task = $driver->getNextTask(1);
if ($task)
{
// Find the handler for the task.
$task->handler = trim($task->handler, '\\()');
$handler = null;
try
{
if (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)$/', $task->handler, $matches))
{
$class_name = '\\' . $matches[1];
$method_name = $matches[2];
if (class_exists($class_name) && method_exists($class_name, $method_name))
{
$handler = [$class_name, $method_name];
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
elseif (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
{
$class_name = '\\' . $matches[1];
$initializer_name = $matches[2];
$method_name = $matches[3];
if (class_exists($class_name) && method_exists($class_name, $initializer_name))
{
$obj = $class_name::$initializer_name();
if (method_exists($obj, $method_name))
{
$handler = [$obj, $method_name];
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
elseif (preg_match('/^new (?:\\\\)?([\\\\\\w]+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
{
$class_name = '\\' . $matches[1];
$method_name = $matches[2];
if (class_exists($class_name) && method_exists($class_name, $method_name))
{
$obj = new $class_name();
$handler = [$obj, $method_name];
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
else
{
if (function_exists('\\' . $task->handler))
{
$handler = '\\' . $task->handler;
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
}
catch (\Throwable $th)
{
error_log(vsprintf('RxQueue: task handler %s could not be accessed: %s in %s:%d', [
$task->handler,
get_class($th),
$th->getFile(),
$th->getLine(),
]));
}
// Call the handler.
try
{
if ($handler)
{
call_user_func($handler, $task->args, $task->options);
}
}
catch (\Throwable $th)
{
error_log(vsprintf('RxQueue: task handler %s threw %s in %s:%d', [
$task->handler,
get_class($th),
$th->getFile(),
$th->getLine(),
]));
}
self::_executeTask($task);
}
// If the timeout is imminent, break the loop.
@ -299,4 +352,107 @@ class Queue
}
}
}
/**
* Execute a task.
*
* @param object $task
* @return void
*/
protected static function _executeTask(object $task): void
{
// Find the handler for the task.
$task->handler = trim($task->handler, '\\()');
$handler = null;
try
{
if (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)$/', $task->handler, $matches))
{
$class_name = '\\' . $matches[1];
$method_name = $matches[2];
if (class_exists($class_name) && method_exists($class_name, $method_name))
{
$handler = [$class_name, $method_name];
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
elseif (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
{
$class_name = '\\' . $matches[1];
$initializer_name = $matches[2];
$method_name = $matches[3];
if (class_exists($class_name) && method_exists($class_name, $initializer_name))
{
$obj = $class_name::$initializer_name();
if (method_exists($obj, $method_name))
{
$handler = [$obj, $method_name];
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
elseif (preg_match('/^new (?:\\\\)?([\\\\\\w]+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
{
$class_name = '\\' . $matches[1];
$method_name = $matches[2];
if (class_exists($class_name) && method_exists($class_name, $method_name))
{
$obj = new $class_name();
$handler = [$obj, $method_name];
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
else
{
if (function_exists('\\' . $task->handler))
{
$handler = '\\' . $task->handler;
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
}
catch (\Throwable $th)
{
error_log(vsprintf('RxQueue: task handler %s could not be accessed: %s in %s:%d', [
$task->handler,
get_class($th),
$th->getFile(),
$th->getLine(),
]));
}
// Call the handler.
try
{
if ($handler)
{
call_user_func($handler, $task->args, $task->options);
}
}
catch (\Throwable $th)
{
error_log(vsprintf('RxQueue: task handler %s threw %s in %s:%d', [
$task->handler,
get_class($th),
$th->getFile(),
$th->getLine(),
]));
}
}
}

View file

@ -62,10 +62,10 @@ interface QueueInterface
public function addTask(string $handler, ?object $args = null, ?object $options = null): int;
/**
* Get the first task.
* Get the next task from the queue.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object;
public function getNextTask(int $blocking = 0): ?object;
}

View file

@ -4,6 +4,7 @@ namespace Rhymix\Framework\Drivers\Queue;
use Rhymix\Framework\DB as RFDB;
use Rhymix\Framework\Drivers\QueueInterface;
use Rhymix\Framework\Queue;
/**
* The DB queue driver.
@ -99,12 +100,68 @@ class DB implements QueueInterface
}
/**
* Get the first task.
* Add a task to be executed at a specific time.
*
* @param int $time
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int
{
$oDB = RFDB::getInstance();
$stmt = $oDB->prepare(trim(<<<END
INSERT INTO task_schedule
(type, first_run, handler, args, options, regdate)
VALUES (?, ?, ?, ?, ?, ?)
END));
$result = $stmt->execute([
'once',
date('Y-m-d H:i:s', $time),
$handler,
serialize($args),
serialize($options),
date('Y-m-d H:i:s'),
]);
return $result ? $oDB->getInsertID() : 0;
}
/**
* Add a task to be executed at an interval.
*
* @param string $interval
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public function addTaskAtInterval(string $interval, string $handler, ?object $args = null, ?object $options = null): int
{
$oDB = RFDB::getInstance();
$stmt = $oDB->prepare(trim(<<<END
INSERT INTO task_schedule
(type, `interval`, handler, args, options, regdate)
VALUES (?, ?, ?, ?, ?, ?)
END));
$result = $stmt->execute([
'interval',
$interval,
$handler,
serialize($args),
serialize($options),
date('Y-m-d H:i:s'),
]);
return $result ? $oDB->getInsertID() : 0;
}
/**
* Get the next task from the queue.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object
public function getNextTask(int $blocking = 0): ?object
{
$oDB = RFDB::getInstance();
$oDB->beginTransaction();
@ -128,4 +185,76 @@ class DB implements QueueInterface
return null;
}
}
/**
* Get scheduled tasks.
*
* @param string $type
* @return array
*/
public function getScheduledTasks(string $type): array
{
$oDB = RFDB::getInstance();
$tasks = [];
$ids = [];
// Get tasks to be executed once at the current time.
if ($type === 'once')
{
$oDB->beginTransaction();
$stmt = $oDB->query("SELECT * FROM task_schedule WHERE `type` = 'once' AND `first_run` <= ? ORDER BY id FOR UPDATE", [$timestamp]);
while ($task = $stmt->fetchObject())
{
$task->args = unserialize($task->args);
$task->options = unserialize($task->options);
$tasks[] = $task;
$ids[] = $task->id;
}
if (count($ids))
{
$stmt = $oDB->prepare('DELETE FROM task_schedule WHERE id IN (' . implode(', ', array_fill(0, count($ids), '?')) . ')');
$stmt->execute($ids);
}
$oDB->commit();
}
// Get tasks to be executed at an interval.
if ($type === 'interval')
{
$stmt = $oDB->query("SELECT id, `interval` FROM task_schedule WHERE `type` = 'interval' ORDER BY id");
while ($task = $stmt->fetchObject())
{
if (Queue::parseInterval($task->interval, \RX_TIME))
{
$ids[] = $task->id;
}
}
if (count($ids))
{
$stmt = $oDB->prepare('SELECT * FROM task_schedule WHERE id IN (' . implode(', ', array_fill(0, count($ids), '?')) . ')');
$stmt->execute($ids);
while ($task = $stmt->fetchObject())
{
$task->args = unserialize($task->args);
$task->options = unserialize($task->options);
$tasks[] = $task;
}
}
}
return $tasks;
}
/**
* Update the last executed timestamp of a scheduled task.
*
* @param int $id
* @return void
*/
public function updateLastRunTimestamp(int $id): void
{
$oDB = RFDB::getInstance();
$stmt = $oDB->prepare('UPDATE task_schedule SET last_run = ?, run_count = run_count + 1 WHERE id = ?');
$stmt->execute([date('Y-m-d H:i:s'), $id]);
}
}

View file

@ -105,12 +105,12 @@ class Dummy implements QueueInterface
}
/**
* Get the first task.
* Get the next task from the queue.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object
public function getNextTask(int $blocking = 0): ?object
{
$result = $this->_dummy_queue;
$this->_dummy_queue = null;

View file

@ -155,12 +155,12 @@ class Redis implements QueueInterface
}
/**
* Get the first task.
* Get the next task from the queue.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object
public function getNextTask(int $blocking = 0): ?object
{
if ($this->_conn)
{

View file

@ -73,7 +73,7 @@ if (PHP_SAPI === 'cli' && $process_count > 1 && function_exists('pcntl_fork') &&
}
elseif ($pid == 0)
{
Rhymix\Framework\Queue::process($timeout);
Rhymix\Framework\Queue::process($i, $process_count, $timeout);
exit;
}
else
@ -96,7 +96,7 @@ if (PHP_SAPI === 'cli' && $process_count > 1 && function_exists('pcntl_fork') &&
}
else
{
Rhymix\Framework\Queue::process($timeout);
Rhymix\Framework\Queue::process(0, 1, $timeout);
}
// If called over the network, display a simple OK message to indicate success.

View file

@ -0,0 +1,12 @@
<table name="task_schedule">
<column name="id" type="bigint" notnull="notnull" primary_key="primary_key" auto_increment="auto_increment" />
<column name="type" type="varchar" size="40" index="idx_type" />
<column name="interval" type="varchar" size="191" index="idx_interval" />
<column name="first_run" type="datetime" index="idx_first_run" />
<column name="last_run" type="datetime" index="idx_last_run" />
<column name="run_count" type="bigint" notnull="notnull" default="0" index="idx_run_count" />
<column name="handler" type="varchar" size="191" notnull="notnull" />
<column name="args" type="longtext" notnull="notnull" />
<column name="options" type="longtext" notnull="notnull" />
<column name="regdate" type="datetime" notnull="notnull" index="idx_regdate" />
</table>