403Webshell
Server IP : 80.87.202.40  /  Your IP : 216.73.216.169
Web Server : Apache
System : Linux rospirotorg.ru 5.14.0-539.el9.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Dec 5 22:26:13 UTC 2024 x86_64
User : bitrix ( 600)
PHP Version : 8.2.27
Disable Function : NONE
MySQL : OFF |  cURL : ON |  WGET : ON |  Perl : ON |  Python : OFF |  Sudo : ON |  Pkexec : ON
Directory :  /home/bitrix/ext_www/rospirotorg.ru/bitrix/js/pull/connector/src/

Upload File :
current_dir [ Writeable] document_root [ Writeable]

 

Command :


[ Back ]     

Current File : /home/bitrix/ext_www/rospirotorg.ru/bitrix/js/pull/connector/src/connector.js
/* eslint-disable no-param-reassign */
/* eslint-disable @bitrix24/bitrix24-rules/no-typeof */
/* eslint-disable @bitrix24/bitrix24-rules/no-native-events-binding */
/* eslint-disable @bitrix24/bitrix24-rules/no-pseudo-private */
/* eslint-disable no-underscore-dangle */
// noinspection ES6PreferShortImport

import {
	CloseReasons,
	ConnectionType,
	PullStatus,
	RpcMethod,
	ServerMode,
	SystemCommands,
	REVISION,
} from '../../client/src/consts';
import {
	buildQueryString,
	getDateForLog,
	isPlainObject,
	isNotEmptyString,
	isWebSocketSupported,
	isArray, browser, getTimestamp,
} from '../../util/src/util';
import { JsonRpc } from '../../jsonrpc/src/jsonrpc';
import type { RestCaller } from '../../minirest/src/restcaller';

import { ChannelManager } from './codec/channelmanager';
import { WebSocketConnector } from './transport/websocket';
import { LongPollingConnector } from './transport/longpolling';
import { ProtobufCodec } from './codec/protobuf';
import { LegacyCodec } from './codec/legacy';
import type { MessageCodec } from './codec/messagecodec';
import type { PullConfig } from '../../configholder/src/configholder';
import type { JsonRpcResponse } from '../../jsonrpc/src/jsonrpc';
import type { StorageManager } from '../../client/src/storage';

const RESTORE_WEBSOCKET_TIMEOUT = 30 * 60;

const MAX_IDS_TO_STORE = 10;

const PING_TIMEOUT = 10;
const JSON_RPC_PING = 'ping';
const JSON_RPC_PONG = 'pong';

const LS_SESSION = 'bx-pull-session';

// const LS_SESSION_CACHE_TIME = 20;

export interface Logger
{
	log(message: string, ...params): void,

	logForce(message: string, ...params): void,
}

type ConnectorOptions = {
	config: ?PullConfig,
	storage: ?StorageManager,
	events: { [key: $Values<typeof ConnectorEvents>]: (e: CustomEvent) => void },
	restoreSession: boolean,
	getPublicListMethod: string,
	restClient: RestCaller,
	logger: Logger,
}

export const ConnectorEvents = {
	Message: 'message',
	RevisionChanged: 'revisionChanged',
	ChannelReplaced: 'channelReplaced',
	ConfigExpired: 'configExpired',
	ConnectionStatus: 'connectionStatus',
	ConnectionError: 'connectionError',
};

type PromiseResolver = {
	resolve: () => {},
	reject: () => {},
}

export class Connector extends EventTarget
{
	config: ?PullConfig;
	codec: ?MessageCodec;
	logger: ?Logger;

	connectors = {
		webSocket: null,
		longPolling: null,
	};

	connectPromises: PromiseResolver[] = [];
	pingWaitTimeout: number | null = null;
	reconnectTimeout: number | null = null;
	isWebsocketBlocked = false;
	isLongPollingBlocked = false;
	isManualDisconnect = false;

	_status = PullStatus.Offline;

	connectionAttempt = 0;

