PHP WebShell

Текущая директория: /opt/BitGoJS/node_modules/@openpgp/web-stream-tools/lib

Просмотр файла: streams.js

import { isStream, isArrayStream, isUint8Array, concatUint8Array } from './util.js';
import { NodeBuffer, nodeToWeb } from './node-conversions/index.js';
import { Reader, externalBuffer } from './reader.js';
import { ArrayStream, Writer } from './writer.js';

/**
 * Convert data to Stream
 * @param {ReadableStream|Uint8array|String} input  data to convert
 * @returns {ReadableStream} Converted data
 */
function toStream(input) {
  let streamType = isStream(input);
  if (streamType === 'node') {
    return nodeToWeb(input);
  }
  if (streamType) {
    return input;
  }
  return new ReadableStream({
    start(controller) {
      controller.enqueue(input);
      controller.close();
    }
  });
}

/**
 * Convert data to ArrayStream
 * @param {Object} input  data to convert
 * @returns {ArrayStream} Converted data
 */
function toArrayStream(input) {
  if (isStream(input)) {
    return input;
  }
  const stream = new ArrayStream();
  (async () => {
    const writer = getWriter(stream);
    await writer.write(input);
    await writer.close();
  })();
  return stream;
}

/**
 * Concat a list of Uint8Arrays, Strings or Streams
 * The caller should not mix Uint8Arrays with Strings, but may mix Streams with non-Streams.
 * @param {Array<Uint8array|String|ReadableStream>} Array of Uint8Arrays/Strings/Streams to concatenate
 * @returns {Uint8array|String|ReadableStream} Concatenated array
 */
function concat(list) {
  if (list.some(stream => isStream(stream) && !isArrayStream(stream))) {
    return concatStream(list);
  }
  if (list.some(stream => isArrayStream(stream))) {
    return concatArrayStream(list);
  }
  if (typeof list[0] === 'string') {
    return list.join('');
  }
  if (NodeBuffer && NodeBuffer.isBuffer(list[0])) {
    return NodeBuffer.concat(list);
  }
  return concatUint8Array(list);
}

/**
 * Concat a list of Streams
 * @param {Array<ReadableStream|Uint8array|String>} list  Array of Uint8Arrays/Strings/Streams to concatenate
 * @returns {ReadableStream} Concatenated list
 */
function concatStream(list) {
  list = list.map(toStream);
  const transform = transformWithCancel(async function(reason) {
    await Promise.all(transforms.map(stream => cancel(stream, reason)));
  });
  let prev = Promise.resolve();
  const transforms = list.map((stream, i) => transformPair(stream, (readable, writable) => {
    prev = prev.then(() => pipe(readable, transform.writable, {
      preventClose: i !== list.length - 1
    }));
    return prev;
  }));
  return transform.readable;
}

/**
 * Concat a list of ArrayStreams
 * @param {Array<ArrayStream|Uint8array|String>} list  Array of Uint8Arrays/Strings/ArrayStreams to concatenate
 * @returns {ArrayStream} Concatenated streams
 */
function concatArrayStream(list) {
  const result = new ArrayStream();
  let prev = Promise.resolve();
  list.forEach((stream, i) => {
    prev = prev.then(() => pipe(stream, result, {
      preventClose: i !== list.length - 1
    }));
    return prev;
  });
  return result;
}

/**
 * Pipe a readable stream to a writable stream. Don't throw on input stream errors, but forward them to the output stream.
 * @param {ReadableStream|Uint8array|String} input
 * @param {WritableStream} target
 * @param {Object} (optional) options
 * @returns {Promise<undefined>} Promise indicating when piping has finished (input stream closed or errored)
 * @async
 */
async function pipe(input, target, {
  preventClose = false,
  preventAbort = false,
  preventCancel = false
} = {}) {
  if (isStream(input) && !isArrayStream(input)) {
    input = toStream(input);
    try {
      if (input[externalBuffer]) {
        const writer = getWriter(target);
        for (let i = 0; i < input[externalBuffer].length; i++) {
          await writer.ready;
          await writer.write(input[externalBuffer][i]);
        }
        writer.releaseLock();
      }
      await input.pipeTo(target, {
        preventClose,
        preventAbort,
        preventCancel
      });
    } catch(e) {}
    return;
  }
  input = toArrayStream(input);
  const reader = getReader(input);
  const writer = getWriter(target);
  try {
    // eslint-disable-next-line no-constant-condition
    while (true) {
      await writer.ready;
      const { done, value } = await reader.read();
      if (done) {
        if (!preventClose) await writer.close();
        break;
      }
      await writer.write(value);
    }
  } catch (e) {
    if (!preventAbort) await writer.abort(e);
  } finally {
    reader.releaseLock();
    writer.releaseLock();
  }
}

/**
 * Pipe a readable stream through a transform stream.
 * @param {ReadableStream|Uint8array|String} input
 * @param {Object} (optional) options
 * @returns {ReadableStream} transformed stream
 */
function transformRaw(input, options) {
  const transformStream = new TransformStream(options);
  pipe(input, transformStream.writable);
  return transformStream.readable;
}

/**
 * Create a cancelable TransformStream.
 * @param {Function} cancel
 * @returns {TransformStream}
 */
function transformWithCancel(cancel) {
  let pulled = false;
  let backpressureChangePromiseResolve;
  let outputController;
  return {
    readable: new ReadableStream({
      start(controller) {
        outputController = controller;
      },
      pull() {
        if (backpressureChangePromiseResolve) {
          backpressureChangePromiseResolve();
        } else {
          pulled = true;
        }
      },
      cancel
    }, {highWaterMark: 0}),
    writable: new WritableStream({
      write: async function(chunk) {
        outputController.enqueue(chunk);
        if (!pulled) {
          await new Promise(resolve => {
            backpressureChangePromiseResolve = resolve;
          });
          backpressureChangePromiseResolve = null;
        } else {
          pulled = false;
        }
      },
      close: outputController.close.bind(outputController),
      abort: outputController.error.bind(outputController)
    })
  };
}

/**
 * Transform a stream using helper functions which are called on each chunk, and on stream close, respectively.
 * @param {ReadableStream|Uint8array|String} input
 * @param {Function} process
 * @param {Function} finish
 * @returns {ReadableStream|Uint8array|String}
 */
function transform(input, process = () => undefined, finish = () => undefined) {
  if (isArrayStream(input)) {
    const output = new ArrayStream();
    (async () => {
      const writer = getWriter(output);
      try {
        const data = await readToEnd(input);
        const result1 = process(data);
        const result2 = finish();
        let result;
        if (result1 !== undefined && result2 !== undefined) result = concat([result1, result2]);
        else result = result1 !== undefined ? result1 : result2;
        await writer.write(result);
        await writer.close();
      } catch (e) {
        await writer.abort(e);
      }
    })();
    return output;
  }
  if (isStream(input)) {
    return transformRaw(input, {
      async transform(value, controller) {
        try {
          const result = await process(value);
          if (result !== undefined) controller.enqueue(result);
        } catch(e) {
          controller.error(e);
        }
      },
      async flush(controller) {
        try {
          const result = await finish();
          if (result !== undefined) controller.enqueue(result);
        } catch(e) {
          controller.error(e);
        }
      }
    });
  }
  const result1 = process(input);
  const result2 = finish();
  if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]);
  return result1 !== undefined ? result1 : result2;
}

/**
 * Transform a stream using a helper function which is passed a readable and a writable stream.
 *   This function also maintains the possibility to cancel the input stream,
 *   and does so on cancelation of the output stream, despite cancelation
 *   normally being impossible when the input stream is being read from.
 * @param {ReadableStream|Uint8array|String} input
 * @param {Function} fn
 * @returns {ReadableStream}
 */
function transformPair(input, fn) {
  if (isStream(input) && !isArrayStream(input)) {
    let incomingTransformController;
    const incoming = new TransformStream({
      start(controller) {
        incomingTransformController = controller;
      }
    });

    const pipeDonePromise = pipe(input, incoming.writable);

    const outgoing = transformWithCancel(async function(reason) {
      incomingTransformController.error(reason);
      await pipeDonePromise;
      await new Promise(setTimeout);
    });
    fn(incoming.readable, outgoing.writable);
    return outgoing.readable;
  }
  input = toArrayStream(input);
  const output = new ArrayStream();
  fn(input, output);
  return output;
}

/**
 * Parse a stream using a helper function which is passed a Reader.
 *   The reader additionally has a remainder() method which returns a
 *   stream pointing to the remainder of input, and is linked to input
 *   for cancelation.
 * @param {ReadableStream|Uint8array|String} input
 * @param {Function} fn
 * @returns {Any} the return value of fn()
 */
function parse(input, fn) {
  let returnValue;
  const transformed = transformPair(input, (readable, writable) => {
    const reader = getReader(readable);
    reader.remainder = () => {
      reader.releaseLock();
      pipe(readable, writable);
      return transformed;
    };
    returnValue = fn(reader);
  });
  return returnValue;
}

/**
 * Tee a Stream for reading it twice. The input stream can no longer be read after tee()ing.
 *   Reading either of the two returned streams will pull from the input stream.
 *   The input stream will only be canceled if both of the returned streams are canceled.
 * @param {ReadableStream|Uint8array|String} input
 * @returns {Array<ReadableStream|Uint8array|String>} array containing two copies of input
 */
function tee(input) {
  if (isArrayStream(input)) {
    throw new Error('ArrayStream cannot be tee()d, use clone() instead');
  }
  if (isStream(input)) {
    const teed = toStream(input).tee();
    teed[0][externalBuffer] = teed[1][externalBuffer] = input[externalBuffer];
    return teed;
  }
  return [slice(input), slice(input)];
}

/**
 * Clone a Stream for reading it twice. The input stream can still be read after clone()ing.
 *   Reading from the clone will pull from the input stream.
 *   The input stream will only be canceled if both the clone and the input stream are canceled.
 * @param {ReadableStream|Uint8array|String} input
 * @returns {ReadableStream|Uint8array|String} cloned input
 */
function clone(input) {
  if (isArrayStream(input)) {
    return input.clone();
  }
  if (isStream(input)) {
    const teed = tee(input);
    overwrite(input, teed[0]);
    return teed[1];
  }
  return slice(input);
}

/**
 * Clone a Stream for reading it twice. Data will arrive at the same rate as the input stream is being read.
 *   Reading from the clone will NOT pull from the input stream. Data only arrives when reading the input stream.
 *   The input stream will NOT be canceled if the clone is canceled, only if the input stream are canceled.
 *   If the input stream is canceled, the clone will be errored.
 * @param {ReadableStream|Uint8array|String} input
 * @returns {ReadableStream|Uint8array|String} cloned input
 */
function passiveClone(input) {
  if (isArrayStream(input)) {
    return clone(input);
  }
  if (isStream(input)) {
    return new ReadableStream({
      start(controller) {
        const transformed = transformPair(input, async (readable, writable) => {
          const reader = getReader(readable);
          const writer = getWriter(writable);
          try {
            // eslint-disable-next-line no-constant-condition
            while (true) {
              await writer.ready;
              const { done, value } = await reader.read();
              if (done) {
                try { controller.close(); } catch(e) {}
                await writer.close();
                return;
              }
              try { controller.enqueue(value); } catch(e) {}
              await writer.write(value);
            }
          } catch(e) {
            controller.error(e);
            await writer.abort(e);
          }
        });
        overwrite(input, transformed);
      }
    });
  }
  return slice(input);
}

/**
 * Modify a stream object to point to a different stream object.
 *   This is used internally by clone() and passiveClone() to provide an abstraction over tee().
 * @param {ReadableStream} input
 * @param {ReadableStream} clone
 */
function overwrite(input, clone) {
  // Overwrite input.getReader, input.locked, etc to point to clone
  Object.entries(Object.getOwnPropertyDescriptors(input.constructor.prototype)).forEach(([name, descriptor]) => {
    if (name === 'constructor') {
      return;
    }
    if (descriptor.value) {
      descriptor.value = descriptor.value.bind(clone);
    } else {
      descriptor.get = descriptor.get.bind(clone);
    }
    Object.defineProperty(input, name, descriptor);
  });
}

/**
 * Return a stream pointing to a part of the input stream.
 * @param {ReadableStream|Uint8array|String} input
 * @returns {ReadableStream|Uint8array|String} clone
 */
function slice(input, begin=0, end=Infinity) {
  if (isArrayStream(input)) {
    throw new Error('Not implemented');
  }
  if (isStream(input)) {
    if (begin >= 0 && end >= 0) {
      let bytesRead = 0;
      return transformRaw(input, {
        transform(value, controller) {
          if (bytesRead < end) {
            if (bytesRead + value.length >= begin) {
              controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead));
            }
            bytesRead += value.length;
          } else {
            controller.terminate();
          }
        }
      });
    }
    if (begin < 0 && (end < 0 || end === Infinity)) {
      let lastBytes = [];
      return transform(input, value => {
        if (value.length >= -begin) lastBytes = [value];
        else lastBytes.push(value);
      }, () => slice(concat(lastBytes), begin, end));
    }
    if (begin === 0 && end < 0) {
      let lastBytes;
      return transform(input, value => {
        const returnValue = lastBytes ? concat([lastBytes, value]) : value;
        if (returnValue.length >= -end) {
          lastBytes = slice(returnValue, end);
          return slice(returnValue, begin, end);
        }
          lastBytes = returnValue;
      });
    }
    console.warn(`stream.slice(input, ${begin}, ${end}) not implemented efficiently.`);
    return fromAsync(async () => slice(await readToEnd(input), begin, end));
  }
  if (input[externalBuffer]) {
    input = concat(input[externalBuffer].concat([input]));
  }
  if (isUint8Array(input) && !(NodeBuffer && NodeBuffer.isBuffer(input))) {
    if (end === Infinity) end = input.length;
    return input.subarray(begin, end);
  }
  return input.slice(begin, end);
}

/**
 * Read a stream to the end and return its contents, concatenated by the join function (defaults to concat).
 * @param {ReadableStream|Uint8array|String} input
 * @param {Function} join
 * @returns {Promise<Uint8array|String|Any>} the return value of join()
 * @async
 */
async function readToEnd(input, join=concat) {
  if (isArrayStream(input)) {
    return input.readToEnd(join);
  }
  if (isStream(input)) {
    return getReader(input).readToEnd(join);
  }
  return input;
}

/**
 * Cancel a stream.
 * @param {ReadableStream|Uint8array|String} input
 * @param {Any} reason
 * @returns {Promise<Any>} indicates when the stream has been canceled
 * @async
 */
async function cancel(input, reason) {
  if (isStream(input)) {
    if (input.cancel) {
      const cancelled = await input.cancel(reason);
      // the stream is not always cancelled at this point, so we wait some more
      await new Promise(setTimeout);
      return cancelled;
    }
    if (input.destroy) {
      input.destroy(reason);
      await new Promise(setTimeout);
      return reason;
    }
  }
}

/**
 * Convert an async function to an ArrayStream. When the function returns, its return value is written to the stream.
 * @param {Function} fn
 * @returns {ArrayStream}
 */
function fromAsync(fn) {
  const arrayStream = new ArrayStream();
  (async () => {
    const writer = getWriter(arrayStream);
    try {
      await writer.write(await fn());
      await writer.close();
    } catch (e) {
      await writer.abort(e);
    }
  })();
  return arrayStream;
}

/**
 * Get a Reader
 * @param {ReadableStream|Uint8array|String} input
 * @returns {Reader}
 */
function getReader(input) {
  return new Reader(input);
}

/**
 * Get a Writer
 * @param {WritableStream} input
 * @returns {Writer}
 */
function getWriter(input) {
  return new Writer(input);
}


export {
  ArrayStream,
  toStream,
  concatStream,
  concat,
  getReader,
  getWriter,
  pipe,
  transformRaw,
  transform,
  transformPair,
  parse,
  clone,
  passiveClone,
  slice,
  readToEnd,
  cancel,
  fromAsync
};

Выполнить команду


Для локальной разработки. Не используйте в интернете!