403Webshell
Server IP : 80.87.202.40  /  Your IP : 216.73.216.169
Web Server : Apache
System : Linux rospirotorg.ru 5.14.0-539.el9.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Dec 5 22:26:13 UTC 2024 x86_64
User : bitrix ( 600)
PHP Version : 8.2.27
Disable Function : NONE
MySQL : OFF |  cURL : ON |  WGET : ON |  Perl : ON |  Python : OFF |  Sudo : ON |  Pkexec : ON
Directory :  /home/bitrix/ext_www/rospirotorg.ru/bitrix/modules/pull/lib/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /home/bitrix/ext_www/rospirotorg.ru/bitrix/modules/pull/lib/jsonrpctransport.php
<?php

namespace Bitrix\Pull;

use Bitrix\Main;

class JsonRpcTransport
{
	protected const VERSION = '2.0';
	protected const METHOD_PUBLISH = 'publish';
	protected const METHOD_GET_LAST_SEEN = 'getUsersLastSeen';
	protected const METHOD_UPDATE_LAST_SEEN = 'updateUsersLastSeen';

	protected string $serverUrl = '';
	protected string $hostname = '';

	function __construct(array $options = [])
	{
		$this->serverUrl = $options['serverUrl'] ?? Config::getJsonRpcUrl();
		$this->hostname = $options['hostname'] ?? Config::getHostname();
	}

	/**
	 * @param \Bitrix\Pull\DTO\Message[] $messages
	 * @param array $options
	 * @return Main\Result
	 * @see DTO\Message
	 */
	public function sendMessages(array $messages): TransportResult
	{
		$result = new TransportResult();
		$result->withRemoteAddress($this->serverUrl);
		try
		{
			$batchList = static::createRequestBatches($messages);
		}
		catch (\Throwable $e)
		{
			return $result->addError(new \Bitrix\Main\Error($e->getMessage(), $e->getCode()));
		}

		foreach ($batchList as $batch)
		{
			$executeResult = $this->executeBatch($this->serverUrl, $batch);
			if (!$executeResult->isSuccess())
			{
				return $result->addErrors($executeResult->getErrors());
			}
		}

		return $result;
	}

	public function getUsersLastSeen(array $userList): Main\Result
	{
		$rpcResult = $this->executeMethod(
			$this->serverUrl,
			static::METHOD_GET_LAST_SEEN,
			[
				'userList' => $userList
			]
		);

		if (!$rpcResult->isSuccess())
		{
			return $rpcResult;
		}

		$response = $rpcResult->getData();
		$data = is_array($response['result']) ? $response['result'] : [];
		$result = new Main\Result();

		return $result->setData($data);
	}

	/**
	 * Communicates users' last seen timestamps to the queue server.
	 *
	 * @param array $userTimestamps USER_ID => LAST_SEEN_TIMESTAMP
	 * @return Main\Result
	 */
	public function updateUsersLastSeen(array $userTimestamps): Main\Result
	{
		return $this->executeMethod(
			$this->serverUrl,
			static::METHOD_UPDATE_LAST_SEEN,
			$userTimestamps
		);
	}

	/**
	 * @param \Bitrix\Pull\DTO\Message[] $messages
	 * @return string[]
	 */
	protected static function createRequestBatches(array $messages): array
	{
		// creates just one batch right now
		$maxPayload = \CPullOptions::GetMaxPayload() - 20;

		$result = [];
		$currentBatch = [];
		$currentBatchSize = 2; // opening and closing bracket
		foreach ($messages as $message)
		{
			$message->userList = array_values($message->userList);
			$message->channelList = array_values($message->channelList);
			$jsonRpcMessage = Main\Web\Json::encode(static::createJsonRpcRequest(static::METHOD_PUBLISH, $message));
			if (mb_strlen($jsonRpcMessage) > $maxPayload - 20)
			{
				trigger_error("Pull message exceeds size limit, skipping", E_USER_WARNING);
			}
			if (($currentBatchSize + mb_strlen($jsonRpcMessage)) + 1> $maxPayload)
			{
				// start new batch
				$result[] = "[" . implode(",", $currentBatch) . "]";
				$currentBatch = [];
				$currentBatchSize = 2;
			}
			$currentBatch[] = $jsonRpcMessage;
			$currentBatchSize += (mb_strlen($jsonRpcMessage)) + 1; // + comma
		}
		if (count($currentBatch) > 0)
		{
			$result[] = "[" . implode(",", $currentBatch) . "]";
		}
		return $result;
	}

