Java & Android

RxJava - Get Started

그레이트쪼 2017. 2. 6. 23:17

Basics

  • RxJava란 Reactive Programming을 하기위한 라이브러리인 Rx(또는 ReactiveX - Reactive Extensions)의 JVM용 버전
    • 하나의 JAR파일로 이루어져 매우 경량
    • Polyglot (다중 언어) 구현으로 Groovy, Clojure, JRuby, Kotlin, Scala와 같은 JVM 기반 (Java 6이상) 언어를 지원한다. 
  • Reactive 코드의 기본적인 building block은 Observable들과 Subscriber들이다. Observable은 아이템들을 발행 (emit)하고, Subscriber는 이 아이템들을 소비 (consume)한다. 
  • Observable은 데이터베이스 쿼리이고 Subscriber은 그 결과를 가지고 화면에 보여주는 것일 수 있다. Observable이 화면의 클릭이고 Subscriber은 그에 대한 반응이 될 수 있다. Observable은 인터넷에서 읽은 바이트 스트림이고, Subscriber은 이것을 디스크에 저장하는 것일 수 있다.
  • 아무도 subscribe하지 않으면 발행하지 않을 수도 있고 (이를 Cold Observable이라고 한다), 발행할 수도 있다 (이를 Hot Observable이라고 한다).
  • 아래는 Observable의 샘플 코드이다. "Hello"와 "world!" 문자열을 전달한다. 
1
2
3
4
5
6
7
8
9
10
Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("world!");
            subscriber.onCompleted();
        }
    }
);
  • 발행을하면 Observable이 가지고 있는 Observer (Subscriber의 일종)들의 onNext()가 몇 번 불리고 onComplete() 또는 onError()가 뒤이어 불린다. 아래는 Observer의 샘플코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observer<String> myObserver = new Observer<String>() {
    @Override
    public void onCompleted() {
    }
 
    @Override
    public void onError(Throwable e) {
    }
 
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
};
  • 아래는 Observable의 subscribe 메소드에 Observer를 등록하여 실행하는 코드이다. 
1
myObservable.subscribe(myObserver);
  • 좀 더 간결한 코드 예제를 보자. RxJava에는 간결한 코드를 지원하기 위한 방법이 많이 존재한다. 아래의 코드에서는 from() 메소드를 사용하여 특정 배열이나 Iterable들의 원소로부터 Observerable을 생성할 수 있다. 또 Subscriber로는 Observer 대신 call() 메소드 한 개만 가지고 있는 Action1 인터페이스를 전달한다.
1
2
3
4
5
6
7
Observable<String> myObservable = Observable.from(new String[]{"Hello""world!"});
myObservable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
});

  • 위의 코드를 람다를 사용해서 아래와 같은 한 줄로 만들 수 있다. 
1
Observable.from(new String[]{"Hello""world!"}).subscribe(s -> System.out::println(s));
  • Action1 인터페이스를 이용해서 Subscriber의 각 파트를 독립적으로 정의할 수 있다. 예를 들어 아래와 같은 코드가 가능하다. 
1
myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);



Operators

  • 각종 Operator들을 활용하여 연쇄 반응을 유도할 수 있다. (method chaining)
  • map
    • map은 Observable을 또 다른 Observable로 가공, 변환하는 역할을 한다. 예를 들어 발행하는 문자열 뒤에 postfix를 붙여보자. (참고로 just()는 아이템을 하나 발행하는 Observable 생성 메소드이다)
1
2
3
4
5
6
7
8
Observable.just("Hello world!")
          .map(new Func1<StringString>() {
               @Override
               public String call(String s) {
                   return s + " (by map)";
               }
           })
          .subscribe(s -> System.out.println(s));
  • fatMap
    • flatMap 메소드는 2차원 배열과 같이 nested case에 대해서 분리된 Observable을 반환한다. 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
String[][] twoWorlds = <<"Hello""world!"><"goodbye""world!">>;
Observable.from(twoWorlds)
        .flatMap(new Func1<String[], Observable<String>>() {
            @Override
            public Observable<String> call(String[] strings) {
                return Observable.from(strings);
            }
        })
        .map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                return s.length();
            }
        })
        .subscribe(i -> System.out.println(i));
    • flatMap의 Func1은 반환 값이 Observable이다. (라인 5) 2차원 배열의 각 배열에 대해 Observable을 반환하면 flatMap에서는 총 2개의 Observable이 생성되고 (라인 6) 다시 map에서는 4개의 문자열을 처리한다. (라인 12) subscribe에서는 최종적으로 4개의 문자열에대한 길이 값을 출력하게 된다. 
    • Lamda를 적용하면 아래와 같이 된다.
