2019. 6. 9. 04:38ㆍ[공부] 영상/Reactor
에러 처리
에러가 발생하면 어떻게 처리하라 라고 명시할 수 있다.
public class Part07Errors {
Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
return mono.onErrorResume(error->Mono.just(User.SAUL));
}
Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
return flux.onErrorResume(error->Flux.just(User.SAUL, User.JESSE));
}
Flux<User> capitalizeMany(Flux<User> flux) {
return flux.map(user->{
try{
return capitalizeUser(user);
}catch(GetOutOfHereException e){
throw Exceptions.propagate(e);
}
});
}
User capitalizeUser(User user) throws GetOutOfHereException {
if (user.equals(User.SAUL)) {
throw new GetOutOfHereException();
}
return new User(user.getUsername(), user.getFirstname(), user.getLastname());
}
protected final class GetOutOfHereException extends Exception {
}
}
onErrorReturn : 에러가 발생하면 어떤 값을 리턴
onErrorResume : 에러가 발생하면 원하는 엘리먼트(sequence) 를 리턴
Exception.propagate : 발생한 Exception 이 RuntimeException 을 상속하고 있지 않을 경우, CheckedException 이라는 형태로 발생한다. Exception.propagate 을 이용하면 CheckedException은 RuntimeException 으로 바꿔서 전파한다. RuntimeException 은 Subscriber 나 StepVerifier 에 의해서 잡히게된다. Subscriber 나 StepVerifier 는 RuntimeException 을 보고 다시 Checked Exception 타입으로 이해하여 처리할 수 있다.
flux.just("hello world")
.map(string->{
try{
return Integer.parseInt(string);
}catch(GetOutOfHereException e){
throw Exceptions.propagate(e);
}
})
.onErrorReturn(400)
.doOnNext(System.out::println)
.subscribe();
RxJava와 호환 (Adapt)
Reactive stream 의 구현체가 Reactor, RxJava... 등이 있는데 이 구현체들 끼리 서로 호환해서 사용할 수 있도록 하는 라이브러리다. 팩토리 메소드가 있으므로 이를 이용해서 호환하면 된다.
Reactor <-> RxJava
Mono<-> Single
Flux <-> Flowable
public class Part09Adapt {
Flowable<User> fromFluxToFlowable(Flux<User> flux) {
return Flowable.fromPublisher(flux);
}
Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
return Flux.from(flowable);
}
Observable<User> fromFluxToObservable(Flux<User> flux) {
return Observable.fromPublisher(flux);
}
Flux<User> fromObservableToFlux(Observable<User> observable) {
return Flux.from(observable.toFlowable(BackpressureStrategy.BUFFER));
}
Single<User> fromMonoToSingle(Mono<User> mono) {
return Single.fromPublisher(mono);
}
Mono<User> fromSingleToMono(Single<User> single) {
return Mono.from(single.toFlowable());
}
CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
return mono.toFuture();
}
Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
return Mono.fromFuture(future);
}
}
RxJava2 부터는 Observable 을 지원하지 않는 듯하다.
테스트를 통과하지 않는다. Observable 은 Backpressure 구현체가 없는 인터페이스 였던 듯하다.
CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->"hello");
completableFuture.thenApply(string->string.toUpperCase());
completableFuture.get();
막간 상식 : Reactor 는 자바 8 기반이며 자바 8 에 있는 Completable Future 를 이용하여 만들어진 듯하다.
Blocking 과 호환
Reactive -> Blocking
* 에러가 발생하면 exception 을 발생시킨다.
public class Part10ReactiveToBlocking {
User monoToValue(Mono<User> mono) {
return mono.block();
}
Iterable<User> fluxToValues(Flux<User> flux) {
return flux.toIterable();
}
}
Blocking -> Reactive
public class Part11BlockingToReactive {
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
.subscribeOn(Schedulers.elastic());
}
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.elastic())
.doOnNext(repository::save)
.then();
}
}
subscribeOn : subsribe -> onSubribe -> request
명시한 스케줄러에 맞춰서 위 순서에 맞는 일을 처리하라
then : operator 의 성공 여부를 알수 있다. flux 를 종료하는 mono 하나로 바꾼다
기타
public class Part08OtherOperations {
Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
return Flux
.zip(usernameFlux, firstnameFlux, lastnameFlux)
.map((tuple)->new User(tuple.getT1(), tuple.getT2(), tuple.getT3()));
// getTN 을 사용해야 타입을 유지 시켜준다. get(N) 은 타입이 유지되지 않아서 문제가 있을 수도 있다.
}
Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
return Mono.first(mono1, mono2); // 처음 publish 한 mono 를 사용
}
Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
return Flux.first(flux1, flux2); // 처음 publish 한 flux 를 사용
}
Mono<Void> fluxCompletion(Flux<User> flux) {
return flux.then(); // 종료
}
Mono<User> nullAwareUserToMono(User user) {
return Mono.justOrEmpty(user); // user가 null 이면 Mono.empty() 를 반환
}
Mono<User> emptyToSkyler(Mono<User> mono) {
return mono.defaultIfEmpty(User.SKYLER); // mono 가 empty 이면 ~ 를 반환하라
}
}
'[공부] 영상 > Reactor' 카테고리의 다른 글
리액터 강의 정리 ( 7강 ~ 8강 : Backpressure) (0) | 2019.06.09 |
---|---|
리액터 강의 정리 ( 3강 ~ 6강 : StepVerifier, Map, flatMap, Merge) (0) | 2019.06.09 |
리액터 강의 정리 ( 1강 ~ 2강 : 인트로 ) (0) | 2019.06.09 |