Java & Android
RxJava - Get Started
그레이트쪼
2017. 2. 6. 23:17
Basics
- RxJava란 Reactive Programming을 하기위한 라이브러리인 Rx(또는 ReactiveX - Reactive Extensions)의 JVM용 버전
- 하나의 JAR파일로 이루어져 매우 경량
- Polyglot (다중 언어) 구현으로 Groovy, Clojure, JRuby, Kotlin, Scala와 같은 JVM 기반 (Java 6이상) 언어를 지원한다.
- Reactive 코드의 기본적인 building block은 Observable들과 Subscriber들이다. Observable은 아이템들을 발행 (emit)하고, Subscriber는 이 아이템들을 소비 (consume)한다.
- Observable은 데이터베이스 쿼리이고 Subscriber은 그 결과를 가지고 화면에 보여주는 것일 수 있다. Observable이 화면의 클릭이고 Subscriber은 그에 대한 반응이 될 수 있다. Observable은 인터넷에서 읽은 바이트 스트림이고, Subscriber은 이것을 디스크에 저장하는 것일 수 있다.
- 아무도 subscribe하지 않으면 발행하지 않을 수도 있고 (이를 Cold Observable이라고 한다), 발행할 수도 있다 (이를 Hot Observable이라고 한다).
- 아래는 Observable의 샘플 코드이다. "Hello"와 "world!" 문자열을 전달한다.
1 2 3 4 5 6 7 8 9 10 | Observable<String> myObservable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("world!"); subscriber.onCompleted(); } } ); |
- 발행을하면 Observable이 가지고 있는 Observer (Subscriber의 일종)들의 onNext()가 몇 번 불리고 onComplete() 또는 onError()가 뒤이어 불린다. 아래는 Observer의 샘플코드
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | Observer<String> myObserver = new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println(s); } }; |
- 아래는 Observable의 subscribe 메소드에 Observer를 등록하여 실행하는 코드이다.
1 | myObservable.subscribe(myObserver); |
- 좀 더 간결한 코드 예제를 보자. RxJava에는 간결한 코드를 지원하기 위한 방법이 많이 존재한다. 아래의 코드에서는 from() 메소드를 사용하여 특정 배열이나 Iterable들의 원소로부터 Observerable을 생성할 수 있다. 또 Subscriber로는 Observer 대신 call() 메소드 한 개만 가지고 있는 Action1 인터페이스를 전달한다.
1 2 3 4 5 6 7 | Observable<String> myObservable = Observable.from(new String[]{"Hello", "world!"}); myObservable.subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); |
- 위의 코드를 람다를 사용해서 아래와 같은 한 줄로 만들 수 있다.
1 | Observable.from(new String[]{"Hello", "world!"}).subscribe(s -> System.out::println(s)); |
- Action1 인터페이스를 이용해서 Subscriber의 각 파트를 독립적으로 정의할 수 있다. 예를 들어 아래와 같은 코드가 가능하다.
1 | myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction); |
Operators
- 각종 Operator들을 활용하여 연쇄 반응을 유도할 수 있다. (method chaining)
- map
- map은 Observable을 또 다른 Observable로 가공, 변환하는 역할을 한다. 예를 들어 발행하는 문자열 뒤에 postfix를 붙여보자. (참고로 just()는 아이템을 하나 발행하는 Observable 생성 메소드이다)
1 2 3 4 5 6 7 8 | Observable.just("Hello world!") .map(new Func1<String, String>() { @Override public String call(String s) { return s + " (by map)"; } }) .subscribe(s -> System.out.println(s)); |
- fatMap
- flatMap 메소드는 2차원 배열과 같이 nested case에 대해서 분리된 Observable을 반환한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | String[][] twoWorlds = <<"Hello", "world!">, <"goodbye", "world!">>; Observable.from(twoWorlds) .flatMap(new Func1<String[], Observable<String>>() { @Override public Observable<String> call(String[] strings) { return Observable.from(strings); } }) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return s.length(); } }) .subscribe(i -> System.out.println(i)); |
- flatMap의 Func1은 반환 값이 Observable이다. (라인 5) 2차원 배열의 각 배열에 대해 Observable을 반환하면 flatMap에서는 총 2개의 Observable이 생성되고 (라인 6) 다시 map에서는 4개의 문자열을 처리한다. (라인 12) subscribe에서는 최종적으로 4개의 문자열에대한 길이 값을 출력하게 된다.
- Lamda를 적용하면 아래와 같이 된다.
1 2 3 4 5 | String[][] twoWorlds = <<"Hello", "world!">, <"goodbye", "world!">>; Observable.from(twoWorlds) .flatMap(strings -> Observable.from(strings)) .map(s -> s.length()) .subscribe(i -> System.out.println(i)); |
- merge
- flatMap을 사용하지 않고 merge를 사용할 수도 있다. merge는 복수의 Observable을 합성하는 기능을 가진다.
1 2 3 4 | String[][] twoWorlds = <<"Hello", "world!">, <"goodbye", "world!">>; Observable.merge(Observable.from(twoWorlds[0]), Observable.from(twoWorlds[1])) .map(s -> s.length()) .subscribe(i -> System.out.println(i)); |
- filter
- filter를 사용하면 결과를 취사선택하는 것이 가능하다. 여기에서는 문자열이 “world!”와 일차하는것을 배제한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | String[][] twoWorlds = <<"Hello", "world!">, <"goodbye", "world!">>; Observable.from(twoWorlds) .flatMap(new Func1<String[], Observable<String>>() { @Override public Observable<String> call(String[] strings) { return Observable.from(strings); } }) .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return !s.equals("world!"); } }) .map(s -> s.length()) .subscribe(i -> System.out.println(i)); |
- "Hello"와 "goodbye"에 대해서만 문자열 길이를 출력한다. 즉, 결과는 5와 7을 출력하는 것이다.
- take
- take는 발행되는 아이템들 중 (at most) 특정 개수의 것만 취하는 operator이다.
- 지금까지와는 다른 예제를 보자. 아래 코드에서 query() 메소드는 검색 결과와 매칭되는 url들을 반환하는 메소드이다.
- 이때 반환 값은 url들의 List<String>. 여기에 url 하나당 웹 사이트 타이틀을 여러 개 반환하는 메소드 getTitle()을 적용한다고 해보자. 역시 flatMap을 사용하여야 할 것이다.
- 그런데 getTitle()이 반환하는 값 역시 title들의 List<String>. 각 title에 대해서 null 체크를 하려면 또한번 flatMap을 적용한 뒤, filter를 적용해야 한다.
- 마지막으로 take(5)를 적용해서 5개의 아이템들만 취한다.
1 2 3 4 5 6 | query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .filter(title -> title != null) .take(5) .subscribe(title -> System.out.println(title)); |
- doOnNext
- doOnNext는 발행되는 아이템에 대해 추가적인 행위를 적용할 때 사용한다.
- 바로 위의 예제에서 title을 저장하는 행위를 추가해보자.
1 2 3 4 5 6 7 | query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .filter(title -> title != null) .take(5) .doOnNext(title -> saveTitle(title)) .subscribe(title -> System.out.println(title)); |
Error Handling
- RxJava에서의 error handling 방식 역시 조금 새롭다. 다시 subscriber의 모습을 보자. error 상황이 발생했을 때 불려지는 callback method가 별도로 존재한다.
1 2 3 4 5 6 7 8 9 10 | Observer<String> myObserver = new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println(s); } }; |
- onError()는 예외가 발생하는 어떠한 경우에도 호출된다. 즉, 에러 처리 부분이 한군데 모이고 나머지 코드들은 더 간결해 질 수 있다는 뜻이다. 따라서 중간의 operator들은 예외를 처리할 필요가 없다. 예외를 어떻게 다룰지는 subscriber의 몫이다. 이로써 일반적인 상황에서의 code flow를 이해하기 더 쉬워진다.
Schedulers
- 일반적으로 right code on right thread가 까다롭다. RxJava에서는 Observer가 어떤 스레드에서 수행되어야 하는지 지정할 수 있다. subscribeOn()은 Observable이 수행될 thread, observeOn()은 Subscriber가 수행될 thread를 지정한다.
1 2 3 4 | myObservableServices.retrieveImage(url) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(bitmap -> myImageView.setImageBitmap(bitmap)); |
- 위의 코드에서 subscriber 이전의 모든 코드들은 I/O thread에서 실행되고 View를 처리하는 코드는 main thread에서 처리된다. (이 부분은 RxAndroid에서 지원하는 feature이다)
- 이러한 방식은 thread 지정을 마치 operator처럼 할 수 있다는 것이 큰 장점이다. Android의 AsyncTask 사례를 보더라도 concurrent 프로그래밍을 위해서는 코드들이 산산히 흩어져서 원래의 로직과 flow는 보기 쉽지 않은데, RxJava의 경우에는 본래의 코드들이 그대로 살아 있다.
Subscription
- subscribe()의 return값은 Subscription인데 Observable과 Subscriber의 연결고리가 되겠다. 예를 들어, 이것을 통해 unsubscribe를 할 수 있다.
1 2 3 4 5 6 | Subscription subscription = Observable.just("Hello, World!") .subscribe(s -> System.out.println(s)); ... subscription.unsubscribe(); System.out.println("Unsubscribed=" + subscription.isUnsubscribed()); // Outputs "Unsubscribed=true" |
RxAndroid
- RxAndroid는 Android용 RxJava extension이다.
- 그중 대표적으로 Android threading system에 특화된 AndroidScheduler를 제공한다. 만약 UI thread에서 어떤 코드를 수행하고 싶다면 아래와 같이 작성하면 된다.
1 2 3 4 | retrofitService.getImage(url) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(bitmap -> myImageView.setImageBitmap(bitmap)); |
- 아니면 자체 생성된 핸들러를 이용하는 스레드를 만들 수도 있다. 이 경우 HandlerThreadScheduler를 사용하면 된다.
- 그리고 Android life cycle에 맞게 동작하는 여러 도구들을 제공하는 AndroidObservable이 있다. bindActivity()와 bindFragment()는 AndroidSchedulers.mainThread()에서 동작하는데 각각 Activity와 Fragment의 life cycle 동안 observing을 한다.
1 2 3 | AndroidObservable.bindActivity(this, retrofitService.getImage(url)) .subscribeOn(Schedulers.io()) .subscribe(bitmap -> myImageView.setImageBitmap(bitmap)); |
- Observerable을 BroadcastReceiver처럼 동작하게 해주는 AndroidObservable.fromBroadcast()도 있다.
1 2 3 | IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION); AndroidObservable.fromBroadcast(context, filter) .subscribe(intent -> handleConnectivityChange(intent)); |
- ViewObservable은 Views와 연관이 있다. 예를 들어, ViewObservable.clicks()는 View가 클릭될 때마다 이벤트를 받는다. 또 ViewObservable.text()는 TextView의 변경이 있을 때 이벤트를 받는다.
1 2 | ViewObservable.clicks(mCardNameEditText, false) .subscribe(view -> handleClick(view)); |
Retrofit
- Android용 REST client인 Retrofit RxAndroid는 Android용 RxJava extension이다.
- 보통은 아래와 같이 코드를 작성하는데,
1 2 | @GET("/user/{id}/photo") void getUserPhoto(@Path("id") int id, Callback<Photo> cb); |
- RxJava가 있다면 아래와 같이 작성한다.
1 2 | @GET("/user/{id}/photo") Observable<Photo> getUserPhoto(@Path("id") int id); |
- Retrofit은 Observable이 multiple REST 콜을 결합할 수 있도록 지원한다. 예를 들어 사진을 가져오는 콜이 있고 그 다음 metadata를 가져와야 한다면 아래와 같이 이 둘을 묶을 수 있다. (flatMap이랑 비슷)
1 2 3 4 5 | Observable.zip( service.getUserPhoto(id), service.getPhotoMetadata(id), (photo, metadata) -> createPhotoWithData(photo, metadata)) .subscribe(photoWithData -> showPhoto(photoWithData)); |
Old, Slow code
- Retrofit이야 RxJava를 잘 지원한다고 보여지지만 만일 RxJava를 지원하지 않는 일반 라이브러리라면 어떻게 할 것인가? 새롭게 작성하지 않고도 Observables를 제공하는 방법은?
- Observable.just()와 Observable.from()을 활용하면 기존 코드로부터 Observable을 만들어낼 수 있다.
1 2 3 4 5 | private Object oldMethod() { ... } public Observable<Object> newMethod() { return Observable.just(oldMethod()); } |
- 위와 같이 작성하면 성능상 문제는 없을까? 원래 빠른 코드였다면 뭐 크게 문제되지는 않겠지만 가뜩이나 느린 코드였다면 이렇게 변경하는 것이 부담될 수 있다. 이럴때는 수행이 느린 부분을 defer()로 감싸는 방법을 사용하면 된다.
1 2 3 4 5 | private Object slowBlockingMethod() { ... } public Observable<Object> newMethod() { return Observable.defer(() -> Observable.just(slowBlockingMethod())); } |
- 위와 같이 하면 subscribe를 할 때까지 Observable이 slowBlockingMethod()를 호출하지 않는다.
References
- https://pluu.github.io/blog/rx/2015/04/29/rxjava/
- http://blog.danlew.net/2014/09/22/grokking-rxjava-part-2/
- http://blog.danlew.net/2014/09/30/grokking-rxjava-part-3/