Implement multiprocessing, and try to make webcron more resilient

This commit is contained in:
Kijin Sung 2024-10-10 23:22:42 +09:00
parent 097cecece8
commit 61b9f57196
2 changed files with 57 additions and 5 deletions

View file

@ -178,9 +178,6 @@ class Queue
*/
public static function process(int $timeout): void
{
// Increase the time limit. This may or may not work.
set_time_limit(min(60, $timeout));
// This part will run in a loop until timeout.
$process_start_time = microtime(true);
while (true)

View file

@ -22,6 +22,7 @@ else
$key = (string)Context::get('key');
if (!Rhymix\Framework\Queue::checkKey($key))
{
Context::setCacheControl(0);
header('HTTP/1.1 403 Forbidden');
echo "Invalid key\n";
Context::close();
@ -29,6 +30,60 @@ else
}
}
// The rest of the work will be done by the Queue class.
// Get queue configuration set by the administrator.
$timeout = (config('queue.interval') ?? 1) * 60;
Rhymix\Framework\Queue::process($timeout);
$process_count = config('queue.process_count') ?? 1;
// If called over the network, try to increase the timeout.
if (PHP_SAPI !== 'cli')
{
ignore_user_abort(true);
set_time_limit(max(60, $timeout));
}
// Create multiple processes if configured.
if (PHP_SAPI === 'cli' && $process_count > 1 && function_exists('pcntl_fork') && function_exists('pcntl_waitpid'))
{
// This array will keep a dictionary of subprocesses.
$pids = [];
// The database connection must be closed before forking.
Rhymix\Framework\DB::getInstance()->disconnect();
Rhymix\Framework\Debug::disable();
// Create the required number of subprocesses.
for ($i = 0; $i < $process_count; $i++)
{
$pid = pcntl_fork();
if ($pid > 0)
{
$pids[$pid] = true;
usleep(200000);
}
elseif ($pid == 0)
{
Rhymix\Framework\Queue::process($timeout);
exit;
}
else
{
error_log('RxQueue: could not fork!');
exit;
}
}
// The parent process waits for its children to finish.
while (count($pids))
{
$pid = pcntl_waitpid(-1, $status, \WNOHANG);
if ($pid)
{
unset($pids[$pid]);
}
usleep(200000);
}
}
else
{
Rhymix\Framework\Queue::process($timeout);
}