PHP WebShell

Текущая директория: /opt/BitGoJS/node_modules/@aptos-labs/ts-sdk/src/transactions/management

Просмотр файла: transactionWorker.ts

/* eslint-disable no-await-in-loop */

import EventEmitter from "eventemitter3";
import { AptosConfig } from "../../api/aptosConfig";
import { Account } from "../../account";
import { waitForTransaction } from "../../internal/transaction";
import { generateTransaction, signAndSubmitTransaction } from "../../internal/transactionSubmission";
import { PendingTransactionResponse, TransactionResponse } from "../../types";
import { InputGenerateTransactionOptions, InputGenerateTransactionPayloadData } from "../types";
import { AccountSequenceNumber } from "./accountSequenceNumber";
import { AsyncQueue, AsyncQueueCancelledError } from "./asyncQueue";
import { SimpleTransaction } from "../instances/simpleTransaction";

export const promiseFulfilledStatus = "fulfilled";

/**
 * Events emitted by the transaction worker during its operation, allowing the dapp to respond to various transaction states.
 */
export enum TransactionWorkerEventsEnum {
  // fired after a transaction gets sent to the chain
  TransactionSent = "transactionSent",
  // fired if there is an error sending the transaction to the chain
  TransactionSendFailed = "transactionSendFailed",
  // fired when a single transaction has executed successfully
  TransactionExecuted = "transactionExecuted",
  // fired if a single transaction fails in execution
  TransactionExecutionFailed = "transactionExecutionFailed",
  // fired when the worker has finished its job / when the queue has been emptied
  ExecutionFinish = "executionFinish",
}

/**
 * Defines the events emitted by the transaction worker during various stages of transaction processing. *
 * @event transactionSent - Emitted when a transaction is successfully sent.
 * @event transactionSendFailed - Emitted when sending a transaction fails.
 * @event transactionExecuted - Emitted when a transaction is successfully executed.
 * @event transactionExecutionFailed - Emitted when executing a transaction fails.
 * @event executionFinish - Emitted when the execution process is finished.
 */
export interface TransactionWorkerEvents {
  transactionSent: (data: SuccessEventData) => void;
  transactionSendFailed: (data: FailureEventData) => void;
  transactionExecuted: (data: SuccessEventData) => void;
  transactionExecutionFailed: (data: FailureEventData) => void;
  executionFinish: (data: ExecutionFinishEventData) => void;
}

/**
 * The payload for when the worker has finished its job.
 */
export type ExecutionFinishEventData = {
  message: string;
};

/**
 * The payload for a success event.
 */
export type SuccessEventData = {
  message: string;
  transactionHash: string;
};

/**
 * The payload for a failure event.
 */
export type FailureEventData = {
  message: string;
  error: string;
};

/**
 * TransactionWorker provides a simple framework for receiving payloads to be processed.
 *
 * Once one `start()` the process and pushes a new transaction, the worker acquires
 * the current account's next sequence number (by using the AccountSequenceNumber class),
 * generates a signed transaction and pushes an async submission process into the `outstandingTransactions` queue.
 * At the same time, the worker processes transactions by reading the `outstandingTransactions` queue
 * and submits the next transaction to chain, it
 * 1) waits for resolution of the submission process or get pre-execution validation error
 * and 2) waits for the resolution of the execution process or get an execution error.
 * The worker fires events for any submission and/or execution success and/or failure.
 */
export class TransactionWorker extends EventEmitter<TransactionWorkerEvents> {
  readonly aptosConfig: AptosConfig;

  readonly account: Account;

  // current account sequence number
  // TODO: Rename Sequnce -> Sequence
  readonly accountSequnceNumber: AccountSequenceNumber;

  readonly taskQueue: AsyncQueue<() => Promise<void>> = new AsyncQueue<() => Promise<void>>();

  // process has started
  started: boolean;

  /**
   * transactions payloads waiting to be generated and signed
   *
   * TODO support entry function payload from ABI builder
   */
  transactionsQueue = new AsyncQueue<
    [InputGenerateTransactionPayloadData, InputGenerateTransactionOptions | undefined]
  >();

  /**
   * signed transactions waiting to be submitted
   */
  outstandingTransactions = new AsyncQueue<[Promise<PendingTransactionResponse>, bigint]>();

  /**
   * transactions that have been submitted to chain
   */
  sentTransactions: Array<[string, bigint, any]> = [];

  /**
   * transactions that have been committed to chain
   */
  executedTransactions: Array<[string, bigint, any]> = [];

