diff --git a/common/framework/Queue.php b/common/framework/Queue.php index 56410e13f..98aa15ce7 100644 --- a/common/framework/Queue.php +++ b/common/framework/Queue.php @@ -25,7 +25,7 @@ class Queue } /** - * Get the default driver. + * Get a Queue driver instance. * * @param string $name * @return ?Drivers\QueueInterface @@ -49,6 +49,16 @@ class Queue } } + /** + * Get the DB driver instance, for managing scheduled tasks. + * + * @return Drivers\Queue\DB + */ + public static function getDbDriver(): Drivers\Queue\DB + { + return self::getDriver('db'); + } + /** * Get the list of supported Queue drivers. * @@ -86,7 +96,9 @@ class Queue } /** - * Add a task. + * Add a task to the queue. + * + * The queued task will be executed as soon as possible. * * The handler can be in one of the following formats: * - Global function, e.g. myHandler @@ -128,29 +140,58 @@ class Queue } /** - * Get the first task to execute immediately. + * Add a task to be executed at a specific time. * - * If no tasks are pending, this method will return null. - * Detailed scheduling of tasks will be handled by each driver. + * The queued task will be executed once at the configured time. + * The rest is identical to addTask(). * - * @param int $blocking - * @return ?object + * @param int $time + * @param string $handler + * @param ?object $args + * @param ?object $options + * @return int */ - public static function getTask(int $blocking = 0): ?object + public static function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int { - $driver_name = config('queue.driver'); - if (!$driver_name) + if (!config('queue.enabled')) { throw new Exceptions\FeatureDisabled('Queue not configured'); } - $driver = self::getDriver($driver_name); - if (!$driver) + // This feature always uses the DB driver. + $driver = self::getDbDriver(); + return $driver->addTaskAt($time, $handler, $args, $options); + } + + /** + * Add a task to be executed at an interval. + * + * The queued task will be executed repeatedly at the scheduled interval. + * The synax for specifying the interval is the same as crontab. + * The rest is identical to addTask(). + * + * @param string $interval + * @param string $handler + * @param ?object $args + * @param ?object $options + * @return int + */ + public static function addTaskAtInterval(string $interval, string $handler, ?object $args = null, ?object $options = null): int + { + if (!config('queue.enabled')) { throw new Exceptions\FeatureDisabled('Queue not configured'); } - return $driver->getTask($blocking); + // Validate the interval syntax. + if (!self::checkIntervalSyntax($interval)) + { + throw new Exceptions\InvalidRequest('Invalid interval syntax: ' . $interval); + } + + // This feature always uses the DB driver. + $driver = self::getDbDriver(); + return $driver->addTaskAtInterval($interval, $handler, $args, $options); } /** @@ -165,7 +206,80 @@ class Queue } /** - * Process the queue. + * Check the interval syntax. + * + * This method returns true if the interval string is well-formed, + * and false otherwise. However, it does not check that all the numbers + * are in the correct range (e.g. 0-59 for minutes). + * + * @param string $interval + * @return bool + */ + public static function checkIntervalSyntax(string $interval): bool + { + $parts = preg_split('/\s+/', $interval); + if (!$parts || count($parts) !== 5) + { + return false; + } + foreach ($parts as $part) + { + if (!preg_match('!^(?:\\*(?:/\d+)?|\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*)$!', $part)) + { + return false; + } + } + return true; + } + + /** + * Parse an interval string check it against a timestamp. + * + * This method returns true if the interval covers the given timestamp, + * and false otherwise. + * + * @param string $interval + * @param ?int $time + * @return bool + */ + public static function parseInterval(string $interval, ?int $time): bool + { + $parts = preg_split('/\s+/', $interval); + if (!$parts || count($parts) !== 5) + { + return false; + } + + $current_time = explode(' ', ltrim(date('i G j n N', $time ?? time()), '0')); + foreach ($parts as $i => $part) + { + $subparts = explode(',', $part); + foreach ($subparts as $subpart) + { + if ($subpart === '*' || $subpart === $current_time[$i]) + { + continue 2; + } + if ($subpart === '7' && $i === 4 && $current_time[$i] === '0') + { + continue 2; + } + if (preg_match('!^\\*/(\d+)?$!', $subpart, $matches) && ($div = intval($matches[1])) && (intval($current_time[$i]) % $div === 0)) + { + continue 2; + } + if (preg_match('!^(\d+)-(\d+)$!', $subpart, $matches) && $current_time[$i] >= intval($matches[1]) && $current_time[$i] <= intval($matches[2])) + { + continue 2; + } + } + return false; + } + return true; + } + + /** + * Process queued and scheduled tasks. * * This will usually be called by a separate script, run every minute * through an external scheduler such as crontab or systemd. @@ -173,115 +287,54 @@ class Queue * If you are on a shared hosting service, you may also call a URL * using a "web cron" service provider. * + * @param int $index + * @param int $count * @param int $timeout * @return void */ - public static function process(int $timeout): void + public static function process(int $index, int $count, int $timeout): void { // This part will run in a loop until timeout. $process_start_time = microtime(true); + + // Get default driver instance. + $driver_name = config('queue.driver'); + $driver = self::getDriver($driver_name); + if (!$driver_name || !$driver) + { + throw new Exceptions\FeatureDisabled('Queue not configured'); + } + + // Process scheduled tasks. + if ($index === 0) + { + $db_driver = self::getDbDriver(); + $tasks = $db_driver->getScheduledTasks('once'); + foreach ($tasks as $task) + { + self::_executeTask($task); + } + } + if ($index === 1 || $count < 2) + { + $db_driver = self::getDbDriver(); + $tasks = $db_driver->getScheduledTasks('interval'); + foreach ($tasks as $task) + { + $db_driver->updateLastRunTimestamp($task->id); + self::_executeTask($task); + } + } + + // Process queued tasks. while (true) { - // Get a task from the driver. + // Get a task from the driver, with a 1 second delay at maximum. $loop_start_time = microtime(true); - $task = self::getTask(1); - - // Wait 1 second and loop back. + $task = $driver->getNextTask(1); if ($task) { - // Find the handler for the task. - $task->handler = trim($task->handler, '\\()'); - $handler = null; - try - { - if (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)$/', $task->handler, $matches)) - { - $class_name = '\\' . $matches[1]; - $method_name = $matches[2]; - if (class_exists($class_name) && method_exists($class_name, $method_name)) - { - $handler = [$class_name, $method_name]; - } - else - { - error_log('RxQueue: task handler not found: ' . $task->handler); - } - } - elseif (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)(?:\(\))?->(\\w+)$/', $task->handler, $matches)) - { - $class_name = '\\' . $matches[1]; - $initializer_name = $matches[2]; - $method_name = $matches[3]; - if (class_exists($class_name) && method_exists($class_name, $initializer_name)) - { - $obj = $class_name::$initializer_name(); - if (method_exists($obj, $method_name)) - { - $handler = [$obj, $method_name]; - } - else - { - error_log('RxQueue: task handler not found: ' . $task->handler); - } - } - else - { - error_log('RxQueue: task handler not found: ' . $task->handler); - } - } - elseif (preg_match('/^new (?:\\\\)?([\\\\\\w]+)(?:\(\))?->(\\w+)$/', $task->handler, $matches)) - { - $class_name = '\\' . $matches[1]; - $method_name = $matches[2]; - if (class_exists($class_name) && method_exists($class_name, $method_name)) - { - $obj = new $class_name(); - $handler = [$obj, $method_name]; - } - else - { - error_log('RxQueue: task handler not found: ' . $task->handler); - } - } - else - { - if (function_exists('\\' . $task->handler)) - { - $handler = '\\' . $task->handler; - } - else - { - error_log('RxQueue: task handler not found: ' . $task->handler); - } - } - } - catch (\Throwable $th) - { - error_log(vsprintf('RxQueue: task handler %s could not be accessed: %s in %s:%d', [ - $task->handler, - get_class($th), - $th->getFile(), - $th->getLine(), - ])); - } - - // Call the handler. - try - { - if ($handler) - { - call_user_func($handler, $task->args, $task->options); - } - } - catch (\Throwable $th) - { - error_log(vsprintf('RxQueue: task handler %s threw %s in %s:%d', [ - $task->handler, - get_class($th), - $th->getFile(), - $th->getLine(), - ])); - } + self::_executeTask($task); } // If the timeout is imminent, break the loop. @@ -299,4 +352,107 @@ class Queue } } } + + /** + * Execute a task. + * + * @param object $task + * @return void + */ + protected static function _executeTask(object $task): void + { + // Find the handler for the task. + $task->handler = trim($task->handler, '\\()'); + $handler = null; + try + { + if (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)$/', $task->handler, $matches)) + { + $class_name = '\\' . $matches[1]; + $method_name = $matches[2]; + if (class_exists($class_name) && method_exists($class_name, $method_name)) + { + $handler = [$class_name, $method_name]; + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + elseif (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)(?:\(\))?->(\\w+)$/', $task->handler, $matches)) + { + $class_name = '\\' . $matches[1]; + $initializer_name = $matches[2]; + $method_name = $matches[3]; + if (class_exists($class_name) && method_exists($class_name, $initializer_name)) + { + $obj = $class_name::$initializer_name(); + if (method_exists($obj, $method_name)) + { + $handler = [$obj, $method_name]; + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + elseif (preg_match('/^new (?:\\\\)?([\\\\\\w]+)(?:\(\))?->(\\w+)$/', $task->handler, $matches)) + { + $class_name = '\\' . $matches[1]; + $method_name = $matches[2]; + if (class_exists($class_name) && method_exists($class_name, $method_name)) + { + $obj = new $class_name(); + $handler = [$obj, $method_name]; + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + else + { + if (function_exists('\\' . $task->handler)) + { + $handler = '\\' . $task->handler; + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + } + catch (\Throwable $th) + { + error_log(vsprintf('RxQueue: task handler %s could not be accessed: %s in %s:%d', [ + $task->handler, + get_class($th), + $th->getFile(), + $th->getLine(), + ])); + } + + // Call the handler. + try + { + if ($handler) + { + call_user_func($handler, $task->args, $task->options); + } + } + catch (\Throwable $th) + { + error_log(vsprintf('RxQueue: task handler %s threw %s in %s:%d', [ + $task->handler, + get_class($th), + $th->getFile(), + $th->getLine(), + ])); + } + } } diff --git a/common/framework/drivers/QueueInterface.php b/common/framework/drivers/QueueInterface.php index 71d71a452..ebcbafacb 100644 --- a/common/framework/drivers/QueueInterface.php +++ b/common/framework/drivers/QueueInterface.php @@ -62,10 +62,10 @@ interface QueueInterface public function addTask(string $handler, ?object $args = null, ?object $options = null): int; /** - * Get the first task. + * Get the next task from the queue. * * @param int $blocking * @return ?object */ - public function getTask(int $blocking = 0): ?object; + public function getNextTask(int $blocking = 0): ?object; } diff --git a/common/framework/drivers/queue/db.php b/common/framework/drivers/queue/db.php index a55bd71ad..43545523a 100644 --- a/common/framework/drivers/queue/db.php +++ b/common/framework/drivers/queue/db.php @@ -4,6 +4,7 @@ namespace Rhymix\Framework\Drivers\Queue; use Rhymix\Framework\DB as RFDB; use Rhymix\Framework\Drivers\QueueInterface; +use Rhymix\Framework\Queue; /** * The DB queue driver. @@ -99,12 +100,68 @@ class DB implements QueueInterface } /** - * Get the first task. + * Add a task to be executed at a specific time. + * + * @param int $time + * @param string $handler + * @param ?object $args + * @param ?object $options + * @return int + */ + public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int + { + $oDB = RFDB::getInstance(); + $stmt = $oDB->prepare(trim(<<execute([ + 'once', + date('Y-m-d H:i:s', $time), + $handler, + serialize($args), + serialize($options), + date('Y-m-d H:i:s'), + ]); + return $result ? $oDB->getInsertID() : 0; + } + + /** + * Add a task to be executed at an interval. + * + * @param string $interval + * @param string $handler + * @param ?object $args + * @param ?object $options + * @return int + */ + public function addTaskAtInterval(string $interval, string $handler, ?object $args = null, ?object $options = null): int + { + $oDB = RFDB::getInstance(); + $stmt = $oDB->prepare(trim(<<execute([ + 'interval', + $interval, + $handler, + serialize($args), + serialize($options), + date('Y-m-d H:i:s'), + ]); + return $result ? $oDB->getInsertID() : 0; + } + + /** + * Get the next task from the queue. * * @param int $blocking * @return ?object */ - public function getTask(int $blocking = 0): ?object + public function getNextTask(int $blocking = 0): ?object { $oDB = RFDB::getInstance(); $oDB->beginTransaction(); @@ -128,4 +185,76 @@ class DB implements QueueInterface return null; } } + + /** + * Get scheduled tasks. + * + * @param string $type + * @return array + */ + public function getScheduledTasks(string $type): array + { + $oDB = RFDB::getInstance(); + $tasks = []; + $ids = []; + + // Get tasks to be executed once at the current time. + if ($type === 'once') + { + $oDB->beginTransaction(); + $stmt = $oDB->query("SELECT * FROM task_schedule WHERE `type` = 'once' AND `first_run` <= ? ORDER BY id FOR UPDATE", [$timestamp]); + while ($task = $stmt->fetchObject()) + { + $task->args = unserialize($task->args); + $task->options = unserialize($task->options); + $tasks[] = $task; + $ids[] = $task->id; + } + if (count($ids)) + { + $stmt = $oDB->prepare('DELETE FROM task_schedule WHERE id IN (' . implode(', ', array_fill(0, count($ids), '?')) . ')'); + $stmt->execute($ids); + } + $oDB->commit(); + } + + // Get tasks to be executed at an interval. + if ($type === 'interval') + { + $stmt = $oDB->query("SELECT id, `interval` FROM task_schedule WHERE `type` = 'interval' ORDER BY id"); + while ($task = $stmt->fetchObject()) + { + if (Queue::parseInterval($task->interval, \RX_TIME)) + { + $ids[] = $task->id; + } + } + if (count($ids)) + { + $stmt = $oDB->prepare('SELECT * FROM task_schedule WHERE id IN (' . implode(', ', array_fill(0, count($ids), '?')) . ')'); + $stmt->execute($ids); + while ($task = $stmt->fetchObject()) + { + $task->args = unserialize($task->args); + $task->options = unserialize($task->options); + $tasks[] = $task; + } + } + } + + return $tasks; + } + + /** + * Update the last executed timestamp of a scheduled task. + * + * @param int $id + * @return void + */ + public function updateLastRunTimestamp(int $id): void + { + $oDB = RFDB::getInstance(); + $stmt = $oDB->prepare('UPDATE task_schedule SET last_run = ?, run_count = run_count + 1 WHERE id = ?'); + $stmt->execute([date('Y-m-d H:i:s'), $id]); + } } diff --git a/common/framework/drivers/queue/dummy.php b/common/framework/drivers/queue/dummy.php index 0abb74792..76d39dc63 100644 --- a/common/framework/drivers/queue/dummy.php +++ b/common/framework/drivers/queue/dummy.php @@ -105,12 +105,12 @@ class Dummy implements QueueInterface } /** - * Get the first task. + * Get the next task from the queue. * * @param int $blocking * @return ?object */ - public function getTask(int $blocking = 0): ?object + public function getNextTask(int $blocking = 0): ?object { $result = $this->_dummy_queue; $this->_dummy_queue = null; diff --git a/common/framework/drivers/queue/redis.php b/common/framework/drivers/queue/redis.php index 58c122184..cd4b8256e 100644 --- a/common/framework/drivers/queue/redis.php +++ b/common/framework/drivers/queue/redis.php @@ -155,12 +155,12 @@ class Redis implements QueueInterface } /** - * Get the first task. + * Get the next task from the queue. * * @param int $blocking * @return ?object */ - public function getTask(int $blocking = 0): ?object + public function getNextTask(int $blocking = 0): ?object { if ($this->_conn) { diff --git a/common/scripts/cron.php b/common/scripts/cron.php index 19ea7b1c9..971f889a4 100644 --- a/common/scripts/cron.php +++ b/common/scripts/cron.php @@ -73,7 +73,7 @@ if (PHP_SAPI === 'cli' && $process_count > 1 && function_exists('pcntl_fork') && } elseif ($pid == 0) { - Rhymix\Framework\Queue::process($timeout); + Rhymix\Framework\Queue::process($i, $process_count, $timeout); exit; } else @@ -96,7 +96,7 @@ if (PHP_SAPI === 'cli' && $process_count > 1 && function_exists('pcntl_fork') && } else { - Rhymix\Framework\Queue::process($timeout); + Rhymix\Framework\Queue::process(0, 1, $timeout); } // If called over the network, display a simple OK message to indicate success. diff --git a/modules/module/schemas/task_schedule.xml b/modules/module/schemas/task_schedule.xml new file mode 100644 index 000000000..35e29db0b --- /dev/null +++ b/modules/module/schemas/task_schedule.xml @@ -0,0 +1,12 @@ + + + + + + + + + + + +