1
2
3
4
5
String[][] twoWorlds = <<"Hello""world!"><"goodbye""world!">>;
Observable.from(twoWorlds)
        .flatMap(strings -> Observable.from(strings))
        .map(s -> s.length())
        .subscribe(i -> System.out.println(i));


  • merge
    • flatMap을 사용하지 않고 merge를 사용할 수도 있다. merge는 복수의 Observable을 합성하는 기능을 가진다. 
1
2
3
4
String[][] twoWorlds = <<"Hello""world!"><"goodbye""world!">>;
Observable.merge(Observable.from(twoWorlds[0]), Observable.from(twoWorlds[1]))
        .map(s -> s.length())
        .subscribe(i -> System.out.println(i));


  • filter
    • filter를 사용하면 결과를 취사선택하는 것이 가능하다. 여기에서는 문자열이 “world!”와 일차하는것을 배제한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
String[][] twoWorlds = <<"Hello""world!"><"goodbye""world!">>;
Observable.from(twoWorlds)
        .flatMap(new Func1<String[], Observable<String>>() {
            @Override
            public Observable<String> call(String[] strings) {
                return Observable.from(strings);
            }
        })
        .filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                return !s.equals("world!");
            }
        })
        .map(s -> s.length())
        .subscribe(i -> System.out.println(i));
    • "Hello"와 "goodbye"에 대해서만 문자열 길이를 출력한다. 즉, 결과는 5와 7을 출력하는 것이다. 
  • take
    • take는 발행되는 아이템들 중 (at most) 특정 개수의 것만 취하는 operator이다. 
    • 지금까지와는 다른 예제를 보자. 아래 코드에서 query() 메소드는 검색 결과와 매칭되는 url들을 반환하는 메소드이다. 
    • 이때 반환 값은 url들의 List<String>. 여기에 url 하나당 웹 사이트 타이틀을 여러 개 반환하는 메소드 getTitle()을 적용한다고 해보자. 역시 flatMap을 사용하여야 할 것이다. 
    • 그런데 getTitle()이 반환하는 값 역시 title들의 List<String>. 각 title에 대해서 null 체크를 하려면 또한번 flatMap을 적용한 뒤, filter를 적용해야 한다. 
    • 마지막으로 take(5)를 적용해서 5개의 아이템들만 취한다. 
1
2
3
4
5
6
query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .subscribe(title -> System.out.println(title));
  • doOnNext
    • doOnNext는 발행되는 아이템에 대해 추가적인 행위를 적용할 때 사용한다. 
    • 로 위의 예제에서 title을 저장하는 행위를 추가해보자. 
1
2
3
4
5
6
7
query("Hello, world!")
    .flatMap(urls -> Observable.from(urls)) 
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .doOnNext(title -> saveTitle(title))
    .subscribe(title -> System.out.println(title));



Error Handling

  • RxJava에서의 error handling 방식 역시 조금 새롭다. 다시 subscriber의 모습을 보자. error 상황이 발생했을 때 불려지는 callback method가 별도로 존재한다. 
1
2
3
4
5
6
7
8
9
10
Observer<String> myObserver = new Observer<String>() {
    @Override
    public void onCompleted() { }
 
    @Override
    public void onError(Throwable e) { }
 
    @Override
    public void onNext(String s) { System.out.println(s); }
};
  • onError()는 예외가 발생하는 어떠한 경우에도 호출된다. 즉, 에러 처리 부분이 한군데 모이고 나머지 코드들은 더 간결해 질 수 있다는 뜻이다. 따라서 중간의 operator들은 예외를 처리할 필요가 없다. 예외를 어떻게 다룰지는 subscriber의 몫이다. 이로써 일반적인 상황에서의 code flow를 이해하기 더 쉬워진다. 


Schedulers

  • 반적으로 right code on right thread가 까다롭다. RxJava에서는 Observer가 어떤 스레드에서 수행되어야 하는지 지정할 수 있다. subscribeOn()은 Observable이 수행될 thread, observeOn()은 Subscriber가 수행될 thread를 지정한다. 
