Implement high and low priority for queued tasks #2453

This commit is contained in:
Kijin Sung 2025-05-21 18:33:19 +09:00
parent 9ca2f79dce
commit 8014413163
5 changed files with 73 additions and 13 deletions

View file

@ -12,6 +12,13 @@ class Queue
*/
protected static $_drivers = [];
/**
* Priority constants.
*/
public const PRIORITY_HIGH = 'high';
public const PRIORITY_NORMAL = 'normal';
public const PRIORITY_LOW = 'low';
/**
* Add a custom Queue driver.
*
@ -120,9 +127,10 @@ class Queue
* @param string $handler
* @param ?object $args
* @param ?object $options
* @param ?string $priority
* @return int
*/
public static function addTask(string $handler, ?object $args = null, ?object $options = null): int
public static function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int
{
$driver_name = config('queue.driver');
if (!$driver_name)
@ -136,7 +144,7 @@ class Queue
throw new Exceptions\FeatureDisabled('Queue not configured');
}
return $driver->addTask($handler, $args, $options);
return $driver->addTask($handler, $args, $options, $priority);
}
/**
@ -149,9 +157,10 @@ class Queue
* @param string $handler
* @param ?object $args
* @param ?object $options
* @param ?string $priority
* @return int
*/
public static function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int
public static function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int
{
if (!config('queue.enabled'))
{
@ -160,7 +169,7 @@ class Queue
// This feature always uses the DB driver.
$driver = self::getDbDriver();
return $driver->addTaskAt($time, $handler, $args, $options);
return $driver->addTaskAt($time, $handler, $args, $options, $priority);
}
/**

View file

@ -57,9 +57,10 @@ interface QueueInterface
* @param string $handler
* @param ?object $args
* @param ?object $options
* @param ?string $priority
* @return int
*/
public function addTask(string $handler, ?object $args = null, ?object $options = null): int;
public function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int;
/**
* Get the next task from the queue.

View file

@ -89,14 +89,43 @@ class DB implements QueueInterface
* @param string $handler
* @param ?object $args
* @param ?object $options
* @param ?string $priority
* @return int
*/
public function addTask(string $handler, ?object $args = null, ?object $options = null): int
public function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int
{
$oDB = RFDB::getInstance();
$stmt = $oDB->prepare('INSERT INTO task_queue (handler, args, options, regdate) VALUES (?, ?, ?, ?)');
$result = $stmt->execute([$handler, serialize($args), serialize($options), date('Y-m-d H:i:s')]);
return $result ? $oDB->getInsertID() : 0;
if ($priority === \Rhymix\Framework\Queue::PRIORITY_HIGH)
{
$stmt = $oDB->query('SELECT MIN(id) AS min_id FROM task_queue');
$min_id = intval($stmt->fetchColumn());
$stmt->closeCursor();
$id = $min_id ? ($min_id - rand(1, 10)) : null;
}
else
{
$min_id = null;
$id = null;
}
$stmt = $oDB->prepare('INSERT INTO task_queue (id, handler, args, options, regdate) VALUES (?, ?, ?, ?, ?)');
$result = $stmt->execute([
$id,
$handler,
serialize($args),
serialize($options),
date('Y-m-d H:i:s'),
]);
if ($result)
{
return $id ?? $oDB->getInsertID();
}
else
{
return 0;
}
}
/**
@ -106,10 +135,20 @@ class DB implements QueueInterface
* @param string $handler
* @param ?object $args
* @param ?object $options
* @param ?string $priority
* @return int
*/
public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int
public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int
{
if ($priority === \Rhymix\Framework\Queue::PRIORITY_HIGH)
{
$time = $time - 1;
}
elseif ($priority === \Rhymix\Framework\Queue::PRIORITY_LOW)
{
$time = $time + 1;
}
$oDB = RFDB::getInstance();
$task_srl = getNextSequence();
$stmt = $oDB->prepare(trim(<<<END
@ -117,6 +156,7 @@ class DB implements QueueInterface
(task_srl, task_type, first_run, handler, args, options, regdate)
VALUES (?, ?, ?, ?, ?, ?, ?)
END));
$result = $stmt->execute([
$task_srl,
'once',
@ -126,6 +166,7 @@ class DB implements QueueInterface
serialize($options),
date('Y-m-d H:i:s'),
]);
return $result ? $task_srl : 0;
}

View file

@ -92,9 +92,10 @@ class Dummy implements QueueInterface
* @param string $handler
* @param ?object $args
* @param ?object $options
* @param ?string $priority
* @return int
*/
public function addTask(string $handler, ?object $args = null, ?object $options = null): int
public function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int
{
$this->_dummy_queue = (object)[
'handler' => $handler,

View file

@ -133,9 +133,10 @@ class Redis implements QueueInterface
* @param string $handler
* @param ?object $args
* @param ?object $options
* @param ?string $priority
* @return int
*/
public function addTask(string $handler, ?object $args = null, ?object $options = null): int
public function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int
{
$value = serialize((object)[
'handler' => $handler,
@ -145,7 +146,14 @@ class Redis implements QueueInterface
if ($this->_conn)
{
$result = $this->_conn->rPush($this->_key, $value);
if ($priority === \Rhymix\Framework\Queue::PRIORITY_HIGH)
{
$result = $this->_conn->lPush($this->_key, $value);
}
else
{
$result = $this->_conn->rPush($this->_key, $value);
}
return intval($result);
}
else