RxAndroidで勘違いしていたこと

Observable.subscribe()したら必ずsubscribe()の戻り値であるSubscriptionのunsubscribe()を呼ばなければいけない

そう思っていた時期が僕にもありました。
Subscriptionのleakが起きるのはonCompletedが呼ばれないObservableをsubscribe()したままにしてしまう場合です。以下のコード例はonCompletedが呼ばれてunsubscribeされるので明示的に呼び出す必要はありません。
ただし、Activityのライフサイクルより処理が長くなる場合は不要な処理となるのでCompositeSubscription等を使ってonPause()で明示的にunsubscribe()してあげましょう。

Observable.just(...).subscribe(...);
Observable.from(...).subscribe(...);
Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        subscriber.onNext(new Object());
        subscriber.onCompleted();
    }
}).subscribe(...);
Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        subscriber.onError(new IllegalStateException());
    }
}).subscribe(...);

参考

Top 7 Tips for RxJava on Android

RxAndroidでよくやる間違い(実行スレッド編その1)

observeOn(…)とsubscribeOn(…)正しく使えてますか

以下の2つのコードはobserveOn(…)とsubscribOn(…)の位置が異なります。
表示の問題上、ブロック内での実装は省略しています。
どのポイントがどのスレッドで動くか確認します。

コードその1

Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        // ポイント1
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<Object, Object>() {
    @Override
    public Object call(Object o) {
        // ポイント2
        return o;
    }
})
.subscribe(new Observer<Object>() {
    @Override
    public void onCompleted() {
        // ポイント3
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(Object o) {
    }
});

コードその2

Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {
        // ポイント1
    }
})
.map(new Func1<Object, Object>() {
    @Override
    public Object call(Object o) {
        // ポイント2
        return o;
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
    @Override
    public void onCompleted() {
        // ポイント3
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(Object o) {
    }
});

それぞれのポイントへ以下のようにログを仕込みます。

Log.d("Thread", String.format("%s threadId:%d","OnSubscribe", Thread.currentThread().getId()));
Log.d("Thread", String.format("%s threadId:%d","map", Thread.currentThread().getId()));
Log.d("Thread", String.format("%s threadId:%d","onCompleted", Thread.currentThread().getId()));

コードその1の場合はmapがUIスレッドで実行されているのに対し

D/Thread﹕ OnSubscribe threadId:199
D/Thread﹕ map threadId:1
D/Thread﹕ onCompleted threadId:1

コードその2の場合はmapがonSubscribe(…)のスレッドで実行されています。

D/Thread﹕ OnSubscribe threadId:201
D/Thread﹕ map threadId:201
D/Thread﹕ onCompleted threadId:1

mapで重い処理を走らせてANRを発生させないためにも、BuildConfig.DEBUGがtrueの時はエラーを投げるよう工夫しましょう。(戒め