	constructor(options: ConnectorOptions = {})
	{
		super();

		this.config = options.config;
		this.logger = options.logger;
		this.storage = options.storage;
		this.restClient = options.restClient;

		this.isSecure = globalThis.location.protocol === 'https:';

		this.connectors.webSocket = new WebSocketConnector({
			pathGetter: () => this.getConnectionPathByType(ConnectionType.WebSocket),
			onOpen: this.onWebSocketOpen.bind(this),
			onMessage: this.onIncomingMessage.bind(this),
			onDisconnect: this.onWebSocketDisconnect.bind(this),
			onError: this.onWebSocketError.bind(this),
		});

		this.connectors.longPolling = new LongPollingConnector({
			pathGetter: () => this.getConnectionPathByType(ConnectionType.LongPolling),
			isBinary: this.isProtobufSupported() && !this.isJsonRpc(),
			onOpen: this.onLongPollingOpen.bind(this),
			onMessage: this.onIncomingMessage.bind(this),
			onDisconnect: this.onLongPollingDisconnect.bind(this),
			onError: this.onLongPollingError.bind(this),
		});

		this.connectionType = this.isWebSocketAllowed() ? ConnectionType.WebSocket : ConnectionType.LongPolling;

		for (const eventName of Object.keys(options.events || {}))
		{
			this.addEventListener(eventName, options.events[eventName]);
		}

		this.channelManager = new ChannelManager({
			restClient: options.restClient,
			getPublicListMethod: options.getPublicListMethod,
		});

		this.jsonRpcAdapter = this.createRpcAdapter();
		this.codec = this.createCodec();

		this.session = {
			mid: null,
			tag: null,
			time: null,
			history: {},
			lastMessageIds: [],
			messageCount: 0,
		};

		if (options.restoreSession && this.storage)
		{
			const oldSession = this.storage.get(LS_SESSION);
			const now = new Date();
			if (isPlainObject(oldSession) && 'ttl' in oldSession && oldSession.ttl >= now)
			{
				this.session.mid = oldSession.mid;
			}
		}
	}

	get status(): string
	{
		return this._status;
	}

	set status(status)
	{
		if (this._status === status)
		{
			return;
		}

		this._status = status;
		this.dispatchEvent(new CustomEvent(ConnectorEvents.ConnectionStatus, {
			detail: {
				status,
				connectionType: this.connector.connectionType,
			},
		}));
	}

	createRpcAdapter(): JsonRpc
	{
		return new JsonRpc({
			sender: this.connectors.webSocket,
			handlers: {
				'incoming.message': this.handleRpcIncomingMessage.bind(this),
			},
			events: {
				error: this.onRpcError.bind(this),
			},
		});
	}

	createCodec(): MessageCodec
	{
		if (this.isProtobufSupported())
		{
			return new ProtobufCodec({
				channelManager: this.channelManager,
			});
		}

		return new LegacyCodec();
	}

	get connector(): WebSocketConnector | LongPollingConnector
	{
		return this.connectors[this.connectionType];
	}

	disconnect(disconnectCode, disconnectReason)
	{
		if (this.connector)
		{
			this.isManualDisconnect = true;
			this.connector.disconnect(disconnectCode, disconnectReason);
		}
	}

	stop(disconnectCode, disconnectReason)
	{
		this.disconnect(disconnectCode, disconnectReason);
		this.stopCheckConfig();
	}

	resetSession()
	{
		this.session.mid = null;
		this.session.tag = null;
		this.session.time = null;
	}

	setConfig(config)
	{
		const wasConnected = this.isConnected();
		if (wasConnected)
		{
			this.disconnect(CloseReasons.CONFIG_REPLACED, 'config was replaced');
		}

		this.config = config;

		if (config.publicChannels)
		{
			this.channelManager.setPublicIds(Object.values(config.publicChannels));
		}

		if (wasConnected)
		{
			this.connect();
		}
	}

	connect(): Promise<void>
	{
		if (this.connector.connected)
		{
			return Promise.resolve();
		}

		if (this.reconnectTimeout)
		{
			clearTimeout(this.reconnectTimeout);
		}

		this.isManualDisconnect = false;
		this.status = PullStatus.Connecting;
		this.connectionAttempt++;

		return new Promise((resolve, reject) => {
			this.connectPromises.push({ resolve, reject });
			this.connector.connect();
		});
	}

	reconnect(disconnectCode, disconnectReason, delay = 1)
	{
		this.disconnect(disconnectCode, disconnectReason);

		this.scheduleReconnect(delay);
	}

	restoreWebSocketConnection()
	{
		if (this.connectionType === ConnectionType.WebSocket)
		{
			return;
		}

		this.connectors.webSocket.connect();
	}

