Skip to content Skip to sidebar Skip to footer

Rxjs Nested Subscribe With Multiple Inner Subscriptions

Original promise based code I'm trying to rewrite: parentPromise .then((parentResult) => { childPromise1 .then(child1Result => child1Handler(parentRes

Solution 1:

That seems pretty strange to me. You're creating new subscription for each child every time parentResult arrives. Even though those eventually indeed will be destroyed (assuming onDestroy$ implementation is correct), seems wrong.

You probably want withLatestFrom(parent$) and three separate pipes for each child.

It might look something like: child1$.pipe(takeUntil(globalDeath$), withLatestFrom(parent$)).subscribe(([childResult, parentResult]) => ...). Not sure if my JS is correct, can't test it at the moment; but the point is: you're getting the latest result from the parent$ every time child1$ fires. Note that you can reverse the direction if necessary (withLatestFrom(child1$)).

Solution 2:

You can: 1) pass parent$ through share, and 2) use flatMap three times, something like:

const sharedParent$ = parent$.pipe(share());

sharedParent$.pipe(
    flatMap(parentResult =>forkJoin(of(parentResult), child1$)), 
    takeUntil(onDestroy$)),
.subscribe((results) =>child1Handler(...results)); // repeat for all children

(If there's more than 2 children, extracting that into a function with child stream and handler as parameters is a good idea).

That's following the original behavior of waiting with subscribing children until parent$ emits. If you don't need that, you can skip flatMap and just forkJoinsharedParent$ and children.

Solution 3:

How about using higher order observables? Something like this:

const parentReplay$ = parent$.pipe(shareReplay(1));

of(
  [child1$, child1Handler],
  [child2$, child2Handler],
  [child3$, child3Handler]
).pipe(
  mergeMap([child$, handler] => parentReplay$.pipe(
    mergeMap(parentResult => child$.pipe(
      tap(childResult => handler(parentResult, childResult))
    )
  )
).subscribe();

Solution 4:

If you were using Promises then the corresponding Observables emit only once and then complete.

If this is the case, you can use forkJoin to execute in parallel the child Observables.

So the code could look like

parent$.pipe(
   takeUntil(onDestroy$),
   // wait for parent$ to emit and then move on// the following forkJoin executes the child observables in parallel and emit when all children complete - the value emitted is an array with the 3 notifications coming from the child observablesconcatMap(parentResult =>forkJoin(child1$, child2$, child3$)).pipe(
      // map returns both the parent and the children notificiationsmap(childrenResults => ({parentResult, childrenResults})
   )
).subscribe(
  ({parentResult, childrenResults}) => {
      child1Handler(parentResult, childrenResults[0]);
      child1Handler(parentResult, childrenResults[1]);
      child1Handler(parentResult, childrenResults[2]);
  }
)

Post a Comment for "Rxjs Nested Subscribe With Multiple Inner Subscriptions"