  /**
   * Initializes a new instance of the class, providing a framework for receiving payloads to be processed.
   *
   * @param aptosConfig - A configuration object for Aptos.
   * @param account - The account that will be used for sending transactions.
   * @param maxWaitTime - The maximum wait time to wait before re-syncing the sequence number to the current on-chain state,
   * default is 30 seconds.
   * @param maximumInFlight - The maximum number of transactions that can be submitted per account, default is 100.
   * @param sleepTime - The time to wait in seconds before re-evaluating if the maximum number of transactions are in flight,
   * default is 10 seconds.
   */
  constructor(
    aptosConfig: AptosConfig,
    account: Account,
    maxWaitTime: number = 30,
    maximumInFlight: number = 100,
    sleepTime: number = 10,
  ) {
    super();
    this.aptosConfig = aptosConfig;
    this.account = account;
    this.started = false;
    this.accountSequnceNumber = new AccountSequenceNumber(
      aptosConfig,
      account,
      maxWaitTime,
      maximumInFlight,
      sleepTime,
    );
  }

  /**
   * Submits the next transaction for the account by generating it with the current sequence number
   * and adding it to the outstanding transaction queue for processing.
   * This function continues to submit transactions until there are no more to process.
   *
   * @throws {Error} Throws an error if the transaction submission fails.
   */
  async submitNextTransaction() {
    try {
      /* eslint-disable no-constant-condition */
      while (true) {
        const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber();
        if (sequenceNumber === null) return;
        const transaction = await this.generateNextTransaction(this.account, sequenceNumber);
        if (!transaction) return;
        const pendingTransaction = signAndSubmitTransaction({
          aptosConfig: this.aptosConfig,
          transaction,
          signer: this.account,
        });
        await this.outstandingTransactions.enqueue([pendingTransaction, sequenceNumber]);
      }
    } catch (error: any) {
      if (error instanceof AsyncQueueCancelledError) {
        return;
      }
      throw new Error(`Submit transaction failed for ${this.account.accountAddress.toString()} with error ${error}`);
    }
  }

  /**
   * Reads the outstanding transaction queue and submits the transactions to the chain.
   * This function processes each transaction, checking their status and emitting events based on whether they were successfully
   * sent or failed.
   *
   * @throws {Error} Throws an error if the process execution fails.
   * @event TransactionWorkerEventsEnum.TransactionSent - Emitted when a transaction has been successfully committed to the chain.
   * @event TransactionWorkerEventsEnum.TransactionSendFailed - Emitted when a transaction fails to commit, along with the error
   * reason.
   * @event TransactionWorkerEventsEnum.ExecutionFinish - Emitted when the execution of transactions is complete.
   */
  async processTransactions() {
    try {
      /* eslint-disable no-constant-condition */
      while (true) {
        const awaitingTransactions = [];
        const sequenceNumbers = [];
        let [pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue();

        awaitingTransactions.push(pendingTransaction);
        sequenceNumbers.push(sequenceNumber);

        while (!this.outstandingTransactions.isEmpty()) {
          [pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue();

          awaitingTransactions.push(pendingTransaction);
          sequenceNumbers.push(sequenceNumber);
        }
        // send awaiting transactions to chain
        const sentTransactions = await Promise.allSettled(awaitingTransactions);
        for (let i = 0; i < sentTransactions.length && i < sequenceNumbers.length; i += 1) {
          // check sent transaction status
          const sentTransaction = sentTransactions[i];
          sequenceNumber = sequenceNumbers[i];
          if (sentTransaction.status === promiseFulfilledStatus) {
            // transaction sent to chain
            this.sentTransactions.push([sentTransaction.value.hash, sequenceNumber, null]);
            // check sent transaction execution
            this.emit(TransactionWorkerEventsEnum.TransactionSent, {
              message: `transaction hash ${sentTransaction.value.hash} has been committed to chain`,
              transactionHash: sentTransaction.value.hash,
            });
            await this.checkTransaction(sentTransaction, sequenceNumber);
          } else {
            // send transaction failed
            this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]);
            this.emit(TransactionWorkerEventsEnum.TransactionSendFailed, {
              message: `failed to commit transaction ${this.sentTransactions.length} with error ${sentTransaction.reason}`,
              error: sentTransaction.reason,
            });
          }
        }
        this.emit(TransactionWorkerEventsEnum.ExecutionFinish, {
          message: `execute ${sentTransactions.length} transactions finished`,
        });
      }
    } catch (error: any) {
      if (error instanceof AsyncQueueCancelledError) {
        return;
      }
      throw new Error(`Process execution failed for ${this.account.accountAddress.toString()} with error ${error}`);
    }
  }