	scheduleReconnect(connectionDelay)
	{
		const delay = connectionDelay ?? this.getConnectionAttemptDelay(this.connectionAttempt);

		if (this.reconnectTimeout)
		{
			clearTimeout(this.reconnectTimeout);
		}

		this.logger?.log(`Pull: scheduling reconnection in ${delay} seconds; attempt # ${this.connectionAttempt}`);

		this.reconnectTimeout = setTimeout(
			() => {
				this.connect().catch((error) => {
					console.error(error);
				});
			},
			delay * 1000,
		);
	}

	scheduleRestoreWebSocketConnection()
	{
		this.logger?.log(`Pull: scheduling restoration of websocket connection in ${RESTORE_WEBSOCKET_TIMEOUT} seconds`);

		if (this.restoreWebSocketTimeout)
		{
			return;
		}

		this.restoreWebSocketTimeout = setTimeout(() => {
			this.restoreWebSocketTimeout = 0;
			this.restoreWebSocketConnection();
		}, RESTORE_WEBSOCKET_TIMEOUT * 1000);
	}

	handleInternalPullEvent(command, message)
	{
		switch (command.toUpperCase())
		{
			case SystemCommands.CHANNEL_EXPIRE:
			{
				if (message.params.action === 'reconnect' && 'new_channel' in message.params)
				{
					this.dispatchEvent(new CustomEvent(ConnectorEvents.ChannelReplaced), {
						detail: {
							type: message.params.channel.type,
							newChannel: message.params.new_channel,
						},
					});
				}
				else
				{
					this.dispatchEvent(new CustomEvent(ConnectorEvents.ConfigExpired));
				}
				break;
			}

			case SystemCommands.CONFIG_EXPIRE:
			{
				this.dispatchEvent(new CustomEvent(ConnectorEvents.ConfigExpired));
				break;
			}

			case SystemCommands.SERVER_RESTART:
			{
				this.reconnect(CloseReasons.SERVER_RESTARTED, 'server was restarted', 15);
				break;
			}
			default://
		}
	}

	getConnectionBasePath(connectionType: string): string
	{
		switch (connectionType)
		{
			case ConnectionType.WebSocket:
				return this.isSecure ? this.config.server.websocket_secure : this.config.server.websocket;
			case ConnectionType.LongPolling:
				return this.isSecure ? this.config.server.long_pooling_secure : this.config.server.long_polling;
			default:
				throw new Error(`Unknown connection type ${connectionType}`);
		}
	}

	getConnectionChannels(): string
	{
		const channels = [];
		for (const channelType of ['private', 'shared'])
		{
			if (channelType in this.config.channels)
			{
				channels.push(this.config.channels[channelType].id);
			}
		}

		if (channels.length === 0)
		{
			throw new Error('Empty channel list');
		}

		return channels.join('/');
	}

	getConnectionPath(): string
	{
		return this.getConnectionPathByType(this.connectionType);
	}

	getConnectionPathByType(connectionType): string
	{
		const params = {};
		const path = this.getConnectionBasePath(connectionType);

		if (isNotEmptyString(this.config.jwt))
		{
			params.token = this.config.jwt;
		}
		else
		{
			params.CHANNEL_ID = this.getConnectionChannels();
		}

		if (this.isJsonRpc())
		{
			params.jsonRpc = 'true';
		}
		else if (this.isProtobufSupported())
		{
			params.binaryMode = 'true';
		}

		if (this.isSharedMode())
		{
			if (!this.config.clientId)
			{
				throw new Error('Push-server is in shared mode, but clientId is not set');
			}
			params.clientId = this.config.clientId;
		}

		if (this.config.server && this.config.server.hostname)
		{
			params.hostname = this.config.server.hostname;
		}

		if (this.session.mid)
		{
			params.mid = this.session.mid;
		}

		if (this.session.tag)
		{
			params.tag = this.session.tag;
		}

		if (this.session.time)
		{
			params.time = this.session.time;
		}
		params.revision = REVISION;

		return `${path}?${buildQueryString(params)}`;
	}

	getPublicationPath(): string
	{
		const path = this.isSecure ? this.config.server.publish_secure : this.config.server.publish;
		if (!path)
		{
			return '';
		}

		const channels = [];
		for (const type of Object.keys(this.config.channels))
		{
			channels.push(this.config.channels[type].id);
		}

		const params = {
			CHANNEL_ID: channels.join('/'),
		};

		return `${path}?${buildQueryString(params)}`;
	}

