mirror of
https://github.com/Lastorder-DC/rhymix.git
synced 2026-01-06 10:11:38 +09:00
Merge branch 'pr/scheduled-tasks'
This commit is contained in:
commit
b66b31e8e7
17 changed files with 647 additions and 139 deletions
|
|
@ -97,7 +97,7 @@ class DisplayHandler extends Handler
|
|||
}
|
||||
else
|
||||
{
|
||||
if($responseMethod == 'JSON' || $responseMethod == 'JS_CALLBACK' || isset($_SERVER['HTTP_X_AJAX_COMPAT']) || isset($_POST['_rx_ajax_compat']))
|
||||
if($responseMethod == 'JSON' || isset($_SERVER['HTTP_X_AJAX_COMPAT']) || isset($_POST['_rx_ajax_compat']))
|
||||
{
|
||||
self::_printJSONHeader();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class Queue
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the default driver.
|
||||
* Get a Queue driver instance.
|
||||
*
|
||||
* @param string $name
|
||||
* @return ?Drivers\QueueInterface
|
||||
|
|
@ -49,6 +49,16 @@ class Queue
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the DB driver instance, for managing scheduled tasks.
|
||||
*
|
||||
* @return Drivers\Queue\DB
|
||||
*/
|
||||
public static function getDbDriver(): Drivers\Queue\DB
|
||||
{
|
||||
return self::getDriver('db');
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of supported Queue drivers.
|
||||
*
|
||||
|
|
@ -86,7 +96,9 @@ class Queue
|
|||
}
|
||||
|
||||
/**
|
||||
* Add a task.
|
||||
* Add a task to the queue.
|
||||
*
|
||||
* The queued task will be executed as soon as possible.
|
||||
*
|
||||
* The handler can be in one of the following formats:
|
||||
* - Global function, e.g. myHandler
|
||||
|
|
@ -128,29 +140,82 @@ class Queue
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the first task to execute immediately.
|
||||
* Add a task to be executed at a specific time.
|
||||
*
|
||||
* If no tasks are pending, this method will return null.
|
||||
* Detailed scheduling of tasks will be handled by each driver.
|
||||
* The queued task will be executed once at the configured time.
|
||||
* The rest is identical to addTask().
|
||||
*
|
||||
* @param int $blocking
|
||||
* @param int $time
|
||||
* @param string $handler
|
||||
* @param ?object $args
|
||||
* @param ?object $options
|
||||
* @return int
|
||||
*/
|
||||
public static function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int
|
||||
{
|
||||
if (!config('queue.enabled'))
|
||||
{
|
||||
throw new Exceptions\FeatureDisabled('Queue not configured');
|
||||
}
|
||||
|
||||
// This feature always uses the DB driver.
|
||||
$driver = self::getDbDriver();
|
||||
return $driver->addTaskAt($time, $handler, $args, $options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a task to be executed at an interval.
|
||||
*
|
||||
* The queued task will be executed repeatedly at the scheduled interval.
|
||||
* The synax for specifying the interval is the same as crontab.
|
||||
* The rest is identical to addTask().
|
||||
*
|
||||
* @param string $interval
|
||||
* @param string $handler
|
||||
* @param ?object $args
|
||||
* @param ?object $options
|
||||
* @return int
|
||||
*/
|
||||
public static function addTaskAtInterval(string $interval, string $handler, ?object $args = null, ?object $options = null): int
|
||||
{
|
||||
if (!config('queue.enabled'))
|
||||
{
|
||||
throw new Exceptions\FeatureDisabled('Queue not configured');
|
||||
}
|
||||
|
||||
// Validate the interval syntax.
|
||||
if (!self::checkIntervalSyntax($interval))
|
||||
{
|
||||
throw new Exceptions\InvalidRequest('Invalid interval syntax: ' . $interval);
|
||||
}
|
||||
|
||||
// This feature always uses the DB driver.
|
||||
$driver = self::getDbDriver();
|
||||
return $driver->addTaskAtInterval($interval, $handler, $args, $options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get information about a scheduled task if it exists.
|
||||
*
|
||||
* @param int $task_srl
|
||||
* @return ?object
|
||||
*/
|
||||
public static function getTask(int $blocking = 0): ?object
|
||||
public static function getScheduledTask(int $task_srl): ?object
|
||||
{
|
||||
$driver_name = config('queue.driver');
|
||||
if (!$driver_name)
|
||||
{
|
||||
throw new Exceptions\FeatureDisabled('Queue not configured');
|
||||
}
|
||||
$driver = self::getDbDriver();
|
||||
return $driver->getScheduledTask($task_srl);
|
||||
}
|
||||
|
||||
$driver = self::getDriver($driver_name);
|
||||
if (!$driver)
|
||||
{
|
||||
throw new Exceptions\FeatureDisabled('Queue not configured');
|
||||
}
|
||||
|
||||
return $driver->getTask($blocking);
|
||||
/**
|
||||
* Cancel a scheduled task.
|
||||
*
|
||||
* @param int $task_srl
|
||||
* @return bool
|
||||
*/
|
||||
public static function cancelScheduledTask(int $task_srl): bool
|
||||
{
|
||||
$driver = self::getDbDriver();
|
||||
return $driver->cancelScheduledTask($task_srl);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -165,7 +230,80 @@ class Queue
|
|||
}
|
||||
|
||||
/**
|
||||
* Process the queue.
|
||||
* Check the interval syntax.
|
||||
*
|
||||
* This method returns true if the interval string is well-formed,
|
||||
* and false otherwise. However, it does not check that all the numbers
|
||||
* are in the correct range (e.g. 0-59 for minutes).
|
||||
*
|
||||
* @param string $interval
|
||||
* @return bool
|
||||
*/
|
||||
public static function checkIntervalSyntax(string $interval): bool
|
||||
{
|
||||
$parts = preg_split('/\s+/', $interval);
|
||||
if (!$parts || count($parts) !== 5)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
foreach ($parts as $part)
|
||||
{
|
||||
if (!preg_match('!^(?:\\*(?:/\d+)?|\d+(?:-\d+)?(?:,\d+(?:-\d+)?)*)$!', $part))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse an interval string check it against a timestamp.
|
||||
*
|
||||
* This method returns true if the interval covers the given timestamp,
|
||||
* and false otherwise.
|
||||
*
|
||||
* @param string $interval
|
||||
* @param ?int $time
|
||||
* @return bool
|
||||
*/
|
||||
public static function parseInterval(string $interval, ?int $time): bool
|
||||
{
|
||||
$parts = preg_split('/\s+/', $interval);
|
||||
if (!$parts || count($parts) !== 5)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
$current_time = explode(' ', date('i G j n w', $time ?? time()));
|
||||
foreach ($parts as $i => $part)
|
||||
{
|
||||
$subparts = explode(',', $part);
|
||||
foreach ($subparts as $subpart)
|
||||
{
|
||||
if ($subpart === '*' || ltrim($subpart, '0') === ltrim($current_time[$i], '0'))
|
||||
{
|
||||
continue 2;
|
||||
}
|
||||
if ($subpart === '7' && $i === 4 && intval($current_time[$i], 10) === 0)
|
||||
{
|
||||
continue 2;
|
||||
}
|
||||
if (preg_match('!^\\*/(\d+)?$!', $subpart, $matches) && ($div = intval($matches[1], 10)) && (intval($current_time[$i], 10) % $div === 0))
|
||||
{
|
||||
continue 2;
|
||||
}
|
||||
if (preg_match('!^(\d+)-(\d+)$!', $subpart, $matches) && intval($current_time[$i], 10) >= intval($matches[1], 10) && intval($current_time[$i], 10) <= intval($matches[2], 10))
|
||||
{
|
||||
continue 2;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process queued and scheduled tasks.
|
||||
*
|
||||
* This will usually be called by a separate script, run every minute
|
||||
* through an external scheduler such as crontab or systemd.
|
||||
|
|
@ -173,115 +311,54 @@ class Queue
|
|||
* If you are on a shared hosting service, you may also call a URL
|
||||
* using a "web cron" service provider.
|
||||
*
|
||||
* @param int $index
|
||||
* @param int $count
|
||||
* @param int $timeout
|
||||
* @return void
|
||||
*/
|
||||
public static function process(int $timeout): void
|
||||
public static function process(int $index, int $count, int $timeout): void
|
||||
{
|
||||
// This part will run in a loop until timeout.
|
||||
$process_start_time = microtime(true);
|
||||
|
||||
// Get default driver instance.
|
||||
$driver_name = config('queue.driver');
|
||||
$driver = self::getDriver($driver_name);
|
||||
if (!$driver_name || !$driver)
|
||||
{
|
||||
throw new Exceptions\FeatureDisabled('Queue not configured');
|
||||
}
|
||||
|
||||
// Process scheduled tasks.
|
||||
if ($index === 0)
|
||||
{
|
||||
$db_driver = self::getDbDriver();
|
||||
$tasks = $db_driver->getScheduledTasks('once');
|
||||
foreach ($tasks as $task)
|
||||
{
|
||||
self::_executeTask($task);
|
||||
}
|
||||
}
|
||||
if ($index === 1 || $count < 2)
|
||||
{
|
||||
$db_driver = self::getDbDriver();
|
||||
$tasks = $db_driver->getScheduledTasks('interval');
|
||||
foreach ($tasks as $task)
|
||||
{
|
||||
$db_driver->updateLastRunTimestamp($task);
|
||||
self::_executeTask($task);
|
||||
}
|
||||
}
|
||||
|
||||
// Process queued tasks.
|
||||
while (true)
|
||||
{
|
||||
// Get a task from the driver.
|
||||
// Get a task from the driver, with a 1 second delay at maximum.
|
||||
$loop_start_time = microtime(true);
|
||||
$task = self::getTask(1);
|
||||
|
||||
// Wait 1 second and loop back.
|
||||
$task = $driver->getNextTask(1);
|
||||
if ($task)
|
||||
{
|
||||
// Find the handler for the task.
|
||||
$task->handler = trim($task->handler, '\\()');
|
||||
$handler = null;
|
||||
try
|
||||
{
|
||||
if (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)$/', $task->handler, $matches))
|
||||
{
|
||||
$class_name = '\\' . $matches[1];
|
||||
$method_name = $matches[2];
|
||||
if (class_exists($class_name) && method_exists($class_name, $method_name))
|
||||
{
|
||||
$handler = [$class_name, $method_name];
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
elseif (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
|
||||
{
|
||||
$class_name = '\\' . $matches[1];
|
||||
$initializer_name = $matches[2];
|
||||
$method_name = $matches[3];
|
||||
if (class_exists($class_name) && method_exists($class_name, $initializer_name))
|
||||
{
|
||||
$obj = $class_name::$initializer_name();
|
||||
if (method_exists($obj, $method_name))
|
||||
{
|
||||
$handler = [$obj, $method_name];
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
elseif (preg_match('/^new (?:\\\\)?([\\\\\\w]+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
|
||||
{
|
||||
$class_name = '\\' . $matches[1];
|
||||
$method_name = $matches[2];
|
||||
if (class_exists($class_name) && method_exists($class_name, $method_name))
|
||||
{
|
||||
$obj = new $class_name();
|
||||
$handler = [$obj, $method_name];
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (function_exists('\\' . $task->handler))
|
||||
{
|
||||
$handler = '\\' . $task->handler;
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (\Throwable $th)
|
||||
{
|
||||
error_log(vsprintf('RxQueue: task handler %s could not be accessed: %s in %s:%d', [
|
||||
$task->handler,
|
||||
get_class($th),
|
||||
$th->getFile(),
|
||||
$th->getLine(),
|
||||
]));
|
||||
}
|
||||
|
||||
// Call the handler.
|
||||
try
|
||||
{
|
||||
if ($handler)
|
||||
{
|
||||
call_user_func($handler, $task->args, $task->options);
|
||||
}
|
||||
}
|
||||
catch (\Throwable $th)
|
||||
{
|
||||
error_log(vsprintf('RxQueue: task handler %s threw %s in %s:%d', [
|
||||
$task->handler,
|
||||
get_class($th),
|
||||
$th->getFile(),
|
||||
$th->getLine(),
|
||||
]));
|
||||
}
|
||||
self::_executeTask($task);
|
||||
}
|
||||
|
||||
// If the timeout is imminent, break the loop.
|
||||
|
|
@ -299,4 +376,107 @@ class Queue
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a task.
|
||||
*
|
||||
* @param object $task
|
||||
* @return void
|
||||
*/
|
||||
protected static function _executeTask(object $task): void
|
||||
{
|
||||
// Find the handler for the task.
|
||||
$task->handler = trim($task->handler, '\\()');
|
||||
$handler = null;
|
||||
try
|
||||
{
|
||||
if (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)$/', $task->handler, $matches))
|
||||
{
|
||||
$class_name = '\\' . $matches[1];
|
||||
$method_name = $matches[2];
|
||||
if (class_exists($class_name) && method_exists($class_name, $method_name))
|
||||
{
|
||||
$handler = [$class_name, $method_name];
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
elseif (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
|
||||
{
|
||||
$class_name = '\\' . $matches[1];
|
||||
$initializer_name = $matches[2];
|
||||
$method_name = $matches[3];
|
||||
if (class_exists($class_name) && method_exists($class_name, $initializer_name))
|
||||
{
|
||||
$obj = $class_name::$initializer_name();
|
||||
if (method_exists($obj, $method_name))
|
||||
{
|
||||
$handler = [$obj, $method_name];
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
elseif (preg_match('/^new (?:\\\\)?([\\\\\\w]+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
|
||||
{
|
||||
$class_name = '\\' . $matches[1];
|
||||
$method_name = $matches[2];
|
||||
if (class_exists($class_name) && method_exists($class_name, $method_name))
|
||||
{
|
||||
$obj = new $class_name();
|
||||
$handler = [$obj, $method_name];
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (function_exists('\\' . $task->handler))
|
||||
{
|
||||
$handler = '\\' . $task->handler;
|
||||
}
|
||||
else
|
||||
{
|
||||
error_log('RxQueue: task handler not found: ' . $task->handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (\Throwable $th)
|
||||
{
|
||||
error_log(vsprintf('RxQueue: task handler %s could not be accessed: %s in %s:%d', [
|
||||
$task->handler,
|
||||
get_class($th),
|
||||
$th->getFile(),
|
||||
$th->getLine(),
|
||||
]));
|
||||
}
|
||||
|
||||
// Call the handler.
|
||||
try
|
||||
{
|
||||
if ($handler)
|
||||
{
|
||||
call_user_func($handler, $task->args, $task->options);
|
||||
}
|
||||
}
|
||||
catch (\Throwable $th)
|
||||
{
|
||||
error_log(vsprintf('RxQueue: task handler %s threw %s in %s:%d', [
|
||||
$task->handler,
|
||||
get_class($th),
|
||||
$th->getFile(),
|
||||
$th->getLine(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,10 +62,10 @@ interface QueueInterface
|
|||
public function addTask(string $handler, ?object $args = null, ?object $options = null): int;
|
||||
|
||||
/**
|
||||
* Get the first task.
|
||||
* Get the next task from the queue.
|
||||
*
|
||||
* @param int $blocking
|
||||
* @return ?object
|
||||
*/
|
||||
public function getTask(int $blocking = 0): ?object;
|
||||
public function getNextTask(int $blocking = 0): ?object;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ namespace Rhymix\Framework\Drivers\Queue;
|
|||
|
||||
use Rhymix\Framework\DB as RFDB;
|
||||
use Rhymix\Framework\Drivers\QueueInterface;
|
||||
use Rhymix\Framework\Queue;
|
||||
|
||||
/**
|
||||
* The DB queue driver.
|
||||
|
|
@ -99,12 +100,72 @@ class DB implements QueueInterface
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the first task.
|
||||
* Add a task to be executed at a specific time.
|
||||
*
|
||||
* @param int $time
|
||||
* @param string $handler
|
||||
* @param ?object $args
|
||||
* @param ?object $options
|
||||
* @return int
|
||||
*/
|
||||
public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int
|
||||
{
|
||||
$oDB = RFDB::getInstance();
|
||||
$task_srl = getNextSequence();
|
||||
$stmt = $oDB->prepare(trim(<<<END
|
||||
INSERT INTO task_schedule
|
||||
(task_srl, task_type, first_run, handler, args, options, regdate)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
END));
|
||||
$result = $stmt->execute([
|
||||
$task_srl,
|
||||
'once',
|
||||
date('Y-m-d H:i:s', $time),
|
||||
$handler,
|
||||
serialize($args),
|
||||
serialize($options),
|
||||
date('Y-m-d H:i:s'),
|
||||
]);
|
||||
return $result ? $task_srl : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a task to be executed at an interval.
|
||||
*
|
||||
* @param string $interval
|
||||
* @param string $handler
|
||||
* @param ?object $args
|
||||
* @param ?object $options
|
||||
* @return int
|
||||
*/
|
||||
public function addTaskAtInterval(string $interval, string $handler, ?object $args = null, ?object $options = null): int
|
||||
{
|
||||
$oDB = RFDB::getInstance();
|
||||
$task_srl = getNextSequence();
|
||||
$stmt = $oDB->prepare(trim(<<<END
|
||||
INSERT INTO task_schedule
|
||||
(task_srl, task_type, run_interval, handler, args, options, regdate)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
END));
|
||||
$result = $stmt->execute([
|
||||
$task_srl,
|
||||
'interval',
|
||||
$interval,
|
||||
$handler,
|
||||
serialize($args),
|
||||
serialize($options),
|
||||
date('Y-m-d H:i:s'),
|
||||
]);
|
||||
return $result ? $task_srl : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next task from the queue.
|
||||
*
|
||||
* @param int $blocking
|
||||
* @return ?object
|
||||
*/
|
||||
public function getTask(int $blocking = 0): ?object
|
||||
public function getNextTask(int $blocking = 0): ?object
|
||||
{
|
||||
$oDB = RFDB::getInstance();
|
||||
$oDB->beginTransaction();
|
||||
|
|
@ -128,4 +189,123 @@ class DB implements QueueInterface
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a scheduled task by its task_srl.
|
||||
*
|
||||
* @param int $task_srl
|
||||
* @return ?object
|
||||
*/
|
||||
public function getScheduledTask(int $task_srl): ?object
|
||||
{
|
||||
$oDB = RFDB::getInstance();
|
||||
$stmt = $oDB->query('SELECT * FROM task_schedule WHERE task_srl = ?', [$task_srl]);
|
||||
$task = $stmt->fetchObject();
|
||||
$stmt->closeCursor();
|
||||
|
||||
if ($task)
|
||||
{
|
||||
$task->args = unserialize($task->args);
|
||||
$task->options = unserialize($task->options);
|
||||
return $task;
|
||||
}
|
||||
else
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get scheduled tasks.
|
||||
*
|
||||
* @param string $type
|
||||
* @return array
|
||||
*/
|
||||
public function getScheduledTasks(string $type): array
|
||||
{
|
||||
$oDB = RFDB::getInstance();
|
||||
$tasks = [];
|
||||
$task_srls = [];
|
||||
|
||||
// Get tasks to be executed once at the current time.
|
||||
if ($type === 'once')
|
||||
{
|
||||
$oDB->beginTransaction();
|
||||
$timestamp = date('Y-m-d H:i:s');
|
||||
$stmt = $oDB->query("SELECT * FROM task_schedule WHERE task_type = 'once' AND first_run <= ? ORDER BY first_run FOR UPDATE", [$timestamp]);
|
||||
while ($task = $stmt->fetchObject())
|
||||
{
|
||||
$task->args = unserialize($task->args);
|
||||
$task->options = unserialize($task->options);
|
||||
$tasks[] = $task;
|
||||
$task_srls[] = $task->task_srl;
|
||||
}
|
||||
if (count($task_srls))
|
||||
{
|
||||
$stmt = $oDB->prepare('DELETE FROM task_schedule WHERE task_srl IN (' . implode(', ', array_fill(0, count($task_srls), '?')) . ')');
|
||||
$stmt->execute($task_srls);
|
||||
}
|
||||
$oDB->commit();
|
||||
}
|
||||
|
||||
// Get tasks to be executed at an interval.
|
||||
if ($type === 'interval')
|
||||
{
|
||||
$stmt = $oDB->query("SELECT task_srl, run_interval FROM task_schedule WHERE task_type = 'interval' ORDER BY task_srl");
|
||||
while ($task = $stmt->fetchObject())
|
||||
{
|
||||
if (Queue::parseInterval($task->run_interval, time()))
|
||||
{
|
||||
$task_srls[] = $task->task_srl;
|
||||
}
|
||||
}
|
||||
if (count($task_srls))
|
||||
{
|
||||
$stmt = $oDB->prepare('SELECT * FROM task_schedule WHERE task_srl IN (' . implode(', ', array_fill(0, count($task_srls), '?')) . ')');
|
||||
$stmt->execute($task_srls);
|
||||
while ($task = $stmt->fetchObject())
|
||||
{
|
||||
$task->args = unserialize($task->args);
|
||||
$task->options = unserialize($task->options);
|
||||
$tasks[] = $task;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return $tasks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the last executed timestamp of a scheduled task.
|
||||
*
|
||||
* @param object $task
|
||||
* @return void
|
||||
*/
|
||||
public function updateLastRunTimestamp(object $task): void
|
||||
{
|
||||
$oDB = RFDB::getInstance();
|
||||
if ($task->first_run)
|
||||
{
|
||||
$stmt = $oDB->prepare('UPDATE task_schedule SET last_run = ?, run_count = run_count + 1 WHERE task_srl = ?');
|
||||
$stmt->execute([date('Y-m-d H:i:s'), $task->task_srl]);
|
||||
}
|
||||
else
|
||||
{
|
||||
$stmt = $oDB->prepare('UPDATE task_schedule SET first_run = ?, last_run = ?, run_count = run_count + 1 WHERE task_srl = ?');
|
||||
$stmt->execute([date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), $task->task_srl]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel a scheduled task.
|
||||
*
|
||||
* @param int $task_srl
|
||||
* @return bool
|
||||
*/
|
||||
public function cancelScheduledTask(int $task_srl): bool
|
||||
{
|
||||
$oDB = RFDB::getInstance();
|
||||
$stmt = $oDB->query('DELETE FROM task_schedule WHERE task_srl = ?', [$task_srl]);
|
||||
return ($stmt && $stmt->rowCount()) ? true : false;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -105,12 +105,12 @@ class Dummy implements QueueInterface
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the first task.
|
||||
* Get the next task from the queue.
|
||||
*
|
||||
* @param int $blocking
|
||||
* @return ?object
|
||||
*/
|
||||
public function getTask(int $blocking = 0): ?object
|
||||
public function getNextTask(int $blocking = 0): ?object
|
||||
{
|
||||
$result = $this->_dummy_queue;
|
||||
$this->_dummy_queue = null;
|
||||
|
|
|
|||
|
|
@ -155,12 +155,12 @@ class Redis implements QueueInterface
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the first task.
|
||||
* Get the next task from the queue.
|
||||
*
|
||||
* @param int $blocking
|
||||
* @return ?object
|
||||
*/
|
||||
public function getTask(int $blocking = 0): ?object
|
||||
public function getNextTask(int $blocking = 0): ?object
|
||||
{
|
||||
if ($this->_conn)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ if (PHP_SAPI === 'cli' && $process_count > 1 && function_exists('pcntl_fork') &&
|
|||
}
|
||||
elseif ($pid == 0)
|
||||
{
|
||||
Rhymix\Framework\Queue::process($timeout);
|
||||
Rhymix\Framework\Queue::process($i, $process_count, $timeout);
|
||||
exit;
|
||||
}
|
||||
else
|
||||
|
|
@ -96,7 +96,7 @@ if (PHP_SAPI === 'cli' && $process_count > 1 && function_exists('pcntl_fork') &&
|
|||
}
|
||||
else
|
||||
{
|
||||
Rhymix\Framework\Queue::process($timeout);
|
||||
Rhymix\Framework\Queue::process(0, 1, $timeout);
|
||||
}
|
||||
|
||||
// If called over the network, display a simple OK message to indicate success.
|
||||
|
|
|
|||
|
|
@ -111,6 +111,10 @@ class EditorAdminView extends Editor
|
|||
|
||||
// Get a group list to set a group
|
||||
$group_list = MemberModel::getGroups(0);
|
||||
foreach ($group_list ?: [] as $group)
|
||||
{
|
||||
$group->title = Context::replaceUserLang($group->title, true);
|
||||
}
|
||||
Context::set('group_list', $group_list);
|
||||
|
||||
// Get a mid list
|
||||
|
|
|
|||
|
|
@ -165,6 +165,10 @@ class EditorView extends Editor
|
|||
// Get a group list
|
||||
$group_list = MemberModel::getGroups();
|
||||
Context::set('group_list', $group_list);
|
||||
foreach ($group_list ?: [] as $group)
|
||||
{
|
||||
$group->title = Context::replaceUserLang($group->title, true);
|
||||
}
|
||||
|
||||
//Security
|
||||
$security = new Security();
|
||||
|
|
|
|||
|
|
@ -350,7 +350,9 @@
|
|||
<h1>{$lang->menu_img_btn}</h1>
|
||||
<div class="cnt">
|
||||
<p style="border-bottom:1px solid #ddd;width:220px;min-width:100%;padding-bottom:10px">{$lang->about_imgbtn}</p>
|
||||
<form action="?module=menu&act=procMenuAdminButtonUpload" class="_btn_normal" target="submitTarget" method="post" enctype="multipart/form-data">
|
||||
<form action="{\RX_BASEURL}" class="_btn_normal" target="submitTarget" method="post" enctype="multipart/form-data">
|
||||
<input name="module" type="hidden" value="menu" />
|
||||
<input name="act" type="hidden" value="procMenuAdminButtonUpload" />
|
||||
<input name="menu_item_srl" type="hidden" value=""/>
|
||||
<input name="xe_js_callback" type="hidden" value="top.onBtnImgUploaded"/>
|
||||
<input name="isNormalDelete" type="hidden" value="Y"/>
|
||||
|
|
@ -368,7 +370,9 @@
|
|||
</span>
|
||||
</div>
|
||||
</form>
|
||||
<form action="?module=menu&act=procMenuAdminButtonUpload" class="_btn_hover" target="submitTarget" method="post" enctype="multipart/form-data">
|
||||
<form action="{\RX_BASEURL}" class="_btn_hover" target="submitTarget" method="post" enctype="multipart/form-data">
|
||||
<input name="module" type="hidden" value="menu" />
|
||||
<input name="act" type="hidden" value="procMenuAdminButtonUpload" />
|
||||
<input name="menu_item_srl" type="hidden" value=""/>
|
||||
<input name="xe_js_callback" type="hidden" value="top.onBtnImgUploaded"/>
|
||||
<input name="isHoverDelete" type="hidden" value="Y"/>
|
||||
|
|
@ -386,7 +390,9 @@
|
|||
</span>
|
||||
</div>
|
||||
</form>
|
||||
<form action="?module=menu&act=procMenuAdminButtonUpload" class="_btn_selected" target="submitTarget" method="post" enctype="multipart/form-data">
|
||||
<form action="{\RX_BASEURL}" class="_btn_selected" target="submitTarget" method="post" enctype="multipart/form-data">
|
||||
<input name="module" type="hidden" value="menu" />
|
||||
<input name="act" type="hidden" value="procMenuAdminButtonUpload" />
|
||||
<input name="menu_item_srl" type="hidden" value=""/>
|
||||
<input name="xe_js_callback" type="hidden" value="top.onBtnImgUploaded"/>
|
||||
<input name="isActiveDelete" type="hidden" value="Y"/>
|
||||
|
|
|
|||
12
modules/module/schemas/task_schedule.xml
Normal file
12
modules/module/schemas/task_schedule.xml
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
<table name="task_schedule">
|
||||
<column name="task_srl" type="bigint" notnull="notnull" primary_key="primary_key" />
|
||||
<column name="task_type" type="varchar" size="40" index="idx_task_type" />
|
||||
<column name="run_interval" type="varchar" size="191" index="idx_run_interval" />
|
||||
<column name="run_count" type="bigint" notnull="notnull" default="0" index="idx_run_count" />
|
||||
<column name="first_run" type="datetime" index="idx_first_run" />
|
||||
<column name="last_run" type="datetime" index="idx_last_run" />
|
||||
<column name="handler" type="varchar" size="191" notnull="notnull" />
|
||||
<column name="args" type="longtext" notnull="notnull" />
|
||||
<column name="options" type="longtext" notnull="notnull" />
|
||||
<column name="regdate" type="datetime" notnull="notnull" index="idx_regdate" />
|
||||
</table>
|
||||
|
|
@ -21,7 +21,7 @@
|
|||
<div id="zone_{$grant_name}" hidden style="margin-top:8px">
|
||||
<label loop="$group_list => $group_srl, $group_item" for="grant_{$grant_name}_{$group_srl}">
|
||||
<input type="checkbox" name="{$grant_name}[]" value="{$group_item->group_srl}" id="grant_{$grant_name}_{$group_srl}" />
|
||||
{$group_item->title}
|
||||
{Context::replaceUserLang($group_item->title, true)}
|
||||
</label>
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@
|
|||
<div id="zone_{$grant_name}" hidden style="margin:8px 0 0 0">
|
||||
<label loop="$group_list => $group_srl, $group_item" for="grant_{$grant_name}_{$group_srl}">
|
||||
<input type="checkbox" class="checkbox" name="{$grant_name}" value="{$group_item->group_srl}" id="grant_{$grant_name}_{$group_srl}" />
|
||||
{$group_item->title}
|
||||
{Context::replaceUserLang($group_item->title, true)}
|
||||
</label>
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
|||
|
|
@ -65,7 +65,10 @@
|
|||
</select>
|
||||
<div id="zone_{$grant_name}" hidden style="margin:8px 0 0 0">
|
||||
<!--@foreach($group_list as $group_srl => $group_item)-->
|
||||
<label for="grant_{$grant_name}_{$group_srl}" class="x_inline"><input type="checkbox" class="checkbox" name="{$grant_name}" value="{$group_item->group_srl}" id="grant_{$grant_name}_{$group_srl}" checked="checked"|cond="is_array($selected_group[$grant_name])&&in_array($group_srl,$selected_group[$grant_name])" /> {$group_item->title}</label>
|
||||
<label for="grant_{$grant_name}_{$group_srl}" class="x_inline">
|
||||
<input type="checkbox" class="checkbox" name="{$grant_name}" value="{$group_item->group_srl}" id="grant_{$grant_name}_{$group_srl}" checked="checked"|cond="is_array($selected_group[$grant_name])&&in_array($group_srl,$selected_group[$grant_name])" />
|
||||
{Context::replaceUserLang($group_item->title, true)}
|
||||
</label>
|
||||
<!--@end-->
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
|||
|
|
@ -2,6 +2,19 @@
|
|||
|
||||
class MailTest extends \Codeception\Test\Unit
|
||||
{
|
||||
protected $_prev_queue_config;
|
||||
|
||||
public function _before()
|
||||
{
|
||||
$this->_prev_queue_config = config('queue');
|
||||
config('queue.enabled', false);
|
||||
}
|
||||
|
||||
public function _after()
|
||||
{
|
||||
config('queue', $this->_prev_queue_config);
|
||||
}
|
||||
|
||||
public function testGetSetDefaultDriver()
|
||||
{
|
||||
$driver = Rhymix\Framework\Mail::getDefaultDriver();
|
||||
|
|
|
|||
|
|
@ -2,22 +2,115 @@
|
|||
|
||||
class QueueTest extends \Codeception\Test\Unit
|
||||
{
|
||||
protected $_prev_queue_config;
|
||||
|
||||
public function _before()
|
||||
{
|
||||
$this->_prev_queue_config = config('queue');
|
||||
config('queue.enabled', true);
|
||||
config('queue.driver', 'dummy');
|
||||
}
|
||||
|
||||
public function _after()
|
||||
{
|
||||
config('queue', $this->_prev_queue_config);
|
||||
}
|
||||
|
||||
public function testDummyQueue()
|
||||
{
|
||||
config('queue.driver', 'dummy');
|
||||
|
||||
$handler = 'myfunc';
|
||||
$args = (object)['foo' => 'bar'];
|
||||
$options = (object)['key' => 'val'];
|
||||
|
||||
Rhymix\Framework\Queue::addTask($handler, $args, $options);
|
||||
|
||||
$output = Rhymix\Framework\Queue::getTask();
|
||||
$output = Rhymix\Framework\Queue::getDriver('dummy')->getNextTask();
|
||||
$this->assertEquals('myfunc', $output->handler);
|
||||
$this->assertEquals('bar', $output->args->foo);
|
||||
$this->assertEquals('val', $output->options->key);
|
||||
|
||||
$output = Rhymix\Framework\Queue::getTask();
|
||||
$output = Rhymix\Framework\Queue::getDriver('dummy')->getNextTask();
|
||||
$this->assertNull($output);
|
||||
}
|
||||
|
||||
public function testScheduledTaskAt()
|
||||
{
|
||||
$timestamp = time() + 43200;
|
||||
$handler = 'MyClass::myFunc';
|
||||
$args = (object)['foo' => 'bar'];
|
||||
$options = null;
|
||||
|
||||
$task_srl = Rhymix\Framework\Queue::addTaskAt($timestamp, $handler, $args, $options);
|
||||
$this->assertGreaterThan(0, $task_srl);
|
||||
|
||||
$output = Rhymix\Framework\Queue::getScheduledTask($task_srl);
|
||||
$this->assertTrue(is_object($output));
|
||||
$this->assertEquals('once', $output->task_type);
|
||||
$this->assertEquals(date('Y-m-d H:i:s', $timestamp), $output->first_run);
|
||||
$this->assertEquals('MyClass::myFunc', $output->handler);
|
||||
$this->assertEquals('bar', $output->args->foo);
|
||||
$this->assertNull($output->options);
|
||||
|
||||
$output = Rhymix\Framework\Queue::cancelScheduledTask($task_srl);
|
||||
$this->assertTrue($output);
|
||||
}
|
||||
|
||||
public function testScheduledTaskAtInterval()
|
||||
{
|
||||
$interval = '30 9 1-15 */2 *';
|
||||
$handler = 'MyClass::getInstance()->myMethod';
|
||||
$args = (object)['foo' => 'bar'];
|
||||
$options = null;
|
||||
|
||||
$task_srl = Rhymix\Framework\Queue::addTaskAtInterval($interval, $handler, $args, $options);
|
||||
$this->assertGreaterThan(0, $task_srl);
|
||||
|
||||
$output = Rhymix\Framework\Queue::getScheduledTask($task_srl);
|
||||
$this->assertTrue(is_object($output));
|
||||
$this->assertEquals('interval', $output->task_type);
|
||||
$this->assertEquals($interval, $output->run_interval);
|
||||
$this->assertEquals('MyClass::getInstance()->myMethod', $output->handler);
|
||||
$this->assertEquals('bar', $output->args->foo);
|
||||
$this->assertNull($output->options);
|
||||
|
||||
$output = Rhymix\Framework\Queue::cancelScheduledTask($task_srl);
|
||||
$this->assertTrue($output);
|
||||
}
|
||||
|
||||
public function testCheckIntervalSyntax()
|
||||
{
|
||||
$this->assertTrue(Rhymix\Framework\Queue::checkIntervalSyntax('* * * * *'));
|
||||
$this->assertTrue(Rhymix\Framework\Queue::checkIntervalSyntax('*/2 15 * * 0,3'));
|
||||
$this->assertTrue(Rhymix\Framework\Queue::checkIntervalSyntax('10-19,40-49 * 1-12 * *'));
|
||||
$this->assertTrue(Rhymix\Framework\Queue::checkIntervalSyntax('*/10 */4 * * *'));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::checkIntervalSyntax('* * * *'));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::checkIntervalSyntax('1 2 3 4 5,6 */7'));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::checkIntervalSyntax('@hourly'));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::checkIntervalSyntax('5/* 12h * * *'));
|
||||
}
|
||||
|
||||
public function testParseInterval()
|
||||
{
|
||||
$interval = '* * * * *';
|
||||
$this->assertTrue(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-01-01 00:00:00')));
|
||||
$this->assertTrue(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-31 23:59:59')));
|
||||
|
||||
$interval = '*/2 15 * * 0,3';
|
||||
$this->assertTrue(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-01 15:00:00')));
|
||||
$this->assertTrue(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-04 15:30:00')));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-02 15:00:00')));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-01 03:00:00')));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-04 15:31:00')));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-11-30 15:44:00')));
|
||||
|
||||
$interval = '3-5,14-21,*/10 * * 12 *';
|
||||
$this->assertTrue(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-01 12:05:00')));
|
||||
$this->assertTrue(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-01 14:19:00')));
|
||||
$this->assertTrue(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-01 16:30:00')));
|
||||
$this->assertTrue(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-01 18:00:00')));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-01 09:01:00')));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-12-01 11:13:00')));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-01-02 06:20:00')));
|
||||
$this->assertFalse(Rhymix\Framework\Queue::parseInterval($interval, strtotime('2024-11-30 18:50:00')));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,19 @@
|
|||
|
||||
class SMSTest extends \Codeception\Test\Unit
|
||||
{
|
||||
protected $_prev_queue_config;
|
||||
|
||||
public function _before()
|
||||
{
|
||||
$this->_prev_queue_config = config('queue');
|
||||
config('queue.enabled', false);
|
||||
}
|
||||
|
||||
public function _after()
|
||||
{
|
||||
config('queue', $this->_prev_queue_config);
|
||||
}
|
||||
|
||||
public function testGetSetDefaultDriver()
|
||||
{
|
||||
$driver = Rhymix\Framework\SMS::getDefaultDriver();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue