Cron.php
См. документацию.
1 <?php
2 
3 /**
4  * @file
5  * @brief выполение крон задач
6  *
7  */
8 
9 namespace Wrong\Task;
10 
12 use Wrong\Start\Env;
13 use Wrong\Curl\API;
14 use Cron\CronExpression;
16 
17 /**
18  * @brief Cron класс управляющий выполнением крон задач
19  *
20  */
21 
22 class Cron
23 {
24  /** настройки потоков по умолчанию */
25  const DEFAULT_THERADS_SET = ['curr' => 0, 'min' => 1, 'max' => 1, 'load' => 300, 'fixed' => 0];
26 
27 
28  /**
29  * проверяет, включено ли выполнение cron, и если это так, получает все задания cron из базы данных,
30  * которые должны быть запущены, и запускает их.
31  */
32  public static function load()
33  {
34 
35  if (!Env::$e->CRON_ACT) return;
36  $dbh = Connect::getInstance()->dbh;
37  $sth = $dbh->query("SELECT * FROM `crontabs` WHERE `run_at` BETWEEN NOW() - INTERVAL 1 YEAR AND NOW() AND `act` = 1");
39  while ($row = $sth->fetch()) {
40  $threads = json_decode($row->threads, true) ?: self::DEFAULT_THERADS_SET;
41  self::run_stack($row->id, self::available_threads($row->id, $threads), $threads);
42  }
43  $sth = null;
44  Connect::getInstance()->close();
45  }
46 
47 
48  /**
49  * выполняет cron задачу
50  *
51  * @param object $row объект строки задачи из базы
52  *
53  */
54  public static function execute($row)
55  {
56  if (!Env::$e->CRON_ACT) return;
57  if ((!$row->request && $row->method != 'CLI') || (!$row->cli && $row->method == 'CLI')) return;
58  $threads = json_decode($row->threads, true) ?: self::DEFAULT_THERADS_SET;
59  if (intval(shell_exec("echo $(nproc) $(cat /proc/loadavg | awk '{print $1}') | awk '$2>$1/100*" . $threads['load'] . " {print 1}'"))) exit;
60 
61  if ($row->method == 'CLI' && $row->cli && Env::$e->CRON_CLI) {
62  Connect::getInstance()->close();
63  exec($row->cli);
64  exit;
65  }
66 
67  if (!$row->request) exit;
68  $x_auth_token = false;
69  if ($row->user_id && !($x_auth_token = self::get_token($row->user_id))) return;
70  $headers = json_decode($row->headers, true);
71  Connect::getInstance()->close();
72  if ($x_auth_token) {
73  $headers[] = 'X-Auth-Token: ' . $x_auth_token;
74  }
75 
76  if (!in_array('Content-Type: application/json; charset=utf-8', $headers)) {
77  $data = json_decode($row->data, true);
78  } else {
79  $data = $row->data;
80  }
81 
82  API::req($row->request, $row->method, $data, $headers);
83  }
84 
85 
86  /**
87  * возвращает x_auth_token пользователя, если пользователь имеет активную учетную
88  * запись с включенным API и имеет x_auth_token.
89  *
90  * @param int $user_id Идентификатор пользователя в базе данных.
91  *
92  * @return string x_auth_token из пользовательской таблицы.
93  */
94  private static function get_token($user_id)
95  {
96  if (!Env::$e->API) return;
97  $dbh = Connect::getInstance()->dbh;
98  $user = $dbh->query("SELECT * FROM `users` WHERE `id` = $user_id")->fetch();
99  if (!$user->act || !$user->api_act || !$user->x_auth_token) return;
100  return $user->x_auth_token;
101  }
102 
103 
104  /**
105  * устанавливает время следующих выполнений для всех крон задач из бд соответственно расписанию для каждой записи
106  */
107  public static function set_run_at()
108  {
109 
110  $dbh = Connect::getInstance()->dbh;
111  $sth = $dbh->query("SELECT * FROM `crontabs`");
112  while ($row = $sth->fetch()) {
113  $cron = CronExpression::factory($row->shedule);
114  $run_at = $cron->getNextRunDate(null, 0)->format('Y-m-d H:i:s');
115  $dbh->query("UPDATE `crontabs` SET `run_at` = '$run_at' WHERE `id` = $row->id");
116  }
117  }
118 
119 
120  /**
121  * Возвращает текущее количество запущенных потоков с id задачи
122  *
123  * @param int $id - идентификатор задачи
124  */
125  private static function get_curr_therads($id)
126  {
127  return intval(shell_exec("ps aux | grep '" . addcslashes('php -f ' . dirname(__DIR__, 3) . '/public_html/cron.php ' . $id, '.') . "' | wc -l"));
128  }
129 
130 
131  /**
132  * Возвращает количество требуемых для запуска потоков
133  *
134  * @param int $id - идентификатор задачи
135  * @param array $threads - настройки потоков
136  */
137  private static function available_threads($id, $threads)
138  {
139  if (intval(shell_exec("echo $(nproc) $(cat /proc/loadavg | awk '{print $1}') | awk '$2>$1/100*" . $threads['load'] . " {print 1}'"))) return 0;
140  $threads['curr'] = self::get_curr_therads($id);
141  if ($threads['curr'] > $threads['max']) return 0;
142  return $threads['min'] - $threads['curr'];
143  }
144 
145 
146  /**
147  * Запускает указанное количество потоков
148  *
149  * @param int $id - идентификатор задачи
150  * @param int $n - количество потоков
151  * @param array $threads - настройки потоков
152  *
153  */
154  private static function run_stack($id, $n, $threads)
155  {
156  if (!Locker::lock("cron-stack-$id", 10)) return;
157  for ($i = 0; $i < $n; $i++) {
158  self::run_thread($id);
159  if ($i != $n) {
160  usleep(10000);
161  }
162  }
163 
164  if ($threads['fixed']) {
165  exec('(sleep 0.4 && php -f ' . dirname(__DIR__, 3) . '/public_html/cron.php fork ' . $id . ' &) > /dev/null 2>&1');
166  }
167 
168  Locker::unlock("cron-stack-$id");
169  }
170 
171  /**
172  * Запускает поток по идентификатору задачи
173  *
174  * @param int $id - идентификатор задачи
175  */
176  private static function run_thread($id)
177  {
178  exec('(php -f ' . dirname(__DIR__, 3) . '/public_html/cron.php ' . $id . ' &) > /dev/null 2>&1');
179  }
180 
181  /**
182  * запуск стека из форка
183  *
184  * @param object $row объект задачи из строки бд
185  */
186  public static function fork($row)
187  {
188  if (!$row->act || !Env::$e->CRON_ACT) {
189  exit;
190  }
191  Connect::getInstance()->close();
192  $threads = json_decode($row->threads, true) ?: self::DEFAULT_THERADS_SET;
193  self::run_stack($row->id, self::available_threads($row->id, $threads), $threads);
194  }
195 }
if(!($row=Wrong\Models\Crontabs::find($_POST['id']))) if(! $user->access() ->write($row)) $sth
Definition: edit-cli.php:20
if(!($row=Wrong\Models\Crontabs::find($_POST['id']))) if(! $user->access() ->write($row)) if($_POST['min']< 1|| $_POST['min'] > 100000|| $_POST['max']< 1|| $_POST['max'] > 100000|| $_POST['load']< 1|| $_POST['load'] > 1000) $threads
if(empty($_POST['name'])) if(empty($_POST['type'])||!in_array($_POST['type'], ['page', 'modal', 'incode', 'select', 'action'])) $i
if(! $response->_meta) $data
Definition: anycomment.php:18
API отвечает за http запросы к серверу
Definition: API.php:19
static req($request, $method='GET', $data='', $headers=[], $timeout=0)
Definition: API.php:31
Connect создает подключение к базе данных
Definition: Connect.php:19
static getInstance($ignore_error=false)
Definition: Connect.php:50
Locker блокировщик файлов
Definition: Locker.php:25
static unlock($id, $forse=false)
Definition: Locker.php:104
static lock($id, $max_time=self::CLEAN_TIME)
Definition: Locker.php:55
Env класс управляющий, добавляющий или записывающий переменные среды
Definition: Env.php:17
static $e
Definition: Env.php:22
Cron класс управляющий выполнением крон задач
Definition: Cron.php:23
static execute($row)
Definition: Cron.php:54
static available_threads($id, $threads)
Definition: Cron.php:137
static get_curr_therads($id)
Definition: Cron.php:125
static fork($row)
Definition: Cron.php:186
static set_run_at()
Definition: Cron.php:107
static get_token($user_id)
Definition: Cron.php:94
static run_thread($id)
Definition: Cron.php:176
const DEFAULT_THERADS_SET
Definition: Cron.php:25
static load()
Definition: Cron.php:32
static run_stack($id, $n, $threads)
Definition: Cron.php:154
$user
Definition: from-user.php:38
if(($dbh=Connect::getInstance(true) ->dbh) && $dbh->query("SHOW TABLES") ->fetchAll() && $dbh->query("SELECT COUNT(*) FROM `users`") ->fetchColumn()) if(!empty($_POST)) exit
Definition: install.php:198
if(!($row=Wrong\Models\Crontabs::find($_GET['id']))) if($row->method=='CLI') $headers
$dbh
Definition: session.php:19