	emitMessage(message)
	{
		if (!isPlainObject(message.extra))
		{
			message.extra = {};
		}

		if (message.extra.server_time_unix)
		{
			const timeShift = this.config.server.timeShift ?? 0;
			const timeAgo = ((getTimestamp() - (message.extra.server_time_unix * 1000)) / 1000) - timeShift;
			message.extra.server_time_ago = timeAgo > 0 ? timeAgo : 0;
		}

		this.dispatchEvent(new CustomEvent(ConnectorEvents.Message, { detail: message }));
	}

	/**
	 * Returns reconnect delay in seconds
	 * @param attemptNumber
	 * @return {number}
	 */
	getConnectionAttemptDelay(attemptNumber): number
	{
		let result = 60;
		if (attemptNumber < 1)
		{
			result = 0.5;
		}
		else if (attemptNumber < 3)
		{
			result = 5;
		}
		else if (attemptNumber < 5)
		{
			result = 25;
		}
		else if (attemptNumber < 10)
		{
			result = 45;
		}

		return result + (result * Math.random() * 0.2);
	}

	onLongPollingOpen()
	{
		this.unloading = false;
		this.starting = false;
		this.connectionAttempt = 0;
		this.isManualDisconnect = false;
		this.status = PullStatus.Online;

		this.logger?.log('Pull: Long polling connection with push-server opened');
		if (this.isWebSocketEnabled())
		{
			this.scheduleRestoreWebSocketConnection();
		}

		this.connectPromises.forEach((resolver) => {
			resolver.resolve();
		});
		this.connectPromises = [];
	}

	onWebSocketOpen()
	{
		this.status = PullStatus.Online;
		this.isWebsocketBlocked = false;
		this.connectionAttempt = 0;

		// to prevent fallback to long polling in case of networking problems
		this.isLongPollingBlocked = true;

		if (this.connectionType === ConnectionType.LongPolling)
		{
			this.connectionType = ConnectionType.WebSocket;
			this.connectors.longPolling.disconnect();
		}

		if (this.restoreWebSocketTimeout)
		{
			clearTimeout(this.restoreWebSocketTimeout);
			this.restoreWebSocketTimeout = null;
		}
		this.logger?.log('Pull: Websocket connection with push-server opened');
		this.connectPromises.forEach((resolver) => {
			resolver.resolve();
		});
		this.connectPromises = [];
	}

	onWebSocketDisconnect(e = {})
	{
		if (this.connectionType === ConnectionType.WebSocket)
		{
			this.status = PullStatus.Offline;
		}

		if (this.isManualDisconnect)
		{
			this.logger?.logForce('Pull: Websocket connection with push-server manually closed');
		}
		else
		{
			this.logger?.logForce(`Pull: Websocket connection with push-server closed. Code: ${e.code}, reason: ${e.reason}`);
			if (e.code === CloseReasons.WRONG_CHANNEL_ID)
			{
				this.dispatchEvent(new CustomEvent(ConnectorEvents.ConnectionError, {
					detail: {
						code: e.code,
						reason: 'wrong channel signature',
					},
				}));
			}
			else
			{
				this.scheduleReconnect();
			}
		}

		// to prevent fallback to long polling in case of networking problems
		this.isLongPollingBlocked = true;
		this.isManualDisconnect = false;

		this.clearPingWaitTimeout();
	}

	onWebSocketError(e)
	{
		this.starting = false;
		if (this.connectionType === ConnectionType.WebSocket)
		{
			this.status = PullStatus.Offline;
		}

		console.error(`${getDateForLog()}: Pull: WebSocket connection error`, e);
		this.scheduleReconnect();
		this.connectPromises.forEach((resolver) => {
			resolver.reject();
		});
		this.connectPromises = [];

		this.clearPingWaitTimeout();
	}

