Цепочка двух зависимых наблюдаемых в rxjs

avatar
cipak
1 июля 2021 в 20:26
120
1
4

У меня есть наблюдаемый объект, который выдает серию сообщений, скажем, obs1. Затем второй наблюдаемый, obs2, которому нужны некоторые данные из последнего сообщения, отправленного obs1, и который выдает еще одну серию сообщений. Я хотел бы «связать» эти 2 наблюдаемых объекта для создания наблюдаемого объекта obs3, который последовательно отправляет ВСЕ сообщения из obs1 и obs2.

На данный момент я придумал следующее решение:

obs3 = concat(
  obs1,
  obs1.pipe(
    last(),
    concatMap(lastMessage => obs2(lastMessage)),
);

Но у этого есть недостаток, заключающийся в том, что obs1 выполняется (подписывается) 2 раза.

Есть ли более прямой способ добиться этого? Что-то вроде оператора concatMapWithSelf(), который будет работать так:

obs3 = obs1.pipe(
  concatMapWithSelf(lastMessage => obs2(lastMessage)),
);

Спасибо!

Источник

Ответы (1)

avatar
martin
1 июля 2021 в 22:41
3

Похоже, вы могли бы использовать ConnectableObservable. Я полагаю, что в RxJS 7 это было бы еще проще и лучше читабельно с multicast(), но это будет объявлено устаревшим в RxJS 8, поэтому единственным вариантом, вероятно, является обертка исходного Observable с connectable(), а затем вызов вручную connect().

const obs1 = connectable(
  defer(() => {
    console.log('new subscription');
    return of('v1', 'v2', 'v3', 'v4');
  })
);

const obs2 = msg => of(msg);

const obs3 = merge(
  obs1,
  obs1.pipe(
    last(),
    concatMap(lastMessage => obs2(lastMessage))
  )
);

obs3.subscribe(console.log);

obs1.connect();

Текущая демонстрация: https://stackblitz.com/edit/rxjs-2uheg4?devtoolsheight=60

Если obs1 всегда асинхронный, то, возможно, вы могли бы использовать share(), но это будет вести себя иначе с синхронными источниками, поэтому использование ConnectableObservable должно быть более безопасным.

customcommander
12 июля 2021 в 22:56
0

Фантастический ответ. Просто интересно, зачем нужен defer?

martin
13 июля 2021 в 07:25
1

defer используется только для регистрации сообщения для каждой новой подписки. Вам не нужно использовать его в реальном приложении.