【RxJS】concatMap, mergeMap, switchMap, exhaustMapの違い

JavaScript

xxxMapという名前のOperatorの処理フローは以下の通りです。

  1. Observableから受け取ったメッセージをもとに新たなObservableを作成
  2. メッセージごとに作成された複数のObservableを合成し、新たな1つのObservableを作成

たとえば以下のコードでは、mergeMapを利用して『1 → 2 → 3』というストリームを『1A → 1B → 1C → 2A → 2B → 2C → 3A → 3B → 3C』というストリームに変換しています。

const observable = of(1,2,3); // 『1 → 2 → 3』というストリーム
observable
  .pipe(

    // mergeMap内で作成された複数のストリーム(『1A → 1B → 1C』, 『2A → 2B → 2C』, 『3A → 3B → 3C』)を合成して新たなストリームを作成
    mergeMap(message =>

      // 『1』というメッセージを受け取って『1A → 1B → 1C』というストリームを作成
      // 『2』というメッセージを受け取って『2A → 2B → 2C』というストリームを作成
      // 『3』というメッセージを受け取って『3A → 3B → 3C』というストリームを作成
      of(`${message}A`, `${message}B`, `${message}C`)
    )

  // mergeMapによって作成された新たなストリーム『1A → 1B → 1C → 2A → 2B → 2C → 3A → 3B → 3C』のメッセージを取得
  .subscribe(message => console.log(message));

実行結果

1A
1B
1C
2A
2B
2C
3A
3B
3C

xxxMapで異なる点はOperator内で作成された複数のObservableを合成するときのルールです。
今回はconcatMap, mergeMap, switchMap, exhaustMapの違いや使い分けについて紹介します。

RxJSに関する基礎知識がある前提で話を進めます。RxJSの基本情報の詳細解説は【RxJS入門】Observable、Observer、subscribe、Operatorの概要・関係性で紹介しています。

Observableの合成ルールの違い

concatMap, mergeMap, switchMap, exhaustMapにおける、Observableの合成ルールの違いについて紹介します。

concatMap

受け取ったメッセージ順でObservableを合成します。

mergeMap

処理が終わったメッセージ順でObservableを合成します。

swithMap

処理が終わったメッセージのObservableを合成します。メッセージの処理中に次のメッセージを受け取った場合は処理中のメッセージを破棄し、次のメッセージを処理します。

exhaustMap

処理が終わったメッセージのObservableを合成します。メッセージの処理中に次のメッセージを受け取った場合は次のメッセージを破棄します。

合成されるObservableの具体例

xxxMapによって合成されるObservableの違いを具体例を利用して比較します。

intervalを使った例

intervalは『指定した時間間隔で連続した整数値を出現させるObservable』を作成するOperatorです。

たとえば、interval(5000)であれば5秒おきに、interval(1000)であれば1秒おきに『0, 1, 2, 3…』という数字が生成されます。

以下のコードを例にとり、それぞれのOperatorによる結果の違いを紹介します。

import { interval } from "rxjs";
import {
  concatMap,
  exhaustMap,
  mergeMap,
  switchMap,
  take
} from "rxjs/operators";

const observable = interval(5000);
observable.pipe(
  concatMap(() => interval(1000) // ここの『concatMap』を変える
    .pipe(
      take(8) // 先頭から8件取得する
    )
  )
)
.subscribe(value => console.log(value));

concatMapの場合

interval(1000).pipe(take(8))の処理が完了するまで次のメッセージは待ち状態になるため、以下のようになります。

実行結果

0
1
2
3
4
5
6
7
0
1
2
3
4
5
6
7
# (以下略)

図で表現すると以下のようになります。

mergeMapの場合

interval(1000).pipe(take(8))を処理中でも、次のメッセージが到着したら次のメッセージの処理も開始するため、『0, 1, 2, 3…』が同時並行で出力されます。

実行結果

0 # interval(5000)の『0』がトリガーとなって生成された0
1
2
3
4
0 # interval(5000)の『1』がトリガーとなって生成された0
5 # 0 と同じタイミング
1
6 # 1 と同じタイミング
2
7 # 2 と同じタイミング
3
# (以下略)

図で表現すると以下のようになります。

siwitchMapの場合

interval(1000).pipe(take(8))の処理中に次のメッセージを受け取ると処理が中断されるため整数値は3までしか出力されません。
次のメッセージを受け取った時点でinterval(1000).pipe(take(8))の処理が再開されるため『0, 1, 2, 3』のループになります。

実行結果

0
1
2
3
# 1秒あく
0
1
2
3
# (以下略)

図で表現すると以下のようになります。

exhaustMapの場合

interval(1000).pipe(take(8))の処理中に次のメッセージを受け取ると次のメッセージは破棄されます。
7の後に時間が空いているのは次のメッセージが到着するのを待っているためです。

実行結果

0 # interval(5000)の『0』がトリガーとなって生成された0
1
2
3
4
5
6
7
# 2秒あく
0 # interval(5000)の『2』がトリガーとなって生成された0
1
2
3
4
5
6
7
# 2秒あく
0 # interval(5000)の『4』がトリガーとなって生成された0
1
# (以下略)

図で表現すると以下のようになります。

処理時間の異なるメッセージを使った例

intervalの場合はメッセージを受け取る間隔および処理時間が均等でした。
次はメッセージの受信間隔と処理時間が均等でないケースを紹介します。

以下のコードは処理時間の異なる3つのメッセージを異なる間隔で送る例です。

import { Subject } from "rxjs";
import {
  concatMap,
  exhaustMap,
  mergeMap,
  switchMap,
} from "rxjs/operators";

type ObjectType = {
  keyword: string;
  delay: number;
};
const subject: Subject<ObjectType> = new Subject();

subject
  .pipe(
    concatMap(({ keyword, delay }) => // ここの『concatMap』を変える
      new Promise((resolve) => {
        setTimeout(() => { resolve(`output: ${keyword}`)}, delay);
      })
    )
  )
  .subscribe((next) => console.log(next));


// 処理に0.3msかかるメッセージを0.1ms後に送る
setTimeout(() => subject.next({ keyword: "1st request, 2nd finish", delay: 300 }), 100);

// 処理に0.1msかかるメッセージを0.2ms後に送る
setTimeout(() => subject.next({ keyword: "2nd request, 1st finish", delay: 100 }), 200);

// 処理に0.1msかかるメッセージを0.5ms後に送る
setTimeout(() => subject.next({ keyword: "3rd request, 3rd finish", delay: 100 }), 500);

concatMapの場合

メッセージを受け取った順にObservableを合成するため『request』順の出力となります。

output: 1st request, 2nd finish
output: 2nd request, 1st finish
output: 3rd request, 3rd finish

mergeMapの場合

メッセージの処理した順にObservableを合成するため『finish』順の出力となります。

output: 2nd request, 1st finish
output: 1st request, 2nd finish
output: 3rd request, 3rd finish

switchMapの場合

1st request, 2nd finishの処理中に2nd request, 1st finishのメッセージを受け取ったため、1st request, 2nd finishの処理が中断されます。
switchMapではメッセージの処理が被った場合は新しいメッセージのみを処理するため、requestfinishも昇順になっています。

output: 2nd request, 1st finish
output: 3rd request, 3rd finish

exhaustMapの場合

1st request, 2nd finishの処理中に2nd request, 1st finishのメッセージを受け取ったため、2nd request, 1st finishを破棄します。

exhaustMapではメッセージの処理が終わらない間は新しいメッセージを破棄するため、requestfinishも昇順になっています。

output: 1st request, 2nd finish
output: 3rd request, 3rd finish

Operatorの使い分け

concatMap, mergeMap, switchMap, exhaustMapの使い分けについて紹介します。

concatMap

concatMapは到着したメッセージ順に処理をし、メッセージの処理が終わっていなければ次のメッセージは待たされるため、キューイングのような動作になります。

ですのでconcatMapは「非同期処理が完了するまで処理待ちが発生するのはしかたない。とにかく発行された順番通りにメッセージ(イベントやアクションなど)を処理するのが大事」という時に利用します。

mergeMap

mergeMapはメッセージの処理が終わった順にObservableを合成するため、concatMapのようにメッセージの処理待ちが発生することはありません。

ですのでmergeMapは「メッセージ(イベントやアクションなど)に依存関係はないから、とにかく処理が終わった順に結果を返してほしい」というケースで利用します。
具体例としてはTodoアプリケーションのDone操作などが挙げられます。

switchMap

switchMapはメッセージの処理中に次のメッセージがきた場合は次のメッセージを優先します。

ですのでswitchMapは「最新のメッセージさえ処理されていれば十分」というケースで利用します。
具体例としてはインクリメンタルサーチ(入力した値をリアルタイムで検索すること)などが挙げられます。

exhaustMap

exhaustMapswitchMapの逆で、メッセージの処理中に次のメッセージがきた場合は次のメッセージを破棄します。

ですので、exhaustMapは「一度メッセージを処理すればそれで十分」というケースで利用します。
具体例としてはフォームの多重送信の防止などが挙げられます。

まとめ

xxxMapのうちどれを使うか悩んだ場合、まずはシンプルなmergeMapで実装するとよいでしょう。もしメッセージの処理順に依存関係があった場合はconcatMapに書き換えます。
また、最新のメッセージのみさえ処理すれば十分なのであればswitchMapを利用します。多重実行されると問題がある場合はexhaustMapを利用します。

さいごに

Twitter(@nishina555)やってます。フォローしてもらえるとうれしいです!

参考