【RxJS入門】Observable、Observer、subscribe、Operatorの概要・関係性

JavaScript

リアクティブプログラミングについて

リアクティブプログラミングとは時間経過によって変化するデータを観測し、変更が生じた際にあらかじめ宣言した操作するというプログラミングのパラダイムです。

リアクティブプログラミングでは時間の経過によって変化するデータのことをストリームと呼びます。また、ストリームに存在する値のことをメッセージと呼びます。

RxJSについて

RxとはReactive Extensionsの略で、リアクティブプログラミング用のライブラリです。
RxではObservableと呼ばれるストリームを用いて非同期処理を記述します。Rxを利用することでリアクティブプログラミングを簡潔かつ可読性高く記述できます。

Rxの始まりはMicrosoftが開発した、C#向けのライブラリ『Rx.NET』です。現在ではさまざまな言語へ移植が進んでいます。RxJSはJavaScript向けに開発されたライブラリです。

RxJSで登場する概念について

RxJSを構成する要素のうち、今回は以下の項目について紹介します。

  • Observable
  • Observer
  • subscribe
  • Operator

Observableについて

ObaservableはRxJSにおけるストリームのクラスです。
RxJSではObservableオブジェクトを操作することでリアクティブプログラミングを実現します。

Observerについて

Observerは受け取ったObaservableのデータを処理するためのクラスです。
Observerはnext, error, completeという3つのメソッドを持っています。

nextはObservableからデータを受け取った際に実行されるメソッドです。
errorおよびcompleteはObaservableの最後のデータを受け取ったタイミングで一度だけ実行されるメソッドです。1

subscribeについて

subscribeはObservableのメソッドです。Observableのデータはsubscribeを利用することで取得できます。subscribeはObserverを引数にとります。

subscribeとObserverを利用してObservableのデータを取得する例は以下の通りです。
今回の場合、1 → 2 → 3という数値の並びがObservable(ストリーム)、各数値(1, 2, 3)がメッセージとなります。

example.ts

import { of } from 'rxjs';

// of: 可変長引数をストリームに変換するメソッド
const observable = of(1, 2, 3);

type Observer = {
  next: (value: number) => void;
  error: (error: any) => void;
  complete: () => void;
}


const observer: Observer = {
  next: value => console.log('value: ' + value),
  error: error => console.log('error: ' + error),
  complete: () => console.log('completed'),
}

observable.subscribe(observer);

実行結果

value: 1
value: 2
value: 3
completed

なお、subscribeの引数にはObserverオブジェクトの代わりに直接メソッドを渡すことも可能です。
直接メソッドを渡す場合、第1引数がObserverのnext、第2引数がObserverのerror、第3引数がObserverのcompleteとして扱われます。

つまり、先ほどの例は以下のように表現できます。

import { of } from "rxjs";

const observable = of(1, 2, 3);

// Observaerオブジェクトのかわりに直接メソッドを指定する記述方法
observable.subscribe(
  (value) => console.log("value: " + value),
  (error) => console.log("error: " + error),
  () => console.log("completed"),
);

RxJS 6.4以降からのsubjectの引数の記述方法について

RxJS 6.4からはerrorやcompleteを定義する場合、直接メソッドを引数にする方法は非推奨となっています。2

RxJS 6.4における推奨・非推奨の記述方法の比較は以下の通りです。

import { of } from "rxjs";

const observable = of(1, 2, 3);

// 非推奨
observable.subscribe(
  (value) => console.log("value: " + value),
  (error) => console.log("error: " + error),
  () => console.log("completed"),
);

// 推奨
observable.subscribe({
  next: (value) => console.log("value: " + value),
  error: (error) => console.log("error: " + error),
  complete: () => console.log("completed"),
});

completeのみを定義する場合は以下のように記述します。

import { of } from "rxjs";

const observable = of(1, 2, 3);

// 非推奨
observable.subscribe(null, null, () => console.log("completed"));

// 推奨
observable.subscribe({
  complete: () => console.log("completed"),
});

ただし、RxJS 6.4以降でもnextのみを定義する場合は直接メソッドを引数にしても問題ありません。

