2019. 6. 9. 00:55ㆍ[공부] 영상/Reactor
강의 참조 : https://www.youtube.com/watch?v=VeSHa_Xsd2U&list=PLfI752FpVCS9hh_FE8uDuRVgPPnAivZTY
문서 참조 : https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro
Reactor : JVM 기반의 Reactive Stream 을 구현한 라이브러리
Reative programming
- 선언적인 코드를 사용해서 비동기적인 프로세스 파이프라인을 만든는 새로운 패러다임.
- 함수형 프로그래밍과 유사한 선언적 코드를 작성하는 것을 목표로한다.
- 데이터를 Consumer 에게 push 해주는 이벤트 기반의 모델
- 비동기적으로 이벤트를 처리한다
선언적인 코드 : 프로그래밍 어떻게 해야하는지를 나타내기 보다는 무엇과 같은지를 설명하는 경우를 선언형이라고함.
asynchronous = 동시에 일을 처리하는 것
non blocking = 중간 중간 코드가 block 이 되지 않는 것 (네트워크 통신과 같은 이유로)
publisher 가 데이터를 생성한다.
publisher 자체는 아무것도하지 않는다.
publisher 에 subscriber 가 등록되기 전까지 아무것도 하지않는다.
backpressure : 그냥 데이터를 막 push 하면 subscriber 가 부하를 감당하지 못하고 데이터를 유실하거나 실패하는 경우가 생긴다. 그러면 안되니까 publisher subscriber 구조에서 publisher 가 일방적으로 subscriber 에게 push 만하는 것은 아니다. pull (feedback)도한다. pull 할 때의 주요 정보는 '데이터를 n 개만 달라' 와 같은 정보다.
Operator
Reactor 에 추가된 개념
데이터를 어떤 스테이지를 거쳐서 가공되어야하는지 명세해둔 chained 된 명세
operator 를 적용한다는 것은 새로운 publisher 를 생성하는 것과 같다.
Flux<String> flux = Flux.just("A");
Flux<String> operatedFlux = flux.map(s -> "foo" + s);
operatedFlux.subscribe(System.out::println);
Mono = Reactive stream 에서 정의한 Publisher 중 하나 ( 0개부터 1 개 까지의 엘리먼트를 publish 할 수 있다.)
Flux = Reactive stream 에서 정의한 Publisher 중 하나 ( 0개부터 N 개 까지의 엘리먼트를 publish 할 수 있다.)
onNext를 통해 엘리먼트를 publish 하다가 onComplete 이나 onError 로 종료하게된다.
실제 flux 코드 예시
Flux.fromIterable(getSomeLongList())
.delayElements(Duration.ofMillis(100))
.doOnNext(serviceA::someObserver)
.map(d -> d * 2)
.take(3)
.onErrorResumeWith(errorHandler::fallback)
.doAfterTerminate(serviceM::incrementTerminate)
.subscribe(System.out::println);
delayElements : 아이템 사이의 delay 시간
doOnNext : 아이템을 보낼 때 trigger 되는 콜백 메소드 명시
map : operator
take : 이 트림에서 n 개만 받고 끝내는 것 ( 스트림에 2개만 있었다면 2개만 받고 끝낸다. )
onErrorResumeWith : 에러가 발생하면 실행되는 콜백 메소드 명시
doAfterTerminate : flux 가 끝나면 해야할 콜백 메소드 명시
subscribe : 이 flux 를 Consumer 에게 subscribe 라고한다.
sequence 에 대한 처리는 비동기적이다.
비동기적이기 때문에 non blocking 하다
onErrorResumeWith 는 동기 처리방식의 try catch 를 이런식으로 구현한다고 생각하면좋다.
public class Part01Flux {
Flux<String> emptyFlux() {
return Flux.empty();
}
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
Flux<Long> counter() {
return Flux.interval(Duration.ofMillis(100))
.take(10);
}
}
public class Part02Mono {
Mono<String> emptyMono() {
return Mono.empty();
}
Mono<String> monoWithNoSignal() {
return Mono.never();
}
Mono<String> fooMono() {
return Mono.just("foo");
}
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
}
* Reactive 를 테스트할 때는 subscribe 가 비동기 적으로 동작하니 메인 스레드가 종료되는 상황이라면 메인 스레드를 반드시 멈추고 결과를 확인하자
'[공부] 영상 > Reactor' 카테고리의 다른 글
리액터 강의 정리 ( 9강 ~ 12강 : 에러처리, 호환, 기타) (0) | 2019.06.09 |
---|---|
리액터 강의 정리 ( 7강 ~ 8강 : Backpressure) (0) | 2019.06.09 |
리액터 강의 정리 ( 3강 ~ 6강 : StepVerifier, Map, flatMap, Merge) (0) | 2019.06.09 |