	onWebSocketBlockChanged(e)
	{
		const isWebSocketBlocked = e.isWebSocketBlocked;

		if (isWebSocketBlocked && this.connectionType === ConnectionType.WebSocket && !this.isConnected())
		{
			clearTimeout(this.reconnectTimeout);

			this.connectionAttempt = 0;
			this.connectionType = ConnectionType.LongPolling;
			this.scheduleReconnect(1);
		}
		else if (!isWebSocketBlocked && this.connectionType === ConnectionType.LongPolling)
		{
			clearTimeout(this.reconnectTimeout);
			clearTimeout(this.restoreWebSocketTimeout);

			this.connectionAttempt = 0;
			this.connectionType = ConnectionType.WebSocket;
			this.scheduleReconnect(1);
		}
	}

	onLongPollingDisconnect(e = {})
	{
		if (this.connectionType === ConnectionType.LongPolling)
		{
			this.status = PullStatus.Offline;
		}

		this.logger?.log(`Pull: Long polling connection with push-server closed. Code: ${e.code}, reason: ${e.reason}`);
		if (!this.isManualDisconnect)
		{
			this.scheduleReconnect();
		}
		this.isManualDisconnect = false;
		this.clearPingWaitTimeout();
	}

	onLongPollingError(e)
	{
		this.starting = false;
		if (this.connectionType === ConnectionType.LongPolling)
		{
			this.status = PullStatus.Offline;
		}
		console.error(`${getDateForLog()}: Pull: Long polling connection error`, e);
		this.scheduleReconnect();
		this.connectPromises.forEach((resolver) => {
			resolver.reject();
		});
		this.connectPromises = [];
		this.clearPingWaitTimeout();
	}

	onIncomingMessage(message)
	{
		if (this.isJsonRpc())
		{
			if (message === JSON_RPC_PING)
			{
				this.onJsonRpcPing();
			}
			else
			{
				this.jsonRpcAdapter.handleIncomingMessage(message);
			}
		}
		else
		{
			const events = this.codec.extractMessages(message);
			this.handleIncomingEvents(events);
		}
	}

	handleRpcIncomingMessage(messageFields): {}
	{
		this.session.mid = messageFields.mid;
		const body = messageFields.body;

		if (!messageFields.body.extra)
		{
			body.extra = {};
		}
		body.extra.sender = messageFields.sender;

		if ('user_params' in messageFields && isPlainObject(messageFields.user_params))
		{
			Object.assign(body.params, messageFields.user_params);
		}

		if ('dictionary' in messageFields && isPlainObject(messageFields.dictionary))
		{
			Object.assign(body.params, messageFields.dictionary);
		}

		if (this.checkDuplicate(messageFields.mid))
		{
			this.addMessageToStat(body);
			this.trimDuplicates();
			if (body.module_id === 'pull')
			{
				this.handleInternalPullEvent(body.command, body);
			}
			else
			{
				this.emitMessage(body);
			}

			if (body.extra && body.extra.revision_web)
			{
				this.checkRevision(body.extra.revision_web);
			}
		}

		this.connector.send(`mack:${messageFields.mid}`);

		return {};
	}

	onRpcError(event)
	{
		// probably, fire event
	}

	onJsonRpcPing()
	{
		this.updatePingWaitTimeout();
		this.connector.send(JSON_RPC_PONG);
	}

	handleIncomingEvents(events)
	{
		const messages = [];
		if (events.length === 0)
		{
			this.session.mid = null;

			return;
		}

		for (const event of events)
		{
			this.updateSessionFromEvent(event);
			if (event.mid && !this.checkDuplicate(event.mid))
			{
				continue;
			}

			this.addMessageToStat(event.text);
			messages.push(event.text);
		}
		this.trimDuplicates();
		messages.forEach((message) => {
			if (message.module_id === 'pull')
			{
				this.handleInternalPullEvent(message.command, message);
			}
			else
			{
				this.emitMessage(message);
			}

			if (message.extra && message.extra.revision_web)
			{
				this.checkRevision(message.extra.revision_web);
			}
		});
	}

	checkRevision(serverRevision: number)
	{
		if (serverRevision > 0 && serverRevision !== REVISION)
		{
			this.logger?.log(`Pull revision changed from ${REVISION} to ${serverRevision}. Reload required`);

			this.dispatchEvent(new CustomEvent(ConnectorEvents.RevisionChanged, { detail: { revision: serverRevision } }));
		}
	}

	updateSessionFromEvent(event)
	{
		this.session.mid = event.mid || null;
		this.session.tag = event.tag || null;
		this.session.time = event.time || null;
	}

