Merge branch 'rhymix:master' into master

This commit is contained in:
Lastorder 2025-03-03 10:47:47 +09:00 committed by GitHub
commit e5b729f8e9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
152 changed files with 2348 additions and 792 deletions

View file

@ -62,10 +62,10 @@ interface QueueInterface
public function addTask(string $handler, ?object $args = null, ?object $options = null): int;
/**
* Get the first task.
* Get the next task from the queue.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object;
public function getNextTask(int $blocking = 0): ?object;
}

View file

@ -4,6 +4,7 @@ namespace Rhymix\Framework\Drivers\Queue;
use Rhymix\Framework\DB as RFDB;
use Rhymix\Framework\Drivers\QueueInterface;
use Rhymix\Framework\Queue;
/**
* The DB queue driver.
@ -99,12 +100,72 @@ class DB implements QueueInterface
}
/**
* Get the first task.
* Add a task to be executed at a specific time.
*
* @param int $time
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int
{
$oDB = RFDB::getInstance();
$task_srl = getNextSequence();
$stmt = $oDB->prepare(trim(<<<END
INSERT INTO task_schedule
(task_srl, task_type, first_run, handler, args, options, regdate)
VALUES (?, ?, ?, ?, ?, ?, ?)
END));
$result = $stmt->execute([
$task_srl,
'once',
date('Y-m-d H:i:s', $time),
$handler,
serialize($args),
serialize($options),
date('Y-m-d H:i:s'),
]);
return $result ? $task_srl : 0;
}
/**
* Add a task to be executed at an interval.
*
* @param string $interval
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public function addTaskAtInterval(string $interval, string $handler, ?object $args = null, ?object $options = null): int
{
$oDB = RFDB::getInstance();
$task_srl = getNextSequence();
$stmt = $oDB->prepare(trim(<<<END
INSERT INTO task_schedule
(task_srl, task_type, run_interval, handler, args, options, regdate)
VALUES (?, ?, ?, ?, ?, ?, ?)
END));
$result = $stmt->execute([
$task_srl,
'interval',
$interval,
$handler,
serialize($args),
serialize($options),
date('Y-m-d H:i:s'),
]);
return $result ? $task_srl : 0;
}
/**
* Get the next task from the queue.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object
public function getNextTask(int $blocking = 0): ?object
{
$oDB = RFDB::getInstance();
$oDB->beginTransaction();
@ -128,4 +189,123 @@ class DB implements QueueInterface
return null;
}
}
/**
* Get a scheduled task by its task_srl.
*
* @param int $task_srl
* @return ?object
*/
public function getScheduledTask(int $task_srl): ?object
{
$oDB = RFDB::getInstance();
$stmt = $oDB->query('SELECT * FROM task_schedule WHERE task_srl = ?', [$task_srl]);
$task = $stmt->fetchObject();
$stmt->closeCursor();
if ($task)
{
$task->args = unserialize($task->args);
$task->options = unserialize($task->options);
return $task;
}
else
{
return null;
}
}
/**
* Get scheduled tasks.
*
* @param string $type
* @return array
*/
public function getScheduledTasks(string $type): array
{
$oDB = RFDB::getInstance();
$tasks = [];
$task_srls = [];
// Get tasks to be executed once at the current time.
if ($type === 'once')
{
$oDB->beginTransaction();
$timestamp = date('Y-m-d H:i:s');
$stmt = $oDB->query("SELECT * FROM task_schedule WHERE task_type = 'once' AND first_run <= ? ORDER BY first_run FOR UPDATE", [$timestamp]);
while ($task = $stmt->fetchObject())
{
$task->args = unserialize($task->args);
$task->options = unserialize($task->options);
$tasks[] = $task;
$task_srls[] = $task->task_srl;
}
if (count($task_srls))
{
$stmt = $oDB->prepare('DELETE FROM task_schedule WHERE task_srl IN (' . implode(', ', array_fill(0, count($task_srls), '?')) . ')');
$stmt->execute($task_srls);
}
$oDB->commit();
}
// Get tasks to be executed at an interval.
if ($type === 'interval')
{
$stmt = $oDB->query("SELECT task_srl, run_interval FROM task_schedule WHERE task_type = 'interval' ORDER BY task_srl");
while ($task = $stmt->fetchObject())
{
if (Queue::parseInterval($task->run_interval, time()))
{
$task_srls[] = $task->task_srl;
}
}
if (count($task_srls))
{
$stmt = $oDB->prepare('SELECT * FROM task_schedule WHERE task_srl IN (' . implode(', ', array_fill(0, count($task_srls), '?')) . ')');
$stmt->execute($task_srls);
while ($task = $stmt->fetchObject())
{
$task->args = unserialize($task->args);
$task->options = unserialize($task->options);
$tasks[] = $task;
}
}
}
return $tasks;
}
/**
* Update the last executed timestamp of a scheduled task.
*
* @param object $task
* @return void
*/
public function updateLastRunTimestamp(object $task): void
{
$oDB = RFDB::getInstance();
if ($task->first_run)
{
$stmt = $oDB->prepare('UPDATE task_schedule SET last_run = ?, run_count = run_count + 1 WHERE task_srl = ?');
$stmt->execute([date('Y-m-d H:i:s'), $task->task_srl]);
}
else
{
$stmt = $oDB->prepare('UPDATE task_schedule SET first_run = ?, last_run = ?, run_count = run_count + 1 WHERE task_srl = ?');
$stmt->execute([date('Y-m-d H:i:s'), date('Y-m-d H:i:s'), $task->task_srl]);
}
}
/**
* Cancel a scheduled task.
*
* @param int $task_srl
* @return bool
*/
public function cancelScheduledTask(int $task_srl): bool
{
$oDB = RFDB::getInstance();
$stmt = $oDB->query('DELETE FROM task_schedule WHERE task_srl = ?', [$task_srl]);
return ($stmt && $stmt->rowCount()) ? true : false;
}
}

View file

@ -105,12 +105,12 @@ class Dummy implements QueueInterface
}
/**
* Get the first task.
* Get the next task from the queue.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object
public function getNextTask(int $blocking = 0): ?object
{
$result = $this->_dummy_queue;
$this->_dummy_queue = null;

View file

@ -155,12 +155,12 @@ class Redis implements QueueInterface
}
/**
* Get the first task.
* Get the next task from the queue.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object
public function getNextTask(int $blocking = 0): ?object
{
if ($this->_conn)
{