Server IP : 80.87.202.40 / Your IP : 216.73.216.169 Web Server : Apache System : Linux rospirotorg.ru 5.14.0-539.el9.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Dec 5 22:26:13 UTC 2024 x86_64 User : bitrix ( 600) PHP Version : 8.2.27 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /home/bitrix/ext_www/ilovecveti.ru/bitrix/modules/calendar/lib/core/queue/rule/ |
Upload File : |
<?php namespace Bitrix\Calendar\Core\Queue\Rule; use Bitrix\Calendar\Core\Base\EntityMap; use Bitrix\Calendar\Core\Base\Map; use Bitrix\Calendar\Core\Queue\Message\HandledMessage; use Bitrix\Calendar\Core\Queue\Message\HandledMessageMapper; use Bitrix\Calendar\Core\Queue\Message\Message; use Bitrix\Calendar\Core\Queue\Message\MessageMapper; use Bitrix\Calendar\Core\Queue\QueueListener; use Bitrix\Calendar\Core\Base\Mutex; use Bitrix\Calendar\Internals\Log\Logger; use Bitrix\Main\ArgumentException; use Bitrix\Main\Event; use Bitrix\Main\EventManager; use Bitrix\Main\SystemException; use Generator; use Throwable; class RuleMaster { public const ON_QUEUE_PUSHED_EVENT_NAME = 'OnPushToTargetQueue'; private const PACK_SIZE = 100; private const LAST_PROCESSED_OPTION_NAME = 'queue_last_processed_id'; private Logger $logger; private MessageMapper $messageMapper; private HandledMessageMapper $handledMessageMapper; private Map $routedQueues; private Mutex $mutex; public function __construct(Logger $logger = null) { $this->logger = $logger ?? new Logger(); } /** * @return void */ public function run() { if ($this->getMutex()->lock()) { try { $this->handleMessages(); $this->sendSystemEvents(); } catch(Throwable $exception) { $this->logger->log($exception); } finally { $this->getMutex()->unlock(); } } } /** * @return Generator * * @throws ArgumentException * @throws SystemException */ private function getMessages(): Generator { do { $messages = $this->getMessageMapper()->getMap( [ '>ID' => $this->getLastProcessedId(), ], self::PACK_SIZE, [ 'ID' => 'ASC', ], ); /** @var Message $message */ foreach ($messages as $message) { yield $message; $this->setLastProcessedId($message->getId()); } } while($messages->count()); } /** * @param Message $message * * @return bool */ private function routeMessage(Message $message): bool { $rules = Registry::getInstance()->getRules(); $isRouted = false; foreach ($rules as $rule) { try { if ($handledMessage = $rule->route($message)) { /** @var HandledMessage $handledMessage */ $handledMessage = $this->getHandledMessageMapper()->create($handledMessage); $this->getRoutedQueues()->add($handledMessage->getQueue()); $isRouted = true; } } catch(Throwable $exception) { $this->logger->log($exception); } } return $isRouted; } /** * @return int */ private function getLastProcessedId(): int { return \COption::GetOptionInt("calendar", self::LAST_PROCESSED_OPTION_NAME, 0); } /** * @param int $id * * @return void */ private function setLastProcessedId(int $id = 0) { \COption::SetOptionInt("calendar", self::LAST_PROCESSED_OPTION_NAME, $id); } /** * @return void */ public function sendSystemEvents(): void { // TODO: move it to right place QueueListener\Dispatcher::register(); foreach ($this->getRoutedQueues() as $queue) { $event = new Event( "calendar", self::ON_QUEUE_PUSHED_EVENT_NAME, [ 'queue' => $queue, ], ); EventManager::getInstance()->send($event); } } /** * @return MessageMapper */ private function getMessageMapper(): MessageMapper { if (empty($this->messageMapper)) { $this->messageMapper = new MessageMapper(); } return $this->messageMapper; } /** * @return HandledMessageMapper */ private function getHandledMessageMapper(): HandledMessageMapper { if (empty($this->handledMessageMapper)) { $this->handledMessageMapper = new HandledMessageMapper(); } return $this->handledMessageMapper; } /** * @return Map */ public function getRoutedQueues(): Map { if (empty($this->routedQueues)) { $this->routedQueues = new EntityMap(); } return $this->routedQueues; } /** * @return Mutex */ private function getMutex(): Mutex { if (empty($this->mutex)) { $this->mutex = new Mutex(self::class); } return $this->mutex; } /** * @return void * * @throws ArgumentException * @throws SystemException */ public function handleMessages(): void { /** @var Message $message */ foreach ($this->getMessages() as $message) { $isRouted = $this->routeMessage($message); if (!$isRouted) { $this->getMessageMapper()->delete($message); } } } }