diff --git a/common/framework/Queue.php b/common/framework/Queue.php new file mode 100644 index 000000000..810d4513f --- /dev/null +++ b/common/framework/Queue.php @@ -0,0 +1,305 @@ + $class_name::getName(), + 'required' => $class_name::getRequiredConfig(), + 'optional' => $class_name::getOptionalConfig(), + ]; + } + } + foreach (self::$_drivers as $driver_name => $driver) + { + if ($driver->isSupported()) + { + $result[$driver_name] = [ + 'name' => $driver->getName(), + 'required' => $driver->getRequiredConfig(), + 'optional' => $driver->getOptionalConfig(), + ]; + } + } + ksort($result); + return $result; + } + + /** + * Add a task. + * + * The handler can be in one of the following formats: + * - Global function, e.g. myHandler + * - ClassName::staticMethodName + * - ClassName::getInstance()->methodName + * - new ClassName()->methodName + * + * Once identified and/or instantiated, the handler will be passed $args + * and $options, in that order. Each of them must be a single object. + * + * It is strongly recommended that you write a dedicated method to handle + * queued tasks, rather than reusing an existing method with a potentially + * incompatible structure. If you must to call an existing method, + * you should consider writing a wrapper. + * + * Any value returned by the handler will be discarded. If you throw an + * exception, it may be logged, but it will not cause a fatal error. + * + * @param string $handler + * @param ?object $args + * @param ?object $options + * @return int + */ + public static function addTask(string $handler, ?object $args = null, ?object $options = null): int + { + $driver_name = config('queue.driver'); + if (!$driver_name) + { + throw new Exceptions\FeatureDisabled('Queue not configured'); + } + + $driver = self::getDriver($driver_name); + if (!$driver) + { + throw new Exceptions\FeatureDisabled('Queue not configured'); + } + + return $driver->addTask($handler, $args, $options); + } + + /** + * Get the first task to execute immediately. + * + * If no tasks are pending, this method will return null. + * Detailed scheduling of tasks will be handled by each driver. + * + * @param int $blocking + * @return ?object + */ + public static function getTask(int $blocking = 0): ?object + { + $driver_name = config('queue.driver'); + if (!$driver_name) + { + throw new Exceptions\FeatureDisabled('Queue not configured'); + } + + $driver = self::getDriver($driver_name); + if (!$driver) + { + throw new Exceptions\FeatureDisabled('Queue not configured'); + } + + return $driver->getTask($blocking); + } + + /** + * Check the process key. + * + * @param string $key + * @return bool + */ + public static function checkKey(string $key): bool + { + return config('queue.key') === $key; + } + + /** + * Process the queue. + * + * This will usually be called by a separate script, run every minute + * through an external scheduler such as crontab or systemd. + * + * If you are on a shared hosting service, you may also call a URL + * using a "web cron" service provider. + * + * @param int $timeout + * @return void + */ + public static function process(int $timeout): void + { + // Increase the time limit. This may or may not work. + set_time_limit(min(60, $timeout)); + + // This part will run in a loop until timeout. + $process_start_time = microtime(true); + while (true) + { + // Get a task from the driver. + $loop_start_time = microtime(true); + $task = self::getTask(1); + + // Wait 1 second and loop back. + if ($task) + { + // Find the handler for the task. + $task->handler = trim($task->handler, '\\()'); + $handler = null; + try + { + if (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)$/', $task->handler, $matches)) + { + $class_name = '\\' . $matches[1]; + $method_name = $matches[2]; + if (class_exists($class_name) && method_exists($class_name, $method_name)) + { + $handler = [$class_name, $method_name]; + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + elseif (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)(?:\(\))?->(\\w+)$/', $task->handler, $matches)) + { + $class_name = '\\' . $matches[1]; + $initializer_name = $matches[2]; + $method_name = $matches[3]; + if (class_exists($class_name) && method_exists($class_name, $initializer_name)) + { + $obj = $class_name::$initializer_name(); + if (method_exists($obj, $method_name)) + { + $handler = [$obj, $method_name]; + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + elseif (preg_match('/^new (?:\\\\)?([\\\\\\w]+)(?:\(\))?->(\\w+)$/', $task->handler, $matches)) + { + $class_name = '\\' . $matches[1]; + $method_name = $matches[2]; + if (class_exists($class_name) && method_exists($class_name, $method_name)) + { + $obj = new $class_name(); + $handler = [$obj, $method_name]; + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + else + { + if (function_exists('\\' . $task->handler)) + { + $handler = '\\' . $task->handler; + } + else + { + error_log('RxQueue: task handler not found: ' . $task->handler); + } + } + } + catch (\Throwable $th) + { + error_log(vsprintf('RxQueue: task handler %s could not be accessed: %s in %s:%d', [ + $task->handler, + get_class($th), + $th->getFile(), + $th->getLine(), + ])); + } + + // Call the handler. + try + { + if ($handler) + { + call_user_func($handler, $task->args, $task->options); + } + } + catch (\Throwable $th) + { + error_log(vsprintf('RxQueue: task handler %s threw %s in %s:%d', [ + $task->handler, + get_class($th), + $th->getFile(), + $th->getLine(), + ])); + } + } + + // If the timeout is imminent, break the loop. + $process_elapsed_time = microtime(true) - $process_start_time; + if ($process_elapsed_time > $timeout - 2) + { + break; + } + + // If there was no task, wait 1 second to make sure that the loop isn't too tight. + $loop_elapsed_time = microtime(true) - $loop_start_time; + if (!$task && $loop_elapsed_time < 1) + { + usleep(intval((1 - $loop_elapsed_time) * 1000000)); + } + } + } +} diff --git a/common/framework/drivers/QueueInterface.php b/common/framework/drivers/QueueInterface.php new file mode 100644 index 000000000..f3c5697f5 --- /dev/null +++ b/common/framework/drivers/QueueInterface.php @@ -0,0 +1,63 @@ +_conn = new \Redis; + $this->_conn->connect($config['host'], $config['port'] ?? 6379); + if(isset($config['user']) || isset($config['pass'])) + { + $auth = []; + if (isset($config['user']) && $config['user']) $auth[] = $config['user']; + if (isset($config['pass']) && $config['pass']) $auth[] = $config['pass']; + $this->_conn->auth(count($auth) > 1 ? $auth : $auth[0]); + } + if(isset($config['dbnum'])) + { + $this->_conn->select(intval($config['dbnum'])); + } + + $this->_key = 'RXQUEUE_' . substr(sha1(\RX_BASEDIR), 0, 16); + } + catch (\RedisException $e) + { + $this->_conn = null; + } + } + + /** + * Add a task. + * + * @param string $handler + * @param ?object $args + * @param ?object $options + * @return int + */ + public function addTask(string $handler, ?object $args = null, ?object $options = null): int + { + $value = serialize((object)[ + 'handler' => $handler, + 'args' => $args, + 'options' => $options, + ]); + + if ($this->_conn) + { + $result = $this->_conn->rPush($this->_key, $value); + return intval($result); + } + else + { + return 0; + } + } + + /** + * Get the first task. + * + * @param int $blocking + * @return ?object + */ + public function getTask(int $blocking = 0): ?object + { + if ($this->_conn) + { + if ($blocking > 0) + { + $result = $this->_conn->blpop($this->_key, $blocking); + if (is_array($result) && isset($result[1])) + { + return unserialize($result[1]); + } + else + { + return null; + } + } + else + { + $result = $this->_conn->lpop($this->_key); + if ($result) + { + return unserialize($result); + } + else + { + return null; + } + } + } + else + { + return null; + } + } +} diff --git a/common/scripts/cron.php b/common/scripts/cron.php new file mode 100644 index 000000000..ece9ee93f --- /dev/null +++ b/common/scripts/cron.php @@ -0,0 +1,34 @@ +