diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index 5d2258309..fa173daac 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -26,10 +26,11 @@ Observable, abc, compose, + of, typing, ) from reactivex.internal.basic import identity -from reactivex.internal.utils import NotSet +from reactivex.internal.utils import NotSet, infinite from reactivex.subject import Subject from reactivex.typing import ( Accumulator, @@ -3325,6 +3326,212 @@ def switch_latest() -> Callable[ return switch_latest_() +def switch_map( + mapper: Optional[Mapper[_T1, Observable[_T2]]] = None +) -> Callable[[Observable[_T1]], Observable[_T2]]: + """ + The switch_map operator. + + Project each element of an observable sequence into a new observable. + + .. marble:: + :alt: switch_map + + ---1---2---3---> + [ switch_map(i: of(i, i ** 2, i ** 3)) ] + ---1---1---1---2---4---8---3---9---27---> + + Example: + >>> switch_map(lambda value: of(value, value // 2)) + + Args: + mapper: A transform function to apply to each source element. + + Returns: + A partially applied operator function that takes an observable + source and returns an observable sequence whose elements are + each element of the result of invoking the transform function + on each element of the source. + """ + mapper_: Mapper[_T1, Union["Future[_T2]", Observable[_T2]]] = mapper or cast( + Mapper[_T1, Union["Future[_T2]", Observable[_T2]]], of + ) + + return compose( + map(mapper_), + switch_latest(), + ) + + +def switch_map_indexed( + mapper_indexed: Optional[MapperIndexed[_T1, Observable[_T2]]] = None +) -> Callable[[Observable[_T1]], Observable[_T2]]: + """ + The switch_map_indexed operator. + + Project each element of an observable sequence into a new observable + by incorporating the element's index. + + .. marble:: + :alt: switch_map_indexed + + ---1-----------2-----------3-----------> + [ switch_map_indexed(i,id: of(i, i ** 2, i + id)) ] + ---1---1---1---2---4---3---3---9---5---> + + Example: + >>> switch_map_indexed(lambda value, index: of(value, value // 2)) + + Args: + mapper_indexed: A transform function to apply to each source + element. The second parameter of the function represents + the index of the source element. + + Returns: + A partially applied operator function that takes an observable + source and returns an observable sequence whose elements are + each element of the result of invoking the transform function + on each element of the source. + """ + + def _of(value: _T1, _: int) -> Observable[_T2]: + return of(cast(_T2, value)) + + _mapper_indexed = mapper_indexed or cast(MapperIndexed[_T1, Observable[_T2]], _of) + + return compose( + zip_with_iterable(infinite()), + switch_starmap_indexed(_mapper_indexed), + ) + + +@overload +def switch_starmap( + mapper: Callable[[_A, _B], Observable[_T]] +) -> Callable[[Observable[Tuple[_A, _B]]], Observable[_T]]: + ... + + +@overload +def switch_starmap( + mapper: Callable[[_A, _B, _C], Observable[_T]] +) -> Callable[[Observable[Tuple[_A, _B, _C]]], Observable[_T]]: + ... + + +@overload +def switch_starmap( + mapper: Callable[[_A, _B, _C, _D], Observable[_T]] +) -> Callable[[Observable[Tuple[_A, _B, _C, _D]]], Observable[_T]]: + ... + + +def switch_starmap( + mapper: Optional[Callable[..., Observable[Any]]] = None +) -> Callable[[Observable[Any]], Observable[Any]]: + """The switch_starmap operator. + + Unpack arguments grouped as tuple elements of an observable sequence + and return an observable sequence whose values are each element of + the observable returned by invoking the mapper function with star + applied on unpacked elements as positional arguments. + + Use instead of `switch_map()` when the the arguments to the mapper is + grouped as tuples and the mapper function takes multiple arguments. + + .. marble:: + :alt: switch_starmap + + ----1,2-------3,4---------| + [ switch_starmap(of) ] + ----1----2----3----4------| + + Example: + >>> switch_starmap(lambda x, y: of(x + y, x * y)) + + Args: + mapper: A transform function to invoke with unpacked elements + as arguments. + + Returns: + An operator function that takes an observable source and returns + an observable sequence whose values are each element of the + observable returned by invoking the mapper function with the + unpacked elements of the source. + """ + + if mapper is None: + mapper = of + + def starred(values: Tuple[Any, ...]) -> Observable[Any]: + return mapper(*values) + + return compose(switch_map(starred)) + + +@overload +def switch_starmap_indexed( + mapper: Callable[[_A, int], Observable[_T]] +) -> Callable[[Observable[_A]], Observable[_T]]: + ... + + +@overload +def switch_starmap_indexed( + mapper: Callable[[_A, _B, int], Observable[_T]] +) -> Callable[[Observable[Tuple[_A, _B]]], Observable[_T]]: + ... + + +@overload +def switch_starmap_indexed( + mapper: Callable[[_A, _B, _C, int], Observable[_T]] +) -> Callable[[Observable[Tuple[_A, _B, _C]]], Observable[_T]]: + ... + + +@overload +def switch_starmap_indexed( + mapper: Callable[[_A, _B, _C, _D, int], Observable[_T]] +) -> Callable[[Observable[Tuple[_A, _B, _C, _D]]], Observable[_T]]: + ... + + +def switch_starmap_indexed( + mapper: Optional[Callable[..., Observable[Any]]] = None +) -> Callable[[Observable[Any]], Observable[Any]]: + """Variant of :func:`switch_starmap` which accepts an indexed mapper. + + .. marble:: + :alt: switch_starmap_indexed + + ------1,2----------3,4-----------| + [ switch_starmap_indexed(of) ] + ------1---2---0----3---4---1-----| + + Example: + >>> switch_starmap_indexed(lambda x, y, i: of(x + y + i, x * y - i)) + + Args: + mapper: A transform function to invoke with unpacked elements + as arguments. + + Returns: + An operator function that takes an observable source and returns + an observable sequence whose values are each element of the + observable returned by invoking the mapper function with the + unpacked elements of the source. + """ + if mapper is None: + return compose(of) + + def starred(values: Tuple[Any, ...]) -> Observable[Any]: + assert mapper # mypy is paranoid + return mapper(*values) + + return compose(switch_map(starred)) + + def take(count: int) -> Callable[[Observable[_T]], Observable[_T]]: """Returns a specified number of contiguous elements from the start of an observable sequence. @@ -4272,6 +4479,10 @@ def zip_with_iterable( "subscribe_on", "sum", "switch_latest", + "switch_map", + "switch_map_indexed", + "switch_starmap", + "switch_starmap_indexed", "take", "take_last", "take_last_buffer", diff --git a/tests/test_observable/test_map.py b/tests/test_observable/test_map.py index 51466db62..90f6ebbbe 100644 --- a/tests/test_observable/test_map.py +++ b/tests/test_observable/test_map.py @@ -25,7 +25,7 @@ def _raise(ex): raise RxException(ex) -class TestSelect(unittest.TestCase): +class TestMap(unittest.TestCase): def test_map_throws(self): mapper = map(lambda x: x) with self.assertRaises(RxException): diff --git a/tests/test_observable/test_starmap.py b/tests/test_observable/test_starmap.py index a4ba460eb..6c6f32c2a 100644 --- a/tests/test_observable/test_starmap.py +++ b/tests/test_observable/test_starmap.py @@ -26,7 +26,7 @@ def _raise(ex): raise RxException(ex) -class TestSelect(unittest.TestCase): +class TestStarmap(unittest.TestCase): def test_starmap_never(self): scheduler = TestScheduler() xs = scheduler.create_hot_observable() diff --git a/tests/test_observable/test_switch_map.py b/tests/test_observable/test_switch_map.py new file mode 100644 index 000000000..b7c4272ad --- /dev/null +++ b/tests/test_observable/test_switch_map.py @@ -0,0 +1,500 @@ +import unittest + +from reactivex import create, empty, of, return_value, throw +from reactivex.disposable import SerialDisposable +from reactivex.operators import switch_map, switch_map_indexed +from reactivex.testing import ReactiveTest, TestScheduler + +on_next = ReactiveTest.on_next +on_completed = ReactiveTest.on_completed +on_error = ReactiveTest.on_error +subscribe = ReactiveTest.subscribe +subscribed = ReactiveTest.subscribed +disposed = ReactiveTest.disposed +created = ReactiveTest.created + +map = None + + +class RxException(Exception): + pass + + +# Helper function for raising exceptions within lambdas + + +def _raise(ex): + raise RxException(ex) + + +class TestSwitchMap(unittest.TestCase): + def test_map_throws(self): + mapper = switch_map(lambda x: of(x)) + with self.assertRaises(RxException): + return_value(1).pipe(mapper).subscribe(lambda x: _raise("ex")) + + with self.assertRaises(RxException): + throw("ex").pipe(mapper).subscribe(on_error=lambda ex: _raise(ex)) + + with self.assertRaises(RxException): + empty().pipe(mapper).subscribe( + lambda x: x, lambda ex: ex, lambda: _raise("ex") + ) + + def subscribe(observer, scheduler=None): + _raise("ex") + + with self.assertRaises(RxException): + create(subscribe).pipe(switch_map(lambda x: of(x))).subscribe() + + def test_map_disposeinsidemapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(100, 1), on_next(200, 2), on_next(500, 3), on_next(600, 4) + ) + results = scheduler.create_observer() + d = SerialDisposable() + invoked = [0] + + def projection(x, *args, **kw): + invoked[0] += 1 + + if scheduler.clock > 400: + d.dispose() + return of(x, x + 1) + + d.disposable = xs.pipe(switch_map(projection)).subscribe( + results, scheduler=scheduler + ) + + def action(scheduler, state): + return d.dispose() + + scheduler.schedule_absolute(ReactiveTest.disposed, action) + scheduler.start() + + assert results.messages == [ + on_next(100, 1), + on_next(100, 2), + on_next(200, 2), + on_next(200, 3), + ] + assert xs.subscriptions == [ReactiveTest.subscribe(0, 500)] + + assert invoked[0] == 3 + + def test_map_completed(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(180, 1), + on_next(210, 2), + on_next(240, 3), + on_next(290, 4), + on_next(350, 5), + on_completed(400), + on_next(410, -1), + on_completed(420), + on_error(430, "ex"), + ) + invoked = [0] + + def factory(): + def projection(x): + invoked[0] += 1 + return of(x + 1) + + return xs.pipe(switch_map(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 3), + on_next(240, 4), + on_next(290, 5), + on_next(350, 6), + on_completed(400), + ] + assert xs.subscriptions == [ReactiveTest.subscribe(200, 400)] + assert invoked[0] == 4 + + def test_map_default_mapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(180, 1), + on_next(210, 2), + on_next(240, 3), + on_next(290, 4), + on_next(350, 5), + on_completed(400), + on_next(410, -1), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + return xs.pipe(switch_map()) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 2), + on_next(240, 3), + on_next(290, 4), + on_next(350, 5), + on_completed(400), + ] + assert xs.subscriptions == [ReactiveTest.subscribe(200, 400)] + + def test_map_completed_two(self): + for i in range(100): + scheduler = TestScheduler() + invoked = [0] + + xs = scheduler.create_hot_observable( + on_next(180, 1), + on_next(210, 2), + on_next(240, 3), + on_next(290, 4), + on_next(350, 5), + on_completed(400), + on_next(410, -1), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x): + invoked[0] += 1 + return of(x + 1, x) + + return xs.pipe(switch_map(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 3), + on_next(210, 2), + on_next(240, 4), + on_next(240, 3), + on_next(290, 5), + on_next(290, 4), + on_next(350, 6), + on_next(350, 5), + on_completed(400), + ] + assert xs.subscriptions == [subscribe(200, 400)] + assert invoked[0] == 4 + + def test_map_not_completed(self): + scheduler = TestScheduler() + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, 1), + on_next(210, 2), + on_next(240, 3), + on_next(290, 4), + on_next(350, 5), + ) + + def factory(): + def projection(x): + invoked[0] += 1 + return of(x + 1) + + return xs.pipe(switch_map(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 3), + on_next(240, 4), + on_next(290, 5), + on_next(350, 6), + ] + assert xs.subscriptions == [subscribe(200, 1000)] + assert invoked[0] == 4 + + def test_map_error(self): + scheduler = TestScheduler() + ex = "ex" + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, 1), + on_next(210, 2), + on_next(240, 3), + on_next(290, 4), + on_next(350, 5), + on_error(400, ex), + on_next(410, -1), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x): + invoked[0] += 1 + return of(x + 1) + + return xs.pipe(switch_map(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 3), + on_next(240, 4), + on_next(290, 5), + on_next(350, 6), + on_error(400, ex), + ] + assert xs.subscriptions == [subscribe(200, 400)] + assert invoked[0] == 4 + + def test_map_mapper_throws(self): + scheduler = TestScheduler() + invoked = [0] + ex = "ex" + xs = scheduler.create_hot_observable( + on_next(180, 1), + on_next(210, 2), + on_next(240, 3), + on_next(290, 4), + on_next(350, 5), + on_completed(400), + on_next(410, -1), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x): + invoked[0] += 1 + if invoked[0] == 3: + raise Exception(ex) + + return of(x + 1) + + return xs.pipe(switch_map(projection)) + + results = scheduler.start(factory) + assert results.messages == [on_next(210, 3), on_next(240, 4), on_error(290, ex)] + assert xs.subscriptions == [subscribe(200, 290)] + assert invoked[0] == 3 + + def test_map_with_index_throws(self): + with self.assertRaises(RxException): + mapper = switch_map_indexed(lambda x, index: of(x, index)) + + return return_value(1).pipe(mapper).subscribe(lambda x: _raise("ex")) + + with self.assertRaises(RxException): + return ( + throw("ex").pipe(mapper).subscribe(lambda x: x, lambda ex: _raise(ex)) + ) + + with self.assertRaises(RxException): + return ( + empty() + .pipe(mapper) + .subscribe(lambda x: x, lambda ex: None, lambda: _raise("ex")) + ) + + with self.assertRaises(RxException): + return create(lambda o, s: _raise("ex")).pipe(mapper).subscribe() + + def test_map_with_index_dispose_inside_mapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(100, 4), on_next(200, 3), on_next(500, 2), on_next(600, 1) + ) + invoked = [0] + results = scheduler.create_observer() + d = SerialDisposable() + + def projection(x, index): + invoked[0] += 1 + if scheduler.clock > 400: + d.dispose() + + return of(x + index * 10) + + d.disposable = xs.pipe(switch_map_indexed(projection)).subscribe(results) + + def action(scheduler, state): + return d.dispose() + + scheduler.schedule_absolute(disposed, action) + scheduler.start() + assert results.messages == [on_next(100, 4), on_next(200, 13)] + assert xs.subscriptions == [subscribe(0, 500)] + assert invoked[0] == 3 + + def test_map_with_index_completed(self): + scheduler = TestScheduler() + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, 5), + on_next(210, 4), + on_next(240, 3), + on_next(290, 2), + on_next(350, 1), + on_completed(400), + on_next(410, -1), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x, index): + invoked[0] += 1 + return of(x + 1, index * 10) + + return xs.pipe(switch_map_indexed(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 5), + on_next(210, 0), + on_next(240, 4), + on_next(240, 10), + on_next(290, 3), + on_next(290, 20), + on_next(350, 2), + on_next(350, 30), + on_completed(400), + ] + assert xs.subscriptions == [subscribe(200, 400)] + assert invoked[0] == 4 + + def test_map_with_index_default_mapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(180, 5), + on_next(210, 4), + on_next(240, 3), + on_next(290, 2), + on_next(350, 1), + on_completed(400), + on_next(410, -1), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + return xs.pipe(switch_map_indexed()) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 4), + on_next(240, 3), + on_next(290, 2), + on_next(350, 1), + on_completed(400), + ] + + assert xs.subscriptions == [subscribe(200, 400)] + + def test_map_with_index_not_completed(self): + scheduler = TestScheduler() + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, 5), + on_next(210, 4), + on_next(240, 3), + on_next(290, 2), + on_next(350, 1), + ) + + def factory(): + def projection(x, index): + invoked[0] += 1 + return of(x + 1, index * 10) + + return xs.pipe(switch_map_indexed(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 5), + on_next(210, 0), + on_next(240, 4), + on_next(240, 10), + on_next(290, 3), + on_next(290, 20), + on_next(350, 2), + on_next(350, 30), + ] + assert xs.subscriptions == [subscribe(200, 1000)] + assert invoked[0] == 4 + + def test_map_with_index_error(self): + scheduler = TestScheduler() + ex = "ex" + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, 5), + on_next(210, 4), + on_next(240, 3), + on_next(290, 2), + on_next(350, 1), + on_error(400, ex), + on_next(410, -1), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x, index): + invoked[0] += 1 + return of(x + 1, index * 10) + + return xs.pipe(switch_map_indexed(projection)) + + results = scheduler.start(factory) + + assert results.messages == [ + on_next(210, 5), + on_next(210, 0), + on_next(240, 4), + on_next(240, 10), + on_next(290, 3), + on_next(290, 20), + on_next(350, 2), + on_next(350, 30), + on_error(400, ex), + ] + assert xs.subscriptions == [subscribe(200, 400)] + assert invoked[0] == 4 + + def test_map_with_index_mapper_throws(self): + scheduler = TestScheduler() + invoked = [0] + ex = "ex" + xs = scheduler.create_hot_observable( + on_next(180, 5), + on_next(210, 4), + on_next(240, 3), + on_next(290, 2), + on_next(350, 1), + on_completed(400), + on_next(410, -1), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x, index): + invoked[0] += 1 + if invoked[0] == 3: + raise Exception(ex) + return of(x + 1, index * 10) + + return xs.pipe(switch_map_indexed(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 5), + on_next(210, 0), + on_next(240, 4), + on_next(240, 10), + on_error(290, ex), + ] + assert xs.subscriptions == [subscribe(200, 290)] + assert invoked[0] == 3 + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_observable/test_switch_starmap.py b/tests/test_observable/test_switch_starmap.py new file mode 100644 index 000000000..f0893110c --- /dev/null +++ b/tests/test_observable/test_switch_starmap.py @@ -0,0 +1,437 @@ +import unittest + +from reactivex import create, empty, of +from reactivex import operators as ops +from reactivex import return_value, throw +from reactivex.disposable import SerialDisposable +from reactivex.testing import ReactiveTest, TestScheduler + +on_next = ReactiveTest.on_next +on_completed = ReactiveTest.on_completed +on_error = ReactiveTest.on_error +subscribe = ReactiveTest.subscribe +subscribed = ReactiveTest.subscribed +disposed = ReactiveTest.disposed +created = ReactiveTest.created + + +class RxException(Exception): + pass + + +# Helper function for raising exceptions within lambdas + + +def _raise(ex): + raise RxException(ex) + + +class TestSwitchStarmap(unittest.TestCase): + def test_starmap_never(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable() + + invoked = [0] + + def factory(): + def mapper(x, y): + invoked[0] += 1 + return of(x + y, x * y) + + return xs.pipe(ops.switch_starmap(mapper)) + + results = scheduler.start(factory) + assert results.messages == [] + assert xs.subscriptions == [ReactiveTest.subscribe(200, 1000)] + assert invoked[0] == 0 + + def test_starmap_empty(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + # 100 create + # 200 subscribe + on_completed(300), + ) + + invoked = [0] + + def factory(): + def mapper(x, y): + invoked[0] += 1 + return of(x + y, x * y) + + return xs.pipe(ops.switch_starmap(mapper)) + + results = scheduler.start(factory) + assert results.messages == [on_completed(300)] + assert xs.subscriptions == [ReactiveTest.subscribe(200, 300)] + assert invoked[0] == 0 + + def test_starmap_subscription_error(self): + mapper = ops.switch_starmap(lambda x, y: of(x, x + y)) + + with self.assertRaises(RxException): + return_value((1, 10)).pipe(mapper).subscribe(lambda x: _raise("ex")) + + with self.assertRaises(RxException): + throw("ex").pipe(mapper).subscribe(on_error=lambda ex: _raise(ex)) + + with self.assertRaises(RxException): + empty().pipe(mapper).subscribe( + lambda x: x, lambda ex: ex, lambda: _raise("ex") + ) + + def subscribe(observer, scheduler=None): + _raise("ex") + + with self.assertRaises(RxException): + create(subscribe).pipe(mapper).subscribe() + + def test_starmap_dispose_inside_mapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + # 100 create + on_next(110, (1, 10)), + # 200 subscribe + on_next(210, (2, 20)), + on_next(310, (3, 30)), + on_next(410, (4, 40)), + ) + + results = scheduler.create_observer() + d = SerialDisposable() + invoked = [0] + + def mapper(x, y): + invoked[0] += 1 + if scheduler._clock > 250: + d.dispose() + return of(x + y, x * y) + + d.disposable = xs.pipe(ops.switch_starmap(mapper)).subscribe( + results, scheduler=scheduler + ) + + def action(scheduler, state): + return d.dispose() + + scheduler.schedule_absolute(ReactiveTest.disposed, action) + scheduler.start() + + assert results.messages == [ + on_next(110, 11), + on_next(110, 10), + on_next(210, 22), + on_next(210, 40), + ] + + assert xs.subscriptions == [ReactiveTest.subscribe(0, 310)] + assert invoked[0] == 3 + + def test_starmap_completed(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + # 100 create + on_next(180, (1, 10)), + # 200 subscribe + on_next(210, (2, 20)), + on_next(240, (3, 30)), + on_next(290, (4, 40)), + on_next(350, (5, 50)), + on_completed(400), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + invoked = [0] + + def factory(): + def mapper(x, y): + invoked[0] += 1 + return of(x + y, y - x) + + return xs.pipe(ops.switch_starmap(mapper)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 22), + on_next(210, 18), + on_next(240, 33), + on_next(240, 27), + on_next(290, 44), + on_next(290, 36), + on_next(350, 55), + on_next(350, 45), + on_completed(400), + ] + + assert xs.subscriptions == [ReactiveTest.subscribe(200, 400)] + assert invoked[0] == 4 + + def test_starmap_not_completed(self): + scheduler = TestScheduler() + invoked = [0] + xs = scheduler.create_hot_observable( + # 100 create + on_next(180, (1, 10)), + # 200 subscribe + on_next(210, (2, 20)), + on_next(240, (3, 30)), + on_next(290, (4, 40)), + on_next(350, (5, 50)), + ) + + def factory(): + def mapper(x, y): + invoked[0] += 1 + return of(x + y, y // x) + + return xs.pipe(ops.switch_starmap(mapper)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 22), + on_next(210, 10), + on_next(240, 33), + on_next(240, 10), + on_next(290, 44), + on_next(290, 10), + on_next(350, 55), + on_next(350, 10), + ] + + assert xs.subscriptions == [subscribe(200, 1000)] + assert invoked[0] == 4 + + def test_starmap_no_mapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + # 100 create + on_next(180, (1, 10)), + # 200 subscribe + on_next(210, (2, 20)), + on_next(240, (3, 30)), + on_next(290, (4, 40)), + on_next(350, (5, 50)), + on_completed(400), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + return xs.pipe(ops.switch_starmap()) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 2), + on_next(210, 20), + on_next(240, 3), + on_next(240, 30), + on_next(290, 4), + on_next(290, 40), + on_next(350, 5), + on_next(350, 50), + on_completed(400), + ] + + assert xs.subscriptions == [ReactiveTest.subscribe(200, 400)] + + def test_starmap_mapper_with_one_element(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + # 100 create + on_next(180, (1,)), + # 200 subscribe + on_next(210, (2,)), + on_next(240, (3,)), + on_next(290, (4,)), + on_next(350, (5,)), + on_completed(400), + on_next(410, (-1,)), + on_completed(420), + on_error(430, "ex"), + ) + + invoked = [0] + + def factory(): + def mapper(x): + invoked[0] += 1 + return of(x * 10) + + return xs.pipe(ops.switch_starmap(mapper)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 20), + on_next(240, 30), + on_next(290, 40), + on_next(350, 50), + on_completed(400), + ] + + assert xs.subscriptions == [ReactiveTest.subscribe(200, 400)] + assert invoked[0] == 4 + + def test_starmap_mapper_with_three_elements(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + # 100 create + on_next(180, (1, 10, 100)), + # 200 subscribe + on_next(210, (2, 20, 200)), + on_next(240, (3, 30, 300)), + on_next(290, (4, 40, 400)), + on_next(350, (5, 50, 500)), + on_completed(400), + on_next(410, (-1, -10, -100)), + on_completed(420), + on_error(430, "ex"), + ) + + invoked = [0] + + def factory(): + def mapper(x, y, z): + invoked[0] += 1 + return of(x + y, y + z) + + return xs.pipe(ops.switch_starmap(mapper)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 22), + on_next(210, 220), + on_next(240, 33), + on_next(240, 330), + on_next(290, 44), + on_next(290, 440), + on_next(350, 55), + on_next(350, 550), + on_completed(400), + ] + + assert xs.subscriptions == [ReactiveTest.subscribe(200, 400)] + assert invoked[0] == 4 + + def test_starmap_mapper_with_args(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + # 100 create + on_next(180, (1, 10)), + # 200 subscribe + on_next(210, (2, 20)), + on_next(240, (3, 30)), + on_next(290, (4, 40)), + on_next(350, (5, 50)), + on_completed(400), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + invoked = [0] + + def factory(): + def mapper(*args): + invoked[0] += 1 + return of(sum(args), max(args)) + + return xs.pipe(ops.switch_starmap(mapper)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 22), + on_next(210, 20), + on_next(240, 33), + on_next(240, 30), + on_next(290, 44), + on_next(290, 40), + on_next(350, 55), + on_next(350, 50), + on_completed(400), + ] + + assert xs.subscriptions == [ReactiveTest.subscribe(200, 400)] + assert invoked[0] == 4 + + def test_starmap_error(self): + scheduler = TestScheduler() + ex = "ex" + invoked = [0] + xs = scheduler.create_hot_observable( + # 100 create + on_next(180, (1, 10)), + # 200 subscribe + on_next(210, (2, 20)), + on_next(240, (3, 30)), + on_next(290, (4, 40)), + on_next(350, (5, 50)), + on_error(400, ex), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, ex), + ) + + def factory(): + def mapper(x, y): + invoked[0] += 1 + return of(x + y) + + return xs.pipe(ops.switch_starmap(mapper)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 22), + on_next(240, 33), + on_next(290, 44), + on_next(350, 55), + on_error(400, ex), + ] + + assert xs.subscriptions == [subscribe(200, 400)] + assert invoked[0] == 4 + + def test_starmap_mapper_error(self): + scheduler = TestScheduler() + invoked = [0] + ex = "ex" + xs = scheduler.create_hot_observable( + # 100 create + on_next(180, (1, 10)), + # 200 subscribe + on_next(210, (2, 20)), + on_next(240, (3, 30)), + on_next(290, (4, 40)), + on_next(350, (5, 50)), + on_completed(400), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, ex), + ) + + def factory(): + def mapper(x, y): + invoked[0] += 1 + if invoked[0] == 3: + raise Exception(ex) + + return of(x + y) + + return xs.pipe(ops.switch_starmap(mapper)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 22), + on_next(240, 33), + on_error(290, ex), + ] + + assert xs.subscriptions == [subscribe(200, 290)] + assert invoked[0] == 3 + + +if __name__ == "__main__": + unittest.main()