Apr
MAY
Jun
31
2021
2022
2023
About this capture
T h e W a y b a c k M a c h i n e - h t t p : / / w e b . a r c h i v e . o r g / w e b / 2 0 2 2 0 5 3 1 1 1 1 8 1 8 / h t t p s : / / g i t h u b . c o m / R e a c t i v e X / R x J a v a
S k i p t o c o n t e n t
S i g n up
●
●
F e a t u r e s
●
M o b i l e
●
A c t i o n s
●
C o d e s p a c e s
●
P a c k a g e s
●
S e c u r i t y
●
C o d e r e v i e w
●
I s s u e s
●
I n t e g r a t i o n s
●
G i t H u b S p o n s o r s
●
C u s t o m e r s t o r i e s
●
●
●
●
E x p l o r e G i t H u b
● L e a r n a n d c o n t r i b u t e
●
T o p i c s
●
C o l l e c t i o n s
●
T r e n d i n g
●
L e a r n i n g L a b
●
O p e n s o u r c e g u i d e s
● C o n n e c t w i t h o t h e r s
●
T h e R e a d M E P r o j e c t
●
E v e n t s
●
C o m m u n i t y f o r u m
●
G i t H u b E d u c a t i o n
●
G i t H u b S t a r s p r o g r a m
●
●
●
P l a n s
●
C o m p a r e p l a n s
●
C o n t a c t S a l e s
●
E d u c a t i o n
In this repository
All GitHub
↵
Jump to
↵
No suggested jump to results
{ { m e s s a g e } }
●
N o t i f i c a t i o n s
●
F o r k
7 . 6 k
S t a r
4 6 . 1 k
R x J a v a – R e a c t i v e E x t e n s i o n s f o r t h e J V M – a l i b r a r y f o r c o m p o s i n g a s y n c h r o n o u s a n d e v e n t - b a s e d p r o g r a m s u s i n g o b s e r v a b l e s e q u e n c e s f o r t h e J a v a V M .
L i c e n s e
A p a c h e - 2 . 0 l i c e n s e
4 6 . 1 k
s t a r s
7 . 6 k
f o r k s
S t a r
N o t i f i c a t i o n s
●
C o d e
●
I s s u e s
13
●
P u l l r e q u e s t s
0
●
A c t i o n s
●
P r o j e c t s
0
●
W i k i
●
S e c u r i t y
●
I n s i g h t s
M o r e
●
C o d e
●
I s s u e s
●
P u l l r e q u e s t s
●
A c t i o n s
●
P r o j e c t s
●
W i k i
●
S e c u r i t y
●
I n s i g h t s
T h i s c o m m i t d o e s n o t b e l o n g t o a n y b r a n c h o n t h i s r e p o s i t o r y , a n d m a y b e l o n g t o a f o r k o u t s i d e o f t h e r e p o s i t o r y .
C o u l d n o t l o a d b r a n c h e s
N o t h i n g t o s h o w
r a n c h e s
C o u l d n o t l o a d t a g s
N o t h i n g t o s h o w
5
b r a n c h e s
2 6 8
t a g s
C o d e
L a t e s t c o m m i t
d e p e n d a b o t [ b o t ]
C o - a u t h o r e d - b y : d e p e n d a b o t [ b o t ] <49699333+dependabot[bot]@users.noreply.github.com>" c l a s s = " L i n k - - p r i m a r y m a r k d o w n - t i t l e " h r e f = " / w e b / 2 0 2 2 0 5 3 1 1 1 1 8 1 8 / h t t p s : / / g i t h u b . c o m / R e a c t i v e X / R x J a v a / c o m m i t / 8 8 4 5 3 7 1 1 e c 1 b 0 e 0 3 e b 7 b a 0 2 d 4 2 b 5 1 f e 1 3 3 0 b 3 a 7 3 " > B u m p m o c k i t o - c o r e f r o m 4 . 5 . 1 t o 4 . 6 . 0 ( # 7 4 2 6
C o - a u t h o r e d - b y : d e p e n d a b o t [ b o t ] <49699333+dependabot[bot]@users.noreply.github.com>" c l a s s = " L i n k - - p r i m a r y m a r k d o w n - t i t l e " h r e f = " / w e b / 2 0 2 2 0 5 3 1 1 1 1 8 1 8 / h t t p s : / / g i t h u b . c o m / R e a c t i v e X / R x J a v a / c o m m i t / 8 8 4 5 3 7 1 1 e c 1 b 0 e 0 3 e b 7 b a 0 2 d 4 2 b 5 1 f e 1 3 3 0 b 3 a 7 3 " > )
…
8 8 4 5 3 7 1
M a y 3 0 , 2 0 2 2
B u m p m o c k i t o - c o r e f r o m 4 . 5 . 1 t o 4 . 6 . 0 ( # 7 4 2 6 )
Bumps [mockito-core](https://github.com/mockito/mockito ) from 4.5.1 to 4.6.0.
- [Release notes](https://github.com/mockito/mockito/releases )
- [Commits](mockito/mockito@v4.5.1...v4.6.0 )
---
updated-dependencies:
- dependency-name: org.mockito:mockito-core
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
8 8 4 5 3 7 1
G i t s t a t s
●
5 , 9 8 0
c o m m i t s
F i l e s
P e r m a l i n k
F a i l e d t o l o a d l a t e s t c o m m i t i n f o r m a t i o n .
T y p e
N a m e
L a t e s t c o m m i t m e s s a g e
C o m m i t t i m e
. g i t h u b
U n d o p e r m i s s i o n s o n s n a p s h o t a n d r e l e a s e G H A
M a y 1 8 , 2 0 2 2
c o n f i g
A d d e d J a v a d o c c h e c k s t o C h e c k s t y l e . F i x v i o l a t i n g J a v a d o c . ( # 7 2 1 0 )
M a r 1 2 , 2 0 2 1
d o c s
a d d i n g 1 . x 2 . x 3 . x j a v a d o c t o w i k i s i d e b a r ( # 7 1 3 3 )
D e c 1 4 , 2 0 2 0
g r a d l e
U p g r a d e t o G r a d l e 7 . 4 . 1 ( # 7 4 0 2 )
M a r 1 0 , 2 0 2 2
s r c
" c l a s s = " L i n k - - s e c o n d a r y " h r e f = " / w e b / 2 0 2 2 0 5 3 1 1 1 1 8 1 8 / h t t p s : / / g i t h u b . c o m / R e a c t i v e X / R x J a v a / c o m m i t / 4 a 6 6 5 c 9 1 c 6 8 e 3 3 f a 2 9 0 7 c c c c c d f 7 f c 3 c f 0 5 3 c 7 c 9 " > R e m o v e @ n o n n u l l " c l a s s = " L i n k - - s e c o n d a r y " h r e f = " / w e b / 2 0 2 2 0 5 3 1 1 1 1 8 1 8 / h t t p s : / / g i t h u b . c o m / R e a c t i v e X / R x J a v a / c o m m i t / 4 a 6 6 5 c 9 1 c 6 8 e 3 3 f a 2 9 0 7 c c c c c d f 7 f c 3 c f 0 5 3 c 7 c 9 " > a n n o t a t i o n s f o r f u n c t i o n s w i t h v o i d r e t u r n t y p e ( # 7 4 2 5 " c l a s s = " L i n k - - s e c o n d a r y " h r e f = " / w e b / 2 0 2 2 0 5 3 1 1 1 1 8 1 8 / h t t p s : / / g i t h u b . c o m / R e a c t i v e X / R x J a v a / c o m m i t / 4 a 6 6 5 c 9 1 c 6 8 e 3 3 f a 2 9 0 7 c c c c c d f 7 f c 3 c f 0 5 3 c 7 c 9 " > )
M a y 2 7 , 2 0 2 2
. g i t a t t r i b u t e s
G i t A t t r i b u t e s f o r L i n e E n d i n g s
D e c 8 , 2 0 1 3
. g i t i g n o r e
A d d . e d i t o r c o n f i g t o . g i t i g n o r e
A u g 1 4 , 2 0 2 1
C O N T R I B U T I N G . m d
S t a n d a r d i z e j a v a s o u r c e h e a d e r s ( # 7 2 0 5 )
M a r 5 , 2 0 2 1
D E S I G N . m d
3 . x : A d j u s t i n f r a s t r u c t u r e v e r s i o n m a r k e r s ( # 6 4 2 1 )
J u n 6 , 2 0 1 9
L I C E N S E
R e s t o r e l i c e n s e t o c o r r e c t t e x t . ( # 5 7 3 5 )
N o v 2 1 , 2 0 1 7
R E A D M E . m d
A d d e d r e q u i r e d c o m m a t o R E A D M E . m d ( # 7 3 4 9 )
O c t 5 , 2 0 2 1
b u i l d . g r a d l e
C o - a u t h o r e d - b y : d e p e n d a b o t [ b o t ] <49699333+dependabot[bot]@users.noreply.github.com> " c l a s s = " L i n k - - s e c o n d a r y " h r e f = " / w e b / 2 0 2 2 0 5 3 1 1 1 1 8 1 8 / h t t p s : / / g i t h u b . c o m / R e a c t i v e X / R x J a v a / c o m m i t / 8 8 4 5 3 7 1 1 e c 1 b 0 e 0 3 e b 7 b a 0 2 d 4 2 b 5 1 f e 1 3 3 0 b 3 a 7 3 " > B u m p m o c k i t o - c o r e f r o m 4 . 5 . 1 t o 4 . 6 . 0 ( # 7 4 2 6
C o - a u t h o r e d - b y : d e p e n d a b o t [ b o t ] <49699333+dependabot[bot]@users.noreply.github.com>" c l a s s = " L i n k - - s e c o n d a r y " h r e f = " / w e b / 2 0 2 2 0 5 3 1 1 1 1 8 1 8 / h t t p s : / / g i t h u b . c o m / R e a c t i v e X / R x J a v a / c o m m i t / 8 8 4 5 3 7 1 1 e c 1 b 0 e 0 3 e b 7 b a 0 2 d 4 2 b 5 1 f e 1 3 3 0 b 3 a 7 3 " > )
M a y 3 0 , 2 0 2 2
c o d e c o v . y m l
U p d a t e c o d e c o v . y m l ( # 5 0 6 6 )
F e b 3 , 2 0 1 7
g r a d l e . p r o p e r t i e s
3 . x : M o v e G r a d l e p r o p e r t i e s i n t o g r a d l e . p r o p e r t i e s f i l e ( # 7 2 6 0 )
M a y 7 , 2 0 2 1
g r a d l e w
U p g r a d e t o G r a d l e 7 . 4 . 1 ( # 7 4 0 2 )
M a r 1 0 , 2 0 2 2
g r a d l e w . b a t
3 . x : U p g r a d e G r a d l e t o 6 . 8 . 3 ( # 7 2 0 8 )
M a r 1 0 , 2 0 2 1
p m d . x m l
2 . x : c l e a n u p o f P M D s u g g e s t i o n s ( # 4 1 2 9 )
J u n 2 6 , 2 0 1 6
p u s h _ j a v a d o c . s h
F i x p u s h _ j a v a d o c . s h
N o v 2 0 , 2 0 2 0
s e t t i n g s . g r a d l e
N e b u l a B u i l d - S i n g l e M o d u l e
A u g 2 9 , 2 0 1 4
V i e w c o d e
R x J a v a : R e a c t i v e E x t e n s i o n s f o r t h e J V M
R x J a v a i s a J a v a V M i m p l e m e n t a t i o n o f R e a c t i v e E x t e n s i o n s : a l i b r a r y f o r c o m p o s i n g a s y n c h r o n o u s a n d e v e n t - b a s e d p r o g r a m s b y u s i n g o b s e r v a b l e s e q u e n c e s .
I t e x t e n d s t h e o b s e r v e r p a t t e r n t o s u p p o r t s e q u e n c e s o f d a t a / e v e n t s a n d a d d s o p e r a t o r s t h a t a l l o w y o u t o c o m p o s e s e q u e n c e s t o g e t h e r d e c l a r a t i v e l y w h i l e a b s t r a c t i n g a w a y c o n c e r n s a b o u t t h i n g s l i k e l o w - l e v e l t h r e a d i n g , s y n c h r o n i z a t i o n , t h r e a d - s a f e t y a n d c o n c u r r e n t d a t a s t r u c t u r e s .
● s i n g l e d e p e n d e n c y : R e a c t i v e - S t r e a m s
● J a v a 8 + o r A n d r o i d A P I 2 1 + r e q u i r e d
● J a v a 8 l a m b d a - f r i e n d l y A P I
● A n d r o i d d e s u g a r f r i e n d l y
● f i x e d A P I m i s t a k e s a n d m a n y l i m i t s o f R x J a v a 2
● i n t e n d e d t o b e a r e p l a c e m e n t f o r R x J a v a 2 w i t h r e l a t i v e l y f e w b i n a r y i n c o m p a t i b l e c h a n g e s
● n o n - o p i n i o n a t e d a b o u t t h e s o u r c e o f c o n c u r r e n c y ( t h r e a d s , p o o l s , e v e n t l o o p s , f i b e r s , a c t o r s , e t c . )
● a s y n c o r s y n c h r o n o u s e x e c u t i o n
● v i r t u a l t i m e a n d s c h e d u l e r s f o r p a r a m e t e r i z e d c o n c u r r e n c y
● t e s t a n d d i a g n o s t i c s u p p o r t v i a t e s t s c h e d u l e r s , t e s t c o n s u m e r s a n d p l u g i n h o o k s
L e a r n m o r e a b o u t R x J a v a i n g e n e r a l o n t h e W i k i H o m e .
ℹ ️ P l e a s e r e a d t h e W h a t ' s d i f f e r e n t i n 3 . 0 f o r d e t a i l s o n t h e c h a n g e s a n d m i g r a t i o n i n f o r m a t i o n w h e n u p g r a d i n g f r o m 2 . x .
V e r s i o n 2 . x
T h e 2 . x v e r s i o n i s e n d - o f - l i f e a s o f F e b r u a r y 2 8 , 2 0 2 1 . N o f u r t h e r d e v e l o p m e n t , s u p p o r t , m a i n t e n a n c e , P R s a n d u p d a t e s w i l l h a p p e n . T h e J a v a d o c o f t h e v e r y l a s t v e r s i o n , 2 . 2 . 2 1 , w i l l r e m a i n a c c e s s i b l e .
V e r s i o n 1 . x
T h e 1 . x v e r s i o n i s e n d - o f - l i f e a s o f M a r c h 3 1 , 2 0 1 8 . N o f u r t h e r d e v e l o p m e n t , s u p p o r t , m a i n t e n a n c e , P R s a n d u p d a t e s w i l l h a p p e n . T h e J a v a d o c o f t h e v e r y l a s t v e r s i o n , 1 . 3 . 8 , w i l l r e m a i n a c c e s s i b l e .
G e t t i n g s t a r t e d
S e t t i n g u p t h e d e p e n d e n c y
T h e f i r s t s t e p i s t o i n c l u d e R x J a v a 3 i n t o y o u r p r o j e c t , f o r e x a m p l e , a s a G r a d l e c o m p i l e d e p e n d e n c y :
implementation " io.reactivex.rxjava3:rxjava:3.x.y"
( P l e a s e r e p l a c e x a n d y w i t h t h e l a t e s t v e r s i o n n u m b e r s :
)
H e l l o W o r l d
T h e s e c o n d i s t o w r i t e t h e H e l l o W o r l d p r o g r a m :
package rxjava .examples ;
import io .reactivex .rxjava3 .core .*;
public class HelloWorld {
public static void main (String [] args ) {
Flowable .just ("Hello world" ).subscribe (System .out ::println );
}
}
N o t e t h a t R x J a v a 3 c o m p o n e n t s n o w l i v e u n d e r i o . r e a c t i v e x . r x j a v a 3 a n d t h e b a s e c l a s s e s a n d i n t e r f a c e s l i v e u n d e r i o . r e a c t i v e x . r x j a v a 3 . c o r e .
B a s e c l a s s e s
R x J a v a 3 f e a t u r e s s e v e r a l b a s e c l a s s e s y o u c a n d i s c o v e r o p e r a t o r s o n :
● i o . r e a c t i v e x . r x j a v a 3 . c o r e . F l o w a b l e : 0 . . N f l o w s , s u p p o r t i n g R e a c t i v e - S t r e a m s a n d b a c k p r e s s u r e
● i o . r e a c t i v e x . r x j a v a 3 . c o r e . O b s e r v a b l e : 0 . . N f l o w s , n o b a c k p r e s s u r e ,
● i o . r e a c t i v e x . r x j a v a 3 . c o r e . S i n g l e : a f l o w o f e x a c t l y 1 i t e m o r a n e r r o r ,
● i o . r e a c t i v e x . r x j a v a 3 . c o r e . C o m p l e t a b l e : a f l o w w i t h o u t i t e m s b u t o n l y a c o m p l e t i o n o r e r r o r s i g n a l ,
● i o . r e a c t i v e x . r x j a v a 3 . c o r e . M a y b e : a f l o w w i t h n o i t e m s , e x a c t l y o n e i t e m o r a n e r r o r .
S o m e t e r m i n o l o g y
U p s t r e a m , d o w n s t r e a m
T h e d a t a f l o w s i n R x J a v a c o n s i s t o f a s o u r c e , z e r o o r m o r e i n t e r m e d i a t e s t e p s f o l l o w e d b y a d a t a c o n s u m e r o r c o m b i n a t o r s t e p ( w h e r e t h e s t e p i s r e s p o n s i b l e t o c o n s u m e t h e d a t a f l o w b y s o m e m e a n s ) :
s o u r c e . o p e r a t o r 1 ( ) . o p e r a t o r 2 ( ) . o p e r a t o r 3 ( ) ) ; " > source .operator1 ().operator2 ().operator3 ().subscribe (consumer );
source .flatMap (value -> source .operator1 ().operator2 ().operator3 ());
H e r e , i f w e i m a g i n e o u r s e l v e s o n o p e r a t o r 2 , l o o k i n g t o t h e l e f t t o w a r d s t h e s o u r c e i s c a l l e d t h e u p s t r e a m . L o o k i n g t o t h e r i g h t t o w a r d s t h e s u b s c r i b e r / c o n s u m e r i s c a l l e d t h e d o w n s t r e a m . T h i s i s o f t e n m o r e a p p a r e n t w h e n e a c h e l e m e n t i s w r i t t e n o n a s e p a r a t e l i n e :
source
.operator1 ()
.operator2 ()
.operator3 ()
.subscribe (consumer )
O b j e c t s i n m o t i o n
I n R x J a v a ' s d o c u m e n t a t i o n , e m i s s i o n , e m i t s , i t e m , e v e n t , s i g n a l , d a t a a n d m e s s a g e a r e c o n s i d e r e d s y n o n y m s a n d r e p r e s e n t t h e o b j e c t t r a v e l i n g a l o n g t h e d a t a f l o w .
B a c k p r e s s u r e
W h e n t h e d a t a f l o w r u n s t h r o u g h a s y n c h r o n o u s s t e p s , e a c h s t e p m a y p e r f o r m d i f f e r e n t t h i n g s w i t h d i f f e r e n t s p e e d . T o a v o i d o v e r w h e l m i n g s u c h s t e p s , w h i c h u s u a l l y w o u l d m a n i f e s t i t s e l f a s i n c r e a s e d m e m o r y u s a g e d u e t o t e m p o r a r y b u f f e r i n g o r t h e n e e d f o r s k i p p i n g / d r o p p i n g d a t a , s o - c a l l e d b a c k p r e s s u r e i s a p p l i e d , w h i c h i s a f o r m o f f l o w c o n t r o l w h e r e t h e s t e p s c a n e x p r e s s h o w m a n y i t e m s a r e t h e y r e a d y t o p r o c e s s . T h i s a l l o w s c o n s t r a i n i n g t h e m e m o r y u s a g e o f t h e d a t a f l o w s i n s i t u a t i o n s w h e r e t h e r e i s g e n e r a l l y n o w a y f o r a s t e p t o k n o w h o w m a n y i t e m s t h e u p s t r e a m w i l l s e n d t o i t .
I n R x J a v a , t h e d e d i c a t e d F l o w a b l e c l a s s i s d e s i g n a t e d t o s u p p o r t b a c k p r e s s u r e a n d O b s e r v a b l e i s d e d i c a t e d t o t h e n o n - b a c k p r e s s u r e d o p e r a t i o n s ( s h o r t s e q u e n c e s , G U I i n t e r a c t i o n s , e t c . ) . T h e o t h e r t y p e s , S i n g l e , M a y b e a n d C o m p l e t a b l e d o n ' t s u p p o r t b a c k p r e s s u r e n o r s h o u l d t h e y ; t h e r e i s a l w a y s r o o m t o s t o r e o n e i t e m t e m p o r a r i l y .
A s s e m b l y t i m e
T h e p r e p a r a t i o n o f d a t a f l o w s b y a p p l y i n g v a r i o u s i n t e r m e d i a t e o p e r a t o r s h a p p e n s i n t h e s o - c a l l e d a s s e m b l y t i m e :
f l o w = F l o w a b l e . r a n g e ( 1 , 5 )
. m a p ( v - > v * v )
. f i l t e r ( v - > v % 3 = = 0 )
; " > Flowable <Integer > flow = Flowable .range (1 , 5 )
.map (v -> v * v )
.filter (v -> v % 3 == 0 )
;
A t t h i s p o i n t , t h e d a t a i s n o t f l o w i n g y e t a n d n o s i d e - e f f e c t s a r e h a p p e n i n g .
S u b s c r i p t i o n t i m e
T h i s i s a t e m p o r a r y s t a t e w h e n s u b s c r i b e ( ) i s c a l l e d o n a f l o w t h a t e s t a b l i s h e s t h e c h a i n o f p r o c e s s i n g s t e p s i n t e r n a l l y :
flow .subscribe (System .out ::println )
T h i s i s w h e n t h e s u b s c r i p t i o n s i d e - e f f e c t s a r e t r i g g e r e d ( s e e d o O n S u b s c r i b e ) . S o m e s o u r c e s b l o c k o r s t a r t e m i t t i n g i t e m s r i g h t a w a y i n t h i s s t a t e .
R u n t i m e
T h i s i s t h e s t a t e w h e n t h e f l o w s a r e a c t i v e l y e m i t t i n g i t e m s , e r r o r s o r c o m p l e t i o n s i g n a l s :
{
w h i l e ( ! e m i t t e r . i s D i s p o s e d ( ) ) {
l o n g t i m e = S y s t e m . c u r r e n t T i m e M i l l i s ( ) ;
e m i t t e r . o n N e x t ( t i m e ) ;
i f ( t i m e % 2 ! = 0 ) {
e m i t t e r . o n E r r o r ( n e w I l l e g a l S t a t e E x c e p t i o n ( " O d d m i l l i s e c o n d ! " ) ) ;
b r e a k ;
}
}
} )
. s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n , T h r o w a b l e : : p r i n t S t a c k T r a c e ) ; " > Observable .create (emitter -> {
while (! emitter .isDisposed ()) {
long time = System .currentTimeMillis ();
emitter .onNext (time );
if (time % 2 != 0 ) {
emitter .onError (new IllegalStateException ("Odd millisecond!" ));
break ;
}
}
})
.subscribe (System .out ::println , Throwable ::printStackTrace );
P r a c t i c a l l y , t h i s i s w h e n t h e b o d y o f t h e g i v e n e x a m p l e a b o v e e x e c u t e s .
S i m p l e b a c k g r o u n d c o m p u t a t i o n
O n e o f t h e c o m m o n u s e c a s e s f o r R x J a v a i s t o r u n s o m e c o m p u t a t i o n , n e t w o r k r e q u e s t o n a b a c k g r o u n d t h r e a d a n d s h o w t h e r e s u l t s ( o r e r r o r ) o n t h e U I t h r e a d :
{
T h r e a d . s l e e p ( 1 0 0 0 ) ; / / i m i t a t e e x p e n s i v e c o m p u t a t i o n
r e t u r n " D o n e " ;
} )
. s u b s c r i b e O n ( S c h e d u l e r s . i o ( ) )
. o b s e r v e O n ( S c h e d u l e r s . s i n g l e ( ) )
. s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n , T h r o w a b l e : : p r i n t S t a c k T r a c e ) ;
T h r e a d . s l e e p ( 2 0 0 0 ) ; / / <︲-- wait for the flow to finish"> import io .reactivex .rxjava3 .schedulers .Schedulers ;
Flowable .fromCallable (() -> {
Thread .sleep (1000 ); // imitate expensive computation
return "Done" ;
})
.subscribeOn (Schedulers .io ())
.observeOn (Schedulers .single ())
.subscribe (System .out ::println , Throwable ::printStackTrace );
Thread .sleep (2000 ); // <--- wait for the flow to finish
T h i s s t y l e o f c h a i n i n g m e t h o d s i s c a l l e d a f l u e n t A P I w h i c h r e s e m b l e s t h e b u i l d e r p a t t e r n . H o w e v e r , R x J a v a ' s r e a c t i v e t y p e s a r e i m m u t a b l e ; e a c h o f t h e m e t h o d c a l l s r e t u r n s a n e w F l o w a b l e w i t h a d d e d b e h a v i o r . T o i l l u s t r a t e , t h e e x a m p l e c a n b e r e w r i t t e n a s f o l l o w s :
s o u r c e = F l o w a b l e . f r o m C a l l a b l e ( ( ) - > {
T h r e a d . s l e e p ( 1 0 0 0 ) ; / / i m i t a t e e x p e n s i v e c o m p u t a t i o n
r e t u r n " D o n e " ;
} ) ;
F l o w a b l e r u n B a c k g r o u n d = s o u r c e . s u b s c r i b e O n ( S c h e d u l e r s . i o ( ) ) ;
F l o w a b l e s h o w F o r e g r o u n d = r u n B a c k g r o u n d . o b s e r v e O n ( S c h e d u l e r s . s i n g l e ( ) ) ;
s h o w F o r e g r o u n d . s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n , T h r o w a b l e : : p r i n t S t a c k T r a c e ) ;
T h r e a d . s l e e p ( 2 0 0 0 ) ; " > Flowable <String > source = Flowable .fromCallable (() -> {
Thread .sleep (1000 ); // imitate expensive computation
return "Done" ;
});
Flowable <String > runBackground = source .subscribeOn (Schedulers .io ());
Flowable <String > showForeground = runBackground .observeOn (Schedulers .single ());
showForeground .subscribe (System .out ::println , Throwable ::printStackTrace );
Thread .sleep (2000 );
T y p i c a l l y , y o u c a n m o v e c o m p u t a t i o n s o r b l o c k i n g I O t o s o m e o t h e r t h r e a d v i a s u b s c r i b e O n . O n c e t h e d a t a i s r e a d y , y o u c a n m a k e s u r e t h e y g e t p r o c e s s e d o n t h e f o r e g r o u n d o r G U I t h r e a d v i a o b s e r v e O n .
S c h e d u l e r s
R x J a v a o p e r a t o r s d o n ' t w o r k w i t h T h r e a d s o r E x e c u t o r S e r v i c e s d i r e c t l y b u t w i t h s o - c a l l e d S c h e d u l e r s t h a t a b s t r a c t a w a y s o u r c e s o f c o n c u r r e n c y b e h i n d a u n i f o r m A P I . R x J a v a 3 f e a t u r e s s e v e r a l s t a n d a r d s c h e d u l e r s a c c e s s i b l e v i a S c h e d u l e r s u t i l i t y c l a s s .
● S c h e d u l e r s . c o m p u t a t i o n ( ) : R u n c o m p u t a t i o n i n t e n s i v e w o r k o n a f i x e d n u m b e r o f d e d i c a t e d t h r e a d s i n t h e b a c k g r o u n d . M o s t a s y n c h r o n o u s o p e r a t o r s u s e t h i s a s t h e i r d e f a u l t S c h e d u l e r .
● S c h e d u l e r s . i o ( ) : R u n I / O - l i k e o r b l o c k i n g o p e r a t i o n s o n a d y n a m i c a l l y c h a n g i n g s e t o f t h r e a d s .
● S c h e d u l e r s . s i n g l e ( ) : R u n w o r k o n a s i n g l e t h r e a d i n a s e q u e n t i a l a n d F I F O m a n n e r .
● S c h e d u l e r s . t r a m p o l i n e ( ) : R u n w o r k i n a s e q u e n t i a l a n d F I F O m a n n e r i n o n e o f t h e p a r t i c i p a t i n g t h r e a d s , u s u a l l y f o r t e s t i n g p u r p o s e s .
T h e s e a r e a v a i l a b l e o n a l l J V M p l a t f o r m s b u t s o m e s p e c i f i c p l a t f o r m s , s u c h a s A n d r o i d , h a v e t h e i r o w n t y p i c a l S c h e d u l e r s d e f i n e d : A n d r o i d S c h e d u l e r s . m a i n T h r e a d ( ) , S w i n g S c h e d u l e r . i n s t a n c e ( ) or J a v a F X S c h e d u l e r s . g u i ( ) .
I n a d d i t i o n , t h e r e i s a n o p t i o n t o w r a p a n e x i s t i n g E x e c u t o r ( a n d i t s s u b t y p e s s u c h a s E x e c u t o r S e r v i c e ) i n t o a S c h e d u l e r v i a S c h e d u l e r s . f r o m ( E x e c u t o r ) . T h i s c a n b e u s e d , f o r e x a m p l e , t o h a v e a l a r g e r b u t s t i l l f i x e d p o o l o f t h r e a d s ( u n l i k e c o m p u t a t i o n ( ) a n d i o ( ) r e s p e c t i v e l y ) .
T h e T h r e a d . s l e e p ( 2 0 0 0 ) ; a t t h e e n d i s n o a c c i d e n t . I n R x J a v a t h e d e f a u l t S c h e d u l e r s r u n o n d a e m o n t h r e a d s , w h i c h m e a n s o n c e t h e J a v a m a i n t h r e a d e x i t s , t h e y a l l g e t s t o p p e d a n d b a c k g r o u n d c o m p u t a t i o n s m a y n e v e r h a p p e n . S l e e p i n g f o r s o m e t i m e i n t h i s e x a m p l e s i t u a t i o n s l e t s y o u s e e t h e o u t p u t o f t h e f l o w o n t h e c o n s o l e w i t h t i m e t o s p a r e .
C o n c u r r e n c y w i t h i n a f l o w
F l o w s i n R x J a v a a r e s e q u e n t i a l i n n a t u r e s p l i t i n t o p r o c e s s i n g s t a g e s t h a t m a y r u n c o n c u r r e n t l y w i t h e a c h o t h e r :
v * v )
. b l o c k i n g S u b s c r i b e ( S y s t e m . o u t : : p r i n t l n ) ; " > Flowable .range (1 , 10 )
.observeOn (Schedulers .computation ())
.map (v -> v * v )
.blockingSubscribe (System .out ::println );
T h i s e x a m p l e f l o w s q u a r e s t h e n u m b e r s f r o m 1 t o 1 0 o n t h e c o m p u t a t i o n S c h e d u l e r a n d c o n s u m e s t h e r e s u l t s o n t h e " m a i n " t h r e a d ( m o r e p r e c i s e l y , t h e c a l l e r t h r e a d o f b l o c k i n g S u b s c r i b e ) . H o w e v e r , t h e l a m b d a v - > v * v d o e s n ' t r u n i n p a r a l l e l f o r t h i s f l o w ; i t r e c e i v e s t h e v a l u e s 1 t o 1 0 o n t h e s a m e c o m p u t a t i o n t h r e a d o n e a f t e r t h e o t h e r .
P a r a l l e l p r o c e s s i n g
P r o c e s s i n g t h e n u m b e r s 1 t o 1 0 i n p a r a l l e l i s a b i t m o r e i n v o l v e d :
F l o w a b l e . j u s t ( v )
. s u b s c r i b e O n ( S c h e d u l e r s . c o m p u t a t i o n ( ) )
. m a p ( w - > w * w )
)
. b l o c k i n g S u b s c r i b e ( S y s t e m . o u t : : p r i n t l n ) ; " > Flowable .range (1 , 10 )
.flatMap (v ->
Flowable .just (v )
.subscribeOn (Schedulers .computation ())
.map (w -> w * w )
)
.blockingSubscribe (System .out ::println );
P r a c t i c a l l y , p a r a l l e l i s m i n R x J a v a m e a n s r u n n i n g i n d e p e n d e n t f l o w s a n d m e r g i n g t h e i r r e s u l t s b a c k i n t o a s i n g l e f l o w . T h e o p e r a t o r f l a t M a p d o e s t h i s b y f i r s t m a p p i n g e a c h n u m b e r f r o m 1 t o 1 0 i n t o i t s o w n i n d i v i d u a l F l o w a b l e , r u n s t h e m a n d m e r g e s t h e c o m p u t e d s q u a r e s .
N o t e , h o w e v e r , t h a t f l a t M a p d o e s n ' t g u a r a n t e e a n y o r d e r a n d t h e i t e m s f r o m t h e i n n e r f l o w s m a y e n d u p i n t e r l e a v e d . T h e r e a r e a l t e r n a t i v e o p e r a t o r s :
● c o n c a t M a p t h a t m a p s a n d r u n s o n e i n n e r f l o w a t a t i m e a n d
● c o n c a t M a p E a g e r w h i c h r u n s a l l i n n e r f l o w s " a t o n c e " b u t t h e o u t p u t f l o w w i l l b e i n t h e o r d e r t h o s e i n n e r f l o w s w e r e c r e a t e d .
A l t e r n a t i v e l y , t h e F l o w a b l e . p a r a l l e l ( ) o p e r a t o r a n d t h e P a r a l l e l F l o w a b l e t y p e h e l p a c h i e v e t h e s a m e p a r a l l e l p r o c e s s i n g p a t t e r n :
v * v )
. s e q u e n t i a l ( )
. b l o c k i n g S u b s c r i b e ( S y s t e m . o u t : : p r i n t l n ) ; " > Flowable .range (1 , 10 )
.parallel ()
.runOn (Schedulers .computation ())
.map (v -> v * v )
.sequential ()
.blockingSubscribe (System .out ::println );
D e p e n d e n t s u b - f l o w s
f l a t M a p i s a p o w e r f u l o p e r a t o r a n d h e l p s i n a l o t o f s i t u a t i o n s . F o r e x a m p l e , g i v e n a s e r v i c e t h a t r e t u r n s a F l o w a b l e , w e ' d l i k e t o c a l l a n o t h e r s e r v i c e w i t h v a l u e s e m i t t e d b y t h e f i r s t s e r v i c e :
i n v e n t o r y S o u r c e = w a r e h o u s e . g e t I n v e n t o r y A s y n c ( ) ;
i n v e n t o r y S o u r c e
. f l a t M a p ( i n v e n t o r y I t e m - > e r p . g e t D e m a n d A s y n c ( i n v e n t o r y I t e m . g e t I d ( ) )
. m a p ( d e m a n d - > " I t e m " + i n v e n t o r y I t e m . g e t N a m e ( ) + " h a s d e m a n d " + d e m a n d ) )
. s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n ) ; " > Flowable <Inventory > inventorySource = warehouse .getInventoryAsync ();
inventorySource
.flatMap (inventoryItem -> erp .getDemandAsync (inventoryItem .getId ())
.map (demand -> "Item " + inventoryItem .getName () + " has demand " + demand ))
.subscribe (System .out ::println );
C o n t i n u a t i o n s
S o m e t i m e s , w h e n a n i t e m h a s b e c o m e a v a i l a b l e , o n e w o u l d l i k e t o p e r f o r m s o m e d e p e n d e n t c o m p u t a t i o n s o n i t . T h i s i s s o m e t i m e s c a l l e d c o n t i n u a t i o n s a n d , d e p e n d i n g o n w h a t s h o u l d h a p p e n a n d w h a t t y p e s a r e i n v o l v e d , m a y i n v o l v e v a r i o u s o p e r a t o r s t o a c c o m p l i s h .
D e p e n d e n t
T h e m o s t t y p i c a l s c e n a r i o i s t o g i v e n a v a l u e , i n v o k e a n o t h e r s e r v i c e , a w a i t a n d c o n t i n u e w i t h i t s r e s u l t :
s e r v i c e . a n o t h e r A p i C a l l ( v a l u e ) )
. f l a t M a p ( n e x t - > s e r v i c e . f i n a l C a l l ( n e x t ) ) " > service .apiCall ()
.flatMap (value -> service .anotherApiCall (value ))
.flatMap (next -> service .finalCall (next ))
I t i s o f t e n t h e c a s e a l s o t h a t l a t e r s e q u e n c e s w o u l d r e q u i r e v a l u e s f r o m e a r l i e r m a p p i n g s . T h i s c a n b e a c h i e v e d b y m o v i n g t h e o u t e r f l a t M a p i n t o t h e i n n e r p a r t s o f t h e p r e v i o u s f l a t M a p f o r e x a m p l e :
s e r v i c e . a n o t h e r A p i C a l l ( v a l u e )
. f l a t M a p ( n e x t - > s e r v i c e . f i n a l C a l l B o t h ( v a l u e , n e x t ) )
) " > service .apiCall ()
.flatMap (value ->
service .anotherApiCall (value )
.flatMap (next -> service .finalCallBoth (value , next ))
)
H e r e , t h e o r i g i n a l v a l u e w i l l b e a v a i l a b l e i n s i d e t h e i n n e r f l a t M a p , c o u r t e s y o f l a m b d a v a r i a b l e c a p t u r e .
N o n - d e p e n d e n t
I n o t h e r s c e n a r i o s , t h e r e s u l t ( s ) o f t h e f i r s t s o u r c e / d a t a f l o w i s i r r e l e v a n t a n d o n e w o u l d l i k e t o c o n t i n u e w i t h a q u a s i i n d e p e n d e n t a n o t h e r s o u r c e . H e r e , f l a t M a p w o r k s a s w e l l :
s o m e S i n g l e S o u r c e )
c o n t i n u e d . m a p ( v - > v . t o S t r i n g ( ) )
. s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n , T h r o w a b l e : : p r i n t S t a c k T r a c e ) ; " > Observable continued = sourceObservable .flatMapSingle (ignored -> someSingleSource )
continued .map (v -> v .toString ())
.subscribe (System .out ::println , Throwable ::printStackTrace );
h o w e v e r , t h e c o n t i n u a t i o n i n t h i s c a s e s t a y s O b s e r v a b l e i n s t e a d o f t h e l i k e l y m o r e a p p r o p r i a t e S i n g l e . ( T h i s i s u n d e r s t a n d a b l e b e c a u s e
f r o m t h e p e r s p e c t i v e o f f l a t M a p S i n g l e , s o u r c e O b s e r v a b l e i s a m u l t i - v a l u e d s o u r c e a n d t h u s t h e m a p p i n g m a y r e s u l t i n m u l t i p l e v a l u e s a s w e l l ) .
O f t e n t h o u g h t h e r e i s a w a y t h a t i s s o m e w h a t m o r e e x p r e s s i v e ( a n d a l s o l o w e r o v e r h e a d ) b y u s i n g C o m p l e t a b l e a s t h e m e d i a t o r a n d i t s o p e r a t o r a n d T h e n t o r e s u m e w i t h s o m e t h i n g e l s e :
v . t o S t r i n g ( ) ) " > sourceObservable
.ignoreElements () // returns Completable
.andThen (someSingleSource )
.map (v -> v .toString ())
T h e o n l y d e p e n d e n c y b e t w e e n t h e s o u r c e O b s e r v a b l e a n d t h e s o m e S i n g l e S o u r c e i s t h a t t h e f o r m e r s h o u l d c o m p l e t e n o r m a l l y i n o r d e r f o r t h e l a t t e r t o b e c o n s u m e d .
D e f e r r e d - d e p e n d e n t
S o m e t i m e s , t h e r e i s a n i m p l i c i t d a t a d e p e n d e n c y b e t w e e n t h e p r e v i o u s s e q u e n c e a n d t h e n e w s e q u e n c e t h a t , f o r s o m e r e a s o n , w a s n o t f l o w i n g t h r o u g h t h e " r e g u l a r c h a n n e l s " . O n e w o u l d b e i n c l i n e d t o w r i t e s u c h c o n t i n u a t i o n s a s f o l l o w s :
c o u n t . i n c r e m e n t A n d G e t ( ) )
. i g n o r e E l e m e n t s ( )
. a n d T h e n ( S i n g l e . j u s t ( c o u n t . g e t ( ) ) )
. s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n ) ; " > AtomicInteger count = new AtomicInteger ();
Observable .range (1 , 10 )
.doOnNext (ignored -> count .incrementAndGet ())
.ignoreElements ()
.andThen (Single .just (count .get ()))
.subscribe (System .out ::println );
U n f o r t u n a t e l y , t h i s p r i n t s 0 b e c a u s e S i n g l e . j u s t ( c o u n t . g e t ( ) ) i s e v a l u a t e d a t a s s e m b l y t i m e w h e n t h e d a t a f l o w h a s n ' t e v e n r u n y e t . W e n e e d s o m e t h i n g t h a t d e f e r s t h e e v a l u a t i o n o f t h i s S i n g l e s o u r c e u n t i l r u n t i m e w h e n t h e m a i n s o u r c e c o m p l e t e s :
c o u n t . i n c r e m e n t A n d G e t ( ) )
. i g n o r e E l e m e n t s ( )
. a n d T h e n ( S i n g l e . d e f e r ( ( ) - > S i n g l e . j u s t ( c o u n t . g e t ( ) ) ) )
. s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n ) ; " > AtomicInteger count = new AtomicInteger ();
Observable .range (1 , 10 )
.doOnNext (ignored -> count .incrementAndGet ())
.ignoreElements ()
.andThen (Single .defer (() -> Single .just (count .get ())))
.subscribe (System .out ::println );
or
c o u n t . i n c r e m e n t A n d G e t ( ) )
. i g n o r e E l e m e n t s ( )
. a n d T h e n ( S i n g l e . f r o m C a l l a b l e ( ( ) - > c o u n t . g e t ( ) ) )
. s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n ) ; " > AtomicInteger count = new AtomicInteger ();
Observable .range (1 , 10 )
.doOnNext (ignored -> count .incrementAndGet ())
.ignoreElements ()
.andThen (Single .fromCallable (() -> count .get ()))
.subscribe (System .out ::println );
T y p e c o n v e r s i o n s
S o m e t i m e s , a s o u r c e o r s e r v i c e r e t u r n s a d i f f e r e n t t y p e t h a n t h e f l o w t h a t i s s u p p o s e d t o w o r k w i t h i t . F o r e x a m p l e , i n t h e i n v e n t o r y e x a m p l e a b o v e , g e t D e m a n d A s y n c c o u l d r e t u r n a S i n g l e < D e m a n d R e c o r d > . I f t h e c o d e e x a m p l e i s l e f t u n c h a n g e d , t h i s w i l l r e s u l t i n a c o m p i l e - t i m e e r r o r ( h o w e v e r , o f t e n w i t h a m i s l e a d i n g e r r o r m e s s a g e a b o u t l a c k o f o v e r l o a d ) .
I n s u c h s i t u a t i o n s , t h e r e a r e u s u a l l y t w o o p t i o n s t o f i x t h e t r a n s f o r m a t i o n : 1 ) c o n v e r t t o t h e d e s i r e d t y p e o r 2 ) f i n d a n d u s e a n o v e r l o a d o f t h e s p e c i f i c o p e r a t o r s u p p o r t i n g t h e d i f f e r e n t t y p e .
C o n v e r t i n g t o t h e d e s i r e d t y p e
E a c h r e a c t i v e b a s e c l a s s f e a t u r e s o p e r a t o r s t h a t c a n p e r f o r m s u c h c o n v e r s i o n s , i n c l u d i n g t h e p r o t o c o l c o n v e r s i o n s , t o m a t c h s o m e o t h e r t y p e . T h e f o l l o w i n g m a t r i x s h o w s t h e a v a i l a b l e c o n v e r s i o n o p t i o n s :
Flowable
Observable
Single
Maybe
Completable
Flowable
toObservable
first, firstOrError, single, singleOrError, last, lastOrError1
firstElement, singleElement, lastElement
ignoreElements
Observable
toFlowable2
first, firstOrError, single, singleOrError, last, lastOrError1
firstElement, singleElement, lastElement
ignoreElements
Single
toFlowable3
toObservable
toMaybe
ignoreElement
Maybe
toFlowable3
toObservable
toSingle
ignoreElement
Completable
toFlowable
toObservable
toSingle
toMaybe
1 : W h e n t u r n i n g a m u l t i - v a l u e d s o u r c e i n t o a s i n g l e - v a l u e d s o u r c e , o n e s h o u l d d e c i d e w h i c h o f t h e m a n y s o u r c e v a l u e s s h o u l d b e c o n s i d e r e d a s t h e r e s u l t .
2 : T u r n i n g a n O b s e r v a b l e i n t o F l o w a b l e r e q u i r e s a n a d d i t i o n a l d e c i s i o n : w h a t t o d o w i t h t h e p o t e n t i a l u n c o n s t r a i n e d f l o w
o f t h e s o u r c e O b s e r v a b l e ? T h e r e a r e s e v e r a l s t r a t e g i e s a v a i l a b l e ( s u c h a s b u f f e r i n g , d r o p p i n g , k e e p i n g t h e l a t e s t ) v i a t h e B a c k p r e s s u r e S t r a t e g y p a r a m e t e r o r v i a s t a n d a r d F l o w a b l e o p e r a t o r s s u c h a s o n B a c k p r e s s u r e B u f f e r , o n B a c k p r e s s u r e D r o p , o n B a c k p r e s s u r e L a t e s t w h i c h a l s o
a l l o w f u r t h e r c u s t o m i z a t i o n o f t h e b a c k p r e s s u r e b e h a v i o r .
3 : W h e n t h e r e i s o n l y ( a t m o s t ) o n e s o u r c e i t e m , t h e r e i s n o p r o b l e m w i t h b a c k p r e s s u r e a s i t c a n b e a l w a y s s t o r e d u n t i l t h e d o w n s t r e a m i s r e a d y t o c o n s u m e .
U s i n g a n o v e r l o a d w i t h t h e d e s i r e d t y p e
M a n y f r e q u e n t l y u s e d o p e r a t o r h a s o v e r l o a d s t h a t c a n d e a l w i t h t h e o t h e r t y p e s . T h e s e a r e u s u a l l y n a m e d w i t h t h e s u f f i x o f t h e t a r g e t t y p e :
Operator
Overloads
flatMap
flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable
concatMap
concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable
switchMap
switchMapSingle, switchMapMaybe, switchMapCompletable
T h e r e a s o n t h e s e o p e r a t o r s h a v e a s u f f i x i n s t e a d o f s i m p l y h a v i n g t h e s a m e n a m e w i t h d i f f e r e n t s i g n a t u r e i s t y p e e r a s u r e . J a v a d o e s n ' t c o n s i d e r s i g n a t u r e s s u c h a s o p e r a t o r ( F u n c t i o n < T , S i n g l e < R > > ) a n d o p e r a t o r ( F u n c t i o n < T , M a y b e < R > > ) d i f f e r e n t ( u n l i k e C # ) a n d d u e t o e r a s u r e , t h e t w o o p e r a t o r s w o u l d e n d u p a s d u p l i c a t e m e t h o d s w i t h t h e s a m e s i g n a t u r e .
O p e r a t o r n a m i n g c o n v e n t i o n s
N a m i n g i n p r o g r a m m i n g i s o n e o f t h e h a r d e s t t h i n g s a s n a m e s a r e e x p e c t e d t o b e n o t l o n g , e x p r e s s i v e , c a p t u r i n g a n d e a s i l y m e m o r a b l e . U n f o r t u n a t e l y , t h e t a r g e t l a n g u a g e ( a n d p r e - e x i s t i n g c o n v e n t i o n s ) m a y n o t g i v e t o o m u c h h e l p i n t h i s r e g a r d ( u n u s a b l e k e y w o r d s , t y p e e r a s u r e , t y p e a m b i g u i t i e s , e t c . ) .
U n u s a b l e k e y w o r d s
I n t h e o r i g i n a l R x . N E T , t h e o p e r a t o r t h a t e m i t s a s i n g l e i t e m a n d t h e n c o m p l e t e s i s c a l l e d R e t u r n ( T ) . S i n c e t h e J a v a c o n v e n t i o n i s t o h a v e a l o w e r c a s e l e t t e r s t a r t a m e t h o d n a m e , t h i s w o u l d h a v e b e e n r e t u r n ( T ) w h i c h i s a k e y w o r d i n J a v a a n d t h u s n o t a v a i l a b l e . T h e r e f o r e , R x J a v a c h o s e t o n a m e t h i s o p e r a t o r j u s t ( T ) . T h e s a m e l i m i t a t i o n e x i s t s f o r t h e o p e r a t o r S w i t c h , w h i c h h a d t o b e n a m e d s w i t c h O n N e x t . Y e t a n o t h e r e x a m p l e i s C a t c h w h i c h w a s n a m e d o n E r r o r R e s u m e N e x t .
T y p e e r a s u r e
M a n y o p e r a t o r s t h a t e x p e c t t h e u s e r t o p r o v i d e s o m e f u n c t i o n r e t u r n i n g a r e a c t i v e t y p e c a n ' t b e o v e r l o a d e d b e c a u s e t h e t y p e e r a s u r e a r o u n d a F u n c t i o n < T , X > t u r n s s u c h m e t h o d s i g n a t u r e s i n t o d u p l i c a t e s . R x J a v a c h o s e t o n a m e s u c h o p e r a t o r s b y a p p e n d i n g t h e t y p e a s s u f f i x a s w e l l :
f l a t M a p ( F u n c t i o n super T, ? extends Publisher extends R> > m a p p e r )
F l o w a b l e f l a t M a p M a y b e ( F u n c t i o n super T, ? extends MaybeSource extends R> > m a p p e r ) " > Flowable <R > flatMap (Function <? super T , ? extends Publisher <? extends R >> mapper )
Flowable <R > flatMapMaybe (Function <? super T , ? extends MaybeSource <? extends R >> mapper )
T y p e a m b i g u i t i e s
E v e n t h o u g h c e r t a i n o p e r a t o r s h a v e n o p r o b l e m s f r o m t y p e e r a s u r e , t h e i r s i g n a t u r e m a y t u r n u p b e i n g a m b i g u o u s , e s p e c i a l l y i f o n e u s e s J a v a 8 a n d l a m b d a s . F o r e x a m p l e , t h e r e a r e s e v e r a l o v e r l o a d s o f c o n c a t W i t h t a k i n g t h e v a r i o u s o t h e r r e a c t i v e b a s e t y p e s a s a r g u m e n t s ( f o r p r o v i d i n g c o n v e n i e n c e a n d p e r f o r m a n c e b e n e f i t s i n t h e u n d e r l y i n g i m p l e m e n t a t i o n ) :
c o n c a t W i t h ( P u b l i s h e r extends T> o t h e r ) ;
F l o w a b l e c o n c a t W i t h ( S i n g l e S o u r c e extends T> o t h e r ) ; " > Flowable <T > concatWith (Publisher <? extends T > other );
Flowable <T > concatWith (SingleSource <? extends T > other );
B o t h P u b l i s h e r a n d S i n g l e S o u r c e a p p e a r a s f u n c t i o n a l i n t e r f a c e s ( t y p e s w i t h o n e a b s t r a c t m e t h o d ) a n d m a y e n c o u r a g e u s e r s t o t r y t o p r o v i d e a l a m b d a e x p r e s s i o n :
S i n g l e . j u s t ( 2 ) )
. s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n , T h r o w a b l e : : p r i n t S t a c k T r a c e ) ; " > someSource .concatWith (s -> Single .just (2 ))
.subscribe (System .out ::println , Throwable ::printStackTrace );
U n f o r t u n a t e l y , t h i s a p p r o a c h d o e s n ' t w o r k a n d t h e e x a m p l e d o e s n o t p r i n t 2 a t a l l . I n f a c t , s i n c e v e r s i o n 2 . 1 . 1 0 , i t d o e s n ' t
e v e n c o m p i l e b e c a u s e a t l e a s t 4 c o n c a t W i t h o v e r l o a d s e x i s t a n d t h e c o m p i l e r f i n d s t h e c o d e a b o v e a m b i g u o u s .
T h e u s e r i n s u c h s i t u a t i o n s p r o b a b l y w a n t e d t o d e f e r s o m e c o m p u t a t i o n u n t i l t h e s o m e S o u r c e h a s c o m p l e t e d , t h u s t h e c o r r e c t
u n a m b i g u o u s o p e r a t o r s h o u l d h a v e b e e n d e f e r :
S i n g l e . j u s t ( 2 ) ) )
. s u b s c r i b e ( S y s t e m . o u t : : p r i n t l n , T h r o w a b l e : : p r i n t S t a c k T r a c e ) ; " > someSource .concatWith (Single .defer (() -> Single .just (2 )))
.subscribe (System .out ::println , Throwable ::printStackTrace );
S o m e t i m e s , a s u f f i x i s a d d e d t o a v o i d l o g i c a l a m b i g u i t i e s t h a t m a y c o m p i l e b u t p r o d u c e t h e w r o n g t y p e i n a f l o w :
m e r g e ( P u b l i s h e r extends Publisher extends T> > s o u r c e s ) ;
F l o w a b l e m e r g e A r r a y ( P u b l i s h e r extends T> . . . s o u r c e s ) ; " > Flowable <T > merge (Publisher <? extends Publisher <? extends T >> sources );
Flowable <T > mergeArray (Publisher <? extends T >... sources );
T h i s c a n g e t a l s o a m b i g u o u s w h e n f u n c t i o n a l i n t e r f a c e t y p e s g e t i n v o l v e d a s t h e t y p e a r g u m e n t T .
E r r o r h a n d l i n g
D a t a f l o w s c a n f a i l , a t w h i c h p o i n t t h e e r r o r i s e m i t t e d t o t h e c o n s u m e r ( s ) . S o m e t i m e s t h o u g h , m u l t i p l e s o u r c e s m a y f a i l a t w h i c h p o i n t t h e r e i s a c h o i c e w h e t h e r o r n o t w a i t f o r a l l o f t h e m t o c o m p l e t e o r f a i l . T o i n d i c a t e t h i s o p p o r t u n i t y , m a n y o p e r a t o r n a m e s a r e s u f f i x e d w i t h t h e D e l a y E r r o r w o r d s ( w h i l e o t h e r s f e a t u r e a d e l a y E r r o r or d e l a y E r r o r s b o o l e a n f l a g i n o n e o f t h e i r o v e r l o a d s ) :
c o n c a t ( P u b l i s h e r extends Publisher extends T> > s o u r c e s ) ;
F l o w a b l e c o n c a t D e l a y E r r o r ( P u b l i s h e r extends Publisher extends T> > s o u r c e s ) ; " > Flowable <T > concat (Publisher <? extends Publisher <? extends T >> sources );
Flowable <T > concatDelayError (Publisher <? extends Publisher <? extends T >> sources );
O f c o u r s e , s u f f i x e s o f v a r i o u s k i n d s m a y a p p e a r t o g e t h e r :
c o n c a t A r r a y E a g e r D e l a y E r r o r ( P u b l i s h e r extends T> . . . s o u r c e s ) ; " > Flowable <T > concatArrayEagerDelayError (Publisher <? extends T >... sources );
B a s e c l a s s v s b a s e t y p e
T h e b a s e c l a s s e s c a n b e c o n s i d e r e d h e a v y d u e t o t h e s h e e r n u m b e r o f s t a t i c a n d i n s t a n c e m e t h o d s o n t h e m . R x J a v a 3 ' s d e s i g n w a s h e a v i l y i n f l u e n c e d b y t h e R e a c t i v e S t r e a m s s p e c i f i c a t i o n , t h e r e f o r e , t h e l i b r a r y f e a t u r e s a c l a s s a n d a n i n t e r f a c e p e r e a c h r e a c t i v e t y p e :
Type
Class
Interface
Consumer
0..N backpressured
Flowable
Publisher1
Subscriber
0..N unbounded
Observable
ObservableSource2
Observer
1 element or error
Single
SingleSource
SingleObserver
0..1 element or error
Maybe
MaybeSource
MaybeObserver
0 element or error
Completable
CompletableSource
CompletableObserver
1 T h e o r g . r e a c t i v e s t r e a m s . P u b l i s h e r i s p a r t o f t h e e x t e r n a l R e a c t i v e S t r e a m s l i b r a r y . I t i s t h e m a i n t y p e t o i n t e r a c t w i t h o t h e r r e a c t i v e l i b r a r i e s t h r o u g h a s t a n d a r d i z e d m e c h a n i s m g o v e r n e d b y t h e R e a c t i v e S t r e a m s s p e c i f i c a t i o n .
2 T h e n a m i n g c o n v e n t i o n o f t h e i n t e r f a c e w a s t o a p p e n d S o u r c e t o t h e s e m i - t r a d i t i o n a l c l a s s n a m e . T h e r e i s n o F l o w a b l e S o u r c e s i n c e P u b l i s h e r i s p r o v i d e d b y t h e R e a c t i v e S t r e a m s l i b r a r y ( a n d s u b t y p i n g i t w o u l d n ' t h a v e h e l p e d w i t h i n t e r o p e r a t i o n e i t h e r ) . T h e s e i n t e r f a c e s a r e , h o w e v e r , n o t s t a n d a r d i n t h e s e n s e o f t h e R e a c t i v e S t r e a m s s p e c i f i c a t i o n a n d a r e c u r r e n t l y R x J a v a s p e c i f i c o n l y .
R 8 a n d P r o G u a r d s e t t i n g s
B y d e f a u l t , R x J a v a i t s e l f d o e s n ' t r e q u i r e a n y P r o G u a r d / R 8 s e t t i n g s a n d s h o u l d w o r k w i t h o u t p r o b l e m s . U n f o r t u n a t e l y , t h e R e a c t i v e S t r e a m s d e p e n d e n c y s i n c e v e r s i o n 1 . 0 . 3 h a s e m b e d d e d J a v a 9 c l a s s f i l e s i n i t s J A R t h a t c a n c a u s e w a r n i n g s w i t h t h e p l a i n P r o G u a r d :
Warning: org.reactivestreams.FlowAdapters$FlowPublisherFromReactive: can't find superclass or interface java.util.concurrent.Flow$Publisher
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveProcessor: can't find superclass or interface java.util.concurrent.Flow$Processor
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber: can't find superclass or interface java.util.concurrent.Flow$Subscriber
Warning: org.reactivestreams.FlowAdapters$FlowToReactiveSubscription: can't find superclass or interface java.util.concurrent.Flow$Subscription
Warning: org.reactivestreams.FlowAdapters: can't find referenced class java.util.concurrent.Flow$Publisher
I t i s r e c o m m e n d e d o n e s e t s u p t h e f o l l o w i n g - d o n t w a r n e n t r y i n t h e a p p l i c a t i o n ' s p r o g u a r d - r u l e s e t f i l e :
-dontwarn java.util.concurrent.Flow*
F o r R 8 , t h e R x J a v a j a r i n c l u d e s t h e M E T A - I N F / p r o g u a r d / r x j a v a 3 . p r o w i t h t h e s a m e n o - w a r n i n g c l a u s e a n d s h o u l d a p p l y a u t o m a t i c a l l y .
F u r t h e r r e a d i n g
F o r f u r t h e r d e t a i l s , c o n s u l t t h e w i k i .
C o m m u n i c a t i o n
● G o o g l e G r o u p : R x J a v a
● T w i t t e r : @ R x J a v a
● G i t H u b I s s u e s
● S t a c k O v e r f l o w : r x - j a v a a n d r x - j a v a 2
● G i t t e r . i m
V e r s i o n i n g
V e r s i o n 3 . x i s i n d e v e l o p m e n t . B u g f i x e s w i l l b e a p p l i e d t o b o t h 2 . x a n d 3 . x b r a n c h e s , b u t n e w f e a t u r e s w i l l o n l y b e a d d e d t o 3 . x .
M i n o r 3 . x i n c r e m e n t s ( s u c h a s 3 . 1 , 3 . 2 , e t c ) w i l l o c c u r w h e n n o n - t r i v i a l n e w f u n c t i o n a l i t y i s a d d e d o r s i g n i f i c a n t e n h a n c e m e n t s o r b u g f i x e s o c c u r t h a t m a y h a v e b e h a v i o r a l c h a n g e s t h a t m a y a f f e c t s o m e e d g e c a s e s ( s u c h a s d e p e n d e n c e o n b e h a v i o r r e s u l t i n g f r o m a b u g ) . A n e x a m p l e o f a n e n h a n c e m e n t t h a t w o u l d c l a s s i f y a s t h i s i s a d d i n g r e a c t i v e p u l l b a c k p r e s s u r e s u p p o r t t o a n o p e r a t o r t h a t p r e v i o u s l y d i d n o t s u p p o r t i t . T h i s s h o u l d b e b a c k w a r d s c o m p a t i b l e b u t d o e s b e h a v e d i f f e r e n t l y .
P a t c h 3 . x . y i n c r e m e n t s ( s u c h a s 3 . 0 . 0 - > 3 . 0 . 1 , 3 . 3 . 1 - > 3 . 3 . 2 , e t c ) w i l l o c c u r f o r b u g f i x e s a n d t r i v i a l f u n c t i o n a l i t y ( l i k e a d d i n g a m e t h o d o v e r l o a d ) . N e w f u n c t i o n a l i t y m a r k e d w i t h a n @ B e t a or @ E x p e r i m e n t a l a n n o t a t i o n c a n a l s o b e a d d e d i n t h e p a t c h r e l e a s e s t o a l l o w r a p i d e x p l o r a t i o n a n d i t e r a t i o n o f u n s t a b l e n e w f u n c t i o n a l i t y .
@ B e t a
A P I s m a r k e d w i t h t h e @ B e t a a n n o t a t i o n a t t h e c l a s s o r m e t h o d l e v e l a r e s u b j e c t t o c h a n g e . T h e y c a n b e m o d i f i e d i n a n y w a y , o r e v e n r e m o v e d , a t a n y t i m e . I f y o u r c o d e i s a l i b r a r y i t s e l f ( i . e . i t i s u s e d o n t h e C L A S S P A T H o f u s e r s o u t s i d e y o u r c o n t r o l ) , y o u s h o u l d n o t u s e b e t a A P I s , u n l e s s y o u r e p a c k a g e t h e m ( e . g . u s i n g P r o G u a r d , s h a d i n g , e t c ) .
@ E x p e r i m e n t a l
A P I s m a r k e d w i t h t h e @ E x p e r i m e n t a l a n n o t a t i o n a t t h e c l a s s o r m e t h o d l e v e l w i l l a l m o s t c e r t a i n l y c h a n g e . T h e y c a n b e m o d i f i e d i n a n y w a y , o r e v e n r e m o v e d , a t a n y t i m e . Y o u s h o u l d n o t u s e o r r e l y o n t h e m i n a n y p r o d u c t i o n c o d e . T h e y a r e p u r e l y t o a l l o w b r o a d t e s t i n g a n d f e e d b a c k .
@ D e p r e c a t e d
A P I s m a r k e d w i t h t h e @ D e p r e c a t e d a n n o t a t i o n a t t h e c l a s s o r m e t h o d l e v e l w i l l r e m a i n s u p p o r t e d u n t i l t h e n e x t m a j o r r e l e a s e , b u t i t i s r e c o m m e n d e d t o s t o p u s i n g t h e m .
i o . r e a c t i v e x . r x j a v a 3 . i n t e r n a l . *
A l l c o d e i n s i d e t h e i o . r e a c t i v e x . r x j a v a 3 . i n t e r n a l . * p a c k a g e s a r e c o n s i d e r e d p r i v a t e A P I a n d s h o u l d n o t b e r e l i e d u p o n a t a l l . I t c a n c h a n g e a t a n y t i m e .
F u l l D o c u m e n t a t i o n
● W i k i
● J a v a d o c
● L a t e s t s n a p h o t J a v a d o c
● J a v a d o c o f a s p e c i f i c r e l e a s e v e r s i o n : h t t p : / / r e a c t i v e x . i o / R x J a v a / 3 . x / j a v a d o c / 3 . x . y /
B i n a r i e s
B i n a r i e s a n d d e p e n d e n c y i n f o r m a t i o n f o r M a v e n , I v y , G r a d l e a n d o t h e r s c a n b e f o u n d a t h t t p : / / s e a r c h . m a v e n . o r g .
E x a m p l e f o r G r a d l e :
implementation ' io.reactivex.rxjava3:rxjava:x.y.z'
a n d f o r M a v e n :
i o . r e a c t i v e x . r x j a v a 3
r x j a v a
x . y . z
" > <dependency >
<groupId >io.reactivex.rxjava3</groupId >
<artifactId >rxjava</artifactId >
<version >x.y.z</version >
</dependency >
a n d f o r I v y :
" > <dependency org =" io.reactivex.rxjava3" name =" rxjava" rev =" x.y.z" />
S n a p s h o t s
S n a p s h o t s a f t e r M a y 1 s t , 2 0 2 1 a r e a v a i l a b l e v i a h t t p s : / / o s s . s o n a t y p e . o r g / c o n t e n t / r e p o s i t o r i e s / s n a p s h o t s / i o / r e a c t i v e x / r x j a v a 3 / r x j a v a /
repositories {
maven { url ' https://oss.sonatype.org/content/repositories/snapshots' }
}
dependencies {
implementation ' io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT'
}
S n a p s h o t s b e f o r e M a y 1 s t , 2 0 2 1 a r e a v a i l a b l e v i a h t t p s : / / o s s . j f r o g . o r g / l i b s - s n a p s h o t / i o / r e a c t i v e x / r x j a v a 3 / r x j a v a /
( N o t e t h a t d u e t o t h e S u n s e t o f B i n t r a y , o u r j f r o g a c c e s s h a s b e e n s e v e r e d , h e n c e t h e n e w s n a p s h o t r e p o a b o v e . )
repositories {
maven { url ' https://oss.jfrog.org/libs-snapshot' }
}
dependencies {
implementation ' io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT'
}
J a v a D o c s n a p s h o t s a r e a v a i l a b l e a t h t t p : / / r e a c t i v e x . i o / R x J a v a / 3 . x / j a v a d o c / s n a p s h o t
B u i l d
T o b u i l d :
$ git clone git@github.com:ReactiveX/RxJava.git
$ cd RxJava/
$ ./gradlew build
F u r t h e r d e t a i l s o n b u i l d i n g c a n b e f o u n d o n t h e G e t t i n g S t a r t e d p a g e o f t h e w i k i .
B u g s a n d F e e d b a c k
F o r b u g s , q u e s t i o n s a n d d i s c u s s i o n s p l e a s e u s e t h e G i t h u b I s s u e s .
L I C E N S E
Copyright (c ) 2016-present, RxJava Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
A b o u t
R x J a v a – R e a c t i v e E x t e n s i o n s f o r t h e J V M – a l i b r a r y f o r c o m p o s i n g a s y n c h r o n o u s a n d e v e n t - b a s e d p r o g r a m s u s i n g o b s e r v a b l e s e q u e n c e s f o r t h e J a v a V M .
T o p i c s
j a v a
f l o w
r x j a v a
r e a c t i v e - s t r e a m s
R e s o u r c e s
R e a d m e
L i c e n s e
A p a c h e - 2 . 0 l i c e n s e
S t a r s
4 6 . 1 k
s t a r s
W a t c h e r s
2 . 3 k
w a t c h i n g
F o r k s
7 . 6 k
f o r k s
3 . 1 . 4
L a t e s t
M a r 2 1 , 2 0 2 2
+ 2 2 7 r e l e a s e s
N o p a c k a g e s p u b l i s h e d
+ 2 7 4 c o n t r i b u t o r s
L a n g u a g e s
●
J a v a
9 9 . 9 %
●
O t h e r
0 . 1 %
●
© 2 0 2 2 G i t H u b , I n c .
● T e r m s
● P r i v a c y
● S e c u r i t y
● S t a t u s
● D o c s
● C o n t a c t G i t H u b
● P r i c i n g
● A P I
● T r a i n i n g
● B l o g
● A b o u t
Y o u c a n ’ t p e r f o r m t h a t a c t i o n a t t h i s t i m e .
Y o u s i g n e d i n w i t h a n o t h e r t a b o r w i n d o w . R e l o a d t o r e f r e s h y o u r s e s s i o n .
Y o u s i g n e d o u t i n a n o t h e r t a b o r w i n d o w . R e l o a d t o r e f r e s h y o u r s e s s i o n .