	/**
	 * @param string $method
	 * @param mixed $params
	 * @return array
	 */
	protected static function createJsonRpcRequest(string $method, $params): array
	{
		return [
			'jsonrpc' => static::VERSION,
			'method' => $method,
			'params' => $params
		];
	}

	protected function executeMethod(string $queueServerUrl, string $method, array $params): Main\Result
	{
		$result = new Main\Result();
		$rpcRequest = static::createJsonRpcRequest($method, $params);

		try
		{
			$body = Main\Web\Json::encode($rpcRequest);
		}
		catch (\Throwable $e)
		{
			return $result->addError(new \Bitrix\Main\Error($e->getMessage(), $e->getCode()));
		}
		$httpResult = $this->performHttpRequest($queueServerUrl, $body);
		if (!$httpResult->isSuccess())
		{
			return $result->addErrors($httpResult->getErrors());
		}
		$response = $httpResult->getData();
		if (!isset($response['jsonrpc']) || $response['jsonrpc'] != static::VERSION)
		{
			return $result->addError(new \Bitrix\Main\Error('Wrong response structure'));
		}
		if (is_array($response['error']))
		{
			return $result->addError(new \Bitrix\Main\Error($response['error']['message'], $response['error']['code']));
		}

		return $result->setData($response);
	}

	protected function executeBatch(string $queueServerUrl, string $batchBody): Main\Result
	{
		$result = new Main\Result();
		$httpResult = $this->performHttpRequest($queueServerUrl, $batchBody);
		if (!$httpResult->isSuccess())
		{
			return $result->addErrors($httpResult->getErrors());
		}
		$response = $result->getData();

		return $result->setData($response);
	}

	protected function performHttpRequest(string $queueServerUrl, string $body): Main\Result
	{
		$result = new Main\Result();
		$httpClient = new Main\Web\HttpClient(["streamTimeout" => 1]);

		$signature = \CPullChannel::GetSignature($body);
		$hostId = (string)Config::getHostId();
		$additionalParams = ["hostId" => $hostId, "signature" => $signature];
		if ($this->hostname != '')
		{
			$additionalParams['hostname'] = $this->hostname;
		}
		$urlWithSignature = \CHTTP::urlAddParams($queueServerUrl, $additionalParams);

		$sendResult = $httpClient->query(Main\Web\HttpClient::HTTP_POST, $urlWithSignature, $body);
		if (!$sendResult)
		{
			$errorCode = array_key_first($httpClient->getError());
			$errorMsg = $httpClient->getError()[$errorCode];
			return $result->addError(new Main\Error($errorMsg, $errorCode));
		}
		$responseCode = (int)$httpClient->getStatus();
		if ($responseCode !== 200)
		{
			return $result->addError(new Main\Error("Unexpected server response code {$responseCode}"));
		}
		$responseBody = $httpClient->getResult();
		if ($responseBody == '')
		{
			return $result->addError(new Main\Error('Empty server response'));
		}
		try
		{
			$decodedBody = Main\Web\Json::decode($responseBody);
		}
		catch (\Throwable $e)
		{
			return $result->addError(new Main\Error('Could not decode server response. Raw response: ' . $responseBody));
		}

		return $result->setData($decodedBody);
	}
}

Youez - 2016 - github.com/yon3zu
LinuXploit