	checkDuplicate(mid): boolean
	{
		if (this.session.lastMessageIds.includes(mid))
		{
			// eslint-disable-next-line no-console
			console.warn(`Duplicate message ${mid} skipped`);

			return false;
		}

		this.session.lastMessageIds.push(mid);

		return true;
	}

	trimDuplicates()
	{
		if (this.session.lastMessageIds.length > MAX_IDS_TO_STORE)
		{
			this.session.lastMessageIds = this.session.lastMessageIds.slice(-MAX_IDS_TO_STORE);
		}
	}

	addMessageToStat(message)
	{
		if (!this.session.history[message.module_id])
		{
			this.session.history[message.module_id] = {};
		}

		if (!this.session.history[message.module_id][message.command])
		{
			this.session.history[message.module_id][message.command] = 0;
		}
		this.session.history[message.module_id][message.command]++;

		this.session.messageCount++;
	}

	getRevision(): number | null
	{
		return (this.config && this.config.api) ? this.config.api.revision_web : null;
	}

	getServerVersion(): number
	{
		return (this.config && this.config.server) ? this.config.server.version : 0;
	}

	getServerMode(): string | null
	{
		return (this.config && this.config.server) ? this.config.server.mode : null;
	}

	isConnected(): boolean
	{
		return this.connector.connected;
	}

	isWebSocketConnected(): boolean
	{
		return this.connector.connected && this.connector.connectionType === ConnectionType.WebSocket;
	}

	isWebSocketAllowed(): boolean
	{
		return !this.isWebsocketBlocked && this.isWebSocketEnabled();
	}

	isWebSocketEnabled(): boolean
	{
		if (!isWebSocketSupported())
		{
			return false;
		}

		return (this.config && this.config.server && this.config.server.websocket_enabled === true);
	}

	isPublishingSupported(): boolean
	{
		return this.getServerVersion() > 3;
	}

	isPublishingEnabled(): boolean
	{
		if (!this.isPublishingSupported())
		{
			return false;
		}

		return (this.config && this.config.server && this.config.server.publish_enabled === true);
	}

	isProtobufSupported(): boolean
	{
		return (this.getServerVersion() === 4 && !browser.IsIe());
	}

	isJsonRpc(): boolean
	{
		return (this.getServerVersion() >= 5);
	}

	isSharedMode(): boolean
	{
		return (this.getServerMode() === ServerMode.Shared);
	}

	setPublicIds(publicIds)
	{
		this.channelManager.setPublicIds(publicIds);
	}

	/**
	 * Sends batch of messages to the multiple public channels.
	 *
	 * @param {object[]} messageBatch Array of messages to send.
	 * @param  {int[]} messageBatch.userList User ids the message receivers.
	 * @param  {string[]|object[]} messageBatch.channelList Public ids of the channels to send messages.
	 * @param {string} messageBatch.moduleId Name of the module to receive message,
	 * @param {string} messageBatch.command Command name.
	 * @param {object} messageBatch.params Command parameters.
	 * @param {integer} [messageBatch.expiry] Message expiry time in seconds.
	 * @return void
	 */
	async sendMessageBatch(messageBatch)
	{
		if (!this.isPublishingEnabled())
		{
			throw new Error('Client publishing is not supported or is disabled');
		}

		try
		{
			const packet = await this.codec.encodeMessageBatch(messageBatch);
			this.connector.send(packet);
		}
		catch (e)
		{
			console.error('sendMessageBatch error:', e);
			throw e;
		}
	}

	/**
	 * Send single message to the specified users.
	 *
	 * @param {integer[]} users User ids of the message receivers.
	 * @param {string} moduleId Name of the module to receive message,
	 * @param {string} command Command name.
	 * @param {object} params Command parameters.
	 * @param {integer} [expiry] Message expiry time in seconds.
	 */
	async sendMessage(users, moduleId, command, params, expiry): Promise<JsonRpcResponse> | void
	{
		const message = {
			userList: users,
			body: {
				module_id: moduleId,
				command,
				params,
			},
			expiry,
		};

		if (this.isJsonRpc())
		{
			return this.jsonRpcAdapter.executeOutgoingRpcCommand(RpcMethod.Publish, message);
		}

		return this.sendMessageBatch([message]);
	}

