Реактивное программирование - это программирование, основанное на асинхронных потоках данных и на распространении изменений. Под потоком здесь понимается массив данных, отсортированных по времени, который может сообщать, что данные изменились. Потоки могут транслировать данные или подписываться на них. В течение своего жизненного цикла потоки могут транслировать три сигнала: данные, ошибка, завершение.
Можно сказать, что смысл реактивного программирования заключается в том, чтобы реактивно реагировать на какие-либо события, при необходимости преобразовывать их и использовать / распространять результат. При этом каждая такая задача может выполняться в собственном потоке, несколько задач может выполняться одновременно. С помощью этого подхода мы можем избежать блокировки основного потока приложения.
Самая популярная реализация реактивного подхода - это библиотеки ReactiveX. Библиотек много, каждая из них написана для использования с конкретным языком программирования. Например, библиотека RxJava может быть использована для написания кода на Java / Kotlin.
Общая концепция ReactiveX
ReactiveX - это реализация принципов реактивного программирования для создания асинхронных программ, основанных на событиях, путем наблюдения за потоками/последовательностями.
Основана библиотека на паттерне проектирования Observer (или Наблюдатель), который поддерживает поток данных и позволяет добавлять к нему различные операторы (для трансформации, фильтрации данных итд.). Благодаря этому мы можем абстрагироваться от таких вещей как потоки (threads) и их безопасность (thread-safety), синхронизация, неблокирующий ввод/вывод.
Ключевые типы
Rx базируется на двух фундаментальных типах - Observable и Observer.
Observable
Observable - это источник данных, который может сообщать три вида событий:
- Данные.
- Сигнал о завершении. Означает, что данные больше не будут поступать.
- Ошибка.
Подписаться на источник данных можно с помощью метода subscribe().
Observable бывают:
hot- порождает данные постоянно, даже если на него никто не подписан. Таким образом существует зависимость от того, когда именно произошла подписка на такой источник данных, т.к. наблюдатели могут пропустить какую-то часть элементов. Наиболее распространённым примером являются события пользовательского интерфейса в Android (например, клики по кнопке или по экрану). Существует много способов реализации горячихObservable. Один из них -Subjects. Также можно реализовать при помощиConnectableObservable.cold- порождает данные только если у него есть хотя бы один подписчик. У каждого наблюдателя будет свой набор элементов. Идея здесь в том, что данные или операция воспроизводятся для каждого наблюдателя.
Observer
Observer - это потребитель данных. У него есть методы, которые вызываются в зависимости от поступившего события от Observable:
onNext()- вызывается для каждого элемента из потока данных.onComplete()- вызывается, если поток завершён и данные больше не будут поступать.onError()- вызывается, если произошла ошибка.
Реализация Observable и Observer
Чтобы подписаться на Observable, совсем нет необходимости в реализации Observer. Существуют различные перегрузки метода subscribe(), которые в качестве аргументов принимают функции onNext(), onComplete(), onSubscribe() и onError(). При этом можно предоставить их все, либо только часть из них.
Отмена подписки
При вызове метода Observable.subscribe() возвращается объект класса Disposable. Он представляет собой связь между вашими Observable и Observer. В дальнейшем его можно использовать для отмены подписки - disposable.dispose().
При отмене подписки останавливается вся цепочка вне зависимости от того, какой именно код сейчас выполняется.
Источники данных
Observable
Observable - наиболее универсальный источник данных. Умеет и генерировать данные, и не производить их вовсе. Если не подходит ни один из других источников данных, то смело использовать его.
Недостаток: не умеет обрабатывать backpressure.
При подписке можно реализовать метода:
onNext()- вызывается при поступление элемента из потока данных.onError()- уведомление об ошибке.onComplete()- уведомление о завершении.
ConnectableObservable
ConnectableObservable - начинает выдавать данные в момент вызова метода connect(). Сделано это для того, чтобы несколько наблюдателей могли обозревать один поток событий, не перезапуская его при каждой подписке.
Flowable
Flowable - источник, предоставляющий дополнительные операторы для обработки backpressure. Когда требуется обрабатывать более 10000 событий, происходящих быстро одно за другим, рекомендуется использовать Flowable вместо Observable.
ConnectableFlowable
ConnectableFlowable - источник, который открывает те же возможности, что и ConnectableObservable, т.е. начинает выдавать данные в момент вызова метода connect(). Но при этом имеет преимущества Flowable.
Single
Single - это Observable, который генерирует только один элемент или выдаёт ошибку.
При подписке можно реализовать методы:
onSuccess()- возвращает результат.onError()- возвращает ошибку.
Пример использования:
- одноразовый сетевой запрос.
Преобразование:
toObservable()- преобразовывает вObservable.singleOrError()- преобразовывает изObservable. Если в потоке данных более одного элемента, то будет выброшено исключение.
Maybe
Maybe - источник, который либо генерирует один элемент, либо ничего не генерирует. Название говорит само за себе, этот источник как бы ничего вам не обещает: будут данные - передам, не будут - не передам. При этом в случае отсутствия данных, не будет выброшена ошибка (в отличие от Single).
При подписке можно реализовать методы:
onSuccess()- возвращает результат.onComplete()- уведомляет об отсутствии элементов.onError()- возвращает ошибку.
При этом методы onSuccess() и onComplete() - взаимоисключающие, т.е. в случае вызова первого можно не ждать вызова второго.
Преобразование:
toObservable()- преобразовывает вObservable.- Нет простого способа преобразовать
ObservableвMaybe. Рекомендуют сначала преобразовать вSingle, а затем вMaybeпри помощиtoMaybe().
Completable
Completable - операция, которая либо может быть выполнена, либо нет. Полезно, когда вас интересует только то, что операция выполняется правильно и вам не нужно отображать результат или какие-либо данные.
Completable не может быть создан при помощи метода just().
При подписке можно реализовать методы:
onComplete()- уведомляет о том, что операция прошла успешно.onError()- уведомляет об ошибке.
Преобразование:
toObservable()- преобразовывает вObservable.toCompletable()- deprecated. Рекомендуют сначала преобразовать вSingle, а затем использоватьignoreElement().
Subject
Subject - это класс, который может быть и источником, и наблюдателем. Это позволяет использовать его, например, в разного рода контроллерах, которые будут отдавать его наружу в виде Observable и внутри оповещать как Observer.
У этого класса есть несколько реализаций.
AsyncSubject / AsyncProcessor
AsyncSubject / AsyncProcessor - держит последнее событие до корректного завершения потока, после чего отдаёт его подписчикам. При возникновении ошибки никакие события проброшены не будут.
PublishSubject / PublishProcessor
PublishSubject / PublishProcessor - пробрасывает приходящие в него события дальше, пока не поступит терминальный сигнал. После конца потока или ошибки он возвращает соответствующие события.
BehaviorSubject / BehaviorProcessor
BehaviorSubject / BehaviorProcessor - работает аналогично PublishSubject, но при подписке возвращает последнее событие, если оно есть и если Subject не перешёл в терминальное состояние.
ReplaySubject / ReplayProcessor
ReplaySubject / ReplayProcessor - возвращает не одно последнее событие, а сколько душе угодно. Если подписаться на завершённый ReplaySubject, то будут получены все накопленные данные.
CompletableSubject, MaybeSubject и SingleSubject
CompletableSubject, MaybeSubject и SingleSubject работают аналогично PublishSubject, только рассчитаны на использование с Completable, Maybe и Single соответственно.
UnicastSubject
UnicastSubject - это фактически ReplaySubject, который следит, чтобы у него был только один подписчик. Он выбрасывает IllegalStateException при попытке повторной подписки.
MulticastProcessor
MulticastProcessor - работает по аналогии с PublishProcessor, за исключением одной небольшой особенности. Он умеет обрабатывать backpressure для входящего в него потока. MulticastProcessor позволяет задать размер буфера, в котором он будет предзапрашивать элементы из upstream для будущих подписчиков.
Операторы
Операторы - это некий промежуточный шаг (после получения данных, но до получения их подписчиками), который может быть использован для трансформации данных. Есть множество стандартных операторов (поставляемых вместе с библиотекой), но также при желании и умении можно написать собственные.
Операторы позволяют делать с потоком данных всё, что угодно.
Несколько фактов об операторах:
- Операторы, привязанные к
Observable, будут вести себя как егоObserver. - Этот промежуточный
Observerможет работать с каждым элементов из источника. - Порядок, в котором вызываются операторы, имеет значение.
- Операторы также являются
Observable, на который можно подписаться.
Marble диаграммы
Прежде чем изучать операторы следует разобраться с такой штукой как Marble диаграммы, т.к. они будут часто встречаться и в документации Rx, и в различных статьях о нём.
Marble диаграммы - визуально передают то, что происходит с входными данными после прохождения через оператор. То есть по сути они просто визуально передают смысл того или иного оператора.
Структура Marble диаграмм:

