xxxMap
という名前のOperatorの処理フローは以下の通りです。
- Observableから受け取ったメッセージをもとに新たなObservableを作成
- メッセージごとに作成された複数のObservableを合成し、新たな1つのObservableを作成
たとえば以下のコードでは、mergeMap
を利用して『1 → 2 → 3』というストリームを『1A → 1B → 1C → 2A → 2B → 2C → 3A → 3B → 3C』というストリームに変換しています。
const observable = of(1,2,3); // 『1 → 2 → 3』というストリーム
observable
.pipe(
// mergeMap内で作成された複数のストリーム(『1A → 1B → 1C』, 『2A → 2B → 2C』, 『3A → 3B → 3C』)を合成して新たなストリームを作成
mergeMap(message =>
// 『1』というメッセージを受け取って『1A → 1B → 1C』というストリームを作成
// 『2』というメッセージを受け取って『2A → 2B → 2C』というストリームを作成
// 『3』というメッセージを受け取って『3A → 3B → 3C』というストリームを作成
of(`${message}A`, `${message}B`, `${message}C`)
)
// mergeMapによって作成された新たなストリーム『1A → 1B → 1C → 2A → 2B → 2C → 3A → 3B → 3C』のメッセージを取得
.subscribe(message => console.log(message));
実行結果
1A
1B
1C
2A
2B
2C
3A
3B
3C
xxxMap
で異なる点はOperator内で作成された複数のObservableを合成するときのルールです。
今回はconcatMap
, mergeMap
, switchMap
, exhaustMap
の違いや使い分けについて紹介します。
RxJSに関する基礎知識がある前提で話を進めます。RxJSの基本情報の詳細解説は【RxJS入門】Observable、Observer、subscribe、Operatorの概要・関係性で紹介しています。
目次
Observableの合成ルールの違い
concatMap
, mergeMap
, switchMap
, exhaustMap
における、Observableの合成ルールの違いについて紹介します。
concatMap
受け取ったメッセージ順でObservableを合成します。
mergeMap
処理が終わったメッセージ順でObservableを合成します。
swithMap
処理が終わったメッセージのObservableを合成します。メッセージの処理中に次のメッセージを受け取った場合は処理中のメッセージを破棄し、次のメッセージを処理します。
exhaustMap
処理が終わったメッセージのObservableを合成します。メッセージの処理中に次のメッセージを受け取った場合は次のメッセージを破棄します。
合成されるObservableの具体例
xxxMap
によって合成されるObservableの違いを具体例を利用して比較します。
intervalを使った例
intervalは『指定した時間間隔で連続した整数値を出現させるObservable』を作成するOperatorです。
たとえば、interval(5000)
であれば5秒おきに、interval(1000)
であれば1秒おきに『0, 1, 2, 3…』という数字が生成されます。
以下のコードを例にとり、それぞれのOperatorによる結果の違いを紹介します。
import { interval } from "rxjs";
import {
concatMap,
exhaustMap,
mergeMap,
switchMap,
take
} from "rxjs/operators";
const observable = interval(5000);
observable.pipe(
concatMap(() => interval(1000) // ここの『concatMap』を変える
.pipe(
take(8) // 先頭から8件取得する
)
)
)
.subscribe(value => console.log(value));
concatMapの場合
interval(1000).pipe(take(8))
の処理が完了するまで次のメッセージは待ち状態になるため、以下のようになります。
実行結果
0
1
2
3
4
5
6
7
0
1
2
3
4
5
6
7
# (以下略)
図で表現すると以下のようになります。
mergeMapの場合
interval(1000).pipe(take(8))
を処理中でも、次のメッセージが到着したら次のメッセージの処理も開始するため、『0, 1, 2, 3…』が同時並行で出力されます。
実行結果
0 # interval(5000)の『0』がトリガーとなって生成された0
1
2
3
4
0 # interval(5000)の『1』がトリガーとなって生成された0
5 # 0 と同じタイミング
1
6 # 1 と同じタイミング
2
7 # 2 と同じタイミング
3
# (以下略)
図で表現すると以下のようになります。
siwitchMapの場合
interval(1000).pipe(take(8))
の処理中に次のメッセージを受け取ると処理が中断されるため整数値は3までしか出力されません。
次のメッセージを受け取った時点でinterval(1000).pipe(take(8))
の処理が再開されるため『0, 1, 2, 3』のループになります。
実行結果
0
1
2
3
# 1秒あく
0
1
2
3
# (以下略)
図で表現すると以下のようになります。
exhaustMapの場合
interval(1000).pipe(take(8))
の処理中に次のメッセージを受け取ると次のメッセージは破棄されます。
7の後に時間が空いているのは次のメッセージが到着するのを待っているためです。
実行結果
0 # interval(5000)の『0』がトリガーとなって生成された0
1
2
3
4
5
6
7
# 2秒あく
0 # interval(5000)の『2』がトリガーとなって生成された0
1
2
3
4
5
6
7
# 2秒あく
0 # interval(5000)の『4』がトリガーとなって生成された0
1
# (以下略)
図で表現すると以下のようになります。
処理時間の異なるメッセージを使った例
interval
の場合はメッセージを受け取る間隔および処理時間が均等でした。
次はメッセージの受信間隔と処理時間が均等でないケースを紹介します。
以下のコードは処理時間の異なる3つのメッセージを異なる間隔で送る例です。
import { Subject } from "rxjs";
import {
concatMap,
exhaustMap,
mergeMap,
switchMap,
} from "rxjs/operators";
type ObjectType = {
keyword: string;
delay: number;
};
const subject: Subject<ObjectType> = new Subject();
subject
.pipe(
concatMap(({ keyword, delay }) => // ここの『concatMap』を変える
new Promise((resolve) => {
setTimeout(() => { resolve(`output: ${keyword}`)}, delay);
})
)
)
.subscribe((next) => console.log(next));
// 処理に0.3msかかるメッセージを0.1ms後に送る
setTimeout(() => subject.next({ keyword: "1st request, 2nd finish", delay: 300 }), 100);
// 処理に0.1msかかるメッセージを0.2ms後に送る
setTimeout(() => subject.next({ keyword: "2nd request, 1st finish", delay: 100 }), 200);
// 処理に0.1msかかるメッセージを0.5ms後に送る
setTimeout(() => subject.next({ keyword: "3rd request, 3rd finish", delay: 100 }), 500);
concatMapの場合
メッセージを受け取った順にObservableを合成するため『request』順の出力となります。
output: 1st request, 2nd finish
output: 2nd request, 1st finish
output: 3rd request, 3rd finish
mergeMapの場合
メッセージの処理した順にObservableを合成するため『finish』順の出力となります。
output: 2nd request, 1st finish
output: 1st request, 2nd finish
output: 3rd request, 3rd finish
switchMapの場合
1st request, 2nd finish
の処理中に2nd request, 1st finish
のメッセージを受け取ったため、1st request, 2nd finish
の処理が中断されます。
switchMap
ではメッセージの処理が被った場合は新しいメッセージのみを処理するため、request
もfinish
も昇順になっています。
output: 2nd request, 1st finish
output: 3rd request, 3rd finish
exhaustMapの場合
1st request, 2nd finish
の処理中に2nd request, 1st finish
のメッセージを受け取ったため、2nd request, 1st finish
を破棄します。
exhaustMap
ではメッセージの処理が終わらない間は新しいメッセージを破棄するため、request
もfinish
も昇順になっています。
output: 1st request, 2nd finish
output: 3rd request, 3rd finish
Operatorの使い分け
concatMap
, mergeMap
, switchMap
, exhaustMap
の使い分けについて紹介します。
concatMap
concatMap
は到着したメッセージ順に処理をし、メッセージの処理が終わっていなければ次のメッセージは待たされるため、キューイングのような動作になります。
ですのでconcatMap
は「非同期処理が完了するまで処理待ちが発生するのはしかたない。とにかく発行された順番通りにメッセージ(イベントやアクションなど)を処理するのが大事」という時に利用します。
mergeMap
mergeMap
はメッセージの処理が終わった順にObservableを合成するため、concatMap
のようにメッセージの処理待ちが発生することはありません。
ですのでmergeMap
は「メッセージ(イベントやアクションなど)に依存関係はないから、とにかく処理が終わった順に結果を返してほしい」というケースで利用します。
具体例としてはTodoアプリケーションのDone操作などが挙げられます。
switchMap
switchMap
はメッセージの処理中に次のメッセージがきた場合は次のメッセージを優先します。
ですのでswitchMap
は「最新のメッセージさえ処理されていれば十分」というケースで利用します。
具体例としてはインクリメンタルサーチ(入力した値をリアルタイムで検索すること)などが挙げられます。
exhaustMap
exhaustMap
はswitchMap
の逆で、メッセージの処理中に次のメッセージがきた場合は次のメッセージを破棄します。
ですので、exhaustMap
は「一度メッセージを処理すればそれで十分」というケースで利用します。
具体例としてはフォームの多重送信の防止などが挙げられます。
まとめ
xxxMap
のうちどれを使うか悩んだ場合、まずはシンプルなmergeMap
で実装するとよいでしょう。もしメッセージの処理順に依存関係があった場合はconcatMap
に書き換えます。
また、最新のメッセージのみさえ処理すれば十分なのであればswitchMap
を利用します。多重実行されると問題がある場合はexhaustMap
を利用します。
さいごに
Twitter(@nishina555)やってます。フォローしてもらえるとうれしいです!