403Webshell
Server IP : 80.87.202.40  /  Your IP : 216.73.216.169
Web Server : Apache
System : Linux rospirotorg.ru 5.14.0-539.el9.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Dec 5 22:26:13 UTC 2024 x86_64
User : bitrix ( 600)
PHP Version : 8.2.27
Disable Function : NONE
MySQL : OFF |  cURL : ON |  WGET : ON |  Perl : ON |  Python : OFF |  Sudo : ON |  Pkexec : ON
Directory :  /home/bitrix/ext_www/ilovecveti.ru/bitrix/modules/calendar/lib/core/queue/rule/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /home/bitrix/ext_www/ilovecveti.ru/bitrix/modules/calendar/lib/core/queue/rule/rulemaster.php
<?php

namespace Bitrix\Calendar\Core\Queue\Rule;

use Bitrix\Calendar\Core\Base\EntityMap;
use Bitrix\Calendar\Core\Base\Map;
use Bitrix\Calendar\Core\Queue\Message\HandledMessage;
use Bitrix\Calendar\Core\Queue\Message\HandledMessageMapper;
use Bitrix\Calendar\Core\Queue\Message\Message;
use Bitrix\Calendar\Core\Queue\Message\MessageMapper;
use Bitrix\Calendar\Core\Queue\QueueListener;
use Bitrix\Calendar\Core\Base\Mutex;
use Bitrix\Calendar\Internals\Log\Logger;
use Bitrix\Main\ArgumentException;
use Bitrix\Main\Event;
use Bitrix\Main\EventManager;
use Bitrix\Main\SystemException;
use Generator;
use Throwable;

class RuleMaster
{
	public const ON_QUEUE_PUSHED_EVENT_NAME = 'OnPushToTargetQueue';

	private const PACK_SIZE = 100;

	private const LAST_PROCESSED_OPTION_NAME = 'queue_last_processed_id';

	private Logger $logger;

	private MessageMapper $messageMapper;

	private HandledMessageMapper $handledMessageMapper;

	private Map $routedQueues;

	private Mutex $mutex;

	public function __construct(Logger $logger = null)
	{
		$this->logger = $logger ?? new Logger();
	}

	/**
	 * @return void
	 */
	public function run()
	{
		if ($this->getMutex()->lock())
		{
			try
			{
				$this->handleMessages();

				$this->sendSystemEvents();
			}
			catch(Throwable $exception)
			{
				$this->logger->log($exception);
			}
			finally
			{
				$this->getMutex()->unlock();
			}
		}
	}

	/**
	 * @return Generator
	 *
	 * @throws ArgumentException
	 * @throws SystemException
	 */
	private function getMessages(): Generator
	{
		do
		{
			$messages = $this->getMessageMapper()->getMap(
				[
					'>ID' => $this->getLastProcessedId(),
				],
				self::PACK_SIZE,
				[
					'ID' => 'ASC',
				],
			);
			/** @var Message $message */
			foreach ($messages as $message)
			{
				yield $message;
				$this->setLastProcessedId($message->getId());
			}
		}

		while($messages->count());
	}

	/**
	 * @param Message $message
	 *
	 * @return bool
	 */
	private function routeMessage(Message $message): bool
	{
		$rules = Registry::getInstance()->getRules();
		$isRouted = false;
		foreach ($rules as $rule)
		{
			try
			{
				if ($handledMessage = $rule->route($message))
				{
					/** @var HandledMessage $handledMessage */
					$handledMessage = $this->getHandledMessageMapper()->create($handledMessage);
					$this->getRoutedQueues()->add($handledMessage->getQueue());
					$isRouted = true;
				}
			}
			catch(Throwable $exception)
			{
				$this->logger->log($exception);
			}
		}

		return $isRouted;
	}

	/**
	 * @return int
	 */
	private function getLastProcessedId(): int
	{
		return \COption::GetOptionInt("calendar", self::LAST_PROCESSED_OPTION_NAME, 0);
	}

	/**
	 * @param int $id
	 *
	 * @return void
	 */
	private function setLastProcessedId(int $id = 0)
	{
		\COption::SetOptionInt("calendar", self::LAST_PROCESSED_OPTION_NAME, $id);
	}

	/**
	 * @return void
	 */
	public function sendSystemEvents(): void
	{
		// TODO: move it to right place
		QueueListener\Dispatcher::register();

		foreach ($this->getRoutedQueues() as $queue)
		{
			$event = new Event(
				"calendar",
				self::ON_QUEUE_PUSHED_EVENT_NAME,
				[
					'queue' => $queue,
				],
			);
			EventManager::getInstance()->send($event);
		}
	}

	/**
	 * @return MessageMapper
	 */
	private function getMessageMapper(): MessageMapper
	{
		if (empty($this->messageMapper))
		{
			$this->messageMapper = new MessageMapper();
		}

		return $this->messageMapper;
	}

	/**
	 * @return HandledMessageMapper
	 */
	private function getHandledMessageMapper(): HandledMessageMapper
	{
		if (empty($this->handledMessageMapper))
		{
			$this->handledMessageMapper = new HandledMessageMapper();
		}

		return $this->handledMessageMapper;
	}

	/**
	 * @return Map
	 */
	public function getRoutedQueues(): Map
	{
		if (empty($this->routedQueues))
		{
			$this->routedQueues = new EntityMap();
		}

		return $this->routedQueues;
	}

	/**
	 * @return Mutex
	 */
	private function getMutex(): Mutex
	{
		if (empty($this->mutex))
		{
			$this->mutex = new Mutex(self::class);
		}

		return $this->mutex;
	}

	/**
	 * @return void
	 *
	 * @throws ArgumentException
	 * @throws SystemException
	 */
	public function handleMessages(): void
	{
		/** @var Message $message */
		foreach ($this->getMessages() as $message)
		{
			$isRouted = $this->routeMessage($message);
			if (!$isRouted)
			{
				$this->getMessageMapper()->delete($message);
			}
		}
	}
}

Youez - 2016 - github.com/yon3zu
LinuXploit