Server IP : 80.87.202.40 / Your IP : 216.73.216.169 Web Server : Apache System : Linux rospirotorg.ru 5.14.0-539.el9.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Dec 5 22:26:13 UTC 2024 x86_64 User : bitrix ( 600) PHP Version : 8.2.27 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /home/bitrix/ext_www/rospirotorg.ru/bitrix/modules/pull/lib/ |
Upload File : |
<?php namespace Bitrix\Pull; use Bitrix\Main; use Bitrix\Main\Localization\Loc; use Bitrix\Main\Config\Option; use Bitrix\Pull\DTO\Message; class Event { const SHARED_CHANNEL = 0; private static bool $isSendingScheduled = false; private static bool $backgroundContext = false; private static array $messages = []; private static array $deferredMessages = []; private static array $push = []; private static $error = false; public static function add($recipient, array $parameters, $channelType = \CPullChannel::TYPE_PRIVATE) { if (!isset($parameters['module_id'])) { self::$error = new Error(__METHOD__, 'EVENT_PARAMETERS_FORMAT', Loc::getMessage('PULL_EVENT_PARAMETERS_FORMAT_ERROR'), $parameters); return false; } $badUnicodeSymbolsPath = Common::findInvalidUnicodeSymbols($parameters); if ($badUnicodeSymbolsPath) { $warning = 'Parameters array contains invalid UTF-8 characters by the path ' . $badUnicodeSymbolsPath; self::$error = new Error(__METHOD__, 'EVENT_BAD_ENCODING', $warning, $parameters); return false; } if (isset($parameters['command']) && !empty($parameters['command'])) { if (!Config::isJsonRpcUsed() && (isset($parameters['user_params']) || isset($parameters['dictionary']))) { self::generateEventsForUsers($recipient, $parameters, $channelType); } else { $result = self::addEvent($recipient, $parameters, $channelType); } } else if (isset($parameters['push']) || isset($parameters['pushParamsCallback'])) { $result = self::addPush($recipient, $parameters); } else { self::$error = new Error(__METHOD__, 'EVENT_PARAMETERS_FORMAT', Loc::getMessage('PULL_EVENT_PARAMETERS_FORMAT_ERROR'), $parameters); return false; } return $result; } private static function addEvent($recipient, $parameters, $channelType = \CPullChannel::TYPE_PRIVATE) { if (!is_array($recipient)) { $recipient = [$recipient]; } $entities = self::getEntitiesByType($recipient); if ($entities === null) { self::$error = new Error(__METHOD__, 'RECIPIENT_FORMAT', Loc::getMessage('PULL_EVENT_RECIPIENT_FORMAT_ERROR'), [ 'recipient' => $recipient, 'eventParameters' => $parameters, ]); return false; } $parameters = self::prepareParameters($parameters); if (!$parameters) { return false; } $parameters['channel_type'] = $channelType; if (empty($entities['users']) && empty($entities['channels'])) { return true; } if (isset($parameters['push'])) { $pushParameters = $parameters['push']; unset($parameters['push']); } else { $pushParameters = null; } if (isset($parameters['pushParamsCallback'])) { $pushParametersCallback = $parameters['pushParamsCallback']; unset($parameters['pushParamsCallback']); } else { $pushParametersCallback = null; } if (isset($parameters['hasCallback']) && $parameters['hasCallback']) { self::addMessage(self::$deferredMessages, $entities['channels'], $entities['users'], $parameters); } else { self::addMessage(self::$messages, $entities['channels'], $entities['users'], $parameters); } if (defined('BX_CHECK_AGENT_START') && !defined('BX_WITH_ON_AFTER_EPILOG')) { self::send(); } else { self::scheduleSending(); } if ($pushParameters || $pushParametersCallback) { if ($pushParameters) { $parameters['push'] = $pushParameters; } if ($pushParametersCallback) { $parameters['pushParamsCallback'] = $pushParametersCallback; } unset($parameters['channel_type']); self::addPush($entities['users'], $parameters); } return true; } private static function addMessage(array &$destination, array $channels, array $users, array $parameters) { $eventCode = self::getParamsCode($parameters); unset($parameters['hasCallback']); if (isset($destination[$eventCode])) { $waitingToReceiveUserList = $destination[$eventCode]['users'] ?? []; $newUserList = $users ?? []; $destination[$eventCode]['users'] = array_unique(array_merge($waitingToReceiveUserList, $newUserList)); $waitingToReceiveChannelList = $destination[$eventCode]['channels'] ?? []; $newChannelList = $channels ?? []; $destination[$eventCode]['channels'] = array_unique(array_merge($waitingToReceiveChannelList, $newChannelList)); } else { $destination[$eventCode] = [ 'event' => $parameters, 'users' => array_unique($users), 'channels' => array_unique($channels), ]; } } private static function generateEventsForUsers($recipients, $parameters, $channelType = \CPullChannel::TYPE_PRIVATE) { if (!is_array($recipients)) { $recipients = [$recipients]; } if (is_array($parameters['dictionary'])) { $dictionary = $parameters['dictionary']; unset($parameters['dictionary']); $parameters['params'] = array_merge($parameters['params'], $dictionary); } $processed = []; if (is_array($parameters['user_params'])) { $params = $parameters['params']; $paramsByUser = $parameters['user_params']; unset($parameters['user_params']); foreach ($recipients as $recipient) { if (isset($paramsByUser[$recipient]) && is_array($paramsByUser[$recipient])) { $userParams = $parameters; $userParams['params'] = array_merge($params, $paramsByUser[$recipient]); self::addEvent($recipient, $userParams, $channelType); $processed[] = $recipient; } } } $left = array_diff($recipients, $processed); if (!empty($left)) { self::addEvent($left, $parameters, $channelType); } } private static function addPush($users, $parameters) { if (!\CPullOptions::GetPushStatus()) { self::$error = new Error(__METHOD__, 'PUSH_DISABLED', Loc::getMessage('PULL_EVENT_PUSH_DISABLED_ERROR'), [ 'recipient' => $users, 'eventParameters' => $parameters, ]); return false; } if (!is_array($users)) { $users = [$users]; } foreach ($users as $id => $entity) { $entity = intval($entity); if ($entity <= 0) { unset($users[$id]); } } if (empty($users)) { self::$error = new Error(__METHOD__, 'RECIPIENT_FORMAT', Loc::getMessage('PULL_EVENT_RECIPIENT_FORMAT_ERROR'), [ 'recipient' => $users, 'eventParameters' => $parameters, ]); return false; } if (isset($parameters['skip_users'])) { if (!isset($parameters['push']['skip_users'])) { $parameters['push']['skip_users'] = []; } $parameters['push']['skip_users'] = array_merge($parameters['skip_users'], $parameters['push']['skip_users']); } if (!empty($parameters['push']['type'])) { foreach ($users as $userId) { if (!\Bitrix\Pull\Push::getConfigTypeStatus($parameters['module_id'], $parameters['push']['type'], $userId)) { $parameters['push']['skip_users'][] = $userId; } } } $parameters = self::preparePushParameters($parameters); if (!$parameters) { return false; } $pushCode = self::getParamsCode($parameters['push']); if (isset(self::$push[$pushCode])) { self::$push[$pushCode]['users'] = array_unique(array_merge(self::$push[$pushCode]['users'], array_values($users))); } else { $hasPushCallback = $parameters['hasPushCallback']; unset($parameters['hasPushCallback']); self::$push[$pushCode]['push'] = $parameters['push']; self::$push[$pushCode]['extra'] = $parameters['extra']; self::$push[$pushCode]['hasPushCallback'] = $hasPushCallback; self::$push[$pushCode]['users'] = array_unique(array_values($users)); } if (defined('BX_CHECK_AGENT_START') && !defined('BX_WITH_ON_AFTER_EPILOG')) { self::send(); } else { self::scheduleSending(); } return true; } private static function processDeferredMessages() { foreach (self::$deferredMessages as $eventCode => $message) { $callback = $message['event']['paramsCallback']; if (Main\Loader::includeModule($callback['module_id']) && method_exists($callback['class'], $callback['method'])) { $messageParameters = call_user_func_array([$callback['class'], $callback['method']], [$callback['params']]); self::addMessage(self::$messages, $message['users'], $message['channels'], $messageParameters); } } self::$deferredMessages = []; } private static function executePushEvent($parameters) { if (!self::$backgroundContext && $parameters['hasPushCallback']) { return null; } $data = []; if ($parameters['hasPushCallback']) { $callback = $parameters['push']['pushParamsCallback']; Main\Loader::includeModule($callback['module_id']); if (method_exists($callback['class'], $callback['method'])) { $data = call_user_func_array( [ $callback['class'], $callback['method'], ], [ $callback['params'], ] ); } } else { $data = $parameters['push']; } $data['message'] = str_replace("\n", " ", trim($data['message'] ?? '')); $data['params'] = $data['params'] ?? []; $data['advanced_params'] = $data['advanced_params'] ?? []; $data['advanced_params']['extra'] = $parameters['extra'] ?? []; $data['badge'] = isset($data['badge']) ? (int)$data['badge'] : ''; $data['sound'] = $data['sound'] ?? ''; $data['tag'] = $data['tag'] ?? ''; $data['sub_tag'] = $data['sub_tag'] ?? ''; $data['app_id'] = $data['app_id'] ?? ''; $data['send_immediately'] = isset($data['send_immediately']) && $data['send_immediately'] == 'Y' ? 'Y' : 'N'; $data['important'] = isset($data['important']) && $data['important'] == 'Y' ? 'Y' : 'N'; $users = []; foreach ($parameters['users'] as $userId) { $users[] = $userId; } if (empty($users)) { return true; } $manager = new \CPushManager(); $manager->AddQueue([ 'USER_ID' => $users, 'SKIP_USERS' => isset($data['skip_users']) && is_array($data['skip_users']) ? $data['skip_users'] : [], 'MESSAGE' => $data['message'], 'EXPIRY' => $data['expiry'], 'PARAMS' => $data['params'], 'ADVANCED_PARAMS' => $data['advanced_params'], 'BADGE' => $data['badge'], 'SOUND' => $data['sound'], 'TAG' => $data['tag'], 'SUB_TAG' => $data['sub_tag'], 'APP_ID' => $data['app_id'], 'SEND_IMMEDIATELY' => $data['send_immediately'], 'IMPORTANT' => $data['important'], ]); return true; } public static function send() { if (self::$backgroundContext) { self::processDeferredMessages(); } $executeResult = static::executeEvents(); if (!$executeResult->isSuccess()) { foreach ($executeResult->getErrors() as $error) { $message = $error->getCode() ? $error->getCode() . ": " . $error->getMessage() : $error->getMessage(); trigger_error("Pull send error; {$message}; remote endpoint: {$executeResult->getRemoteAddress()}", E_USER_WARNING); } } static::executePushEvents(); return true; } public static function executeEvents(): TransportResult { $result = new TransportResult(); if (empty(self::$messages)) { return $result; } if (!\CPullOptions::GetQueueServerStatus()) { self::$messages = []; return $result; } self::fillChannels(self::$messages); if (Config::isJsonRpcUsed()) { $messageList = self::convertEventsToMessages(self::$messages); $sendResult = (new JsonRpcTransport())->sendMessages($messageList); if ($sendResult->isSuccess()) { self::$messages = []; } else { $result->withRemoteAddress($sendResult->getRemoteAddress()); $result->addErrors($sendResult->getErrors()); } } else { if (Config::isProtobufUsed()) { $sendResult = ProtobufTransport::sendMessages(self::$messages); if (!$sendResult->isSuccess()) { $result->withRemoteAddress($sendResult->getRemoteAddress()); $result->addErrors($sendResult->getErrors()); } } else { self::sendEventsLegacy(); } self::$messages = []; } return $result; } public static function executePushEvents() { foreach (self::$push as $pushCode => $event) { $result = self::executePushEvent($event); if (!is_null($result)) { unset(self::$push[$pushCode]); } } } private static function sendEventsLegacy() { foreach (self::$messages as $eventCode => $event) { if (\Bitrix\Pull\Log::isEnabled()) { // TODO change code after release - $parameters['hasCallback'] $currentHits = ceil(count($event['channels']) / \CPullOptions::GetCommandPerHit()); $hitCount += $currentHits; $currentChannelCount = count($event['channels']); $channelCount += $currentChannelCount; $currentMessagesBytes = self::getBytes($event['event']) + self::getBytes($event['channels']); $messagesBytes += $currentMessagesBytes; $logs[] = 'Command: ' . $event['event']['module_id'] . '/' . $event['event']['command'] . '; Hits: ' . $currentHits . '; Channel: ' . $currentChannelCount . '; Bytes: ' . $currentMessagesBytes . ''; } if (empty($event['channels'])) { continue; } $data = [ 'module_id' => $event['event']['module_id'], 'command' => $event['event']['command'], 'params' => is_array($event['event']['params']) ? $event['event']['params'] : [], 'extra' => $event['event']['extra'], ]; $options = ['expiry' => $event['event']['expiry']]; if (\CPullChannel::Send($event['channels'], \Bitrix\Pull\Common::jsonEncode($data), $options)) { unset(self::$messages[$eventCode]); } } if ($logs && \Bitrix\Pull\Log::isEnabled()) { if (count($logs) > 1) { $logs[] = 'Total - Hits: ' . $hitCount . '; Channel: ' . $channelCount . '; Messages: ' . $messagesCount . '; Bytes: ' . $messagesBytes . ''; } if (count($logs) > 1 || $hitCount > 1 || $channelCount > 1 || $messagesBytes > 1000) { $logTitle = '!! Pull messages stats - important !!'; } else { $logTitle = '-- Pull messages stats --'; } \Bitrix\Pull\Log::write(implode("\n", $logs), $logTitle); } } public static function onAfterEpilog() { self::scheduleSending(); return true; } protected static function scheduleSending(): void { if (self::$isSendingScheduled) { return; } self::$isSendingScheduled = true; Main\Application::getInstance()->addBackgroundJob([__CLASS__, "sendInBackground"], [], Main\Application::JOB_PRIORITY_LOW); } public static function sendInBackground() { self::$backgroundContext = true; self::send(); self::$isSendingScheduled = false; } public static function fillChannels(array &$messages) { foreach ($messages as $key => &$message) { $users = $message['users'] ?? []; if (!empty($messages[$key]['channels']) && is_array($messages[$key]['channels'])) { $messages[$key]['channels'] = array_merge($messages[$key]['channels'], self::getChannelIds($users, $message['event']['channel_type'])); } else { $messages[$key]['channels'] = self::getChannelIds($users, $message['event']['channel_type']); } unset($message['event']['channel_type']); } } public static function getChannelIds(array $users, $type = \CPullChannel::TYPE_PRIVATE) { $result = []; foreach ($users as $userId) { $data = \CPullChannel::Get($userId, true, false, $type); if ($data) { $result[] = $data['CHANNEL_ID']; } } return $result; } public static function getUserIds(array $channels) { $result = array_fill_keys($channels, null); $orm = \Bitrix\Pull\Model\ChannelTable::getList([ 'select' => ['USER_ID', 'CHANNEL_ID', 'USER_ACTIVE' => 'USER.ACTIVE'], 'filter' => [ '=CHANNEL_ID' => $channels, ], ]); while ($row = $orm->fetch()) { if ($row['USER_ID'] > 0 && $row['USER_ACTIVE'] !== 'N') { $result[$row['CHANNEL_ID']] = $row['USER_ID']; } else { unset($result[$row['CHANNEL_ID']]); } } return $result; } private static function prepareParameters(array $parameters) { if (empty($parameters['command'])) { self::$error = new Error(__METHOD__, 'EVENT_PARAMETERS_FORMAT', Loc::getMessage('PULL_EVENT_PARAMETERS_FORMAT_ERROR'), $parameters); return false; } $parameters['module_id'] = mb_strtolower($parameters['module_id']); $parameters['expiry'] = (int)($parameters['expiry'] ?? 86400); if (isset($parameters['paramsCallback'])) { if (empty($parameters['paramsCallback']['class']) || empty($parameters['paramsCallback']['method'])) { self::$error = new Error(__METHOD__, 'EVENT_CALLBACK_FORMAT', Loc::getMessage('PULL_EVENT_CALLBACK_FORMAT_ERROR'), $parameters); return false; } if (empty($parameters['paramsCallback']['module_id'])) { $parameters['paramsCallback']['module_id'] = $parameters['module_id']; } Main\Loader::includeModule($parameters['paramsCallback']['module_id']); if (!method_exists($parameters['paramsCallback']['class'], $parameters['paramsCallback']['method'])) { self::$error = new Error(__METHOD__, 'EVENT_CALLBACK_NOT_FOUND', Loc::getMessage('PULL_EVENT_CALLBACK_FORMAT_ERROR'), $parameters); return false; } if (!isset($parameters['paramsCallback']['params'])) { $parameters['paramsCallback']['params'] = []; } $parameters['params'] = []; $parameters['hasCallback'] = true; } else { if (!isset($parameters['params']) || !is_array($parameters['params'])) { $parameters['params'] = []; } } $parameters['extra']['server_time'] ??= date('c'); $parameters['extra']['server_time_unix'] ??= microtime(true); $parameters['extra']['server_name'] = Option::get('main', 'server_name', $_SERVER['SERVER_NAME']); $parameters['extra']['revision_web'] = PULL_REVISION_WEB; $parameters['extra']['revision_mobile'] = PULL_REVISION_MOBILE; return $parameters; } private static function preparePushParameters(array $parameters) { $parameters['module_id'] = mb_strtolower($parameters['module_id']); if (isset($parameters['pushParamsCallback'])) { if ( empty($parameters['pushParamsCallback']['class']) || empty($parameters['pushParamsCallback']['method']) ) { self::$error = new Error(__METHOD__, 'EVENT_PUSH_CALLBACK_FORMAT', Loc::getMessage('PULL_EVENT_PUSH_CALLBACK_FORMAT_ERROR'), $parameters); return false; } if (empty($parameters['pushParamsCallback']['module_id'])) { $parameters['pushParamsCallback']['module_id'] = $parameters['module_id']; } Main\Loader::includeModule($parameters['pushParamsCallback']['module_id']); if (!method_exists($parameters['pushParamsCallback']['class'], $parameters['pushParamsCallback']['method'])) { self::$error = new Error(__METHOD__, 'EVENT_PUSH_CALLBACK_NOT_FOUND', Loc::getMessage('PULL_EVENT_PUSH_CALLBACK_FORMAT_ERROR'), $parameters); return false; } if (!isset($parameters['pushParamsCallback']['params'])) { $parameters['pushParamsCallback']['params'] = []; } $parameters['push']['pushParamsCallback'] = $parameters['pushParamsCallback']; $parameters['hasPushCallback'] = true; } else { $parameters['hasPushCallback'] = false; $parameters['pushParamsCallback'] = []; if (isset($parameters['badge']) && $parameters['badge'] == 'Y') { $parameters['send_immediately'] = 'Y'; unset($parameters['badge']); } if (empty($parameters['push'])) { self::$error = new Error(__METHOD__, 'EVENT_PUSH_PARAMETERS_FORMAT', Loc::getMessage('PULL_EVENT_PUSH_PARAMETERS_FORMAT_ERROR'), $parameters); return false; } } if (!isset($parameters['extra']['server_time'])) { $parameters['extra']['server_time'] = date('c'); } if (!$parameters['extra']['server_time_unix']) { $parameters['extra']['server_time_unix'] = microtime(true); } return $parameters; } public static function getParamsCode($params) { if (isset($params['groupId']) && !empty($params['groupId'])) { return md5($params['groupId']); } else { $paramsWithoutTime = $params; unset($paramsWithoutTime['extra']['server_time']); unset($paramsWithoutTime['extra']['server_time_unix']); unset($paramsWithoutTime['advanced_params']['filterCallback']); return serialize($paramsWithoutTime); } } private static function getEntitiesByType(array $recipientList): ?array { $result = [ 'users' => [], 'channels' => [], 'count' => 0, ]; foreach ($recipientList as $entity) { if ($entity instanceof \Bitrix\Pull\Model\Channel) { $result['channels'][] = $entity->getPrivateId(); $result['count']++; } else if (self::isChannelEntity($entity)) { $result['channels'][] = $entity; $result['count']++; } else { $result['users'][] = intval($entity); $result['count']++; } } return $result['count'] > 0 ? $result : null; } private static function getBytes($variable) { $bytes = 0; if (is_string($variable)) { $bytes += mb_strlen($variable); } else if (is_array($variable)) { foreach ($variable as $value) { $bytes += self::getBytes($value); } } else { $bytes += mb_strlen((string)$variable); } return $bytes; } private static function isChannelEntity($entity) { return is_string($entity) && mb_strlen($entity) == 32; } /** * @param array $events * @return \Bitrix\Pull\DTO\Message[] */ private static function convertEventsToMessages(array $events): array { return array_map( function ($event) { return Message::fromEvent($event); }, $events ); } public static function getLastError() { return self::$error; } }