mirror of
https://github.com/Lastorder-DC/rhymix.git
synced 2026-04-02 01:52:10 +09:00
Assign task_srl to scheduled tasks
This commit is contained in:
parent
53cd6e807d
commit
f87429687a
3 changed files with 31 additions and 26 deletions
|
|
@ -321,7 +321,7 @@ class Queue
|
||||||
$tasks = $db_driver->getScheduledTasks('interval');
|
$tasks = $db_driver->getScheduledTasks('interval');
|
||||||
foreach ($tasks as $task)
|
foreach ($tasks as $task)
|
||||||
{
|
{
|
||||||
$db_driver->updateLastRunTimestamp($task->id);
|
$db_driver->updateLastRunTimestamp($task->task_srl);
|
||||||
self::_executeTask($task);
|
self::_executeTask($task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -111,12 +111,14 @@ class DB implements QueueInterface
|
||||||
public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int
|
public function addTaskAt(int $time, string $handler, ?object $args = null, ?object $options = null): int
|
||||||
{
|
{
|
||||||
$oDB = RFDB::getInstance();
|
$oDB = RFDB::getInstance();
|
||||||
|
$task_srl = getNextSequence();
|
||||||
$stmt = $oDB->prepare(trim(<<<END
|
$stmt = $oDB->prepare(trim(<<<END
|
||||||
INSERT INTO task_schedule
|
INSERT INTO task_schedule
|
||||||
(type, first_run, handler, args, options, regdate)
|
(task_srl, task_type, first_run, handler, args, options, regdate)
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
END));
|
END));
|
||||||
$result = $stmt->execute([
|
$result = $stmt->execute([
|
||||||
|
$task_srl,
|
||||||
'once',
|
'once',
|
||||||
date('Y-m-d H:i:s', $time),
|
date('Y-m-d H:i:s', $time),
|
||||||
$handler,
|
$handler,
|
||||||
|
|
@ -124,7 +126,7 @@ class DB implements QueueInterface
|
||||||
serialize($options),
|
serialize($options),
|
||||||
date('Y-m-d H:i:s'),
|
date('Y-m-d H:i:s'),
|
||||||
]);
|
]);
|
||||||
return $result ? $oDB->getInsertID() : 0;
|
return $result ? $task_srl : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -139,12 +141,14 @@ class DB implements QueueInterface
|
||||||
public function addTaskAtInterval(string $interval, string $handler, ?object $args = null, ?object $options = null): int
|
public function addTaskAtInterval(string $interval, string $handler, ?object $args = null, ?object $options = null): int
|
||||||
{
|
{
|
||||||
$oDB = RFDB::getInstance();
|
$oDB = RFDB::getInstance();
|
||||||
|
$task_srl = getNextSequence();
|
||||||
$stmt = $oDB->prepare(trim(<<<END
|
$stmt = $oDB->prepare(trim(<<<END
|
||||||
INSERT INTO task_schedule
|
INSERT INTO task_schedule
|
||||||
(type, `interval`, handler, args, options, regdate)
|
(task_srl, task_type, run_interval, handler, args, options, regdate)
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
END));
|
END));
|
||||||
$result = $stmt->execute([
|
$result = $stmt->execute([
|
||||||
|
$task_srl,
|
||||||
'interval',
|
'interval',
|
||||||
$interval,
|
$interval,
|
||||||
$handler,
|
$handler,
|
||||||
|
|
@ -152,7 +156,7 @@ class DB implements QueueInterface
|
||||||
serialize($options),
|
serialize($options),
|
||||||
date('Y-m-d H:i:s'),
|
date('Y-m-d H:i:s'),
|
||||||
]);
|
]);
|
||||||
return $result ? $oDB->getInsertID() : 0;
|
return $result ? $task_srl : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -196,24 +200,25 @@ class DB implements QueueInterface
|
||||||
{
|
{
|
||||||
$oDB = RFDB::getInstance();
|
$oDB = RFDB::getInstance();
|
||||||
$tasks = [];
|
$tasks = [];
|
||||||
$ids = [];
|
$task_srls = [];
|
||||||
|
|
||||||
// Get tasks to be executed once at the current time.
|
// Get tasks to be executed once at the current time.
|
||||||
if ($type === 'once')
|
if ($type === 'once')
|
||||||
{
|
{
|
||||||
$oDB->beginTransaction();
|
$oDB->beginTransaction();
|
||||||
$stmt = $oDB->query("SELECT * FROM task_schedule WHERE `type` = 'once' AND `first_run` <= ? ORDER BY id FOR UPDATE", [$timestamp]);
|
$timestamp = date('Y-m-d H:i:s');
|
||||||
|
$stmt = $oDB->query("SELECT * FROM task_schedule WHERE task_type = 'once' AND first_run <= ? ORDER BY first_run FOR UPDATE", [$timestamp]);
|
||||||
while ($task = $stmt->fetchObject())
|
while ($task = $stmt->fetchObject())
|
||||||
{
|
{
|
||||||
$task->args = unserialize($task->args);
|
$task->args = unserialize($task->args);
|
||||||
$task->options = unserialize($task->options);
|
$task->options = unserialize($task->options);
|
||||||
$tasks[] = $task;
|
$tasks[] = $task;
|
||||||
$ids[] = $task->id;
|
$task_srls[] = $task->task_srl;
|
||||||
}
|
}
|
||||||
if (count($ids))
|
if (count($task_srls))
|
||||||
{
|
{
|
||||||
$stmt = $oDB->prepare('DELETE FROM task_schedule WHERE id IN (' . implode(', ', array_fill(0, count($ids), '?')) . ')');
|
$stmt = $oDB->prepare('DELETE FROM task_schedule WHERE task_srl IN (' . implode(', ', array_fill(0, count($task_srls), '?')) . ')');
|
||||||
$stmt->execute($ids);
|
$stmt->execute($task_srls);
|
||||||
}
|
}
|
||||||
$oDB->commit();
|
$oDB->commit();
|
||||||
}
|
}
|
||||||
|
|
@ -221,18 +226,18 @@ class DB implements QueueInterface
|
||||||
// Get tasks to be executed at an interval.
|
// Get tasks to be executed at an interval.
|
||||||
if ($type === 'interval')
|
if ($type === 'interval')
|
||||||
{
|
{
|
||||||
$stmt = $oDB->query("SELECT id, `interval` FROM task_schedule WHERE `type` = 'interval' ORDER BY id");
|
$stmt = $oDB->query("SELECT task_srl, run_interval FROM task_schedule WHERE task_type = 'interval' ORDER BY task_srl");
|
||||||
while ($task = $stmt->fetchObject())
|
while ($task = $stmt->fetchObject())
|
||||||
{
|
{
|
||||||
if (Queue::parseInterval($task->interval, \RX_TIME))
|
if (Queue::parseInterval($task->run_interval, time()))
|
||||||
{
|
{
|
||||||
$ids[] = $task->id;
|
$task_srls[] = $task->task_srl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (count($ids))
|
if (count($task_srls))
|
||||||
{
|
{
|
||||||
$stmt = $oDB->prepare('SELECT * FROM task_schedule WHERE id IN (' . implode(', ', array_fill(0, count($ids), '?')) . ')');
|
$stmt = $oDB->prepare('SELECT * FROM task_schedule WHERE task_srl IN (' . implode(', ', array_fill(0, count($task_srls), '?')) . ')');
|
||||||
$stmt->execute($ids);
|
$stmt->execute($task_srls);
|
||||||
while ($task = $stmt->fetchObject())
|
while ($task = $stmt->fetchObject())
|
||||||
{
|
{
|
||||||
$task->args = unserialize($task->args);
|
$task->args = unserialize($task->args);
|
||||||
|
|
@ -251,10 +256,10 @@ class DB implements QueueInterface
|
||||||
* @param int $id
|
* @param int $id
|
||||||
* @return void
|
* @return void
|
||||||
*/
|
*/
|
||||||
public function updateLastRunTimestamp(int $id): void
|
public function updateLastRunTimestamp(int $task_srl): void
|
||||||
{
|
{
|
||||||
$oDB = RFDB::getInstance();
|
$oDB = RFDB::getInstance();
|
||||||
$stmt = $oDB->prepare('UPDATE task_schedule SET last_run = ?, run_count = run_count + 1 WHERE id = ?');
|
$stmt = $oDB->prepare('UPDATE task_schedule SET last_run = ?, run_count = run_count + 1 WHERE task_srl = ?');
|
||||||
$stmt->execute([date('Y-m-d H:i:s'), $id]);
|
$stmt->execute([date('Y-m-d H:i:s'), $task_srl]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
<table name="task_schedule">
|
<table name="task_schedule">
|
||||||
<column name="id" type="bigint" notnull="notnull" primary_key="primary_key" auto_increment="auto_increment" />
|
<column name="task_srl" type="bigint" notnull="notnull" primary_key="primary_key" />
|
||||||
<column name="type" type="varchar" size="40" index="idx_type" />
|
<column name="task_type" type="varchar" size="40" index="idx_task_type" />
|
||||||
<column name="interval" type="varchar" size="191" index="idx_interval" />
|
<column name="run_interval" type="varchar" size="191" index="idx_run_interval" />
|
||||||
|
<column name="run_count" type="bigint" notnull="notnull" default="0" index="idx_run_count" />
|
||||||
<column name="first_run" type="datetime" index="idx_first_run" />
|
<column name="first_run" type="datetime" index="idx_first_run" />
|
||||||
<column name="last_run" type="datetime" index="idx_last_run" />
|
<column name="last_run" type="datetime" index="idx_last_run" />
|
||||||
<column name="run_count" type="bigint" notnull="notnull" default="0" index="idx_run_count" />
|
|
||||||
<column name="handler" type="varchar" size="191" notnull="notnull" />
|
<column name="handler" type="varchar" size="191" notnull="notnull" />
|
||||||
<column name="args" type="longtext" notnull="notnull" />
|
<column name="args" type="longtext" notnull="notnull" />
|
||||||
<column name="options" type="longtext" notnull="notnull" />
|
<column name="options" type="longtext" notnull="notnull" />
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue