PHP WebShell

Текущая директория: /usr/lib/node_modules/bitgo/node_modules/wonka/src

Просмотр файла: Wonka_operators.test.ts

import * as deriving from './helpers/Wonka_deriving';
import * as sources from './Wonka_sources.gen';
import * as sinks from './Wonka_sinks.gen';
import * as operators from './Wonka_operators.gen';
import * as web from './web/WonkaJs.gen';
import * as types from './Wonka_types.gen';

/* This tests a noop operator for passive Pull talkback signals.
  A Pull will be sent from the sink upwards and should pass through
  the operator until the source receives it, which then pushes a
  value down. */
const passesPassivePull = (
  operator: types.operatorT<any, any>,
  output: any = 0
) =>
  it('responds to Pull talkback signals (spec)', () => {
    let talkback = null;
    let push = 0;
    const values = [];

    const source: types.sourceT<any> = sink => {
      sink(deriving.start(tb => {
        if (!push && tb === deriving.pull) {
          push++;
          sink(deriving.push(0));
        }
      }));
    };

    const sink: types.sinkT<any> = signal => {
      expect(deriving.isEnd(signal)).toBeFalsy();
      if (deriving.isPush(signal)) {
        values.push(deriving.unboxPush(signal));
      } else if (deriving.isStart(signal)) {
        talkback = deriving.unboxStart(signal);
      }
    };

    operator(source)(sink);
    // The Start signal should always come in immediately
    expect(talkback).not.toBe(null);
    // No Push signals should be issued initially
    expect(values).toEqual([]);

    // When pulling a value we expect an immediate response
    talkback(deriving.pull);
    jest.runAllTimers();
    expect(values).toEqual([output]);
  });

/* This tests a noop operator for regular, active Push signals.
  A Push will be sent downwards from the source, through the
  operator to the sink. Pull events should be let through from
  the sink after every Push event. */
const passesActivePush = (
  operator: types.operatorT<any, any>,
  result: any = 0
) =>
  it('responds to eager Push signals (spec)', () => {
    const values = [];
    let talkback = null;
    let push = null;
    let pulls = 0;

    const source: types.sourceT<any> = sink => {
      push = (value: any) => sink(deriving.push(value));
      sink(deriving.start(tb => {
        if (tb === deriving.pull)
          pulls++;
      }));
    };

    const sink: types.sinkT<any> = signal => {
      expect(deriving.isEnd(signal)).toBeFalsy();
      if (deriving.isStart(signal)) {
        talkback = deriving.unboxStart(signal);
      } else if (deriving.isPush(signal)) {
        values.push(deriving.unboxPush(signal));
        talkback(deriving.pull);
      }
    };

    operator(source)(sink);
    // No Pull signals should be issued initially
    expect(pulls).toBe(0);

    // When pushing a value we expect an immediate response
    push(0);
    jest.runAllTimers();
    expect(values).toEqual([result]);
    // Subsequently the Pull signal should have travelled upwards
    expect(pulls).toBe(1);
  });

/* This tests a noop operator for Close talkback signals from the sink.
  A Close signal will be sent, which should be forwarded to the source,
  which then ends the communication without sending an End signal. */
const passesSinkClose = (operator: types.operatorT<any, any>) =>
  it('responds to Close signals from sink (spec)', () => {
    let talkback = null;
    let closing = 0;

    const source: types.sourceT<any> = sink => {
      sink(deriving.start(tb => {
        if (tb === deriving.pull && !closing) {
          sink(deriving.push(0));
        } else if (tb === deriving.close) {
          closing++;
        }
      }));
    };

    const sink: types.sinkT<any> = signal => {
      expect(deriving.isEnd(signal)).toBeFalsy();
      if (deriving.isStart(signal)) {
        talkback = deriving.unboxStart(signal);
      } else if (deriving.isPush(signal)) {
        talkback(deriving.close);
      }
    };

    operator(source)(sink);

    // When pushing a value we expect an immediate close signal
    talkback(deriving.pull);
    jest.runAllTimers();
    expect(closing).toBe(1);
  });

/* This tests a noop operator for End signals from the source.
  A Push and End signal will be sent after the first Pull talkback
  signal from the sink, which shouldn't lead to any extra Close or Pull
  talkback signals. */
const passesSourceEnd = (
  operator: types.operatorT<any, any>,
  result: any = 0
) =>
  it('passes on immediate Push then End signals from source (spec)', () => {
    const signals = [];
    let talkback = null;
    let pulls = 0;
    let ending = 0;

    const source: types.sourceT<any> = sink => {
      sink(deriving.start(tb => {
        expect(tb).not.toBe(deriving.close);
        if (tb === deriving.pull) {
          pulls++;
          if (pulls === 1) {
            sink(deriving.push(0));
            sink(deriving.end());
          }
        }
      }));
    };

    const sink: types.sinkT<any> = signal => {
      if (deriving.isStart(signal)) {
        talkback = deriving.unboxStart(signal);
      } else {
        signals.push(signal);
        if (deriving.isEnd(signal)) ending++;
      }
    };

    operator(source)(sink);

    // When pushing a value we expect an immediate Push then End signal
    talkback(deriving.pull);
    jest.runAllTimers();
    expect(ending).toBe(1);
    expect(signals).toEqual([deriving.push(result), deriving.end()]);
    // Also no additional pull event should be created by the operator
    expect(pulls).toBe(1);
  });

/* This tests a noop operator for End signals from the source
  after the first pull in response to another.
  This is similar to passesSourceEnd but more well behaved since
  mergeMap/switchMap/concatMap are eager operators. */
const passesSourcePushThenEnd = (
  operator: types.operatorT<any, any>,
  result: any = 0
) =>
  it('passes on End signals from source (spec)', () => {
    const signals = [];
    let talkback = null;
    let pulls = 0;
    let ending = 0;

    const source: types.sourceT<any> = sink => {
      sink(deriving.start(tb => {
        expect(tb).not.toBe(deriving.close);
        if (tb === deriving.pull) {
          pulls++;
          if (pulls <= 2) { sink(deriving.push(0)); }
          else { sink(deriving.end()); }
        }
      }));
    };

    const sink: types.sinkT<any> = signal => {
      if (deriving.isStart(signal)) {
        talkback = deriving.unboxStart(signal);
      } else {
        signals.push(signal);
        if (deriving.isPush(signal)) talkback(deriving.pull);
        if (deriving.isEnd(signal)) ending++;
      }
    };

    operator(source)(sink);

    // When pushing a value we expect an immediate Push then End signal
    talkback(deriving.pull);
    jest.runAllTimers();
    expect(ending).toBe(1);
    expect(pulls).toBe(3);
    expect(signals).toEqual([
      deriving.push(result),
      deriving.push(result),
      deriving.end()
    ]);
  });

/* This tests a noop operator for Start signals from the source.
  When the operator's sink is started by the source it'll receive
  a Start event. As a response it should never send more than one
  Start signals to the sink. */
const passesSingleStart = (operator: types.operatorT<any, any>) =>
  it('sends a single Start event to the incoming sink (spec)', () => {
    let start = 0;

    const source: types.sourceT<any> = sink => {
      sink(deriving.start(() => {}));
    };

    const sink: types.sinkT<any> = signal => {
      if (deriving.isStart(signal)) start++;
    };

    // When starting the operator we expect a single start event on the sink
    operator(source)(sink);
    expect(start).toBe(1);
  });

/* This tests a noop operator for silence after End signals from the source.
  When the operator receives the End signal it shouldn't forward any other
  signals to the sink anymore.
  This isn't a strict requirement, but some operators should ensure that
  all sources are well behaved. This is particularly true for operators
  that either Close sources themselves or may operate on multiple sources. */
const passesStrictEnd = (operator: types.operatorT<any, any>) => {
  it('stops all signals after End has been received (spec: strict end)', () => {
    let pulls = 0;
    const signals = [];

    const source: types.sourceT<any> = sink => {
      sink(deriving.start(tb => {
        if (tb === deriving.pull) {
          pulls++;
          sink(deriving.end());
          sink(deriving.push(123));
        }
      }));
    };

    const sink: types.sinkT<any> = signal => {
      if (deriving.isStart(signal)) {
        deriving.unboxStart(signal)(deriving.pull);
      } else {
        signals.push(signal);
      }
    };

    operator(source)(sink);

    // The Push signal should've been dropped
    jest.runAllTimers();
    expect(signals).toEqual([deriving.end()]);
    expect(pulls).toBe(1);
  });

  it('stops all signals after Close has been received (spec: strict close)', () => {
    const signals = [];

    const source: types.sourceT<any> = sink => {
      sink(deriving.start(tb => {
        if (tb === deriving.close) {
          sink(deriving.push(123));
        }
      }));
    };

    const sink: types.sinkT<any> = signal => {
      if (deriving.isStart(signal)) {
        deriving.unboxStart(signal)(deriving.close);
      } else {
        signals.push(signal);
      }
    };

    operator(source)(sink);

    // The Push signal should've been dropped
    jest.runAllTimers();
    expect(signals).toEqual([]);
  });
};

/* This tests an immediately closing operator for End signals to
  the sink and Close signals to the source.
  When an operator closes immediately we expect to see a Close
  signal at the source and an End signal to the sink, since the
  closing operator is expected to end the entire chain. */
const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) =>
  it('closes the source and ends the sink correctly (spec: ending operator)', () => {
    let closing = 0;
    let ending = 0;

    const source: types.sourceT<any> = sink => {
      sink(deriving.start(tb => {
        // For some operator tests we do need to send a single value
        if (tb === deriving.pull)
          sink(deriving.push(null));
        if (tb === deriving.close)
          closing++;
      }));
    };

    const sink: types.sinkT<any> = signal => {
      if (deriving.isStart(signal)) {
        deriving.unboxStart(signal)(deriving.pull);
      } if (deriving.isEnd(signal)) {
        ending++;
      }
    };

    // We expect the operator to immediately end and close
    closingOperator(source)(sink);
    expect(closing).toBe(1);
    expect(ending).toBe(1);
  });

const passesAsyncSequence = (
  operator: types.operatorT<any, any>,
  result: any = 0
) =>
  it('passes an async push with an async end (spec)', () => {
    let hasPushed = false;
    const signals = [];

    const source: types.sourceT<any> = sink => {
      sink(deriving.start(tb => {
        if (tb === deriving.pull && !hasPushed) {
          hasPushed = true;
          setTimeout(() => sink(deriving.push(0)), 10);
          setTimeout(() => sink(deriving.end()), 20);
        }
      }));
    };

    const sink: types.sinkT<any> = signal => {
      if (deriving.isStart(signal)) {
        setTimeout(() => {
          deriving.unboxStart(signal)(deriving.pull);
        }, 5);
      } else {
        signals.push(signal);
      }
    };

    // We initially expect to see the push signal
    // Afterwards after all timers all other signals come in
    operator(source)(sink);
    expect(signals.length).toBe(0);
    jest.advanceTimersByTime(5);
    expect(hasPushed).toBeTruthy();
    jest.runAllTimers();

    expect(signals).toEqual([
      deriving.push(result),
      deriving.end()
    ]);
  });

beforeEach(() => {
  jest.useFakeTimers();
});

describe('combine', () => {
  const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source);

  passesPassivePull(noop, [0, 0]);
  passesActivePush(noop, [0, 0]);
  passesSinkClose(noop);
  passesSourceEnd(noop, [0, 0]);
  passesSingleStart(noop);
  passesStrictEnd(noop);

  it('emits the zipped values of two sources', () => {
    const { source: sourceA, next: nextA } = sources.makeSubject();
    const { source: sourceB, next: nextB } = sources.makeSubject();
    const fn = jest.fn();

    sinks.forEach(fn)(operators.combine(sourceA, sourceB));

    nextA(1);
    expect(fn).not.toHaveBeenCalled();
    nextB(2);
    expect(fn).toHaveBeenCalledWith([1, 2]);
  });
});

describe('buffer', () => {
  const valueThenNever: types.sourceT<any> = sink =>
    sink(deriving.start(tb => {
      if (tb === deriving.pull)
        sink(deriving.push(null));
    }));

  const noop = operators.buffer(valueThenNever);

  passesPassivePull(noop, [0]);
  passesActivePush(noop, [0]);
  passesSinkClose(noop);
  passesSourcePushThenEnd(noop, [0]);
  passesSingleStart(noop);
  passesStrictEnd(noop);

  it('emits batches of input values when a notifier emits', () => {
    const { source: notifier$, next: notify } = sources.makeSubject();
    const { source: input$, next } = sources.makeSubject();
    const fn = jest.fn();

    sinks.forEach(fn)(operators.buffer(notifier$)(input$));

    next(1);
    next(2);
    expect(fn).not.toHaveBeenCalled();

    notify(null);
    expect(fn).toHaveBeenCalledWith([1, 2]);

    next(3);
    notify(null);
    expect(fn).toHaveBeenCalledWith([3]);
  });
});

describe('concatMap', () => {
  const noop = operators.concatMap(x => sources.fromValue(x));
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourcePushThenEnd(noop);
  passesSingleStart(noop);
  passesStrictEnd(noop);
  passesAsyncSequence(noop);

  // This synchronous test for concatMap will behave the same as mergeMap & switchMap
  it('emits values from each flattened synchronous source', () => {
    const { source, next, complete } = sources.makeSubject<number>();
    const fn = jest.fn();

    operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);

    next(1);
    next(3);
    complete();

    expect(fn).toHaveBeenCalledTimes(6);
    expect(fn.mock.calls).toEqual([
      [deriving.start(expect.any(Function))],
      [deriving.push(1)],
      [deriving.push(2)],
      [deriving.push(3)],
      [deriving.push(4)],
      [deriving.end()],
    ]);
  });

  // This synchronous test for concatMap will behave the same as mergeMap & switchMap
  it('lets inner sources finish when outer source ends', () => {
    const values = [];
    const teardown = jest.fn();
    const fn = (signal: types.signalT<any>) => {
      values.push(signal);
      if (deriving.isStart(signal)) {
        deriving.unboxStart(signal)(deriving.pull);
        deriving.unboxStart(signal)(deriving.close);
      }
    };

    operators.concatMap(() => {
      return sources.make(() => teardown);
    })(sources.fromValue(null))(fn);

    expect(teardown).toHaveBeenCalled();
    expect(values).toEqual([
      deriving.start(expect.any(Function)),
    ]);
  });

  // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
  it('emits values from each flattened asynchronous source, one at a time', () => {
    const source = web.delay<number>(4)(sources.fromArray([1, 10]));
    const fn = jest.fn();

    sinks.forEach(fn)(
      operators.concatMap((x: number) => {
        return web.delay(5)(sources.fromArray([x, x * 2]));
      })(source)
    );

    jest.advanceTimersByTime(14);
    expect(fn.mock.calls).toEqual([
      [1],
      [2],
    ]);

    jest.runAllTimers();
    expect(fn.mock.calls).toEqual([
      [1],
      [2],
      [10],
      [20],
    ]);
  });

  it('works for fully asynchronous sources', () => {
    const fn = jest.fn();

    sinks.forEach(fn)(
      operators.concatMap(() => {
        return sources.make(observer => {
          setTimeout(() => observer.next(1));
          return () => {};
        })
      })(sources.fromValue(null))
    );

    jest.runAllTimers();
    expect(fn).toHaveBeenCalledWith(1);
  });

  it('emits synchronous values in order', () => {
    const values = [];

    sinks.forEach(x => values.push(x))(
      operators.concat([
        sources.fromArray([1, 2]),
        sources.fromArray([3, 4])
      ])
    );

    expect(values).toEqual([ 1, 2, 3, 4 ]);
  });
});

describe('debounce', () => {
  const noop = web.debounce(() => 0);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesStrictEnd(noop);
  passesAsyncSequence(noop);

  it('waits for a specified amount of silence before emitting the last value', () => {
    const { source, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    sinks.forEach(fn)(web.debounce(() => 100)(source));

    next(1);
    jest.advanceTimersByTime(50);
    expect(fn).not.toHaveBeenCalled();

    next(2);
    jest.advanceTimersByTime(99);
    expect(fn).not.toHaveBeenCalled();

    jest.advanceTimersByTime(1);
    expect(fn).toHaveBeenCalledWith(2);
  });

  it('emits debounced value with delayed End signal', () => {
    const { source, next, complete } = sources.makeSubject<number>();
    const fn = jest.fn();

    sinks.forEach(fn)(web.debounce(() => 100)(source));

    next(1);
    complete();
    jest.advanceTimersByTime(100);
    expect(fn).toHaveBeenCalled();
  });
});

describe('delay', () => {
  const noop = web.delay(0);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('delays outputs by a specified delay timeout value', () => {
    const { source, next } = sources.makeSubject();
    const fn = jest.fn();

    sinks.forEach(fn)(web.delay(100)(source));

    next(1);
    expect(fn).not.toHaveBeenCalled();

    jest.advanceTimersByTime(100);
    expect(fn).toHaveBeenCalledWith(1);
  });
});

describe('filter', () => {
  const noop = operators.filter(() => true);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('prevents emissions for which a predicate fails', () => {
    const { source, next } = sources.makeSubject();
    const fn = jest.fn();

    sinks.forEach(fn)(operators.filter(x => !!x)(source));

    next(false);
    expect(fn).not.toHaveBeenCalled();

    next(true);
    expect(fn).toHaveBeenCalledWith(true);
  });
});

describe('map', () => {
  const noop = operators.map(x => x);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('maps over values given a transform function', () => {
    const { source, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));

    next(1);
    expect(fn).toHaveBeenCalledWith(2);
  });
});

describe('mergeMap', () => {
  const noop = operators.mergeMap(x => sources.fromValue(x));
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourcePushThenEnd(noop);
  passesSingleStart(noop);
  passesStrictEnd(noop);
  passesAsyncSequence(noop);

  // This synchronous test for mergeMap will behave the same as concatMap & switchMap
  it('emits values from each flattened synchronous source', () => {
    const { source, next, complete } = sources.makeSubject<number>();
    const fn = jest.fn();

    operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);

    next(1);
    next(3);
    complete();

    expect(fn.mock.calls).toEqual([
      [deriving.start(expect.any(Function))],
      [deriving.push(1)],
      [deriving.push(2)],
      [deriving.push(3)],
      [deriving.push(4)],
      [deriving.end()],
    ]);
  });

  // This synchronous test for mergeMap will behave the same as concatMap & switchMap
  it('lets inner sources finish when outer source ends', () => {
    const values = [];
    const teardown = jest.fn();
    const fn = (signal: types.signalT<any>) => {
      values.push(signal);
      if (deriving.isStart(signal)) {
        deriving.unboxStart(signal)(deriving.pull);
        deriving.unboxStart(signal)(deriving.close);
      }
    };

    operators.mergeMap(() => {
      return sources.make(() => teardown);
    })(sources.fromValue(null))(fn);

    expect(teardown).toHaveBeenCalled();
    expect(values).toEqual([
      deriving.start(expect.any(Function)),
    ]);
  });

  // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
  it('emits values from each flattened asynchronous source simultaneously', () => {
    const source = web.delay<number>(4)(sources.fromArray([1, 10]));
    const fn = jest.fn();

    sinks.forEach(fn)(
      operators.mergeMap((x: number) => {
        return web.delay(5)(sources.fromArray([x, x * 2]));
      })(source)
    );

    jest.runAllTimers();
    expect(fn.mock.calls).toEqual([
      [1],
      [10],
      [2],
      [20],
    ]);
  });

  it('emits synchronous values in order', () => {
    const values = [];

    sinks.forEach(x => values.push(x))(
      operators.merge([
        sources.fromArray([1, 2]),
        sources.fromArray([3, 4])
      ])
    );

    expect(values).toEqual([ 1, 2, 3, 4 ]);
  });
});

describe('onEnd', () => {
  const noop = operators.onEnd(() => {});
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesStrictEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('calls a callback when the source ends', () => {
    const { source, next, complete } = sources.makeSubject<number>();
    const fn = jest.fn();

    sinks.forEach(() => {})(operators.onEnd(fn)(source));

    next(null);
    expect(fn).not.toHaveBeenCalled();

    complete();
    expect(fn).toHaveBeenCalled();
  });
});

describe('onPush', () => {
  const noop = operators.onPush(() => {});
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesStrictEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('calls a callback when the source emits', () => {
    const { source, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    sinks.forEach(() => {})(operators.onPush(fn)(source));

    next(1);
    expect(fn).toHaveBeenCalledWith(1);
    next(2);
    expect(fn).toHaveBeenCalledWith(2);
  });

  it('is the same as `tap`', () => {
    expect(operators.onPush).toBe(operators.tap);
  });
});

describe('onStart', () => {
  const noop = operators.onStart(() => {});
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('is called when the source starts', () => {
    let sink: types.sinkT<any>;

    const fn = jest.fn();
    const source: types.sourceT<any> = _sink => { sink = _sink; };

    sinks.forEach(() => {})(operators.onStart(fn)(source));

    expect(fn).not.toHaveBeenCalled();

    sink(deriving.start(() => {}));
    expect(fn).toHaveBeenCalled();
  });
});

describe('sample', () => {
  const valueThenNever: types.sourceT<any> = sink =>
    sink(deriving.start(tb => {
      if (tb === deriving.pull)
        sink(deriving.push(null));
    }));

  const noop = operators.sample(valueThenNever);

  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourcePushThenEnd(noop);
  passesSingleStart(noop);
  passesStrictEnd(noop);

  it('emits the latest value when a notifier source emits', () => {
    const { source: notifier$, next: notify } = sources.makeSubject();
    const { source: input$, next } = sources.makeSubject();
    const fn = jest.fn();

    sinks.forEach(fn)(operators.sample(notifier$)(input$));

    next(1);
    next(2);
    expect(fn).not.toHaveBeenCalled();

    notify(null);
    expect(fn).toHaveBeenCalledWith(2);
  });
});

describe('scan', () => {
  const noop = operators.scan((_acc, x) => x, null);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('folds values continuously with a reducer and initial value', () => {
    const { source: input$, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    const reducer = (acc: number, x: number) => acc + x;
    sinks.forEach(fn)(operators.scan(reducer, 0)(input$));

    next(1);
    expect(fn).toHaveBeenCalledWith(1);
    next(2);
    expect(fn).toHaveBeenCalledWith(3);
  });
});

describe('share', () => {
  const noop = operators.share;
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesStrictEnd(noop);
  passesAsyncSequence(noop);

  it('shares output values between sinks', () => {
    let push = () => {};

    const source: types.sourceT<any> = operators.share(sink => {
      sink(deriving.start(() => {}));
      push = () => {
        sink(deriving.push([0]));
        sink(deriving.end());
      };
    });

    const fnA = jest.fn();
    const fnB = jest.fn();

    sinks.forEach(fnA)(source);
    sinks.forEach(fnB)(source);
    push();

    expect(fnA).toHaveBeenCalledWith([0]);
    expect(fnB).toHaveBeenCalledWith([0]);
    expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
  });
});

describe('skip', () => {
  const noop = operators.skip(0);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('skips a number of values before emitting normally', () => {
    const { source, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    sinks.forEach(fn)(operators.skip(1)(source));

    next(1);
    expect(fn).not.toHaveBeenCalled();
    next(2);
    expect(fn).toHaveBeenCalledWith(2);
  });
});

describe('skipUntil', () => {
  const noop = operators.skipUntil(sources.fromValue(null));
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);
  passesStrictEnd(noop);

  it('skips values until the notifier source emits', () => {
    const { source: notifier$, next: notify } = sources.makeSubject();
    const { source: input$, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));

    next(1);
    expect(fn).not.toHaveBeenCalled();
    notify(null);
    next(2);
    expect(fn).toHaveBeenCalledWith(2);
  });
});

describe('skipWhile', () => {
  const noop = operators.skipWhile(() => false);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('skips values until one fails a predicate', () => {
    const { source, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source));

    next(1);
    expect(fn).not.toHaveBeenCalled();
    next(2);
    expect(fn).toHaveBeenCalledWith(2);
  });
});

describe('switchMap', () => {
  const noop = operators.switchMap(x => sources.fromValue(x));
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourcePushThenEnd(noop);
  passesSingleStart(noop);
  passesStrictEnd(noop);
  passesAsyncSequence(noop);

  // This synchronous test for switchMap will behave the same as concatMap & mergeMap
  it('emits values from each flattened synchronous source', () => {
    const { source, next, complete } = sources.makeSubject<number>();
    const fn = jest.fn();

    operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);

    next(1);
    next(3);
    complete();

    expect(fn).toHaveBeenCalledTimes(6);
    expect(fn.mock.calls).toEqual([
      [deriving.start(expect.any(Function))],
      [deriving.push(1)],
      [deriving.push(2)],
      [deriving.push(3)],
      [deriving.push(4)],
      [deriving.end()],
    ]);
  });

  // This synchronous test for switchMap will behave the same as concatMap & mergeMap
  it('lets inner sources finish when outer source ends', () => {
    const values = [];
    const teardown = jest.fn();
    const fn = (signal: types.signalT<any>) => {
      values.push(signal);
      if (deriving.isStart(signal)) {
        deriving.unboxStart(signal)(deriving.pull);
        deriving.unboxStart(signal)(deriving.close);
      }
    };

    operators.switchMap(() => {
      return sources.make(() => teardown);
    })(sources.fromValue(null))(fn);

    expect(teardown).toHaveBeenCalled();
    expect(values).toEqual([
      deriving.start(expect.any(Function)),
    ]);
  });

  // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
  it('emits values from each flattened asynchronous source, one at a time', () => {
    const source = web.delay<number>(4)(sources.fromArray([1, 10]));
    const fn = jest.fn();

    sinks.forEach(fn)(
      operators.switchMap((x: number) => (
        operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5)))
      ))(source)
    );

    jest.runAllTimers();
    expect(fn.mock.calls).toEqual([
      [1],
      [10],
      [20],
    ]);
  });
});

