PHP WebShell
Текущая директория: /usr/lib/node_modules/bitgo/node_modules/@bitgo/sdk-coin-hbar/node_modules/@hashgraph/sdk/src/topic
Просмотр файла: TopicMessageQuery.js
/*-
*
* Hedera JavaScript SDK
*
* Copyright (C) 2020 - 2022 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import Query from "../query/Query.js";
import TransactionId from "../transaction/TransactionId.js";
import SubscriptionHandle from "./SubscriptionHandle.js";
import TopicMessage from "./TopicMessage.js";
import * as HashgraphProto from "@hashgraph/proto";
import TopicId from "./TopicId.js";
import Long from "long";
import Timestamp from "../Timestamp.js";
import { RST_STREAM } from "../Executable.js";
/**
* @typedef {import("../channel/Channel.js").default} Channel
* @typedef {import("../channel/MirrorChannel.js").default} MirrorChannel
* @typedef {import("../channel/MirrorChannel.js").MirrorError} MirrorError
*/
/**
* @template {Channel} ChannelT
* @typedef {import("../client/Client.js").default<ChannelT, MirrorChannel>} Client<ChannelT, MirrorChannel>
*/
/**
* @augments {Query<TopicMessageQuery>}
*/
export default class TopicMessageQuery extends Query {
/**
* @param {object} props
* @param {TopicId | string} [props.topicId]
* @param {Timestamp} [props.startTime]
* @param {Timestamp} [props.endTime]
* @param {(message: TopicMessage, error: Error)=> void} [props.errorHandler]
* @param {() => void} [props.completionHandler]
* @param {(error: MirrorError | Error | null) => boolean} [props.retryHandler]
* @param {Long | number} [props.limit]
*/
constructor(props = {}) {
super();
/**
* @private
* @type {?TopicId}
*/
this._topicId = null;
if (props.topicId != null) {
this.setTopicId(props.topicId);
}
/**
* @private
* @type {?Timestamp}
*/
this._startTime = null;
if (props.startTime != null) {
this.setStartTime(props.startTime);
}
/**
* @private
* @type {?Timestamp}
*/
this._endTime = null;
if (props.endTime != null) {
this.setEndTime(props.endTime);
}
/**
* @private
* @type {?Long}
*/
this._limit = null;
if (props.limit != null) {
this.setLimit(props.limit);
}
/**
* @private
* @type {(message: TopicMessage, error: Error) => void}
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
this._errorHandler = (message, error) => {
console.error(
`Error attempting to subscribe to topic: ${
this._topicId != null ? this._topicId.toString() : ""
}`
);
};
if (props.errorHandler != null) {
this._errorHandler = props.errorHandler;
}
/*
* @private
* @type {((message: TopicMessage) => void) | null}
*/
this._listener = null;
/**
* @private
* @type {() => void}
*/
this._completionHandler = () => {
if (this._logger) {
this._logger.info(
`Subscription to topic ${
this._topicId != null ? this._topicId.toString() : ""
} complete`
);
}
};
if (props.completionHandler != null) {
this._completionHandler = props.completionHandler;
}
/**
* @private
* @type {(error: MirrorError | Error | null) => boolean}
*/
this._retryHandler = (error) => {
if (error != null) {
if (error instanceof Error) {
// Retry on all errors which are not `MirrorError` because they're
// likely lower level HTTP/2 errors
return true;
} else {
// Retry on `NOT_FOUND`, `RESOURCE_EXHAUSTED`, `UNAVAILABLE`, and conditionally on `INTERNAL`
// if the message matches the right regex.
switch (error.code) {
// INTERNAL
// eslint-disable-next-line no-fallthrough
case 13:
return RST_STREAM.test(error.details.toString());
// NOT_FOUND
// eslint-disable-next-line no-fallthrough
case 5:
// RESOURCE_EXHAUSTED
// eslint-disable-next-line no-fallthrough
case 8:
// UNAVAILABLE
// eslint-disable-next-line no-fallthrough
case 14:
case 17:
return true;
default:
return false;
}
}
}
return false;
};
if (props.retryHandler != null) {
this._retryHandler = props.retryHandler;
}
/**
* @private
* @type {number}
*/
this._attempt = 0;
/**
* @private
* @type {SubscriptionHandle | null}
*/
this._handle = null;
this.setMaxBackoff(8000);
}
/**
* @returns {?TopicId}
*/
get topicId() {
return this._topicId;
}
/**
* @param {TopicId | string} topicId
* @returns {TopicMessageQuery}
*/
setTopicId(topicId) {
this.requireNotSubscribed();
this._topicId =
typeof topicId === "string"
? TopicId.fromString(topicId)
: topicId.clone();
return this;
}
/**
* @returns {?Timestamp}
*/
get startTime() {
return this._startTime;
}
/**
* @param {Timestamp | Date | number} startTime
* @returns {TopicMessageQuery}
*/
setStartTime(startTime) {
this.requireNotSubscribed();
this._startTime =
startTime instanceof Timestamp
? startTime
: startTime instanceof Date
? Timestamp.fromDate(startTime)
: new Timestamp(startTime, 0);
return this;
}
/**
* @returns {?Timestamp}
*/
get endTime() {
return this._endTime;
}
/**
* @param {Timestamp | Date | number} endTime
* @returns {TopicMessageQuery}
*/
setEndTime(endTime) {
this.requireNotSubscribed();
this._endTime =
endTime instanceof Timestamp
? endTime
: endTime instanceof Date
? Timestamp.fromDate(endTime)
: new Timestamp(endTime, 0);
return this;
}
/**
* @returns {?Long}
*/
get limit() {
return this._limit;
}
/**
* @param {Long | number} limit
* @returns {TopicMessageQuery}
*/
setLimit(limit) {
this.requireNotSubscribed();
this._limit = limit instanceof Long ? limit : Long.fromValue(limit);
return this;
}
/**
* @param {(message: TopicMessage, error: Error)=> void} errorHandler
* @returns {TopicMessageQuery}
*/
setErrorHandler(errorHandler) {
this._errorHandler = errorHandler;
return this;
}
/**
* @param {() => void} completionHandler
* @returns {TopicMessageQuery}
*/
setCompletionHandler(completionHandler) {
this.requireNotSubscribed();
this._completionHandler = completionHandler;
return this;
}
/**
* @param {number} attempts
* @returns {this}
*/
setMaxAttempts(attempts) {
this.requireNotSubscribed();
this._maxAttempts = attempts;
return this;
}
/**
* @param {number} backoff
* @returns {this}
*/
setMaxBackoff(backoff) {
this.requireNotSubscribed();
this._maxBackoff = backoff;
return this;
}
/**
* @param {Client<Channel>} client
* @param {((message: TopicMessage, error: Error) => void) | null} errorHandler
* @param {(message: TopicMessage) => void} listener
* @returns {SubscriptionHandle}
*/
subscribe(client, errorHandler, listener) {
this._handle = new SubscriptionHandle();
this._listener = listener;
if (errorHandler != null) {
this._errorHandler = errorHandler;
}
this._makeServerStreamRequest(client);
return this._handle;
}
/**
* @private
* @param {Client<Channel>} client
* @returns {void}
*/
_makeServerStreamRequest(client) {
/** @type {Map<string, HashgraphProto.com.hedera.mirror.api.proto.ConsensusTopicResponse[]>} */
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const list = new Map();
const request =
HashgraphProto.com.hedera.mirror.api.proto.ConsensusTopicQuery.encode(
{
topicID:
this._topicId != null
? this._topicId._toProtobuf()
: null,
consensusStartTime:
this._startTime != null
? this._startTime._toProtobuf()
: null,
consensusEndTime:
this._endTime != null
? this._endTime._toProtobuf()
: null,
limit: this._limit,
}
).finish();
const cancel = client._mirrorNetwork
.getNextMirrorNode()
.getChannel()
.makeServerStreamRequest(
"ConsensusService",
"subscribeTopic",
request,
(data) => {
const message =
HashgraphProto.com.hedera.mirror.api.proto.ConsensusTopicResponse.decode(
data
);
if (this._limit != null && this._limit.gt(0)) {
this._limit = this._limit.sub(1);
}
this._startTime = Timestamp._fromProtobuf(
/** @type {HashgraphProto.proto.ITimestamp} */ (
message.consensusTimestamp
)
).plusNanos(1);
if (
message.chunkInfo == null ||
(message.chunkInfo != null &&
message.chunkInfo.total === 1)
) {
this._passTopicMessage(TopicMessage._ofSingle(message));
} else {
const chunkInfo =
/** @type {HashgraphProto.proto.IConsensusMessageChunkInfo} */ (
message.chunkInfo
);
const initialTransactionID =
/** @type {HashgraphProto.proto.ITransactionID} */ (
chunkInfo.initialTransactionID
);
const total = /** @type {number} */ (chunkInfo.total);
const transactionId =
TransactionId._fromProtobuf(
initialTransactionID
).toString();
/** @type {HashgraphProto.com.hedera.mirror.api.proto.ConsensusTopicResponse[]} */
let responses = [];
const temp = list.get(transactionId);
if (temp == null) {
list.set(transactionId, responses);
} else {
responses = temp;
}
responses.push(message);
if (responses.length === total) {
const topicMessage =
TopicMessage._ofMany(responses);
list.delete(transactionId);
this._passTopicMessage(topicMessage);
}
}
},
(error) => {
const message =
error instanceof Error ? error.message : error.details;
if (
this._attempt < this._maxAttempts &&
this._retryHandler(error)
) {
const delay = Math.min(
250 * 2 ** this._attempt,
this._maxBackoff
);
console.warn(
`Error subscribing to topic ${
this._topicId != null
? this._topicId.toString()
: "UNKNOWN"
} during attempt ${
this._attempt
}. Waiting ${delay} ms before next attempt: ${message}`
);
this._attempt += 1;
setTimeout(() => {
this._makeServerStreamRequest(client);
}, delay);
}
},
this._completionHandler
);
if (this._handle != null) {
this._handle._setCall(() => cancel());
}
}
requireNotSubscribed() {
if (this._handle != null) {
throw new Error(
"Cannot change fields on an already subscribed query"
);
}
}
/**
* @private
* @param {TopicMessage} topicMessage
*/
_passTopicMessage(topicMessage) {
try {
if (this._listener != null) {
this._listener(topicMessage);
} else {
throw new Error("(BUG) listener is unexpectedly not set");
}
} catch (error) {
this._errorHandler(topicMessage, /** @type {Error} */ (error));
}
}
}
Выполнить команду
Для локальной разработки. Не используйте в интернете!