Merge pull request #2414 from kijin/pr/queue

비동기 처리를 위한 간단한 Queue 기능 #2402
This commit is contained in:
Kijin Sung 2024-10-12 01:39:00 +09:00 committed by GitHub
commit e39c3bedd1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 1450 additions and 0 deletions

View file

@ -560,6 +560,13 @@ class Mail
*/
public function send(): bool
{
// If queue is enabled, send asynchronously.
if (config('queue.enabled') && !defined('RXQUEUE_CRON'))
{
Queue::addTask(self::class . '::' . 'sendAsync', $this);
return true;
}
// Get caller information.
$backtrace = debug_backtrace(0);
if(count($backtrace) && isset($backtrace[0]['file']))
@ -600,6 +607,17 @@ class Mail
return $this->sent;
}
/**
* Send an email asynchronously (for Queue integration).
*
* @param self $mail
* @return void
*/
public static function sendAsync(self $mail): void
{
$mail->send();
}
/**
* Check if the message was sent.
*

View file

@ -402,6 +402,13 @@ class Push
*/
public function send(): bool
{
// If queue is enabled, send asynchronously.
if (config('queue.enabled') && !defined('RXQUEUE_CRON'))
{
Queue::addTask(self::class . '::' . 'sendAsync', $this);
return true;
}
// Get caller information.
$backtrace = debug_backtrace(0);
if(count($backtrace) && isset($backtrace[0]['file']))
@ -464,6 +471,17 @@ class Push
return $this->sent > 0 ? true : false;
}
/**
* Send asynchronously (for Queue integration).
*
* @param self $sms
* @return void
*/
public static function sendAsync(self $push): void
{
$push->send();
}
/**
* Get the device token
*

302
common/framework/Queue.php Normal file
View file

@ -0,0 +1,302 @@
<?php
namespace Rhymix\Framework;
/**
* The Queue class.
*/
class Queue
{
/**
* Static properties.
*/
protected static $_drivers = [];
/**
* Add a custom Queue driver.
*
* @param string $name
* @param object $driver
* @return void
*/
public static function addDriver(string $name, Drivers\QueueInterface $driver): void
{
self::$_drivers[$name] = $driver;
}
/**
* Get the default driver.
*
* @param string $name
* @return ?Drivers\QueueInterface
*/
public static function getDriver(string $name): ?Drivers\QueueInterface
{
if (isset(self::$_drivers[$name]))
{
return self::$_drivers[$name];
}
$driver_class = '\Rhymix\Framework\Drivers\Queue\\' . $name;
if (class_exists($driver_class))
{
$driver_config = config('queue.' . $name) ?: [];
return self::$_drivers[$name] = $driver_class::getInstance($driver_config);
}
else
{
return null;
}
}
/**
* Get the list of supported Queue drivers.
*
* @return array
*/
public static function getSupportedDrivers(): array
{
$result = [];
foreach (Storage::readDirectory(__DIR__ . '/drivers/queue', false) as $filename)
{
$driver_name = substr($filename, 0, -4);
$class_name = '\Rhymix\Framework\Drivers\Queue\\' . $driver_name;
if ($class_name::isSupported())
{
$result[$driver_name] = [
'name' => $class_name::getName(),
'required' => $class_name::getRequiredConfig(),
'optional' => $class_name::getOptionalConfig(),
];
}
}
foreach (self::$_drivers as $driver_name => $driver)
{
if ($driver->isSupported())
{
$result[$driver_name] = [
'name' => $driver->getName(),
'required' => $driver->getRequiredConfig(),
'optional' => $driver->getOptionalConfig(),
];
}
}
ksort($result);
return $result;
}
/**
* Add a task.
*
* The handler can be in one of the following formats:
* - Global function, e.g. myHandler
* - ClassName::staticMethodName
* - ClassName::getInstance()->methodName
* - new ClassName()->methodName
*
* Once identified and/or instantiated, the handler will be passed $args
* and $options, in that order. Each of them must be a single object.
*
* It is strongly recommended that you write a dedicated method to handle
* queued tasks, rather than reusing an existing method with a potentially
* incompatible structure. If you must to call an existing method,
* you should consider writing a wrapper.
*
* Any value returned by the handler will be discarded. If you throw an
* exception, it may be logged, but it will not cause a fatal error.
*
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public static function addTask(string $handler, ?object $args = null, ?object $options = null): int
{
$driver_name = config('queue.driver');
if (!$driver_name)
{
throw new Exceptions\FeatureDisabled('Queue not configured');
}
$driver = self::getDriver($driver_name);
if (!$driver)
{
throw new Exceptions\FeatureDisabled('Queue not configured');
}
return $driver->addTask($handler, $args, $options);
}
/**
* Get the first task to execute immediately.
*
* If no tasks are pending, this method will return null.
* Detailed scheduling of tasks will be handled by each driver.
*
* @param int $blocking
* @return ?object
*/
public static function getTask(int $blocking = 0): ?object
{
$driver_name = config('queue.driver');
if (!$driver_name)
{
throw new Exceptions\FeatureDisabled('Queue not configured');
}
$driver = self::getDriver($driver_name);
if (!$driver)
{
throw new Exceptions\FeatureDisabled('Queue not configured');
}
return $driver->getTask($blocking);
}
/**
* Check the process key.
*
* @param string $key
* @return bool
*/
public static function checkKey(string $key): bool
{
return config('queue.key') === $key;
}
/**
* Process the queue.
*
* This will usually be called by a separate script, run every minute
* through an external scheduler such as crontab or systemd.
*
* If you are on a shared hosting service, you may also call a URL
* using a "web cron" service provider.
*
* @param int $timeout
* @return void
*/
public static function process(int $timeout): void
{
// This part will run in a loop until timeout.
$process_start_time = microtime(true);
while (true)
{
// Get a task from the driver.
$loop_start_time = microtime(true);
$task = self::getTask(1);
// Wait 1 second and loop back.
if ($task)
{
// Find the handler for the task.
$task->handler = trim($task->handler, '\\()');
$handler = null;
try
{
if (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)$/', $task->handler, $matches))
{
$class_name = '\\' . $matches[1];
$method_name = $matches[2];
if (class_exists($class_name) && method_exists($class_name, $method_name))
{
$handler = [$class_name, $method_name];
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
elseif (preg_match('/^(?:\\\\)?([\\\\\\w]+)::(\\w+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
{
$class_name = '\\' . $matches[1];
$initializer_name = $matches[2];
$method_name = $matches[3];
if (class_exists($class_name) && method_exists($class_name, $initializer_name))
{
$obj = $class_name::$initializer_name();
if (method_exists($obj, $method_name))
{
$handler = [$obj, $method_name];
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
elseif (preg_match('/^new (?:\\\\)?([\\\\\\w]+)(?:\(\))?->(\\w+)$/', $task->handler, $matches))
{
$class_name = '\\' . $matches[1];
$method_name = $matches[2];
if (class_exists($class_name) && method_exists($class_name, $method_name))
{
$obj = new $class_name();
$handler = [$obj, $method_name];
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
else
{
if (function_exists('\\' . $task->handler))
{
$handler = '\\' . $task->handler;
}
else
{
error_log('RxQueue: task handler not found: ' . $task->handler);
}
}
}
catch (\Throwable $th)
{
error_log(vsprintf('RxQueue: task handler %s could not be accessed: %s in %s:%d', [
$task->handler,
get_class($th),
$th->getFile(),
$th->getLine(),
]));
}
// Call the handler.
try
{
if ($handler)
{
call_user_func($handler, $task->args, $task->options);
}
}
catch (\Throwable $th)
{
error_log(vsprintf('RxQueue: task handler %s threw %s in %s:%d', [
$task->handler,
get_class($th),
$th->getFile(),
$th->getLine(),
]));
}
}
// If the timeout is imminent, break the loop.
$process_elapsed_time = microtime(true) - $process_start_time;
if ($process_elapsed_time > $timeout - 2)
{
break;
}
// If there was no task, wait 1 second to make sure that the loop isn't too tight.
$loop_elapsed_time = microtime(true) - $loop_start_time;
if (!$task && $loop_elapsed_time < 1)
{
usleep(intval((1 - $loop_elapsed_time) * 1000000));
}
}
}
}

View file

@ -511,6 +511,13 @@ class SMS
*/
public function send(): bool
{
// If queue is enabled, send asynchronously.
if (config('queue.enabled') && !defined('RXQUEUE_CRON'))
{
Queue::addTask(self::class . '::' . 'sendAsync', $this);
return true;
}
// Get caller information.
$backtrace = debug_backtrace(0);
if(count($backtrace) && isset($backtrace[0]['file']))
@ -566,6 +573,17 @@ class SMS
return $this->sent;
}
/**
* Send an SMS asynchronously (for Queue integration).
*
* @param self $sms
* @return void
*/
public static function sendAsync(self $sms): void
{
$sms->send();
}
/**
* Check if the message was sent.
*

View file

@ -0,0 +1,71 @@
<?php
namespace Rhymix\Framework\Drivers;
/**
* The Queue driver interface.
*/
interface QueueInterface
{
/**
* Create a new instance of the current Queue driver, using the given settings.
*
* @param array $config
* @return QueueInterface
*/
public static function getInstance(array $config): QueueInterface;
/**
* Get the human-readable name of this Queue driver.
*
* @return string
*/
public static function getName(): string;
/**
* Get the list of configuration fields required by this Queue driver.
*
* @return array
*/
public static function getRequiredConfig(): array;
/**
* Get the list of configuration fields optionally used by this Queue driver.
*
* @return array
*/
public static function getOptionalConfig(): array;
/**
* Check if this driver is supported on this server.
*
* @return bool
*/
public static function isSupported(): bool;
/**
* Validate driver configuration.
*
* @param mixed $config
* @return bool
*/
public static function validateConfig($config): bool;
/**
* Add a task.
*
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public function addTask(string $handler, ?object $args = null, ?object $options = null): int;
/**
* Get the first task.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object;
}

View file

@ -0,0 +1,131 @@
<?php
namespace Rhymix\Framework\Drivers\Queue;
use Rhymix\Framework\DB as RFDB;
use Rhymix\Framework\Drivers\QueueInterface;
/**
* The DB queue driver.
*/
class DB implements QueueInterface
{
/**
* Create a new instance of the current Queue driver, using the given settings.
*
* @param array $config
* @return QueueInterface
*/
public static function getInstance(array $config): QueueInterface
{
return new self($config);
}
/**
* Get the human-readable name of this Queue driver.
*
* @return string
*/
public static function getName(): string
{
return 'DB';
}
/**
* Get the list of configuration fields required by this Queue driver.
*
* @return array
*/
public static function getRequiredConfig(): array
{
return [];
}
/**
* Get the list of configuration fields optionally used by this Queue driver.
*
* @return array
*/
public static function getOptionalConfig(): array
{
return [];
}
/**
* Check if this driver is supported on this server.
*
* @return bool
*/
public static function isSupported(): bool
{
return true;
}
/**
* Validate driver configuration.
*
* @param mixed $config
* @return bool
*/
public static function validateConfig($config): bool
{
return true;
}
/**
* Constructor.
*
* @param array $config
*/
public function __construct(array $config)
{
}
/**
* Add a task.
*
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public function addTask(string $handler, ?object $args = null, ?object $options = null): int
{
$oDB = RFDB::getInstance();
$stmt = $oDB->prepare('INSERT INTO task_queue (handler, args, options) VALUES (?, ?, ?)');
$result = $stmt->execute([$handler, serialize($args), serialize($options)]);
return $result ? $oDB->getInsertID() : 0;
}
/**
* Get the first task.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object
{
$oDB = RFDB::getInstance();
$oDB->beginTransaction();
$stmt = $oDB->query('SELECT * FROM task_queue ORDER BY id LIMIT 1 FOR UPDATE');
$result = $stmt->fetchObject();
$stmt->closeCursor();
if ($result)
{
$stmt = $oDB->prepare('DELETE FROM task_queue WHERE id = ?');
$stmt->execute([$result->id]);
$oDB->commit();
$result->args = unserialize($result->args);
$result->options = unserialize($result->options);
return $result;
}
else
{
$oDB->commit();
return null;
}
}
}

View file

@ -0,0 +1,119 @@
<?php
namespace Rhymix\Framework\Drivers\Queue;
use Rhymix\Framework\Drivers\QueueInterface;
/**
* The Dummy queue driver.
*/
class Dummy implements QueueInterface
{
/**
* Dummy queue for testing.
*/
protected $_dummy_queue;
/**
* Create a new instance of the current Queue driver, using the given settings.
*
* @param array $config
* @return QueueInterface
*/
public static function getInstance(array $config): QueueInterface
{
return new self($config);
}
/**
* Get the human-readable name of this Queue driver.
*
* @return string
*/
public static function getName(): string
{
return 'Dummy';
}
/**
* Get the list of configuration fields required by this Queue driver.
*
* @return array
*/
public static function getRequiredConfig(): array
{
return [];
}
/**
* Get the list of configuration fields optionally used by this Queue driver.
*
* @return array
*/
public static function getOptionalConfig(): array
{
return [];
}
/**
* Check if this driver is supported on this server.
*
* @return bool
*/
public static function isSupported(): bool
{
return true;
}
/**
* Validate driver configuration.
*
* @param mixed $config
* @return bool
*/
public static function validateConfig($config): bool
{
return true;
}
/**
* Constructor.
*
* @param array $config
*/
public function __construct(array $config)
{
}
/**
* Add a task.
*
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public function addTask(string $handler, ?object $args = null, ?object $options = null): int
{
$this->_dummy_queue = (object)[
'handler' => $handler,
'args' => $args,
'options' => $options,
];
return 0;
}
/**
* Get the first task.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object
{
$result = $this->_dummy_queue;
$this->_dummy_queue = null;
return $result;
}
}

View file

@ -0,0 +1,197 @@
<?php
namespace Rhymix\Framework\Drivers\Queue;
use Rhymix\Framework\Drivers\QueueInterface;
/**
* The Redis queue driver.
*/
class Redis implements QueueInterface
{
/**
* The Redis connection is stored here.
*/
protected $_conn;
protected $_key;
/**
* Create a new instance of the current Queue driver, using the given settings.
*
* @param array $config
* @return QueueInterface
*/
public static function getInstance(array $config): QueueInterface
{
return new self($config);
}
/**
* Get the human-readable name of this Queue driver.
*
* @return string
*/
public static function getName(): string
{
return 'Redis';
}
/**
* Get the list of configuration fields required by this Queue driver.
*
* @return array
*/
public static function getRequiredConfig(): array
{
return ['host', 'port'];
}
/**
* Get the list of configuration fields optionally used by this Queue driver.
*
* @return array
*/
public static function getOptionalConfig(): array
{
return ['dbnum', 'user', 'pass'];
}
/**
* Check if this driver is supported on this server.
*
* @return bool
*/
public static function isSupported(): bool
{
return class_exists('\\Redis');
}
/**
* Validate driver configuration.
*
* @param mixed $config
* @return bool
*/
public static function validateConfig($config): bool
{
try
{
$test = new \Redis;
$test->connect($config['host'], $config['port'] ?? 6379);
if (isset($config['user']) || isset($config['pass']))
{
$auth = [];
if (isset($config['user']) && $config['user']) $auth[] = $config['user'];
if (isset($config['pass']) && $config['pass']) $auth[] = $config['pass'];
$test->auth(count($auth) > 1 ? $auth : $auth[0]);
}
if (isset($config['dbnum']))
{
$test->select(intval($config['dbnum']));
}
return true;
}
catch (\Throwable $th)
{
return false;
}
}
/**
* Constructor.
*
* @param array $config
*/
public function __construct(array $config)
{
try
{
$this->_conn = new \Redis;
$this->_conn->connect($config['host'], $config['port'] ?? 6379);
if (isset($config['user']) || isset($config['pass']))
{
$auth = [];
if (isset($config['user']) && $config['user']) $auth[] = $config['user'];
if (isset($config['pass']) && $config['pass']) $auth[] = $config['pass'];
$this->_conn->auth(count($auth) > 1 ? $auth : $auth[0]);
}
if (isset($config['dbnum']))
{
$this->_conn->select(intval($config['dbnum']));
}
$this->_key = 'rxQueue_' . substr(hash_hmac('sha1', 'rxQueue_', config('crypto.authentication_key')), 0, 24);
}
catch (\RedisException $e)
{
$this->_conn = null;
}
}
/**
* Add a task.
*
* @param string $handler
* @param ?object $args
* @param ?object $options
* @return int
*/
public function addTask(string $handler, ?object $args = null, ?object $options = null): int
{
$value = serialize((object)[
'handler' => $handler,
'args' => $args,
'options' => $options,
]);
if ($this->_conn)
{
$result = $this->_conn->rPush($this->_key, $value);
return intval($result);
}
else
{
return 0;
}
}
/**
* Get the first task.
*
* @param int $blocking
* @return ?object
*/
public function getTask(int $blocking = 0): ?object
{
if ($this->_conn)
{
if ($blocking > 0)
{
$result = $this->_conn->blpop($this->_key, $blocking);
if (is_array($result) && isset($result[1]))
{
return unserialize($result[1]);
}
else
{
return null;
}
}
else
{
$result = $this->_conn->lpop($this->_key);
if ($result)
{
return unserialize($result);
}
else
{
return null;
}
}
}
else
{
return null;
}
}
}

91
common/scripts/cron.php Normal file
View file

@ -0,0 +1,91 @@
<?php
/**
* This script runs the task queue.
*
* Unlike other scripts provided with Rhymix, it can be called
* both on the command line and over the network.
*/
define('RXQUEUE_CRON', true);
// If called on the CLI, run additional checks.
if (PHP_SAPI === 'cli')
{
require_once __DIR__ . '/common.php';
}
else
{
// If called over the network, load Rhymix directly.
chdir(dirname(dirname(__DIR__)));
require_once dirname(__DIR__) . '/autoload.php';
Context::init();
// On the other hand, we should check the key.
$key = (string)Context::get('key');
if (!Rhymix\Framework\Queue::checkKey($key))
{
Context::setCacheControl(0);
header('HTTP/1.1 403 Forbidden');
echo "Invalid key\n";
Context::close();
exit;
}
}
// Get queue configuration set by the administrator.
$timeout = (config('queue.interval') ?? 1) * 60;
$process_count = config('queue.process_count') ?? 1;
// If called over the network, try to increase the timeout.
if (PHP_SAPI !== 'cli')
{
ignore_user_abort(true);
set_time_limit(max(60, $timeout));
}
// Create multiple processes if configured.
if (PHP_SAPI === 'cli' && $process_count > 1 && function_exists('pcntl_fork') && function_exists('pcntl_waitpid'))
{
// This array will keep a dictionary of subprocesses.
$pids = [];
// The database connection must be closed before forking.
Rhymix\Framework\DB::getInstance()->disconnect();
Rhymix\Framework\Debug::disable();
// Create the required number of subprocesses.
for ($i = 0; $i < $process_count; $i++)
{
$pid = pcntl_fork();
if ($pid > 0)
{
$pids[$pid] = true;
usleep(200000);
}
elseif ($pid == 0)
{
Rhymix\Framework\Queue::process($timeout);
exit;
}
else
{
error_log('RxQueue: could not fork!');
exit;
}
}
// The parent process waits for its children to finish.
while (count($pids))
{
$pid = pcntl_waitpid(-1, $status, \WNOHANG);
if ($pid)
{
unset($pids[$pid]);
}
usleep(200000);
}
}
else
{
Rhymix\Framework\Queue::process($timeout);
}

View file

@ -34,6 +34,8 @@
<action name="procAdminUpdateDebug" class="Controllers\SystemConfig\Debug" />
<action name="dispAdminConfigSEO" class="Controllers\SystemConfig\SEO" menu_name="adminConfigurationGeneral" />
<action name="procAdminUpdateSEO" class="Controllers\SystemConfig\SEO" />
<action name="dispAdminConfigQueue" class="Controllers\SystemConfig\Queue" menu_name="adminConfigurationGeneral" />
<action name="procAdminUpdateQueue" class="Controllers\SystemConfig\Queue" />
<action name="dispAdminConfigSitelock" class="Controllers\SystemConfig\SiteLock" menu_name="adminConfigurationGeneral" />
<action name="procAdminUpdateSitelock" class="Controllers\SystemConfig\SiteLock" />
<!-- Admin Interface Config -->

View file

@ -0,0 +1,135 @@
<?php
namespace Rhymix\Modules\Admin\Controllers\SystemConfig;
use Context;
use Rhymix\Framework\Config;
use Rhymix\Framework\Exception;
use Rhymix\Framework\Queue as RFQueue;
use Rhymix\Framework\Security;
use Rhymix\Modules\Admin\Controllers\Base;
class Queue extends Base
{
/**
* Display Notification Settings page
*/
public function dispAdminConfigQueue()
{
// Load queue drivers.
$queue_drivers = RFQueue::getSupportedDrivers();
uasort($queue_drivers, function($a, $b) {
if ($a['name'] === 'Dummy') return -1;
if ($b['name'] === 'Dummy') return 1;
return strnatcasecmp($a['name'], $b['name']);
});
Context::set('queue_drivers', $queue_drivers);
Context::set('queue_driver', config('queue.driver') ?: 'dummy');
// Set the default auth key.
if (!config('queue.key'))
{
config('queue.key', Security::getRandom(32));
}
// Set defaults for Redis.
if (!config('queue.redis'))
{
config('queue.redis', [
'host' => '127.0.0.1',
'port' => '6379',
'dbnum' => 0,
]);
}
$this->setTemplateFile('config_queue');
}
/**
* Update notification configuration.
*/
public function procAdminUpdateQueue()
{
$vars = Context::getRequestVars();
// Enabled?
$enabled = $vars->queue_enabled === 'Y';
// Validate the driver.
$drivers = RFQueue::getSupportedDrivers();
$driver = trim($vars->queue_driver);
if (!array_key_exists($driver, $drivers))
{
throw new Exception('msg_queue_driver_not_found');
}
if ($enabled && (!$driver || $driver === 'dummy'))
{
throw new Exception('msg_queue_driver_cannot_be_dummy');
}
// Validate required and optional driver settings.
$driver_config = [];
foreach ($drivers[$driver]['required'] as $conf_name)
{
$conf_value = trim($vars->{'queue_' . $driver . '_' . $conf_name} ?? '');
if ($conf_value === '')
{
throw new Exception('msg_queue_invalid_config');
}
$driver_config[$conf_name] = $conf_value === '' ? null : $conf_value;
}
foreach ($drivers[$driver]['optional'] as $conf_name)
{
$conf_value = trim($vars->{'queue_' . $driver . '_' . $conf_name} ?? '');
$driver_config[$conf_name] = $conf_value === '' ? null : $conf_value;
}
// Validate the interval.
$interval = intval($vars->queue_interval ?? 1);
if ($interval < 1 || $interval > 10)
{
throw new Exception('msg_queue_invalid_interval');
}
// Validate the process count.
$process_count = intval($vars->queue_process_count ?? 1);
if ($process_count < 1 || $process_count > 10)
{
throw new Exception('msg_queue_invalid_process_count');
}
// Validate the key.
$key = trim($vars->queue_key ?? '');
if (strlen($key) < 16 || !ctype_alnum($key))
{
throw new Exception('msg_queue_invalid_key');
}
// Validate actual operation of the driver.
$driver_class = '\\Rhymix\\Framework\\Drivers\\Queue\\' . $driver;
if (!class_exists($driver_class) || !$driver_class::isSupported())
{
throw new Exception('msg_queue_driver_not_found');
}
if (!$driver_class::validateConfig($driver_config))
{
throw new Exception('msg_queue_driver_not_usable');
}
// Save system config.
Config::set("queue.enabled", $enabled);
Config::set("queue.driver", $driver);
Config::set("queue.interval", $interval);
Config::set("queue.process_count", $process_count);
Config::set("queue.key", $key);
Config::set("queue.$driver", $driver_config);
if (!Config::save())
{
throw new Exception('msg_failed_to_save_config');
}
$this->setMessage('success_updated');
$this->setRedirectUrl(Context::get('success_return_url') ?: getNotEncodedUrl('', 'module', 'admin', 'act', 'dispAdminConfigQueue'));
}
}

View file

@ -8,6 +8,7 @@ $lang->subtitle_security = 'Security';
$lang->subtitle_advanced = 'Advanced';
$lang->subtitle_domains = 'Domains';
$lang->subtitle_debug = 'Debug';
$lang->subtitle_queue = 'Async Queue';
$lang->subtitle_seo = 'SEO Settings';
$lang->subtitle_etc = 'Other Settings';
$lang->current_state = 'Current state';
@ -282,6 +283,36 @@ $lang->og_extract_images_fallback = 'Use site default image only';
$lang->og_extract_hashtags = 'Extract Hashtags from Document';
$lang->og_use_nick_name = 'Include Author Name';
$lang->og_use_timestamps = 'Include Timestamps';
$lang->cmd_queue_description = 'Improve response times by processing time-consuming tasks, such as sending notifications, asynchronously.<br>This is an experimental feature. It may not be stable depending on your hosting environment.';
$lang->cmd_queue_enabled = 'Use Task Queue';
$lang->cmd_queue_enabled_help = 'The task queue will stop accepting new tasks if you uncheck the above.';
$lang->cmd_queue_driver = 'Queue Driver';
$lang->cmd_queue_driver_help = 'Select the driver for the task queue that suits your hosting environment and website needs.<br>Some drivers such as Redis will need the corresponding program to be installed on the server.';
$lang->cmd_queue_interval = 'Calling Interval';
$lang->cmd_queue_interval_help = 'Use a scheduler such as crontab or systemd timer to call the script on a set interval.<br>All tasks are processed as soon as possible regardless of the interval, but a short interval means quick recovery from any error.<br>For web-based cron, this should not exceed the max_execution_time setting in php.ini.<br>The max_execution_time on this server is %d seconds.';
$lang->cmd_queue_process_count = 'Process Count';
$lang->cmd_queue_process_count_help = 'Use multiple processes to increase throughput. This may increase server load significantly.<br>Keep a value of 1 unless you have a high-performance dedicated server.<br>Multiprocessing is not supported when using web-based cron.';
$lang->cmd_queue_call_script = 'Processing Script';
$lang->cmd_queue_webcron_key = 'Webcron Auth Key';
$lang->cmd_queue_config_keys['host'] = 'Host';
$lang->cmd_queue_config_keys['port'] = 'Port';
$lang->cmd_queue_config_keys['user'] = 'User';
$lang->cmd_queue_config_keys['pass'] = 'Password';
$lang->cmd_queue_config_keys['dbnum'] = 'DB Number';
$lang->msg_queue_instructions['same_as_php'] = '(same as PHP)';
$lang->msg_queue_instructions['crontab1'] = 'Log into the server as the <code>%s</code> account and run <code>crontab -e</code> to paste the following content into your crontab. (DO NOT run it as root!)<br>The <code>%s</code> directory in the example should be replaced with a path where logs can be recorded.';
$lang->msg_queue_instructions['crontab2'] = 'If you change the calling interval below, the crontab interval must be adjusted accordingly.';
$lang->msg_queue_instructions['webcron'] = 'Configure an external cron service to make a GET request to the following URL every minute, or following the interval set below.<br>Check your logs to make sure that the cron service is reaching your website.';
$lang->msg_queue_instructions['systemd1'] = 'Put the following content in <code>/etc/systemd/system/rhymix-queue.service</code>';
$lang->msg_queue_instructions['systemd2'] = 'Put the following content in <code>/etc/systemd/system/rhymix-queue.timer</code>';
$lang->msg_queue_instructions['systemd3'] = 'Execute the following commands to enable the timer, and monitor your journal to make sure that it is operating as scheduled.';
$lang->msg_queue_driver_not_found = 'The selected task queue driver is not supported on this server.';
$lang->msg_queue_driver_not_usable = 'The selected task queue driver failed to initialize using the configuration values you entered.';
$lang->msg_queue_driver_cannot_be_dummy = 'In otder to use the task queue, you must select a driver other than "Not use"';
$lang->msg_queue_invalid_config = 'Missing or invalid configuration for the selected queue driver.';
$lang->msg_queue_invalid_interval = 'The calling interval must be between 1 and 10 minutes.';
$lang->msg_queue_invalid_process_count = 'The process count must be between 1 and 10.';
$lang->msg_queue_invalid_key = 'The webcron auth key must be at least 16 characters long, and only consist of alphanumeric characters.';
$lang->autoinstall = 'EasyInstall';
$lang->last_week = 'Last Week';
$lang->this_week = 'This Week';

View file

@ -7,6 +7,7 @@ $lang->subtitle_notification = '알림 설정';
$lang->subtitle_security = '보안 설정';
$lang->subtitle_advanced = '고급 설정';
$lang->subtitle_debug = '디버그 설정';
$lang->subtitle_queue = '비동기 작업';
$lang->subtitle_seo = 'SEO 설정';
$lang->subtitle_etc = '기타';
$lang->current_state = '현황';
@ -278,6 +279,36 @@ $lang->og_extract_images_fallback = '사이트 대표 이미지 사용';
$lang->og_extract_hashtags = '본문에서 해시태그 추출';
$lang->og_use_nick_name = '글 작성자 이름 표시';
$lang->og_use_timestamps = '글 작성/수정 시각 표시';
$lang->cmd_queue_description = '메일 발송, 푸시알림 등 시간이 오래 걸리거나 외부 서비스와 연동하는 작업을 비동기 처리하여 응답 속도를 개선합니다.<br>실험적인 기능입니다. 호스팅 환경에 따라서는 안정적으로 작동하지 않을 수도 있습니다.';
$lang->cmd_queue_enabled = '비동기 작업 사용';
$lang->cmd_queue_enabled_help = '체크를 해제하면 더이상 작업을 접수하지 않습니다.';
$lang->cmd_queue_driver = '비동기 드라이버';
$lang->cmd_queue_driver_help = '비동기 작업을 관리할 방법을 설정합니다. 호스팅 환경과 사이트의 필요에 맞추어 선택하세요.<br>Redis 등 일부 드라이버는 서버에 해당 기능이 설치되어 있어야 사용할 수 있습니다.';
$lang->cmd_queue_interval = '호출 간격';
$lang->cmd_queue_interval_help = 'crontab, systemd timer, 웹크론 등을 사용하여 일정한 주기로 스크립트를 호출해 주십시오.<br>모든 비동기 작업은 호출 간격과 무관하게 실시간으로 처리되나, 호출 간격이 짧으면 장애 발생시 신속하게 복구됩니다.<br>웹크론 사용시에는 php.ini의 실행 시간 제한을 초과하지 않는 것이 좋습니다.<br>이 서버의 max_execution_time은 %d초로 설정되어 있습니다.';
$lang->cmd_queue_process_count = '프로세스 갯수';
$lang->cmd_queue_process_count_help = '여러 개의 프로세스를 동시에 실행하여 처리 용량을 늘립니다. 서버 부하가 증가할 수 있습니다.<br>고성능 단독서버가 아니라면 1을 유지하시기 바랍니다.<br>웹크론으로 호출한 경우에는 멀티프로세싱을 지원하지 않습니다.';
$lang->cmd_queue_call_script = '작업 처리 스크립트';
$lang->cmd_queue_webcron_key = '웹크론 인증키';
$lang->cmd_queue_config_keys['host'] = '호스트';
$lang->cmd_queue_config_keys['port'] = '포트';
$lang->cmd_queue_config_keys['user'] = '아이디';
$lang->cmd_queue_config_keys['pass'] = '암호';
$lang->cmd_queue_config_keys['dbnum'] = 'DB 번호';
$lang->msg_queue_instructions['same_as_php'] = 'PHP를 실행하는 계정과 동일한';
$lang->msg_queue_instructions['crontab1'] = '%s 계정으로 서버에 로그인하여 <code>crontab -e</code> 명령을 실행한 후, 아래의 내용을 붙여넣으십시오. (root 권한으로 실행하지 마십시오.)<br>예제의 <code>%s</code> 디렉토리는 로그를 기록할 수 있는 경로로 변경하여 사용하십시오.';
$lang->msg_queue_instructions['crontab2'] = '스크립트 호출 간격을 변경할 경우, 설정에 맞추어 crontab 실행 간격도 조절하여야 합니다.';
$lang->msg_queue_instructions['webcron'] = '아래의 URL을 1분 간격 또는 아래에서 설정한 호출 간격에 맞추어 GET으로 호출하도록 합니다.<br>웹크론 서비스가 방화벽이나 CDN 등에 의해 차단되지 않도록 주의하고, 정상적으로 호출되는지 서버 로그를 확인하십시오.';
$lang->msg_queue_instructions['systemd1'] = '<code>/etc/systemd/system/rhymix-queue.service</code> 파일에 아래와 같은 내용을 넣습니다.';
$lang->msg_queue_instructions['systemd2'] = '<code>/etc/systemd/system/rhymix-queue.timer</code> 파일에 아래와 같은 내용을 넣습니다.';
$lang->msg_queue_instructions['systemd3'] = '아래의 명령을 실행하여 타이머를 활성화하고, 정상 작동하는지 모니터링하십시오.';
$lang->msg_queue_driver_not_found = '이 서버에서 지원하지 않는 비동기 드라이버입니다.';
$lang->msg_queue_driver_not_usable = '입력하신 정보로 비동기 드라이버와 연동하는 데 실패했습니다. 드라이버 설정을 확인해 주십시오.';
$lang->msg_queue_driver_cannot_be_dummy = '비동기 작업을 사용하려면 "미사용" 이외의 드라이버를 선택해야 합니다.';
$lang->msg_queue_invalid_config = '비동기 드라이버의 필수 설정이 누락되었습니다.';
$lang->msg_queue_invalid_interval = '호출 간격은 1~10분 이내여야 합니다.';
$lang->msg_queue_invalid_process_count = '프로세스 갯수는 1~10개 이내여야 합니다.';
$lang->msg_queue_invalid_key = '웹크론 인증키는 16자 이상으로, 알파벳 대소문자와 숫자만으로 이루어져야 합니다.';
$lang->autoinstall = '쉬운 설치';
$lang->last_week = '지난주';
$lang->this_week = '이번주';

View file

@ -12,5 +12,6 @@
<li class="x_active"|cond="$act == 'dispAdminConfigAdvanced'"><a href="{getUrl('', 'module', 'admin', 'act', 'dispAdminConfigAdvanced')}">{$lang->subtitle_advanced}</a></li>
<li class="x_active"|cond="$act == 'dispAdminConfigDebug'"><a href="{getUrl('', 'module', 'admin', 'act', 'dispAdminConfigDebug')}">{$lang->subtitle_debug}</a></li>
<li class="x_active"|cond="$act == 'dispAdminConfigSEO'"><a href="{getUrl('', 'module', 'admin', 'act', 'dispAdminConfigSEO')}">{$lang->subtitle_seo}</a></li>
<li class="x_active"|cond="$act == 'dispAdminConfigQueue'"><a href="{getUrl('', 'module', 'admin', 'act', 'dispAdminConfigQueue')}">{$lang->subtitle_queue}</a></li>
<li class="x_active"|cond="$act == 'dispAdminConfigSitelock'"><a href="{getUrl('', 'module', 'admin', 'act', 'dispAdminConfigSitelock')}">{$lang->subtitle_sitelock}</a></li>
</ul>

View file

@ -0,0 +1,195 @@
<config autoescape="on" />
<include target="config_header.html" />
<load target="css/queue_config.scss" />
<load target="js/queue_config.js" />
<div class="message">
<p>{$lang->cmd_queue_description}</p>
</div>
<div cond="!empty($XE_VALIDATOR_MESSAGE) && $XE_VALIDATOR_ID == 'modules/admin/tpl/config_queue/1'" class="message {$XE_VALIDATOR_MESSAGE_TYPE}">
<p>{$XE_VALIDATOR_MESSAGE}</p>
</div>
<script type="text/javascript">
var queue_drivers = {json_encode($queue_drivers)|noescape};
</script>
<form action="./" method="post" class="x_form-horizontal">
<input type="hidden" name="module" value="admin" />
<input type="hidden" name="act" value="procAdminUpdateQueue" />
<input type="hidden" name="xe_validator_id" value="modules/admin/tpl/config_queue/1" />
<section class="section">
<h2>{$lang->subtitle_queue}</h2>
<div class="x_control-group">
<label class="x_control-label">{$lang->cmd_queue_enabled}</label>
<div class="x_controls">
<label for="queue_enabled" class="x_inline">
<input type="checkbox" name="queue_enabled" id="queue_enabled" value="Y" checked="checked"|cond="config('queue.enabled')" />
{$lang->cmd_queue_enabled}
</label>
<br>
<p class="x_help-block">{$lang->cmd_queue_enabled_help}</p>
</div>
</div>
<div class="x_control-group">
<label class="x_control-label" for="queue_driver">{$lang->cmd_queue_driver}</label>
<div class="x_controls">
<select name="queue_driver" id="queue_driver">
<!--@foreach($queue_drivers as $driver_name => $driver_definition)-->
<option value="{$driver_name}" selected="selected"|cond="$queue_driver === $driver_name">{$driver_name === 'dummy' ? $lang->notuse : $driver_definition['name']}</option>
<!--@end-->
</select>
<p class="x_help-block">{$lang->cmd_queue_driver_help}</p>
</div>
</div>
<!--@foreach($queue_drivers as $driver_name => $driver_definition)-->
{@ $conf_names = array_merge($driver_definition['required'], $driver_definition['optional'])}
<!--@foreach($conf_names as $conf_name)-->
{@ $conf_value = escape(config("queue.$driver_name.$conf_name"))}
{@ $text_keys = ['host', 'user']}
{@ $number_keys = ['port', 'dbnum']}
{@ $password_keys = ['pass']}
<!--@if(in_array($conf_name, $text_keys))-->
<div class="x_control-group hidden-by-default show-for-{$driver_name}">
<label class="x_control-label" for="queue_{$driver_name}_{$conf_name}">{$lang->cmd_queue_config_keys[$conf_name]}</label>
<div class="x_controls">
<input type="text" name="queue_{$driver_name}_{$conf_name}" id="queue_{$driver_name}_{$conf_name}" value="{$conf_value}" />
</div>
</div>
<!--@end-->
<!--@if(in_array($conf_name, $number_keys))-->
<div class="x_control-group hidden-by-default show-for-{$driver_name}">
<label class="x_control-label" for="queue_{$driver_name}_{$conf_name}">{$lang->cmd_queue_config_keys[$conf_name]}</label>
<div class="x_controls">
<input type="number" name="queue_{$driver_name}_{$conf_name}" id="queue_{$driver_name}_{$conf_name}" value="{$conf_value}" />
</div>
</div>
<!--@end-->
<!--@if(in_array($conf_name, $password_keys))-->
<div class="x_control-group hidden-by-default show-for-{$driver_name}">
<label class="x_control-label" for="queue_{$driver_name}_{$conf_name}">{$lang->cmd_queue_config_keys[$conf_name]}</label>
<div class="x_controls">
<input type="password" name="queue_{$driver_name}_{$conf_name}" id="queue_{$driver_name}_{$conf_name}" value="{$conf_value}" autocomplete="new-password" />
</div>
</div>
<!--@end-->
<!--@end-->
<!--@end-->
</section>
<section>
<h2>{$lang->cmd_queue_call_script}</h2>
<div class="queue-script-setup">
<ul class="qss-tabs x_nav x_nav-tabs">
<li class="x_active"><a href="#" data-value="crontab">crontab</a></li>
<li><a href="#" data-value="webcron">webcron</a></li>
<li><a href="#" data-value="systemd-timer">systemd timer</a></li>
</ul>
<div class="qss-content crontab active">
{@
if (function_exists('posix_getpwuid') && function_exists('posix_getuid')):
$user_info = posix_getpwuid(posix_getuid());
if (!empty($user_info['dir'])):
$user_info['dir'] .= DIRECTORY_SEPARATOR;
endif;
else:
$user_info = [];
$user_info['name'] = $lang->msg_queue_instructions['same_as_php'];
endif;
}
<p class="qss-instruction">
{sprintf($lang->msg_queue_instructions['crontab1'], $user_info['name'] ?? 'PHP', $user_info['dir'] . 'logs')|noescape}
</p>
<pre><code>* * * * * php {\RX_BASEDIR}index.php common.cron &gt;&gt; {$user_info['dir']}logs{\DIRECTORY_SEPARATOR}queue.log 2&gt;&amp;1</code></pre>
<p class="qss-instruction">
{$lang->msg_queue_instructions['crontab2']|noescape}
</p>
</div>
<div class="qss-content webcron">
<p class="qss-instruction">
{$lang->msg_queue_instructions['webcron']|noescape}
</p>
<pre><code class="webcron-url">{getFullUrl('')}common/scripts/cron.php?key={config('queue.key')}</code></pre>
</div>
<div class="qss-content systemd-timer">
<p class="qss-instruction">
{$lang->msg_queue_instructions['systemd1']|noescape}
</p>
<pre><code>[Unit]
Description=Rhymix Queue Service
[Service]
ExecStart=php {\RX_BASEDIR}index.php common.cron
User={$user_info['name']}</code></pre>
<p class="qss-instruction">
{$lang->msg_queue_instructions['systemd2']|noescape}
</p>
<pre><code>[Unit]
Description=Rhymix Queue Timer
[Timer]
OnCalendar=*-*-* *:*:00
Unit=rhymix-queue.service
[Install]
WantedBy=multi-user.target</code></pre>
<p class="qss-instruction">
{$lang->msg_queue_instructions['systemd3']|noescape}
</p>
<pre><code>systemctl daemon-reload
systemctl start rhymix-queue.timer
systemctl enable rhymix-queue.timer</code></pre>
</div>
</div>
<div class="x_control-group">
<label class="x_control-label" for="queue_key">{$lang->cmd_queue_webcron_key}</label>
<div class="x_controls">
<input type="text" class="x_full-width" name="queue_key" id="queue_key" value="{config('queue.key')}" />
</div>
</div>
<div class="x_control-group">
<label class="x_control-label" for="queue_interval">{$lang->cmd_queue_interval}</label>
<div class="x_controls">
<input type="number" name="queue_interval" id="queue_interval" min="1" max="10" value="{config('queue.interval') ?: 1}" />
<span class="x_inline">{$lang->unit_min}</span>
<br>
<p class="x_help-block" style="margin-top:10px">{sprintf($lang->cmd_queue_interval_help, ini_get('max_execution_time'))|noescape}</p>
</div>
</div>
<div class="x_control-group">
<label class="x_control-label" for="queue_process_count">{$lang->cmd_queue_process_count}</label>
<div class="x_controls">
<input type="number" name="queue_process_count" id="queue_process_count" min="1" max="10" value="{config('queue.process_count') ?: 1}" />
<p class="x_help-block">{$lang->cmd_queue_process_count_help}</p>
</div>
</div>
</section>
<div class="x_clearfix btnArea">
<div class="x_pull-right">
<button type="submit" class="x_btn x_btn-primary">{$lang->cmd_save}</button>
</div>
</div>
</form>

View file

@ -0,0 +1,27 @@
.queue-script-setup {
.qss-content {
display: none;
&.active {
display: block;
border: 1px solid #ddd;
border-top: 0;
margin-top: -20px;
padding: 20px 12px 10px 12px;
border-bottom-left-radius: 4px;
border-bottom-right-radius: 4px;
margin-bottom: 20px;
}
.qss-instruction {
margin-bottom: 10px;
code {
color: #333;
border: 0;
background-color: transparent;
padding: 0 1px;
}
}
.pre {
margin-bottom: 10px;
}
}
}

View file

@ -0,0 +1,33 @@
(function($) {
$(function() {
$("#queue_driver").on("change", function() {
const selected_driver = $(this).val();
$(this).parents("section").find("div.x_control-group.hidden-by-default, p.x_help-block.hidden-by-default").each(function() {
if ($(this).hasClass("show-for-" + selected_driver)) {
$(this).show();
} else {
$(this).hide();
}
});
}).triggerHandler("change");
$("#queue_key").on('change keyup paste', function() {
const key = encodeURIComponent(String($(this).val()));
$('.webcron-url').text($('.webcron-url').text().replace(/\?key=[a-zA-Z0-9]+/g, '?key=' + key));
});
const qss = $('.queue-script-setup');
const qss_tabs = qss.find('.qss-tabs');
const qss_content = qss.find('.qss-content');
qss_tabs.on('click', 'a', function(event) {
const selected_tab = $(this).data('value');
qss_tabs.find('li').removeClass('x_active');
$(this).parent().addClass('x_active');
qss_content.removeClass('active');
qss_content.filter('.' + selected_tab).addClass('active');
event.preventDefault();
});
});
})(jQuery);

View file

@ -0,0 +1,7 @@
<table name="task_queue">
<column name="id" type="bigint" notnull="notnull" primary_key="primary_key" auto_increment="auto_increment" />
<column name="handler" type="varchar" size="191" notnull="notnull" />
<column name="args" type="longtext" notnull="notnull" />
<column name="options" type="longtext" notnull="notnull" />
<column name="regdate" type="datetime" notnull="notnull" default="current_timestamp()" index="idx_regdate" />
</table>

View file

@ -0,0 +1,23 @@
<?php
class QueueTest extends \Codeception\Test\Unit
{
public function testDummyQueue()
{
config('queue.driver', 'dummy');
$handler = 'myfunc';
$args = (object)['foo' => 'bar'];
$options = (object)['key' => 'val'];
Rhymix\Framework\Queue::addTask($handler, $args, $options);
$output = Rhymix\Framework\Queue::getTask();
$this->assertEquals('myfunc', $output->handler);
$this->assertEquals('bar', $output->args->foo);
$this->assertEquals('val', $output->options->key);
$output = Rhymix\Framework\Queue::getTask();
$this->assertNull($output);
}
}