kok202
리액터 강의 정리 ( 1강 ~ 2강 : 인트로 )

2019. 6. 9. 00:55[공부] 영상/Reactor

강의 참조 : https://www.youtube.com/watch?v=VeSHa_Xsd2U&list=PLfI752FpVCS9hh_FE8uDuRVgPPnAivZTY

문서 참조 : https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro

 

Introduction to Reactive Programming - Reactive Programming with Reactor 3

Explore this playground and try new concepts right into your browser

tech.io

 

 

 

 

 

Reactor : JVM 기반의 Reactive Stream 을 구현한 라이브러리

Reative programming

  • 선언적인 코드를 사용해서 비동기적인 프로세스 파이프라인을 만든는 새로운 패러다임.
  • 함수형 프로그래밍과 유사한 선언적 코드를 작성하는 것을 목표로한다. 
  • 데이터를 Consumer 에게 push 해주는 이벤트 기반의 모델
  • 비동기적으로 이벤트를 처리한다

선언적인 코드 : 프로그래밍 어떻게 해야하는지를 나타내기 보다는 무엇과 같은지를 설명하는 경우를 선언형이라고함.

asynchronous = 동시에 일을 처리하는 것

non blocking = 중간 중간 코드가 block 이 되지 않는 것 (네트워크 통신과 같은 이유로)

 

 

 

 

 

publisher 가 데이터를 생성한다.

publisher 자체는 아무것도하지 않는다.

publisher 에 subscriber 가 등록되기 전까지 아무것도 하지않는다.

 

backpressure : 그냥 데이터를 막 push 하면 subscriber 가 부하를 감당하지 못하고 데이터를 유실하거나 실패하는 경우가 생긴다. 그러면 안되니까 publisher subscriber 구조에서 publisher 가 일방적으로 subscriber 에게 push 만하는 것은 아니다. pull (feedback)도한다. pull 할 때의 주요 정보는 '데이터를 n 개만 달라' 와 같은 정보다.

 

Operator

Reactor 에 추가된 개념

데이터를 어떤 스테이지를 거쳐서 가공되어야하는지 명세해둔 chained 된 명세

operator 를 적용한다는 것은 새로운 publisher 를 생성하는 것과 같다.

Flux<String> flux = Flux.just("A");
Flux<String> operatedFlux = flux.map(s -> "foo" + s);
operatedFlux.subscribe(System.out::println);

 

 

 

 

 

Mono = Reactive stream 에서 정의한 Publisher 중 하나 ( 0개부터 1 개 까지의 엘리먼트를 publish 할 수 있다.)

Flux = Reactive stream 에서 정의한 Publisher 중 하나 ( 0개부터 N 개 까지의 엘리먼트를 publish 할 수 있다.)

onNext를 통해 엘리먼트를 publish 하다가 onComplete 이나 onError 로 종료하게된다.

실제 flux 코드 예시

Flux.fromIterable(getSomeLongList())
    .delayElements(Duration.ofMillis(100))
    .doOnNext(serviceA::someObserver)
    .map(d -> d * 2)
    .take(3)
    .onErrorResumeWith(errorHandler::fallback)
    .doAfterTerminate(serviceM::incrementTerminate)
    .subscribe(System.out::println);

delayElements : 아이템 사이의 delay 시간

doOnNext : 아이템을 보낼 때 trigger 되는 콜백 메소드 명시

map : operator

take : 이 트림에서 n 개만 받고 끝내는 것 ( 스트림에 2개만 있었다면 2개만 받고 끝낸다. )

onErrorResumeWith : 에러가 발생하면 실행되는 콜백 메소드 명시

doAfterTerminate : flux 가 끝나면 해야할 콜백 메소드 명시

subscribe : 이 flux 를 Consumer 에게 subscribe 라고한다.

 

sequence 에 대한 처리는 비동기적이다.

비동기적이기 때문에 non blocking 하다

onErrorResumeWith 는 동기 처리방식의 try catch 를 이런식으로 구현한다고 생각하면좋다.

 

public class Part01Flux {
	Flux<String> emptyFlux() {
		return Flux.empty();
	}

	Flux<String> fooBarFluxFromValues() {
		return Flux.just("foo", "bar");
	}

	Flux<String> fooBarFluxFromList() {
		return Flux.fromIterable(Arrays.asList("foo", "bar"));
	}

	Flux<String> errorFlux() {
		return Flux.error(new IllegalStateException());
	}

	Flux<Long> counter() {
		return Flux.interval(Duration.ofMillis(100))
		           .take(10);
	}
}
public class Part02Mono {
	Mono<String> emptyMono() {
		return Mono.empty();
	}

	Mono<String> monoWithNoSignal() {
		return Mono.never();
	}

	Mono<String> fooMono() {
		return Mono.just("foo");
	}

	Mono<String> errorMono() {
		return Mono.error(new IllegalStateException());
	}
}

 

 

* Reactive 를 테스트할 때는 subscribe 가 비동기 적으로 동작하니 메인 스레드가 종료되는 상황이라면 메인 스레드를 반드시 멈추고 결과를 확인하자