	/**
	 * Send single message to the specified public channels.
	 *
	 * @param {string[]} publicChannels Public ids of the channels to receive message.
	 * @param {string} moduleId Name of the module to receive message,
	 * @param {string} command Command name.
	 * @param {object} params Command parameters.
	 * @param {integer} [expiry] Message expiry time in seconds.
	 * @return {Promise}
	 */
	sendMessageToChannels(publicChannels, moduleId, command, params, expiry): Promise<JsonRpcResponse> | void
	{
		const message = {
			channelList: publicChannels,
			body: {
				module_id: moduleId,
				command,
				params,
			},
			expiry,
		};

		if (this.isJsonRpc())
		{
			return this.jsonRpcAdapter.executeOutgoingRpcCommand(RpcMethod.Publish, message);
		}

		return this.sendMessageBatch([message]);
	}

	/**
	 * @param userId {number}
	 */
	async subscribeUserStatusChange(userId): Promise<void>
	{
		if (typeof (userId) !== 'number')
		{
			throw new TypeError('userId must be a number');
		}

		await this.jsonRpcAdapter.executeOutgoingRpcCommand(RpcMethod.SubscribeStatusChange, { userId });
	}

	/**
	 * @param userId {number}
	 * @returns {Promise}
	 */
	async unsubscribeUserStatusChange(userId): Promise<void>
	{
		if (typeof (userId) !== 'number')
		{
			throw new TypeError('userId must be a number');
		}

		await this.jsonRpcAdapter.executeOutgoingRpcCommand(RpcMethod.UnsubscribeStatusChange, { userId });
	}

	/**
	 * Returns "last seen" time in seconds for the users. Result format: Object{userId: int}
	 * If the user is currently connected - will return 0.
	 * If the user if offline - will return diff between current timestamp and last seen timestamp in seconds.
	 * If the user was never online - the record for user will be missing from the result object.
	 *
	 * @param {integer[]} userList List of user ids.
	 * @returns {Promise}
	 */
	async getUsersLastSeen(userList: number[]): Promise<{ [number]: number }>
	{
		if (!isArray(userList) || !userList.every((item) => typeof (item) === 'number'))
		{
			throw new Error('userList must be an array of numbers');
		}
		const result = {};

		const response = await this.jsonRpcAdapter.executeOutgoingRpcCommand(RpcMethod.GetUsersLastSeen, {
			userList,
		});
		const unresolved = [];
		for (const userId of userList)
		{
			if (!(userId in response))
			{
				unresolved.push(userId);
			}
			result[userId] = response[userId];
		}

		if (unresolved.length === 0)
		{
			return result;
		}

		const params = {
			userIds: unresolved,
			sendToQueueSever: true,
		};
		const restResponse = await this.restClient.callMethod('pull.api.user.getLastSeen', params);
		const restData = restResponse.data();
		for (const userId of Object.keys(restData))
		{
			result[userId] = restData[userId];
		}

		return result;
	}

	/**
	 * Pings server. In case of success promise will be resolved, otherwise - rejected.
	 *
	 * @param {int} timeout Request timeout in seconds
	 * @returns {Promise}
	 */
	ping(timeout): Promise<JsonRpcResponse>
	{
		return this.jsonRpcAdapter.executeOutgoingRpcCommand(RpcMethod.Ping, {}, timeout);
	}

	/**
	 * Returns list channels that the connection is subscribed to.
	 *
	 * @returns {Promise}
	 */
	listChannels(): Promise<JsonRpcResponse>
	{
		return this.jsonRpcAdapter.executeOutgoingRpcCommand(RpcMethod.ListChannels, {});
	}

	updatePingWaitTimeout()
	{
		clearTimeout(this.pingWaitTimeout);
		this.pingWaitTimeout = setTimeout(this.onPingTimeout.bind(this), PING_TIMEOUT * 2 * 1000);
	}

	clearPingWaitTimeout()
	{
		clearTimeout(this.pingWaitTimeout);
		this.pingWaitTimeout = null;
	}

	onPingTimeout()
	{
		this.pingWaitTimeout = null;
		if (!this.isConnected())
		{
			return;
		}

		// eslint-disable-next-line no-console
		console.warn(`No pings are received in ${PING_TIMEOUT * 2} seconds. Reconnecting`);
		this.disconnect(CloseReasons.STUCK, 'connection stuck');
		this.scheduleReconnect();
	}
}

Youez - 2016 - github.com/yon3zu
LinuXploit