  /**
   * Once a transaction has been sent to the chain, this function checks for its execution status.
   * @param sentTransaction - The transaction that was sent to the chain and is now waiting to be executed.
   * @param sequenceNumber - The account's sequence number that was sent with the transaction.
   */
  async checkTransaction(sentTransaction: PromiseFulfilledResult<PendingTransactionResponse>, sequenceNumber: bigint) {
    try {
      const waitFor: Array<Promise<TransactionResponse>> = [];
      waitFor.push(waitForTransaction({ aptosConfig: this.aptosConfig, transactionHash: sentTransaction.value.hash }));
      const sentTransactions = await Promise.allSettled(waitFor);

      for (let i = 0; i < sentTransactions.length; i += 1) {
        const executedTransaction = sentTransactions[i];
        if (executedTransaction.status === promiseFulfilledStatus) {
          // transaction executed to chain
          this.executedTransactions.push([executedTransaction.value.hash, sequenceNumber, null]);
          this.emit(TransactionWorkerEventsEnum.TransactionExecuted, {
            message: `transaction hash ${executedTransaction.value.hash} has been executed on chain`,
            transactionHash: sentTransaction.value.hash,
          });
        } else {
          // transaction execution failed
          this.executedTransactions.push([executedTransaction.status, sequenceNumber, executedTransaction.reason]);
          this.emit(TransactionWorkerEventsEnum.TransactionExecutionFailed, {
            message: `failed to execute transaction ${this.executedTransactions.length} with error ${executedTransaction.reason}`,
            error: executedTransaction.reason,
          });
        }
      }
    } catch (error: any) {
      throw new Error(`Check transaction failed for ${this.account.accountAddress.toString()} with error ${error}`);
    }
  }

  /**
   * Pushes a transaction to the transactions queue for processing.
   *
   * @param transactionData - The transaction payload containing necessary details.
   * @param transactionData.abi - For all entry function payloads, the ABI to skip remote ABI lookups.
   * @param options - Optional parameters for transaction configuration.
   * @param options.maxGasAmount - Maximum gas amount for the transaction.
   * @param options.gasUnitPrice - Gas unit price for the transaction.
   * @param options.expireTimestamp - Expiration timestamp on the transaction.
   * @param options.accountSequenceNumber - The sequence number for the transaction.
   */
  async push(
    transactionData: InputGenerateTransactionPayloadData,
    options?: InputGenerateTransactionOptions,
  ): Promise<void> {
    this.transactionsQueue.enqueue([transactionData, options]);
  }

  /**
   * Generates a signed transaction that can be submitted to the chain.
   *
   * @param account - An Aptos account used as the sender of the transaction.
   * @param sequenceNumber - A sequence number the transaction will be generated with.
   * @returns A signed transaction object or undefined if the transaction queue is empty.
   */
  async generateNextTransaction(account: Account, sequenceNumber: bigint): Promise<SimpleTransaction | undefined> {
    if (this.transactionsQueue.isEmpty()) return undefined;
    const [transactionData, options] = await this.transactionsQueue.dequeue();
    return generateTransaction({
      aptosConfig: this.aptosConfig,
      sender: account.accountAddress,
      data: transactionData,
      options: { ...options, accountSequenceNumber: sequenceNumber },
    });
  }

  /**
   * Starts transaction submission and processing by executing tasks from the queue until it is cancelled.
   *
   * @throws {Error} Throws an error if unable to start transaction batching.
   */
  async run() {
    try {
      while (!this.taskQueue.isCancelled()) {
        const task = await this.taskQueue.dequeue();
        await task();
      }
    } catch (error: any) {
      throw new Error(`Unable to start transaction batching: ${error}`);
    }
  }

  /**
   * Starts the transaction management process.
   *
   * @throws {Error} Throws an error if the worker has already started.
   */
  start() {
    if (this.started) {
      throw new Error("worker has already started");
    }
    this.started = true;
    this.taskQueue.enqueue(() => this.submitNextTransaction());
    this.taskQueue.enqueue(() => this.processTransactions());
    this.run();
  }

  /**
   * Stops the transaction management process.
   *
   * @throws {Error} Throws an error if the worker has already stopped.
   */
  stop() {
    if (this.taskQueue.isCancelled()) {
      throw new Error("worker has already stopped");
    }
    this.started = false;
    this.taskQueue.cancel();
  }
}

Выполнить команду


Для локальной разработки. Не используйте в интернете!