1
2
3
4
myObservableServices.retrieveImage(url)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
  • 위의 코드에서 subscriber 이전의 모든 코드들은 I/O thread에서 실행되고 View를 처리하는 코드는 main thread에서 처리된다. (이 부분은 RxAndroid에서 지원하는 feature이다)
  • 이러한 방식은 thread 지정을 마치 operator처럼 할 수 있다는 것이 큰 장점이다. Android의 AsyncTask 사례를 보더라도 concurrent 프로그래밍을 위해서는 코드들이 산산히 흩어져서 원래의 로직과 flow는 보기 쉽지 않은데, RxJava의 경우에는 본래의 코드들이 그대로 살아 있다. 


Subscription

  • subscribe()의 return값은 Subscription인데 Observable과 Subscriber의 연결고리가 되겠다. 예를 들어, 이것을 통해 unsubscribe를 할 수 있다. 
1
2
3
4
5
6
Subscription subscription = Observable.just("Hello, World!")
    .subscribe(s -> System.out.println(s));
...
subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
// Outputs "Unsubscribed=true" 



RxAndroid

  • RxAndroid는 Android용 RxJava extension이다. 
  • 그중 대표적으로 Android threading system에 특화된 AndroidScheduler를 제공한다. 만약 UI thread에서 어떤 코드를 수행하고 싶다면 아래와 같이 작성하면 된다. 
1
2
3
4
retrofitService.getImage(url)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
    • 아니면 자체 생성된 핸들러를 이용하는 스레드를 만들 수도 있다. 이 경우 HandlerThreadScheduler를 사용하면 된다. 
  • 리고 Android life cycle에 맞게 동작하는 여러 도구들을 제공하는 AndroidObservable이 있다. bindActivity()와 bindFragment()는 AndroidSchedulers.mainThread()에서 동작하는데 각각 Activity와 Fragment의 life cycle 동안 observing을 한다. 
1
2
3
AndroidObservable.bindActivity(this, retrofitService.getImage(url))
    .subscribeOn(Schedulers.io())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
  • Observerable을 BroadcastReceiver처럼 동작하게 해주는 AndroidObservable.fromBroadcast()도 있다. 
1
2
3
IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
    .subscribe(intent -> handleConnectivityChange(intent));
  • ViewObservable은 Views와 연관이 있다. 예를 들어, ViewObservable.clicks()는 View가 클릭될 때마다 이벤트를 받는다. 또 ViewObservable.text()는 TextView의 변경이 있을 때 이벤트를 받는다. 
1
2
ViewObservable.clicks(mCardNameEditText, false)
    .subscribe(view -> handleClick(view));


Retrofit

  • Android용 REST client인 Retrofit RxAndroid는 Android용 RxJava extension이다. 
    • 보통은 아래와 같이 코드를 작성하는데,
1
2
@GET("/user/{id}/photo")
void getUserPhoto(@Path("id"int id, Callback<Photo> cb);
    • RxJava가 있다면 아래와 같이 작성한다. 
1
2
@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id"int id);
  • Retrofit은 Observable이 multiple REST 콜을 결합할 수 있도록 지원한다. 예를 들어 사진을 가져오는 콜이 있고 그 다음 metadata를 가져와야 한다면 아래와 같이 이 둘을 묶을 수 있다. (flatMap이랑 비슷)
1
2
3
4
5
Observable.zip(
    service.getUserPhoto(id),
    service.getPhotoMetadata(id),
    (photo, metadata) -> createPhotoWithData(photo, metadata))
    .subscribe(photoWithData -> showPhoto(photoWithData));


Old, Slow code

  • Retrofit이야 RxJava를 잘 지원한다고 보여지지만 만일 RxJava를 지원하지 않는 일반 라이브러리라면 어떻게 할 것인가? 새롭게 작성하지 않고도 Observables를 제공하는 방법은?
  • Observable.just()와 Observable.from()을 활용하면 기존 코드로부터 Observable을 만들어낼 수 있다. 
1
2
3
4
5
private Object oldMethod() { ... }
 
public Observable<Object> newMethod() {
    return Observable.just(oldMethod());
}
    • 위와 같이 작성하면 성능상 문제는 없을까? 원래 빠른 코드였다면 뭐 크게 문제되지는 않겠지만 가뜩이나 느린 코드였다면 이렇게 변경하는 것이 부담될 수 있다. 이럴때는 수행이 느린 부분을 defer()로 감싸는 방법을 사용하면 된다. 
1
2
3
4
5
private Object slowBlockingMethod() { ... }
 
public Observable<Object> newMethod() {
    return Observable.defer(() -> Observable.just(slowBlockingMethod()));
    • 위와 같이 하면 subscribe를 할 때까지 Observable이 slowBlockingMethod()를 호출하지 않는다. 



References