PHP WebShell
Текущая директория: /usr/lib/node_modules/bitgo/node_modules/@polkadot/rpc-provider/ws
Просмотр файла: index.js
import { EventEmitter } from 'eventemitter3';
import { isChildClass, isNull, isUndefined, logger, noop, objectSpread, stringify } from '@polkadot/util';
import { xglobal } from '@polkadot/x-global';
import { WebSocket } from '@polkadot/x-ws';
import { RpcCoder } from '../coder/index.js';
import defaults from '../defaults.js';
import { DEFAULT_CAPACITY, LRUCache } from '../lru.js';
import { getWSErrorString } from './errors.js';
const ALIASES = {
chain_finalisedHead: 'chain_finalizedHead',
chain_subscribeFinalisedHeads: 'chain_subscribeFinalizedHeads',
chain_unsubscribeFinalisedHeads: 'chain_unsubscribeFinalizedHeads'
};
const RETRY_DELAY = 2_500;
const DEFAULT_TIMEOUT_MS = 60 * 1000;
const TIMEOUT_INTERVAL = 5_000;
const l = logger('api-ws');
/** @internal Clears a Record<*> of all keys, optionally with all callback on clear */
function eraseRecord(record, cb) {
Object.keys(record).forEach((key) => {
if (cb) {
cb(record[key]);
}
delete record[key];
});
}
/** @internal Creates a default/empty stats object */
function defaultEndpointStats() {
return { bytesRecv: 0, bytesSent: 0, cached: 0, errors: 0, requests: 0, subscriptions: 0, timeout: 0 };
}
/**
* # @polkadot/rpc-provider/ws
*
* @name WsProvider
*
* @description The WebSocket Provider allows sending requests using WebSocket to a WebSocket RPC server TCP port. Unlike the [[HttpProvider]], it does support subscriptions and allows listening to events such as new blocks or balance changes.
*
* @example
* <BR>
*
* ```javascript
* import Api from '@polkadot/api/promise';
* import { WsProvider } from '@polkadot/rpc-provider/ws';
*
* const provider = new WsProvider('ws://127.0.0.1:9944');
* const api = new Api(provider);
* ```
*
* @see [[HttpProvider]]
*/
export class WsProvider {
__internal__callCache;
__internal__coder;
__internal__endpoints;
__internal__headers;
__internal__eventemitter;
__internal__handlers = {};
__internal__isReadyPromise;
__internal__stats;
__internal__waitingForId = {};
__internal__cacheCapacity;
__internal__autoConnectMs;
__internal__endpointIndex;
__internal__endpointStats;
__internal__isConnected = false;
__internal__subscriptions = {};
__internal__timeoutId = null;
__internal__websocket;
__internal__timeout;
/**
* @param {string | string[]} endpoint The endpoint url. Usually `ws://ip:9944` or `wss://ip:9944`, may provide an array of endpoint strings.
* @param {number | false} autoConnectMs Whether to connect automatically or not (default). Provided value is used as a delay between retries.
* @param {Record<string, string>} headers The headers provided to the underlying WebSocket
* @param {number} [timeout] Custom timeout value used per request . Defaults to `DEFAULT_TIMEOUT_MS`
*/
constructor(endpoint = defaults.WS_URL, autoConnectMs = RETRY_DELAY, headers = {}, timeout, cacheCapacity) {
const endpoints = Array.isArray(endpoint)
? endpoint
: [endpoint];
if (endpoints.length === 0) {
throw new Error('WsProvider requires at least one Endpoint');
}
endpoints.forEach((endpoint) => {
if (!/^(wss|ws):\/\//.test(endpoint)) {
throw new Error(`Endpoint should start with 'ws://', received '${endpoint}'`);
}
});
this.__internal__callCache = new LRUCache(cacheCapacity || DEFAULT_CAPACITY);
this.__internal__cacheCapacity = cacheCapacity || DEFAULT_CAPACITY;
this.__internal__eventemitter = new EventEmitter();
this.__internal__autoConnectMs = autoConnectMs || 0;
this.__internal__coder = new RpcCoder();
this.__internal__endpointIndex = -1;
this.__internal__endpoints = endpoints;
this.__internal__headers = headers;
this.__internal__websocket = null;
this.__internal__stats = {
active: { requests: 0, subscriptions: 0 },
total: defaultEndpointStats()
};
this.__internal__endpointStats = defaultEndpointStats();
this.__internal__timeout = timeout || DEFAULT_TIMEOUT_MS;
if (autoConnectMs && autoConnectMs > 0) {
this.connectWithRetry().catch(noop);
}
this.__internal__isReadyPromise = new Promise((resolve) => {
this.__internal__eventemitter.once('connected', () => {
resolve(this);
});
});
}
/**
* @summary `true` when this provider supports subscriptions
*/
get hasSubscriptions() {
return !!true;
}
/**
* @summary `true` when this provider supports clone()
*/
get isClonable() {
return !!true;
}
/**
* @summary Whether the node is connected or not.
* @return {boolean} true if connected
*/
get isConnected() {
return this.__internal__isConnected;
}
/**
* @description Promise that resolves the first time we are connected and loaded
*/
get isReady() {
return this.__internal__isReadyPromise;
}
get endpoint() {
return this.__internal__endpoints[this.__internal__endpointIndex];
}
/**
* @description Returns a clone of the object
*/
clone() {
return new WsProvider(this.__internal__endpoints);
}
selectEndpointIndex(endpoints) {
return (this.__internal__endpointIndex + 1) % endpoints.length;
}
/**
* @summary Manually connect
* @description The [[WsProvider]] connects automatically by default, however if you decided otherwise, you may
* connect manually using this method.
*/
// eslint-disable-next-line @typescript-eslint/require-await
async connect() {
if (this.__internal__websocket) {
throw new Error('WebSocket is already connected');
}
try {
this.__internal__endpointIndex = this.selectEndpointIndex(this.__internal__endpoints);
// the as here is Deno-specific - not available on the globalThis
this.__internal__websocket = typeof xglobal.WebSocket !== 'undefined' && isChildClass(xglobal.WebSocket, WebSocket)
? new WebSocket(this.endpoint)
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore - WS may be an instance of ws, which supports options
: new WebSocket(this.endpoint, undefined, {
headers: this.__internal__headers
});
if (this.__internal__websocket) {
this.__internal__websocket.onclose = this.__internal__onSocketClose;
this.__internal__websocket.onerror = this.__internal__onSocketError;
this.__internal__websocket.onmessage = this.__internal__onSocketMessage;
this.__internal__websocket.onopen = this.__internal__onSocketOpen;
}
// timeout any handlers that have not had a response
this.__internal__timeoutId = setInterval(() => this.__internal__timeoutHandlers(), TIMEOUT_INTERVAL);
}
catch (error) {
l.error(error);
this.__internal__emit('error', error);
throw error;
}
}
/**
* @description Connect, never throwing an error, but rather forcing a retry
*/
async connectWithRetry() {
if (this.__internal__autoConnectMs > 0) {
try {
await this.connect();
}
catch {
setTimeout(() => {
this.connectWithRetry().catch(noop);
}, this.__internal__autoConnectMs);
}
}
}
/**
* @description Manually disconnect from the connection, clearing auto-connect logic
*/
// eslint-disable-next-line @typescript-eslint/require-await
async disconnect() {
// switch off autoConnect, we are in manual mode now
this.__internal__autoConnectMs = 0;
try {
if (this.__internal__websocket) {
// 1000 - Normal closure; the connection successfully completed
this.__internal__websocket.close(1000);
}
}
catch (error) {
l.error(error);
this.__internal__emit('error', error);
throw error;
}
}
/**
* @description Returns the connection stats
*/
get stats() {
return {
active: {
requests: Object.keys(this.__internal__handlers).length,
subscriptions: Object.keys(this.__internal__subscriptions).length
},
total: this.__internal__stats.total
};
}
get endpointStats() {
return this.__internal__endpointStats;
}
/**
* @summary Listens on events after having subscribed using the [[subscribe]] function.
* @param {ProviderInterfaceEmitted} type Event
* @param {ProviderInterfaceEmitCb} sub Callback
* @return unsubscribe function
*/
on(type, sub) {
this.__internal__eventemitter.on(type, sub);
return () => {
this.__internal__eventemitter.removeListener(type, sub);
};
}
/**
* @summary Send JSON data using WebSockets to configured HTTP Endpoint or queue.
* @param method The RPC methods to execute
* @param params Encoded parameters as applicable for the method
* @param subscription Subscription details (internally used)
*/
send(method, params, isCacheable, subscription) {
this.__internal__endpointStats.requests++;
this.__internal__stats.total.requests++;
const [id, body] = this.__internal__coder.encodeJson(method, params);
if (this.__internal__cacheCapacity === 0) {
return this.__internal__send(id, body, method, params, subscription);
}
const cacheKey = isCacheable ? `${method}::${stringify(params)}` : '';
let resultPromise = isCacheable
? this.__internal__callCache.get(cacheKey)
: null;
if (!resultPromise) {
resultPromise = this.__internal__send(id, body, method, params, subscription);
if (isCacheable) {
this.__internal__callCache.set(cacheKey, resultPromise);
}
}
else {
this.__internal__endpointStats.cached++;
this.__internal__stats.total.cached++;
}
return resultPromise;
}
async __internal__send(id, body, method, params, subscription) {
return new Promise((resolve, reject) => {
try {
if (!this.isConnected || this.__internal__websocket === null) {
throw new Error('WebSocket is not connected');
}
const callback = (error, result) => {
error
? reject(error)
: resolve(result);
};
l.debug(() => ['calling', method, body]);
this.__internal__handlers[id] = {
callback,
method,
params,
start: Date.now(),
subscription
};
const bytesSent = body.length;
this.__internal__endpointStats.bytesSent += bytesSent;
this.__internal__stats.total.bytesSent += bytesSent;
this.__internal__websocket.send(body);
}
catch (error) {
this.__internal__endpointStats.errors++;
this.__internal__stats.total.errors++;
reject(error);
}
});
}
/**
* @name subscribe
* @summary Allows subscribing to a specific event.
*
* @example
* <BR>
*
* ```javascript
* const provider = new WsProvider('ws://127.0.0.1:9944');
* const rpc = new Rpc(provider);
*
* rpc.state.subscribeStorage([[storage.system.account, <Address>]], (_, values) => {
* console.log(values)
* }).then((subscriptionId) => {
* console.log('balance changes subscription id: ', subscriptionId)
* })
* ```
*/
subscribe(type, method, params, callback) {
this.__internal__endpointStats.subscriptions++;
this.__internal__stats.total.subscriptions++;
// subscriptions are not cached, LRU applies to .at(<blockHash>) only
return this.send(method, params, false, { callback, type });
}
/**
* @summary Allows unsubscribing to subscriptions made with [[subscribe]].
*/
async unsubscribe(type, method, id) {
const subscription = `${type}::${id}`;
// FIXME This now could happen with re-subscriptions. The issue is that with a re-sub
// the assigned id now does not match what the API user originally received. It has
// a slight complication in solving - since we cannot rely on the send id, but rather
// need to find the actual subscription id to map it
if (isUndefined(this.__internal__subscriptions[subscription])) {
l.debug(() => `Unable to find active subscription=${subscription}`);
return false;
}
delete this.__internal__subscriptions[subscription];
try {
return this.isConnected && !isNull(this.__internal__websocket)
? this.send(method, [id])
: true;
}
catch {
return false;
}
}
__internal__emit = (type, ...args) => {
this.__internal__eventemitter.emit(type, ...args);
};
__internal__onSocketClose = (event) => {
const error = new Error(`disconnected from ${this.endpoint}: ${event.code}:: ${event.reason || getWSErrorString(event.code)}`);
if (this.__internal__autoConnectMs > 0) {
l.error(error.message);
}
this.__internal__isConnected = false;
if (this.__internal__websocket) {
this.__internal__websocket.onclose = null;
this.__internal__websocket.onerror = null;
this.__internal__websocket.onmessage = null;
this.__internal__websocket.onopen = null;
this.__internal__websocket = null;
}
if (this.__internal__timeoutId) {
clearInterval(this.__internal__timeoutId);
this.__internal__timeoutId = null;
}
// reject all hanging requests
eraseRecord(this.__internal__handlers, (h) => {
try {
h.callback(error, undefined);
}
catch (err) {
// does not throw
l.error(err);
}
});
eraseRecord(this.__internal__waitingForId);
// Reset stats for active endpoint
this.__internal__endpointStats = defaultEndpointStats();
this.__internal__emit('disconnected');
if (this.__internal__autoConnectMs > 0) {
setTimeout(() => {
this.connectWithRetry().catch(noop);
}, this.__internal__autoConnectMs);
}
};
__internal__onSocketError = (error) => {
l.debug(() => ['socket error', error]);
this.__internal__emit('error', error);
};
__internal__onSocketMessage = (message) => {
l.debug(() => ['received', message.data]);
const bytesRecv = message.data.length;
this.__internal__endpointStats.bytesRecv += bytesRecv;
this.__internal__stats.total.bytesRecv += bytesRecv;
const response = JSON.parse(message.data);
return isUndefined(response.method)
? this.__internal__onSocketMessageResult(response)
: this.__internal__onSocketMessageSubscribe(response);
};
__internal__onSocketMessageResult = (response) => {
const handler = this.__internal__handlers[response.id];
if (!handler) {
l.debug(() => `Unable to find handler for id=${response.id}`);
return;
}
try {
const { method, params, subscription } = handler;
const result = this.__internal__coder.decodeResponse(response);
// first send the result - in case of subs, we may have an update
// immediately if we have some queued results already
handler.callback(null, result);
if (subscription) {
const subId = `${subscription.type}::${result}`;
this.__internal__subscriptions[subId] = objectSpread({}, subscription, {
method,
params
});
// if we have a result waiting for this subscription already
if (this.__internal__waitingForId[subId]) {
this.__internal__onSocketMessageSubscribe(this.__internal__waitingForId[subId]);
}
}
}
catch (error) {
this.__internal__endpointStats.errors++;
this.__internal__stats.total.errors++;
handler.callback(error, undefined);
}
delete this.__internal__handlers[response.id];
};
__internal__onSocketMessageSubscribe = (response) => {
if (!response.method) {
throw new Error('No method found in JSONRPC response');
}
const method = ALIASES[response.method] || response.method;
const subId = `${method}::${response.params.subscription}`;
const handler = this.__internal__subscriptions[subId];
if (!handler) {
// store the JSON, we could have out-of-order subid coming in
this.__internal__waitingForId[subId] = response;
l.debug(() => `Unable to find handler for subscription=${subId}`);
return;
}
// housekeeping
delete this.__internal__waitingForId[subId];
try {
const result = this.__internal__coder.decodeResponse(response);
handler.callback(null, result);
}
catch (error) {
this.__internal__endpointStats.errors++;
this.__internal__stats.total.errors++;
handler.callback(error, undefined);
}
};
__internal__onSocketOpen = () => {
if (this.__internal__websocket === null) {
throw new Error('WebSocket cannot be null in onOpen');
}
l.debug(() => ['connected to', this.endpoint]);
this.__internal__isConnected = true;
this.__internal__resubscribe();
this.__internal__emit('connected');
return true;
};
__internal__resubscribe = () => {
const subscriptions = this.__internal__subscriptions;
this.__internal__subscriptions = {};
Promise.all(Object.keys(subscriptions).map(async (id) => {
const { callback, method, params, type } = subscriptions[id];
// only re-create subscriptions which are not in author (only area where
// transactions are created, i.e. submissions such as 'author_submitAndWatchExtrinsic'
// are not included (and will not be re-broadcast)
if (type.startsWith('author_')) {
return;
}
try {
await this.subscribe(type, method, params, callback);
}
catch (error) {
l.error(error);
}
})).catch(l.error);
};
__internal__timeoutHandlers = () => {
const now = Date.now();
const ids = Object.keys(this.__internal__handlers);
for (let i = 0, count = ids.length; i < count; i++) {
const handler = this.__internal__handlers[ids[i]];
if ((now - handler.start) > this.__internal__timeout) {
try {
handler.callback(new Error(`No response received from RPC endpoint in ${this.__internal__timeout / 1000}s`), undefined);
}
catch {
// ignore
}
this.__internal__endpointStats.timeout++;
this.__internal__stats.total.timeout++;
delete this.__internal__handlers[ids[i]];
}
}
};
}
Выполнить команду
Для локальной разработки. Не используйте в интернете!