diff --git a/common/framework/Queue.php b/common/framework/Queue.php index ec58a6aeb..361e4bfeb 100644 --- a/common/framework/Queue.php +++ b/common/framework/Queue.php @@ -12,6 +12,13 @@ class Queue */ protected static $_drivers = []; + /** + * Priority constants. + */ + public const PRIORITY_HIGH = 'high'; + public const PRIORITY_NORMAL = 'normal'; + public const PRIORITY_LOW = 'low'; + /** * Add a custom Queue driver. * @@ -120,9 +127,10 @@ class Queue * @param string $handler * @param ?object $args * @param ?object $options + * @param ?string $priority * @return int */ - public static function addTask(string $handler, ?object $args = null, ?object $options = null): int + public static function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int { $driver_name = config('queue.driver'); if (!$driver_name) @@ -136,7 +144,7 @@ class Queue throw new Exceptions\FeatureDisabled('Queue not configured'); } - return $driver->addTask($handler, $args, $options); + return $driver->addTask($handler, $args, $options, $priority); } /** @@ -149,9 +157,10 @@ class Queue * @param string $handler * @param ?object $args * @param ?object $options + * @param ?string $priority * @return int */ - public static function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int + public static function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int { if (!config('queue.enabled')) { @@ -160,7 +169,7 @@ class Queue // This feature always uses the DB driver. $driver = self::getDbDriver(); - return $driver->addTaskAt($time, $handler, $args, $options); + return $driver->addTaskAt($time, $handler, $args, $options, $priority); } /** diff --git a/common/framework/drivers/QueueInterface.php b/common/framework/drivers/QueueInterface.php index ebcbafacb..242ef2812 100644 --- a/common/framework/drivers/QueueInterface.php +++ b/common/framework/drivers/QueueInterface.php @@ -57,9 +57,10 @@ interface QueueInterface * @param string $handler * @param ?object $args * @param ?object $options + * @param ?string $priority * @return int */ - public function addTask(string $handler, ?object $args = null, ?object $options = null): int; + public function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int; /** * Get the next task from the queue. diff --git a/common/framework/drivers/queue/db.php b/common/framework/drivers/queue/db.php index 68632704d..c6ced5583 100644 --- a/common/framework/drivers/queue/db.php +++ b/common/framework/drivers/queue/db.php @@ -89,14 +89,43 @@ class DB implements QueueInterface * @param string $handler * @param ?object $args * @param ?object $options + * @param ?string $priority * @return int */ - public function addTask(string $handler, ?object $args = null, ?object $options = null): int + public function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int { $oDB = RFDB::getInstance(); - $stmt = $oDB->prepare('INSERT INTO task_queue (handler, args, options, regdate) VALUES (?, ?, ?, ?)'); - $result = $stmt->execute([$handler, serialize($args), serialize($options), date('Y-m-d H:i:s')]); - return $result ? $oDB->getInsertID() : 0; + + if ($priority === \Rhymix\Framework\Queue::PRIORITY_HIGH) + { + $stmt = $oDB->query('SELECT MIN(id) AS min_id FROM task_queue'); + $min_id = intval($stmt->fetchColumn()); + $stmt->closeCursor(); + $id = $min_id ? ($min_id - rand(1, 10)) : null; + } + else + { + $min_id = null; + $id = null; + } + + $stmt = $oDB->prepare('INSERT INTO task_queue (id, handler, args, options, regdate) VALUES (?, ?, ?, ?, ?)'); + $result = $stmt->execute([ + $id, + $handler, + serialize($args), + serialize($options), + date('Y-m-d H:i:s'), + ]); + + if ($result) + { + return $id ?? $oDB->getInsertID(); + } + else + { + return 0; + } } /** @@ -106,10 +135,20 @@ class DB implements QueueInterface * @param string $handler * @param ?object $args * @param ?object $options + * @param ?string $priority * @return int */ - public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int + public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int { + if ($priority === \Rhymix\Framework\Queue::PRIORITY_HIGH) + { + $time = $time - 1; + } + elseif ($priority === \Rhymix\Framework\Queue::PRIORITY_LOW) + { + $time = $time + 1; + } + $oDB = RFDB::getInstance(); $task_srl = getNextSequence(); $stmt = $oDB->prepare(trim(<<execute([ $task_srl, 'once', @@ -126,6 +166,7 @@ class DB implements QueueInterface serialize($options), date('Y-m-d H:i:s'), ]); + return $result ? $task_srl : 0; } diff --git a/common/framework/drivers/queue/dummy.php b/common/framework/drivers/queue/dummy.php index 76d39dc63..156f00673 100644 --- a/common/framework/drivers/queue/dummy.php +++ b/common/framework/drivers/queue/dummy.php @@ -92,9 +92,10 @@ class Dummy implements QueueInterface * @param string $handler * @param ?object $args * @param ?object $options + * @param ?string $priority * @return int */ - public function addTask(string $handler, ?object $args = null, ?object $options = null): int + public function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int { $this->_dummy_queue = (object)[ 'handler' => $handler, diff --git a/common/framework/drivers/queue/redis.php b/common/framework/drivers/queue/redis.php index cd4b8256e..963dc845f 100644 --- a/common/framework/drivers/queue/redis.php +++ b/common/framework/drivers/queue/redis.php @@ -133,9 +133,10 @@ class Redis implements QueueInterface * @param string $handler * @param ?object $args * @param ?object $options + * @param ?string $priority * @return int */ - public function addTask(string $handler, ?object $args = null, ?object $options = null): int + public function addTask(string $handler, ?object $args = null, ?object $options = null, ?string $priority = null): int { $value = serialize((object)[ 'handler' => $handler, @@ -145,7 +146,14 @@ class Redis implements QueueInterface if ($this->_conn) { - $result = $this->_conn->rPush($this->_key, $value); + if ($priority === \Rhymix\Framework\Queue::PRIORITY_HIGH) + { + $result = $this->_conn->lPush($this->_key, $value); + } + else + { + $result = $this->_conn->rPush($this->_key, $value); + } return intval($result); } else