describe('take', () => {
  const noop = operators.take(10);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesStrictEnd(noop);
  passesAsyncSequence(noop);

  passesCloseAndEnd(operators.take(0));

  it('emits values until a maximum is reached', () => {
    const { source, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    operators.take(1)(source)(fn);
    next(1);

    expect(fn).toHaveBeenCalledTimes(3);
    expect(fn.mock.calls).toEqual([
      [deriving.start(expect.any(Function))],
      [deriving.push(1)],
      [deriving.end()],
    ]);
  });
});

describe('takeUntil', () => {
  const noop = operators.takeUntil(sources.never);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourcePushThenEnd(noop);
  passesSingleStart(noop);
  passesStrictEnd(noop);
  passesAsyncSequence(noop);

  const ending = operators.takeUntil(sources.fromValue(null));
  passesCloseAndEnd(ending);

  it('emits values until a notifier emits', () => {
    const { source: notifier$, next: notify } = sources.makeSubject<number>();
    const { source: input$, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    operators.takeUntil(notifier$)(input$)(fn);
    next(1);

    expect(fn).toHaveBeenCalledTimes(2);
    expect(fn.mock.calls).toEqual([
      [deriving.start(expect.any(Function))],
      [deriving.push(1)],
    ]);

    notify(null);
    expect(fn).toHaveBeenCalledTimes(3);
    expect(fn.mock.calls[2][0]).toEqual(deriving.end());
  });
});

describe('takeWhile', () => {
  const noop = operators.takeWhile(() => true);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  const ending = operators.takeWhile(() => false);
  passesCloseAndEnd(ending);

  it('emits values while a predicate passes for all values', () => {
    const { source, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    operators.takeWhile(x => x < 2)(source)(fn);
    next(1);
    next(2);

    expect(fn.mock.calls).toEqual([
      [deriving.start(expect.any(Function))],
      [deriving.push(1)],
      [deriving.end()],
    ]);
  });
});

describe('takeLast', () => {
  passesCloseAndEnd(operators.takeLast(0));

  it('emits the last max values of an ended source', () => {
    const { source, next, complete } = sources.makeSubject<number>();
    const values = [];

    let talkback;
    operators.takeLast(1)(source)(signal => {
      values.push(signal);
      if (deriving.isStart(signal))
        talkback = deriving.unboxStart(signal);
      if (!deriving.isEnd(signal))
        talkback(deriving.pull);
    });

    next(1);
    next(2);

    expect(values.length).toBe(0);
    complete();

    expect(values).toEqual([
      deriving.start(expect.any(Function)),
      deriving.push(2),
      deriving.end(),
    ]);
  });
});

describe('throttle', () => {
  const noop = web.throttle(() => 0);
  passesPassivePull(noop);
  passesActivePush(noop);
  passesSinkClose(noop);
  passesSourceEnd(noop);
  passesSingleStart(noop);
  passesAsyncSequence(noop);

  it('should ignore emissions for a period of time after a value', () => {
    const { source, next } = sources.makeSubject<number>();
    const fn = jest.fn();

    sinks.forEach(fn)(web.throttle(() => 100)(source));

    next(1);
    expect(fn).toHaveBeenCalledWith(1);
    jest.advanceTimersByTime(50);

    next(2);
    expect(fn).toHaveBeenCalledTimes(1);
    jest.advanceTimersByTime(50);

    next(3);
    expect(fn).toHaveBeenCalledWith(3);
  });
});

Выполнить команду


Для локальной разработки. Не используйте в интернете!