Пояснения:
- Ось X - это время.
- Ось Y - слой, который показывает трансформацию входных данных.
- Круглые точки - это элемент из потока данных.
- Треугольные значки - обрабатываемые элементы.
- Если произошла ошибка, то она отмечается крестом -
Х. - Если событие завершено успешно, то это отмечается прямой линией -
|.
Создание Observable или Observable Factories
Методы, с помощью которых создаются Observable также считаются операторами, поскольку они преобразуют входные данные в Observable.
just()- наиболее простой оператор, чаще всего встречается в различных примерах по демонстрации Rx. Он обёртывает введённые данные вObservableи возвращает их как элементы. Может принимать от 1 до 10 однотипных элементов. Если по какой-то причине во входном выражении произошло исключение, то оно не будет передано в методonError(). Вместо этого вы можете столкнуться сRuntimeException.create()- создаётObservableс нуля, в качестве аргументов принимает методы наблюдателя.start()- создаётObservable, который который испускает возвращаемое значение функции.from()- это целая группа операторов, которая преобразовывает какой-либо объект или структуру данных вObservableи по очереди рассылает их наблюдателям. Виды:fromIterable()- этот оператор полезен в тех случаях, когда на вход передаётся коллекция и при этом нужно обработать каждый элемент этой коллекции.fromArray()- похож наfromIterable(), но вместо этого принимает переменное количество аргументов. Он используется только при использовании более чем с одним параметром.fromCallable()- принимает в качестве входных данных экземплярCallable<V>, который будет вызван только в случае подписки. Любое обнаруженное исключение будет передано в методonError().fromFuture()- принимает в качестве входных данных экземплярFuture.fromPublisher()- обычно используется для потенциально неограниченного потока данных
empty()- ничего не отправляет, вызывает уведомление о завершении операции.never()- ничего не отправляет, даже уведомления о завершении операции.error()- ничего не отправляет, вызывает уведомление об ошибке.range()- на вход принимает два числа -startиcount- и генерирует последовательность целых чисел (int), начиная соstartи заканчиваяstart + count - 1. Можно использоватьrangeLong()для больших чисел.timer()- позволяет указать время задержки перед отправкой события. Также отправляет одинLongсо значение0Lперед завершением.interval()- похож на операторtimer(), но с передачей последовательности целых чисел, разделённых заданным временным интервалом.intervalRange()- сочетает в себе операторыrange()иinterval(). С его помощью можно генерировать инкрементные значения в пределах диапазона с заданным временным интервалом.repeat()- повторяет входные данные заданное количество раз. Если не задать количество повторений, то данные будут повторятьсяLong.MAX_VALUEраз.repeatUntil()- повторяет входные данные до тех пор, пока не будет выполнено заданное условие.repeatWhen()defer()- работает по тому же принципу, что иfromCallable(), но более подробный. Не создаётObservableдо тех пор, пока не появится наблюдатель. При этом создаёт новыйObservableдля каждого наблюдателя.
Оператотры трансформации
map()- преобразует один элемент данных в другой (например, с помощью него можно трансформировать строку, добавив к ней какое-либо значение). Также может преобразовать один тип данных в другой (например,StringвInt).flatMap()- принимает данные от одногоObservable, применяя к каждому его элементу переданную вами функцию, которая возвращает новыйObservable. Т.е. подменяет одинObservableна другой. НовыйObservable- это то, что в итоге увидитObserver. При этом не заботится о том, в каком порядке эти данные придут подписчику (могут прийти в ином порядке, чем при изначальном создании данных).switchMap()-concatMap()- поддерживает порядок элементов и ожидает, пока текущийObservableзавершит свою работу, прежде чем передать следующий. Подходит, если вы хотите сохранить порядок выполнения.buffer()- периодически собирает элементыObservableвBundle, после чего испускает этиBundle‘ы, вместо того, чтобы передавать элементы по одному.groupBy()- группируетObservableна наборObservable, каждый из которых испускает отдельную группу элементов из исходногоObservable.scan()- (пример использования - поиск факториала)
Операторы фильтрации
debounce()- добавляет задержку перед отправкой данных.distinct()- убирает дубли.elementAt()- принимает индекс, возвращает один элемент по заданному индексу.filter()- позволяет фильтровать данные. Достаточно передать условие и тогда будут передаваться только те данные, которые прошли заданную проверку.first()- отправляет только первый элемент изObservableили первый элемент, который соответствует заданному условию.ignoreElemnts()- игнорирует все элементы, но отображает уведомление о том, что всё успешно прошло.last()- отправляет только последний элемент изObservable.sample()- отправляет только последний элемент изObservableс заданным временным интервалом.skip()- с начала потока будет исключено заданное количество элементов.skipLast()- с конца потока будет исключено заданное количество элементов.take()- позволяет задать количество элементов, которые будут переданы подписчикам. Если фактически поступило меньшее количество элементов, чем заданное этим оператором, тоtake()просто завершит свою работу раньше.takeLast()- отправляет подписчикам заданное количество элементов из конца потока.
Операторы комбинирования
combineLatest()- объединяет последние элементы из несколькихObservableи возвращает полученный результат.join()- объединяет элементы, излучаемые двумяObservable, всякий раз, когда элемент из одногоObservableиспускается в течение временного окна, определённого элементом, испускаемым другимObservable.merge()- слияние двухObservableв один. Не заботится о том, в каком порядке выдаются данные из этих потоков.startWith()- испускать указанную последовательность элементов перед тем, как начать испускать элементы из источникаObservable.switch()- преобразуетObservable, который испускаетObservablesв одинObservable. Он отправляет элементы, испускаемые самым последним из этихObservable.switchOnNext()- отправляет данные из одного источника данных, пока не появятся данные из более приоритетного источника.
zip()- объединяет элементы несколькихObservableвместе и отправляет отдельно каждую получившуюся комбинацию.zipWith()- работа с двумя запросами как с одним. Если для кого-то пары не нашлось - будет пропущен. Выдаёт результаты по порядку.concat()- слияние двух потоков, выдача сначала данных из первого потока (по порядку), затем из второго (тоже по порядку).
Операторы для обработки ошибок
catch()- восстанавливает поток после получения уведомления об ошибке вonError().retry()- после получения уведомления об ошибке вonError()повторно подписывается на поток в надежде, что он завершится без ошибок.
Вспомогательные операторы
delay()- добавляет задержку перед отправкой данных.do- это своего рода набор операторов, которые можно использовать для получения уведомлений перед отправкой их в соответствующий методObserver‘а.doOnNext()- позволяет добавить какое-либо дополнительное действие, которое будет применяться к каждому новому элементу.
observeOn()- указывает на scheduler, в которомObserverбудет наблюдать заObservable.subscribeOn()- указывает на то, в каком scheduler начнёт работатьObservableи операторы. При этом не важно, в каком месте всей цепочки он был вызван.timeInterval()- испускает то, сколько времени прошло с момента поступления предыдущего элемента.timeout()- позволяет указать время, за которое должны поступить данные. Если не поступили - выбрасываетсяTimeoutException.timestamp()- прикрепляет временную метку каждому элементу.using()- создаёт disposable, срок жизни которого аналогиченObservable.
Условные и логические операторы
all()- позволяет проверить, что все элементы, испускаемыеObservable, соответствуют заданным критериям.contains()- позволяет проверить, если ли вObservableконкретный элемент.defaultIfEmpty()- испускает либо элементы изObservable, либо элемент по умолчанию, еслиObservableпуст.
Математические операторы
count()- количество элементов вObservable.
Остальные операторы
Реализация собственных операторов.
Backpressure
Backpressure — ситуация, когда новые события поступают существенно быстрее, чем успевают обрабатываться, и поэтому начинают скапливаться в буфере, переполняя его. Это может привести к неприятностям вроде OutOfMemoryError. Подробнее можно посмотреть тут.
Rx vs Android
При написании приложений под Android могут быть использованы библиотеки:
RxJava- основа.RxAndroid- поверх RxJava добавлены специфичные для платформы Android классы. Может быть использована совместно с RxJava. Что в нём специфичного:- Класс
AndroidSchedulers- предоставляет готовые Schedulers для потоков, специфичных для Android. - Класс
AndroidObservable- предоставляющий возможности по работе с жизненным циклом некоторых классов из Android SDK. В нём есть операторы:bindActivity()иbindFragment()- останавливают поток данных, если вашиActivityилиFragmentуничтожаются/уничтожены.fromBroadcast()- позволяет создатьObservable, который работает какBroadcastReceiver.
- Класс
ViewObservable- добавляет привязки кView. Операторы:clicks()- для получения уведомлений всякий раз, когда происходит нажатие наView.text()- срабатывает всякий раз, когдаTextViewизменяет своё содержимое.
- Класс
RxKotlin- это обёртка RxJava на языке Kotlin, с более лаконичными решениями, которые позволяют сократить количество кода. Может быть использована самостоятельно, без дополнительных зависимостей в виде RxJava, но RxAndroid при необходимости придётся подключить.RxBinding- библиотека, которая осуществляет привязку View и тем самым превращает View в источник данных.
Rx vs Retrofit
Retrofit поддерживает библиотеку RxJava, благодаря чему открываются некоторые возможности::
Вместо использования
Callbackв ApiInterface можно использоватьObservable.1 2 3 4
@GET("pictures") fun getPictures( @Query("query") query: String, ): Observable<PictureResponse>Но в таком случае потребуется адаптировать Rx типы под Retrofit. Осуществляется с помощью специального адаптера, созданного разработчиками Retrofit. Его необходимо добавить в зависимости, после чего достаточно дописать одну строчку кода при создании API клиента:
1 2 3 4 5
retrofit = Retrofit.Builder() .baseUrl(BASE_URL) .addConverterFactory(...) .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) // адаптер для Rx .build()
Так же есть ещё одна библиотека с адаптером, но только для RxJava3. Принцип работы аналогичный.
Комбинирование нескольких запросов вместе.
Полезные ссылки
Статьи на русском:
Реактивное программирование. Начало - немного про реактивное программирование в целом.
ReactiveX 2.0 с примерами, или грокаем реактивное программирование 2.0. Часть 1: Observable vs Flowable, Backpressure - серия статей, написана на примерах из RxJava v.1.x, полезна для общего понимая.
Введение в RxJava: Почему Rx? - серия статей. Является переводом туториала по RxJava Крисса Фруссиоса.
Справочник по источникам событий в Rx - коротко и понятно об источниках данных.
Статьи на английском:
RxJava Ninja: Introduction to Reactive Programming - серия статей по RxJava.
Видео на русском:
RxJava - Observable, Flowable (часть 1)
RxJava - Transformation, Filter (часть 2)
RxJava - Combination, Utility, Binding (часть 3)
Примеры приложений:
MVVM with Hilt, RxJava 3, Retrofit, Room, Live Data and View Binding - пример на Java.
How to make complex requests simple with RxJava in Kotlin - пример на Kotlin.