import { of } from "rxjs";

const observable = of(1, 2, 3);

// nextのみ定義する場合は直接メソッドを引数にしてOK
observable.subscribe((value) => console.log("value: " + value));

Operatorについて

OperatorはObservableを操作するメソッドです。Observableを受け取り、新たなObservableを返します。
OperatorはObservableのメソッドであるpipeの中で呼び出す必要があります。

Operatorにはさまざまなメソッドがありますが、以下ではfilterを利用した例について紹介します。

example.ts

import { of } from "rxjs";
import { filter } from "rxjs/operators";

const observable = of(1, 2, 3);

observable
  .pipe(
    // 偶数のメッセージのみ取り出す
    filter((v) => v % 2 === 0)
  )
  .subscribe({
  next: (value) => console.log("value: " + value),
  error: (error) => console.log("error: " + error),
  complete: () => console.log("completed"),
});

実行結果

value: 2
completed

Operatorはメソッドチェーンのようにメソッドを複数組み合わせられます。以下はOperatorであるfiltermapを組み合わせた例です。

example.ts

import { filter, map } from "rxjs/operators";

const observable = of(1, 2, 3);

observable
  .pipe(
    filter((v) => v % 2 === 0),
    map((value) => value * 2)
  )
  .subscribe({
    next: (value) => console.log("value: " + value),
    error: (error) => console.log("error: " + error),
    complete: () => console.log("completed"),
  });

実行結果

value: 4
completed

RxJSを利用した非同期API通信の具体例

以下のようなTodoリストをRxJSを利用して取得する例について紹介します。

// curl http://localhost:4000/todos

[
  {
    "id": 1,
    "content": "do something",
    "completed": true
  },
  {
    "id": 2,
    "content": "go somewhere",
    "completed": false
  }
]

RxJS 6.0以前はfromPromise、RxJS 6.0以降はfromを利用することでPromiseオブジェクトからObservableを作成できます。3 4

PromiseオブジェクトからObservableを生成し、subscribeによって結果を取得する例は以下の通りです。

import { from } from "rxjs";

const fulfilledObservable = from(Promise.resolve(10));

fulfilledObservable.subscribe({
  next: (value) => console.log(value),
  error: (message) => console.log("error " + message),
});
// 10

const rejectedObservable = from(Promise.reject('bad request'));

rejectedObservable.subscribe({
  next: (value) => console.log(value),
  error: (message) => console.log("error: " + message),
});
// error: bad request

たとえばHTTPクライアントにaxiosを利用した場合、axiosの戻り値はPromiseオブジェクトであるためRxJSを利用した非同期API通信の処理は以下のようになります。

example.ts

import axios from "axios";
import { from } from "rxjs";

type Todo = {
  id: number;
  content: string;
  completed: boolean;
};

const apiResponse = axios.get("http://localhost:4000/todos")

const observable = from(apiResponse);

observable.subscribe({
  next: (response) => {
    const todos: Todo[] = response.data;
    todos.map(({ id, content, completed }) =>
      console.log(
        "id: " + id + ", content: " + content + ", completed: " + completed
      )
    );
  },
  error: (error) => console.log("error: " + error.message),
});

実行結果

# 成功の場合
id: 1, content: do something, completed: true
id: 2, content: go somewhere, completed: false

# 失敗の場合(ネットワークエラーが原因の場合)
error: Network Error

なお、Promiseオブジェクトの詳細解説はJavaScriptの非同期処理で理解必須!Promiseの概要・挙動まとめで紹介しています。

RxJSのまとめ

RxJSのまとめ
  • RxJSはJS版リアクティブプログラミング用ライブラリ
  • Observableはストリームを表すクラス
  • subcribeはObservableのデータを取得するためのメソッド
  • ObserverはObservableのデータを処理するためのクラス
  • Observerはnext, error, completeのメソッドを持つ
  • subscribeはObserverを引数にとる
  • OperatorはObservableを受け取り、新たなObservableを返すメソッド

さいごに

Twitter(@nishina555)やってます。フォローしてもらえるとうれしいです!

参考