PHP WebShell
Текущая директория: /opt/BitGoJS/node_modules/web3-utils/src
Просмотр файла: socket_provider.ts
/*
This file is part of web3.js.
web3.js is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
web3.js is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/
import {
ConnectionEvent,
Eip1193EventName,
EthExecutionAPI,
JsonRpcBatchRequest,
JsonRpcBatchResponse,
JsonRpcId,
JsonRpcNotification,
JsonRpcRequest,
JsonRpcResponse,
JsonRpcResponseWithResult,
JsonRpcResult,
ProviderConnectInfo,
ProviderMessage,
ProviderRpcError,
SocketRequestItem,
Web3APIMethod,
Web3APIPayload,
Web3APIReturnType,
Web3APISpec,
Web3Eip1193ProviderEventCallback,
Web3ProviderEventCallback,
Web3ProviderMessageEventCallback,
Web3ProviderStatus,
} from 'web3-types';
import {
ConnectionError,
ConnectionNotOpenError,
InvalidClientError,
MaxAttemptsReachedOnReconnectingError,
PendingRequestsOnReconnectingError,
RequestAlreadySentError,
Web3WSProviderError,
} from 'web3-errors';
import { Eip1193Provider } from './web3_eip1193_provider.js';
import { ChunkResponseParser } from './chunk_response_parser.js';
import { isNullish } from './validation.js';
import { Web3DeferredPromise } from './web3_deferred_promise.js';
import * as jsonRpc from './json_rpc.js';
export type ReconnectOptions = {
autoReconnect: boolean;
delay: number;
maxAttempts: number;
};
const DEFAULT_RECONNECTION_OPTIONS = {
autoReconnect: true,
delay: 5000,
maxAttempts: 5,
};
const NORMAL_CLOSE_CODE = 1000; // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close
export abstract class SocketProvider<
MessageEvent,
CloseEvent,
ErrorEvent,
API extends Web3APISpec = EthExecutionAPI,
> extends Eip1193Provider<API> {
protected isReconnecting: boolean;
protected readonly _socketPath: string;
protected readonly chunkResponseParser: ChunkResponseParser;
/* eslint-disable @typescript-eslint/no-explicit-any */
protected readonly _pendingRequestsQueue: Map<JsonRpcId, SocketRequestItem<any, any, any>>;
/* eslint-disable @typescript-eslint/no-explicit-any */
protected readonly _sentRequestsQueue: Map<JsonRpcId, SocketRequestItem<any, any, any>>;
protected _reconnectAttempts!: number;
protected readonly _socketOptions?: unknown;
protected readonly _reconnectOptions: ReconnectOptions;
protected _socketConnection?: unknown;
public get SocketConnection() {
return this._socketConnection;
}
protected _connectionStatus: Web3ProviderStatus;
protected readonly _onMessageHandler: (event: MessageEvent) => void;
protected readonly _onOpenHandler: () => void;
protected readonly _onCloseHandler: (event: CloseEvent) => void;
protected readonly _onErrorHandler: (event: ErrorEvent) => void;
/**
* This is an abstract class for implementing a socket provider (e.g. WebSocket, IPC). It extends the EIP-1193 provider {@link EIP1193Provider}.
* @param socketPath - The path to the socket (e.g. /ipc/path or ws://localhost:8546)
* @param socketOptions - The options for the socket connection. Its type is supposed to be specified in the inherited classes.
* @param reconnectOptions - The options for the socket reconnection {@link ReconnectOptions}
*/
public constructor(
socketPath: string,
socketOptions?: unknown,
reconnectOptions?: Partial<ReconnectOptions>,
) {
super();
this._connectionStatus = 'connecting';
// Message handlers. Due to bounding of `this` and removing the listeners we have to keep it's reference.
this._onMessageHandler = this._onMessage.bind(this);
this._onOpenHandler = this._onConnect.bind(this);
this._onCloseHandler = this._onCloseEvent.bind(this);
this._onErrorHandler = this._onError.bind(this);
if (!this._validateProviderPath(socketPath)) throw new InvalidClientError(socketPath);
this._socketPath = socketPath;
this._socketOptions = socketOptions;
this._reconnectOptions = {
...DEFAULT_RECONNECTION_OPTIONS,
...(reconnectOptions ?? {}),
};
this._pendingRequestsQueue = new Map<JsonRpcId, SocketRequestItem<any, any, any>>();
this._sentRequestsQueue = new Map<JsonRpcId, SocketRequestItem<any, any, any>>();
this._init();
this.connect();
this.chunkResponseParser = new ChunkResponseParser(
this._eventEmitter,
this._reconnectOptions.autoReconnect,
);
this.chunkResponseParser.onError(() => {
this._clearQueues();
});
this.isReconnecting = false;
}
protected _init() {
this._reconnectAttempts = 0;
}
/**
* Try to establish a connection to the socket
*/
public connect(): void {
try {
this._openSocketConnection();
this._connectionStatus = 'connecting';
this._addSocketListeners();
} catch (e) {
if (!this.isReconnecting) {
this._connectionStatus = 'disconnected';
if (e && (e as Error).message) {
throw new ConnectionError(
`Error while connecting to ${this._socketPath}. Reason: ${
(e as Error).message
}`,
);
} else {
throw new InvalidClientError(this._socketPath);
}
} else {
setImmediate(() => {
this._reconnect();
});
}
}
}
protected abstract _openSocketConnection(): void;
protected abstract _addSocketListeners(): void;
protected abstract _removeSocketListeners(): void;
protected abstract _onCloseEvent(_event: unknown): void;
protected abstract _sendToSocket(_payload: Web3APIPayload<API, any>): void;
protected abstract _parseResponses(_event: MessageEvent): JsonRpcResponse[];
protected abstract _closeSocketConnection(_code?: number, _data?: string): void;
// eslint-disable-next-line class-methods-use-this
protected _validateProviderPath(path: string): boolean {
return !!path;
}
/**
*
* @returns the pendingRequestQueue size
*/
// eslint-disable-next-line class-methods-use-this
public getPendingRequestQueueSize() {
return this._pendingRequestsQueue.size;
}
/**
*
* @returns the sendPendingRequests size
*/
// eslint-disable-next-line class-methods-use-this
public getSentRequestsQueueSize() {
return this._sentRequestsQueue.size;
}
/**
*
* @returns `true` if the socket supports subscriptions
*/
// eslint-disable-next-line class-methods-use-this
public supportsSubscriptions(): boolean {
return true;
}
/**
* Registers a listener for the specified event type.
* @param type - The event type to listen for
* @param listener - The callback to be invoked when the event is emitted
*/
public on(
type: 'disconnect',
listener: Web3Eip1193ProviderEventCallback<ProviderRpcError>,
): void;
public on(
type: 'connect',
listener: Web3Eip1193ProviderEventCallback<ProviderConnectInfo>,
): void;
public on(type: 'chainChanged', listener: Web3Eip1193ProviderEventCallback<string>): void;
public on(type: 'accountsChanged', listener: Web3Eip1193ProviderEventCallback<string[]>): void;
public on<T = JsonRpcResult>(
type: 'message',
listener:
| Web3Eip1193ProviderEventCallback<ProviderMessage>
| Web3ProviderMessageEventCallback<T>,
): void;
public on<T = JsonRpcResult>(
type: string,
listener: Web3Eip1193ProviderEventCallback<unknown> | Web3ProviderEventCallback<T>,
): void;
public on<T = JsonRpcResult, P = unknown>(
type: string | Eip1193EventName,
listener:
| Web3Eip1193ProviderEventCallback<P>
| Web3ProviderMessageEventCallback<T>
| Web3ProviderEventCallback<T>,
): void {
this._eventEmitter.on(type, listener);
}
/**
* Registers a listener for the specified event type that will be invoked at most once.
* @param type - The event type to listen for
* @param listener - The callback to be invoked when the event is emitted
*/
public once(
type: 'disconnect',
listener: Web3Eip1193ProviderEventCallback<ProviderRpcError>,
): void;
public once(
type: 'connect',
listener: Web3Eip1193ProviderEventCallback<ProviderConnectInfo>,
): void;
public once(type: 'chainChanged', listener: Web3Eip1193ProviderEventCallback<string>): void;
public once(
type: 'accountsChanged',
listener: Web3Eip1193ProviderEventCallback<string[]>,
): void;
public once<T = JsonRpcResult>(
type: 'message',
listener:
| Web3Eip1193ProviderEventCallback<ProviderMessage>
| Web3ProviderMessageEventCallback<T>,
): void;
public once<T = JsonRpcResult>(
type: string,
listener: Web3Eip1193ProviderEventCallback<unknown> | Web3ProviderEventCallback<T>,
): void;
public once<T = JsonRpcResult, P = unknown>(
type: string | Eip1193EventName,
listener:
| Web3Eip1193ProviderEventCallback<P>
| Web3ProviderMessageEventCallback<T>
| Web3ProviderEventCallback<T>,
): void {
this._eventEmitter.once(type, listener);
}
/**
* Removes a listener for the specified event type.
* @param type - The event type to remove the listener for
* @param listener - The callback to be executed
*/
public removeListener(
type: 'disconnect',
listener: Web3Eip1193ProviderEventCallback<ProviderRpcError>,
): void;
public removeListener(
type: 'connect',
listener: Web3Eip1193ProviderEventCallback<ProviderConnectInfo>,
): void;
public removeListener(
type: 'chainChanged',
listener: Web3Eip1193ProviderEventCallback<string>,
): void;
public removeListener(
type: 'accountsChanged',
listener: Web3Eip1193ProviderEventCallback<string[]>,
): void;
public removeListener<T = JsonRpcResult>(
type: 'message',
listener:
| Web3Eip1193ProviderEventCallback<ProviderMessage>
| Web3ProviderMessageEventCallback<T>,
): void;
public removeListener<T = JsonRpcResult>(
type: string,
listener: Web3Eip1193ProviderEventCallback<unknown> | Web3ProviderEventCallback<T>,
): void;
public removeListener<T = JsonRpcResult, P = unknown>(
type: string | Eip1193EventName,
listener:
| Web3Eip1193ProviderEventCallback<P>
| Web3ProviderMessageEventCallback<T>
| Web3ProviderEventCallback<T>,
): void {
this._eventEmitter.removeListener(type, listener);
}
protected _onDisconnect(code: number, data?: string) {
this._connectionStatus = 'disconnected';
super._onDisconnect(code, data);
}
/**
* Disconnects the socket
* @param code - The code to be sent to the server
* @param data - The data to be sent to the server
*/
public disconnect(code?: number, data?: string): void {
const disconnectCode = code ?? NORMAL_CLOSE_CODE;
this._removeSocketListeners();
if (this.getStatus() !== 'disconnected') {
this._closeSocketConnection(disconnectCode, data);
}
this._onDisconnect(disconnectCode, data);
}
/**
* Safely disconnects the socket, async and waits for request size to be 0 before disconnecting
* @param forceDisconnect - If true, will clear queue after 5 attempts of waiting for both pending and sent queue to be 0
* @param ms - Determines the ms of setInterval
* @param code - The code to be sent to the server
* @param data - The data to be sent to the server
*/
public async safeDisconnect(code?: number, data?: string, forceDisconnect = false,ms = 1000) {
let retryAttempt = 0;
const checkQueue = async () =>
new Promise(resolve => {
const interval = setInterval(() => {
if (forceDisconnect && retryAttempt === 5) {
this.clearQueues();
}
if (this.getPendingRequestQueueSize() === 0 && this.getSentRequestsQueueSize() === 0) {
clearInterval(interval);
resolve(true);
}
retryAttempt+=1;
}, ms)
})
await checkQueue();
this.disconnect(code, data);
}
/**
* Removes all listeners for the specified event type.
* @param type - The event type to remove the listeners for
*/
public removeAllListeners(type: string): void {
this._eventEmitter.removeAllListeners(type);
}
protected _onError(event: ErrorEvent): void {
// do not emit error while trying to reconnect
if (this.isReconnecting) {
this._reconnect();
} else {
this._eventEmitter.emit('error', event);
}
}
/**
* Resets the socket, removing all listeners and pending requests
*/
public reset(): void {
this._sentRequestsQueue.clear();
this._pendingRequestsQueue.clear();
this._init();
this._removeSocketListeners();
this._addSocketListeners();
}
protected _reconnect(): void {
if (this.isReconnecting) {
return;
}
this.isReconnecting = true;
if (this._sentRequestsQueue.size > 0) {
this._sentRequestsQueue.forEach(
(request: SocketRequestItem<any, any, any>, key: JsonRpcId) => {
request.deferredPromise.reject(new PendingRequestsOnReconnectingError());
this._sentRequestsQueue.delete(key);
},
);
}
if (this._reconnectAttempts < this._reconnectOptions.maxAttempts) {
this._reconnectAttempts += 1;
setTimeout(() => {
this._removeSocketListeners();
this.connect();
this.isReconnecting = false;
}, this._reconnectOptions.delay);
} else {
this.isReconnecting = false;
this._clearQueues();
this._removeSocketListeners();
this._eventEmitter.emit(
'error',
new MaxAttemptsReachedOnReconnectingError(this._reconnectOptions.maxAttempts),
);
}
}
/**
* Creates a request object to be sent to the server
*/
public async request<
Method extends Web3APIMethod<API>,
ResultType = Web3APIReturnType<API, Method>,
>(request: Web3APIPayload<API, Method>): Promise<JsonRpcResponseWithResult<ResultType>> {
if (isNullish(this._socketConnection)) {
throw new Error('Connection is undefined');
}
// if socket disconnected - open connection
if (this.getStatus() === 'disconnected') {
this.connect();
}
const requestId = jsonRpc.isBatchRequest(request)
? (request as unknown as JsonRpcBatchRequest)[0].id
: (request as unknown as JsonRpcRequest).id;
if (!requestId) {
throw new Web3WSProviderError('Request Id not defined');
}
if (this._sentRequestsQueue.has(requestId)) {
throw new RequestAlreadySentError(requestId);
}
const deferredPromise = new Web3DeferredPromise<JsonRpcResponseWithResult<ResultType>>();
deferredPromise.catch(error => {
this._eventEmitter.emit('error', error);
});
const reqItem: SocketRequestItem<API, Method, JsonRpcResponseWithResult<ResultType>> = {
payload: request,
deferredPromise,
};
if (this.getStatus() === 'connecting') {
this._pendingRequestsQueue.set(requestId, reqItem);
return reqItem.deferredPromise;
}
this._sentRequestsQueue.set(requestId, reqItem);
try {
this._sendToSocket(reqItem.payload);
} catch (error) {
this._sentRequestsQueue.delete(requestId);
this._eventEmitter.emit('error', error);
}
return deferredPromise;
}
protected _onConnect() {
this._connectionStatus = 'connected';
this._reconnectAttempts = 0;
super._onConnect();
this._sendPendingRequests();
}
private _sendPendingRequests() {
for (const [id, value] of this._pendingRequestsQueue.entries()) {
this._sendToSocket(value.payload as Web3APIPayload<API, any>);
this._pendingRequestsQueue.delete(id);
this._sentRequestsQueue.set(id, value);
}
}
protected _onMessage(event: MessageEvent): void {
const responses = this._parseResponses(event);
if (isNullish(responses) || responses.length === 0) {
return;
}
for (const response of responses) {
if (
jsonRpc.isResponseWithNotification(response as JsonRpcNotification) &&
(response as JsonRpcNotification).method.endsWith('_subscription')
) {
this._eventEmitter.emit('message', response);
return;
}
const requestId = jsonRpc.isBatchResponse(response)
? (response as unknown as JsonRpcBatchResponse)[0].id
: (response as unknown as JsonRpcResponseWithResult).id;
const requestItem = this._sentRequestsQueue.get(requestId);
if (!requestItem) {
return;
}
if (
jsonRpc.isBatchResponse(response) ||
jsonRpc.isResponseWithResult(response) ||
jsonRpc.isResponseWithError(response)
) {
this._eventEmitter.emit('message', response);
requestItem.deferredPromise.resolve(response);
}
this._sentRequestsQueue.delete(requestId);
}
}
public clearQueues(event?: ConnectionEvent) {
this._clearQueues(event);
}
protected _clearQueues(event?: ConnectionEvent) {
if (this._pendingRequestsQueue.size > 0) {
this._pendingRequestsQueue.forEach(
(request: SocketRequestItem<any, any, any>, key: JsonRpcId) => {
request.deferredPromise.reject(new ConnectionNotOpenError(event));
this._pendingRequestsQueue.delete(key);
},
);
}
if (this._sentRequestsQueue.size > 0) {
this._sentRequestsQueue.forEach(
(request: SocketRequestItem<any, any, any>, key: JsonRpcId) => {
request.deferredPromise.reject(new ConnectionNotOpenError(event));
this._sentRequestsQueue.delete(key);
},
);
}
this._removeSocketListeners();
}
}
Выполнить команду
Для локальной разработки. Не используйте в интернете!