目次
リアクティブプログラミングについて
リアクティブプログラミングとは時間経過によって変化するデータを観測し、変更が生じた際にあらかじめ宣言した操作するというプログラミングのパラダイムです。
リアクティブプログラミングでは時間の経過によって変化するデータのことをストリームと呼びます。また、ストリームに存在する値のことをメッセージと呼びます。
RxJSについて
RxとはReactive Extensionsの略で、リアクティブプログラミング用のライブラリです。
RxではObservableと呼ばれるストリームを用いて非同期処理を記述します。Rxを利用することでリアクティブプログラミングを簡潔かつ可読性高く記述できます。
Rxの始まりはMicrosoftが開発した、C#向けのライブラリ『Rx.NET』です。現在ではさまざまな言語へ移植が進んでいます。RxJSはJavaScript向けに開発されたライブラリです。
RxJSで登場する概念について
RxJSを構成する要素のうち、今回は以下の項目について紹介します。
- Observable
- Observer
- subscribe
- Operator
Observableについて
ObaservableはRxJSにおけるストリームのクラスです。
RxJSではObservableオブジェクトを操作することでリアクティブプログラミングを実現します。
Observerについて
Observerは受け取ったObaservableのデータを処理するためのクラスです。
Observerはnext
, error
, complete
という3つのメソッドを持っています。
next
はObservableからデータを受け取った際に実行されるメソッドです。
error
およびcomplete
はObaservableの最後のデータを受け取ったタイミングで一度だけ実行されるメソッドです。1
subscribeについて
subscribeはObservableのメソッドです。Observableのデータはsubscribeを利用することで取得できます。subscribeはObserverを引数にとります。
subscribeとObserverを利用してObservableのデータを取得する例は以下の通りです。
今回の場合、1 → 2 → 3
という数値の並びがObservable(ストリーム)、各数値(1
, 2
, 3
)がメッセージとなります。
example.ts
import { of } from 'rxjs';
// of: 可変長引数をストリームに変換するメソッド
const observable = of(1, 2, 3);
type Observer = {
next: (value: number) => void;
error: (error: any) => void;
complete: () => void;
}
const observer: Observer = {
next: value => console.log('value: ' + value),
error: error => console.log('error: ' + error),
complete: () => console.log('completed'),
}
observable.subscribe(observer);
実行結果
value: 1
value: 2
value: 3
completed
なお、subscribeの引数にはObserverオブジェクトの代わりに直接メソッドを渡すことも可能です。
直接メソッドを渡す場合、第1引数がObserverのnext
、第2引数がObserverのerror
、第3引数がObserverのcomplete
として扱われます。
つまり、先ほどの例は以下のように表現できます。
import { of } from "rxjs";
const observable = of(1, 2, 3);
// Observaerオブジェクトのかわりに直接メソッドを指定する記述方法
observable.subscribe(
(value) => console.log("value: " + value),
(error) => console.log("error: " + error),
() => console.log("completed"),
);
RxJS 6.4以降からのsubjectの引数の記述方法について
RxJS 6.4からはerrorやcompleteを定義する場合、直接メソッドを引数にする方法は非推奨となっています。2RxJS 6.4における推奨・非推奨の記述方法の比較は以下の通りです。
import { of } from "rxjs";
const observable = of(1, 2, 3);
// 非推奨
observable.subscribe(
(value) => console.log("value: " + value),
(error) => console.log("error: " + error),
() => console.log("completed"),
);
// 推奨
observable.subscribe({
next: (value) => console.log("value: " + value),
error: (error) => console.log("error: " + error),
complete: () => console.log("completed"),
});
complete
のみを定義する場合は以下のように記述します。
import { of } from "rxjs";
const observable = of(1, 2, 3);
// 非推奨
observable.subscribe(null, null, () => console.log("completed"));
// 推奨
observable.subscribe({
complete: () => console.log("completed"),
});
ただし、RxJS 6.4以降でもnextのみを定義する場合は直接メソッドを引数にしても問題ありません。
import { of } from "rxjs";
const observable = of(1, 2, 3);
// nextのみ定義する場合は直接メソッドを引数にしてOK
observable.subscribe((value) => console.log("value: " + value));
Operatorについて
OperatorはObservableを操作するメソッドです。Observableを受け取り、新たなObservableを返します。
OperatorはObservableのメソッドであるpipe
の中で呼び出す必要があります。
Operatorにはさまざまなメソッドがありますが、以下ではfilter
を利用した例について紹介します。
example.ts
import { of } from "rxjs";
import { filter } from "rxjs/operators";
const observable = of(1, 2, 3);
observable
.pipe(
// 偶数のメッセージのみ取り出す
filter((v) => v % 2 === 0)
)
.subscribe({
next: (value) => console.log("value: " + value),
error: (error) => console.log("error: " + error),
complete: () => console.log("completed"),
});
実行結果
value: 2
completed
Operatorはメソッドチェーンのようにメソッドを複数組み合わせられます。以下はOperatorであるfilter
とmap
を組み合わせた例です。
example.ts
import { filter, map } from "rxjs/operators";
const observable = of(1, 2, 3);
observable
.pipe(
filter((v) => v % 2 === 0),
map((value) => value * 2)
)
.subscribe({
next: (value) => console.log("value: " + value),
error: (error) => console.log("error: " + error),
complete: () => console.log("completed"),
});
実行結果
value: 4
completed
RxJSを利用した非同期API通信の具体例
以下のようなTodoリストをRxJSを利用して取得する例について紹介します。
// curl http://localhost:4000/todos
[
{
"id": 1,
"content": "do something",
"completed": true
},
{
"id": 2,
"content": "go somewhere",
"completed": false
}
]
RxJS 6.0以前はfromPromise
、RxJS 6.0以降はfrom
を利用することでPromiseオブジェクトからObservableを作成できます。3 4
PromiseオブジェクトからObservableを生成し、subscribeによって結果を取得する例は以下の通りです。
import { from } from "rxjs";
const fulfilledObservable = from(Promise.resolve(10));
fulfilledObservable.subscribe({
next: (value) => console.log(value),
error: (message) => console.log("error " + message),
});
// 10
const rejectedObservable = from(Promise.reject('bad request'));
rejectedObservable.subscribe({
next: (value) => console.log(value),
error: (message) => console.log("error: " + message),
});
// error: bad request
たとえばHTTPクライアントにaxios
を利用した場合、axios
の戻り値はPromiseオブジェクトであるためRxJSを利用した非同期API通信の処理は以下のようになります。
example.ts
import axios from "axios";
import { from } from "rxjs";
type Todo = {
id: number;
content: string;
completed: boolean;
};
const apiResponse = axios.get("http://localhost:4000/todos")
const observable = from(apiResponse);
observable.subscribe({
next: (response) => {
const todos: Todo[] = response.data;
todos.map(({ id, content, completed }) =>
console.log(
"id: " + id + ", content: " + content + ", completed: " + completed
)
);
},
error: (error) => console.log("error: " + error.message),
});
実行結果
# 成功の場合
id: 1, content: do something, completed: true
id: 2, content: go somewhere, completed: false
# 失敗の場合(ネットワークエラーが原因の場合)
error: Network Error
なお、Promiseオブジェクトの詳細解説はJavaScriptの非同期処理で理解必須!Promiseの概要・挙動まとめで紹介しています。
RxJSのまとめ
- RxJSはJS版リアクティブプログラミング用ライブラリ
- Observableはストリームを表すクラス
- subcribeはObservableのデータを取得するためのメソッド
- ObserverはObservableのデータを処理するためのクラス
- Observerはnext, error, completeのメソッドを持つ
- subscribeはObserverを引数にとる
- OperatorはObservableを受け取り、新たなObservableを返すメソッド
さいごに
Twitter(@nishina555)やってます。フォローしてもらえるとうれしいです!