Реактивное программирование - это программирование, основанное на асинхронных потоках данных и на распространении изменений. Под потоком здесь понимается массив данных, отсортированных по времени, который может сообщать, что данные изменились. Потоки могут транслировать данные или подписываться на них. В течение своего жизненного цикла потоки могут транслировать три сигнала: данные, ошибка, завершение.
Можно сказать, что смысл реактивного программирования заключается в том, чтобы реактивно реагировать на какие-либо события, при необходимости преобразовывать их и использовать / распространять результат. При этом каждая такая задача может выполняться в собственном потоке, несколько задач может выполняться одновременно. С помощью этого подхода мы можем избежать блокировки основного потока приложения.
Самая популярная реализация реактивного подхода - это библиотеки ReactiveX. Библиотек много, каждая из них написана для использования с конкретным языком программирования. Например, библиотека RxJava может быть использована для написания кода на Java / Kotlin.
Общая концепция ReactiveX
ReactiveX - это реализация принципов реактивного программирования для создания асинхронных программ, основанных на событиях, путем наблюдения за потоками/последовательностями.
Основана библиотека на паттерне проектирования Observer (или Наблюдатель), который поддерживает поток данных и позволяет добавлять к нему различные операторы (для трансформации, фильтрации данных итд.). Благодаря этому мы можем абстрагироваться от таких вещей как потоки (threads) и их безопасность (thread-safety), синхронизация, неблокирующий ввод/вывод.
Ключевые типы
Rx базируется на двух фундаментальных типах - Observable и Observer.
Observable
Observable
- это источник данных, который может сообщать три вида событий:
- Данные.
- Сигнал о завершении. Означает, что данные больше не будут поступать.
- Ошибка.
Подписаться на источник данных можно с помощью метода subscribe()
.
Observable
бывают:
hot
- порождает данные постоянно, даже если на него никто не подписан. Таким образом существует зависимость от того, когда именно произошла подписка на такой источник данных, т.к. наблюдатели могут пропустить какую-то часть элементов. Наиболее распространённым примером являются события пользовательского интерфейса в Android (например, клики по кнопке или по экрану). Существует много способов реализации горячихObservable
. Один из них -Subjects
. Также можно реализовать при помощиConnectableObservable
.cold
- порождает данные только если у него есть хотя бы один подписчик. У каждого наблюдателя будет свой набор элементов. Идея здесь в том, что данные или операция воспроизводятся для каждого наблюдателя.
Observer
Observer
- это потребитель данных. У него есть методы, которые вызываются в зависимости от поступившего события от Observable
:
onNext()
- вызывается для каждого элемента из потока данных.onComplete()
- вызывается, если поток завершён и данные больше не будут поступать.onError()
- вызывается, если произошла ошибка.
Реализация Observable и Observer
Чтобы подписаться на Observable
, совсем нет необходимости в реализации Observer
. Существуют различные перегрузки метода subscribe()
, которые в качестве аргументов принимают функции onNext()
, onComplete()
, onSubscribe()
и onError()
. При этом можно предоставить их все, либо только часть из них.
Отмена подписки
При вызове метода Observable.subscribe()
возвращается объект класса Disposable
. Он представляет собой связь между вашими Observable
и Observer
. В дальнейшем его можно использовать для отмены подписки - disposable.dispose()
.
При отмене подписки останавливается вся цепочка вне зависимости от того, какой именно код сейчас выполняется.
Источники данных
Observable
Observable
- наиболее универсальный источник данных. Умеет и генерировать данные, и не производить их вовсе. Если не подходит ни один из других источников данных, то смело использовать его.
Недостаток: не умеет обрабатывать backpressure.
При подписке можно реализовать метода:
onNext()
- вызывается при поступление элемента из потока данных.onError()
- уведомление об ошибке.onComplete()
- уведомление о завершении.
ConnectableObservable
ConnectableObservable
- начинает выдавать данные в момент вызова метода connect()
. Сделано это для того, чтобы несколько наблюдателей могли обозревать один поток событий, не перезапуская его при каждой подписке.
Flowable
Flowable
- источник, предоставляющий дополнительные операторы для обработки backpressure. Когда требуется обрабатывать более 10000 событий, происходящих быстро одно за другим, рекомендуется использовать Flowable
вместо Observable
.
ConnectableFlowable
ConnectableFlowable
- источник, который открывает те же возможности, что и ConnectableObservable
, т.е. начинает выдавать данные в момент вызова метода connect()
. Но при этом имеет преимущества Flowable
.
Single
Single
- это Observable
, который генерирует только один элемент или выдаёт ошибку.
При подписке можно реализовать методы:
onSuccess()
- возвращает результат.onError()
- возвращает ошибку.
Пример использования:
- одноразовый сетевой запрос.
Преобразование:
toObservable()
- преобразовывает вObservable
.singleOrError()
- преобразовывает изObservable
. Если в потоке данных более одного элемента, то будет выброшено исключение.
Maybe
Maybe
- источник, который либо генерирует один элемент, либо ничего не генерирует. Название говорит само за себе, этот источник как бы ничего вам не обещает: будут данные - передам, не будут - не передам. При этом в случае отсутствия данных, не будет выброшена ошибка (в отличие от Single
).
При подписке можно реализовать методы:
onSuccess()
- возвращает результат.onComplete()
- уведомляет об отсутствии элементов.onError()
- возвращает ошибку.
При этом методы onSuccess()
и onComplete()
- взаимоисключающие, т.е. в случае вызова первого можно не ждать вызова второго.
Преобразование:
toObservable()
- преобразовывает вObservable
.- Нет простого способа преобразовать
Observable
вMaybe
. Рекомендуют сначала преобразовать вSingle
, а затем вMaybe
при помощиtoMaybe()
.
Completable
Completable
- операция, которая либо может быть выполнена, либо нет. Полезно, когда вас интересует только то, что операция выполняется правильно и вам не нужно отображать результат или какие-либо данные.
Completable
не может быть создан при помощи метода just()
.
При подписке можно реализовать методы:
onComplete()
- уведомляет о том, что операция прошла успешно.onError()
- уведомляет об ошибке.
Преобразование:
toObservable()
- преобразовывает вObservable
.toCompletable()
- deprecated. Рекомендуют сначала преобразовать вSingle
, а затем использоватьignoreElement()
.
Subject
Subject
- это класс, который может быть и источником, и наблюдателем. Это позволяет использовать его, например, в разного рода контроллерах, которые будут отдавать его наружу в виде Observable
и внутри оповещать как Observer
.
У этого класса есть несколько реализаций.
AsyncSubject / AsyncProcessor
AsyncSubject / AsyncProcessor
- держит последнее событие до корректного завершения потока, после чего отдаёт его подписчикам. При возникновении ошибки никакие события проброшены не будут.
PublishSubject / PublishProcessor
PublishSubject / PublishProcessor
- пробрасывает приходящие в него события дальше, пока не поступит терминальный сигнал. После конца потока или ошибки он возвращает соответствующие события.
BehaviorSubject / BehaviorProcessor
BehaviorSubject / BehaviorProcessor
- работает аналогично PublishSubject
, но при подписке возвращает последнее событие, если оно есть и если Subject не перешёл в терминальное состояние.
ReplaySubject / ReplayProcessor
ReplaySubject / ReplayProcessor
- возвращает не одно последнее событие, а сколько душе угодно. Если подписаться на завершённый ReplaySubject
, то будут получены все накопленные данные.
CompletableSubject, MaybeSubject и SingleSubject
CompletableSubject
, MaybeSubject
и SingleSubject
работают аналогично PublishSubject
, только рассчитаны на использование с Completable
, Maybe
и Single
соответственно.
UnicastSubject
UnicastSubject
- это фактически ReplaySubject
, который следит, чтобы у него был только один подписчик. Он выбрасывает IllegalStateException
при попытке повторной подписки.
MulticastProcessor
MulticastProcessor
- работает по аналогии с PublishProcessor
, за исключением одной небольшой особенности. Он умеет обрабатывать backpressure для входящего в него потока. MulticastProcessor
позволяет задать размер буфера, в котором он будет предзапрашивать элементы из upstream для будущих подписчиков.
Операторы
Операторы - это некий промежуточный шаг (после получения данных, но до получения их подписчиками), который может быть использован для трансформации данных. Есть множество стандартных операторов (поставляемых вместе с библиотекой), но также при желании и умении можно написать собственные.
Операторы позволяют делать с потоком данных всё, что угодно.
Несколько фактов об операторах:
- Операторы, привязанные к
Observable
, будут вести себя как егоObserver
. - Этот промежуточный
Observer
может работать с каждым элементов из источника. - Порядок, в котором вызываются операторы, имеет значение.
- Операторы также являются
Observable
, на который можно подписаться.
Marble диаграммы
Прежде чем изучать операторы следует разобраться с такой штукой как Marble диаграммы, т.к. они будут часто встречаться и в документации Rx, и в различных статьях о нём.
Marble диаграммы - визуально передают то, что происходит с входными данными после прохождения через оператор. То есть по сути они просто визуально передают смысл того или иного оператора.
Структура Marble диаграмм:
Пояснения:
- Ось X - это время.
- Ось Y - слой, который показывает трансформацию входных данных.
- Круглые точки - это элемент из потока данных.
- Треугольные значки - обрабатываемые элементы.
- Если произошла ошибка, то она отмечается крестом -
Х
. - Если событие завершено успешно, то это отмечается прямой линией -
|
.
Создание Observable или Observable Factories
Методы, с помощью которых создаются Observable
также считаются операторами, поскольку они преобразуют входные данные в Observable
.
just()
- наиболее простой оператор, чаще всего встречается в различных примерах по демонстрации Rx. Он обёртывает введённые данные вObservable
и возвращает их как элементы. Может принимать от 1 до 10 однотипных элементов. Если по какой-то причине во входном выражении произошло исключение, то оно не будет передано в методonError()
. Вместо этого вы можете столкнуться сRuntimeException
.create()
- создаётObservable
с нуля, в качестве аргументов принимает методы наблюдателя.start()
- создаётObservable
, который который испускает возвращаемое значение функции.from()
- это целая группа операторов, которая преобразовывает какой-либо объект или структуру данных вObservable
и по очереди рассылает их наблюдателям. Виды:fromIterable()
- этот оператор полезен в тех случаях, когда на вход передаётся коллекция и при этом нужно обработать каждый элемент этой коллекции.fromArray()
- похож наfromIterable()
, но вместо этого принимает переменное количество аргументов. Он используется только при использовании более чем с одним параметром.fromCallable()
- принимает в качестве входных данных экземплярCallable<V>
, который будет вызван только в случае подписки. Любое обнаруженное исключение будет передано в методonError()
.fromFuture()
- принимает в качестве входных данных экземплярFuture
.fromPublisher()
- обычно используется для потенциально неограниченного потока данных
empty()
- ничего не отправляет, вызывает уведомление о завершении операции.never()
- ничего не отправляет, даже уведомления о завершении операции.error()
- ничего не отправляет, вызывает уведомление об ошибке.range()
- на вход принимает два числа -start
иcount
- и генерирует последовательность целых чисел (int), начиная соstart
и заканчиваяstart + count - 1
. Можно использоватьrangeLong()
для больших чисел.timer()
- позволяет указать время задержки перед отправкой события. Также отправляет одинLong
со значение0L
перед завершением.interval()
- похож на операторtimer()
, но с передачей последовательности целых чисел, разделённых заданным временным интервалом.intervalRange()
- сочетает в себе операторыrange()
иinterval()
. С его помощью можно генерировать инкрементные значения в пределах диапазона с заданным временным интервалом.repeat()
- повторяет входные данные заданное количество раз. Если не задать количество повторений, то данные будут повторятьсяLong.MAX_VALUE
раз.repeatUntil()
- повторяет входные данные до тех пор, пока не будет выполнено заданное условие.repeatWhen()
defer()
- работает по тому же принципу, что иfromCallable()
, но более подробный. Не создаётObservable
до тех пор, пока не появится наблюдатель. При этом создаёт новыйObservable
для каждого наблюдателя.
Оператотры трансформации
map()
- преобразует один элемент данных в другой (например, с помощью него можно трансформировать строку, добавив к ней какое-либо значение). Также может преобразовать один тип данных в другой (например,String
вInt
).flatMap()
- принимает данные от одногоObservable
, применяя к каждому его элементу переданную вами функцию, которая возвращает новыйObservable
. Т.е. подменяет одинObservable
на другой. НовыйObservable
- это то, что в итоге увидитObserver
. При этом не заботится о том, в каком порядке эти данные придут подписчику (могут прийти в ином порядке, чем при изначальном создании данных).switchMap()
-concatMap()
- поддерживает порядок элементов и ожидает, пока текущийObservable
завершит свою работу, прежде чем передать следующий. Подходит, если вы хотите сохранить порядок выполнения.buffer()
- периодически собирает элементыObservable
вBundle
, после чего испускает этиBundle
‘ы, вместо того, чтобы передавать элементы по одному.groupBy()
- группируетObservable
на наборObservable
, каждый из которых испускает отдельную группу элементов из исходногоObservable
.scan()
- (пример использования - поиск факториала)
Операторы фильтрации
debounce()
- добавляет задержку перед отправкой данных.distinct()
- убирает дубли.elementAt()
- принимает индекс, возвращает один элемент по заданному индексу.filter()
- позволяет фильтровать данные. Достаточно передать условие и тогда будут передаваться только те данные, которые прошли заданную проверку.first()
- отправляет только первый элемент изObservable
или первый элемент, который соответствует заданному условию.ignoreElemnts()
- игнорирует все элементы, но отображает уведомление о том, что всё успешно прошло.last()
- отправляет только последний элемент изObservable
.sample()
- отправляет только последний элемент изObservable
с заданным временным интервалом.skip()
- с начала потока будет исключено заданное количество элементов.skipLast()
- с конца потока будет исключено заданное количество элементов.take()
- позволяет задать количество элементов, которые будут переданы подписчикам. Если фактически поступило меньшее количество элементов, чем заданное этим оператором, тоtake()
просто завершит свою работу раньше.takeLast()
- отправляет подписчикам заданное количество элементов из конца потока.
Операторы комбинирования
combineLatest()
- объединяет последние элементы из несколькихObservable
и возвращает полученный результат.join()
- объединяет элементы, излучаемые двумяObservable
, всякий раз, когда элемент из одногоObservable
испускается в течение временного окна, определённого элементом, испускаемым другимObservable
.merge()
- слияние двухObservable
в один. Не заботится о том, в каком порядке выдаются данные из этих потоков.startWith()
- испускать указанную последовательность элементов перед тем, как начать испускать элементы из источникаObservable
.switch()
- преобразуетObservable
, который испускаетObservables
в одинObservable
. Он отправляет элементы, испускаемые самым последним из этихObservable
.switchOnNext()
- отправляет данные из одного источника данных, пока не появятся данные из более приоритетного источника.
zip()
- объединяет элементы несколькихObservable
вместе и отправляет отдельно каждую получившуюся комбинацию.zipWith()
- работа с двумя запросами как с одним. Если для кого-то пары не нашлось - будет пропущен. Выдаёт результаты по порядку.concat()
- слияние двух потоков, выдача сначала данных из первого потока (по порядку), затем из второго (тоже по порядку).
Операторы для обработки ошибок
catch()
- восстанавливает поток после получения уведомления об ошибке вonError()
.retry()
- после получения уведомления об ошибке вonError()
повторно подписывается на поток в надежде, что он завершится без ошибок.
Вспомогательные операторы
delay()
- добавляет задержку перед отправкой данных.do
- это своего рода набор операторов, которые можно использовать для получения уведомлений перед отправкой их в соответствующий методObserver
‘а.doOnNext()
- позволяет добавить какое-либо дополнительное действие, которое будет применяться к каждому новому элементу.
observeOn()
- указывает на scheduler, в которомObserver
будет наблюдать заObservable
.subscribeOn()
- указывает на то, в каком scheduler начнёт работатьObservable
и операторы. При этом не важно, в каком месте всей цепочки он был вызван.timeInterval()
- испускает то, сколько времени прошло с момента поступления предыдущего элемента.timeout()
- позволяет указать время, за которое должны поступить данные. Если не поступили - выбрасываетсяTimeoutException
.timestamp()
- прикрепляет временную метку каждому элементу.using()
- создаёт disposable, срок жизни которого аналогиченObservable
.
Условные и логические операторы
all()
- позволяет проверить, что все элементы, испускаемыеObservable
, соответствуют заданным критериям.contains()
- позволяет проверить, если ли вObservable
конкретный элемент.defaultIfEmpty()
- испускает либо элементы изObservable
, либо элемент по умолчанию, еслиObservable
пуст.
Математические операторы
count()
- количество элементов вObservable
.
Остальные операторы
Реализация собственных операторов.
Backpressure
Backpressure — ситуация, когда новые события поступают существенно быстрее, чем успевают обрабатываться, и поэтому начинают скапливаться в буфере, переполняя его. Это может привести к неприятностям вроде OutOfMemoryError
. Подробнее можно посмотреть тут.
Rx vs Android
При написании приложений под Android могут быть использованы библиотеки:
RxJava
- основа.RxAndroid
- поверх RxJava добавлены специфичные для платформы Android классы. Может быть использована совместно с RxJava. Что в нём специфичного:- Класс
AndroidSchedulers
- предоставляет готовые Schedulers для потоков, специфичных для Android. - Класс
AndroidObservable
- предоставляющий возможности по работе с жизненным циклом некоторых классов из Android SDK. В нём есть операторы:bindActivity()
иbindFragment()
- останавливают поток данных, если вашиActivity
илиFragment
уничтожаются/уничтожены.fromBroadcast()
- позволяет создатьObservable
, который работает какBroadcastReceiver
.
- Класс
ViewObservable
- добавляет привязки кView
. Операторы:clicks()
- для получения уведомлений всякий раз, когда происходит нажатие наView
.text()
- срабатывает всякий раз, когдаTextView
изменяет своё содержимое.
- Класс
RxKotlin
- это обёртка RxJava на языке Kotlin, с более лаконичными решениями, которые позволяют сократить количество кода. Может быть использована самостоятельно, без дополнительных зависимостей в виде RxJava, но RxAndroid при необходимости придётся подключить.RxBinding
- библиотека, которая осуществляет привязку View и тем самым превращает View в источник данных.
Rx vs Retrofit
Retrofit
поддерживает библиотеку RxJava
, благодаря чему открываются некоторые возможности::
Вместо использования
Callback
в ApiInterface можно использоватьObservable
.1 2 3 4
@GET("pictures") fun getPictures( @Query("query") query: String, ): Observable<PictureResponse>
Но в таком случае потребуется адаптировать Rx типы под Retrofit. Осуществляется с помощью специального адаптера, созданного разработчиками Retrofit. Его необходимо добавить в зависимости, после чего достаточно дописать одну строчку кода при создании API клиента:
1 2 3 4 5
retrofit = Retrofit.Builder() .baseUrl(BASE_URL) .addConverterFactory(...) .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) // адаптер для Rx .build()
Так же есть ещё одна библиотека с адаптером, но только для RxJava3. Принцип работы аналогичный.
Комбинирование нескольких запросов вместе.
Полезные ссылки
Статьи на русском:
Реактивное программирование. Начало - немного про реактивное программирование в целом.
ReactiveX 2.0 с примерами, или грокаем реактивное программирование 2.0. Часть 1: Observable vs Flowable, Backpressure - серия статей, написана на примерах из RxJava v.1.x, полезна для общего понимая.
Введение в RxJava: Почему Rx? - серия статей. Является переводом туториала по RxJava Крисса Фруссиоса.
Справочник по источникам событий в Rx - коротко и понятно об источниках данных.
Статьи на английском:
RxJava Ninja: Introduction to Reactive Programming - серия статей по RxJava.
Видео на русском:
RxJava - Observable, Flowable (часть 1)
RxJava - Transformation, Filter (часть 2)
RxJava - Combination, Utility, Binding (часть 3)
Примеры приложений:
MVVM with Hilt, RxJava 3, Retrofit, Room, Live Data and View Binding - пример на Java.
How to make complex requests simple with RxJava in Kotlin - пример на Kotlin.