kok202
리액터 강의 정리 ( 9강 ~ 12강 : 에러처리, 호환, 기타)

2019. 6. 9. 04:38[공부] 영상/Reactor

에러 처리

에러가 발생하면 어떻게 처리하라 라고 명시할 수 있다.

public class Part07Errors {
	Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
		return mono.onErrorResume(error->Mono.just(User.SAUL));
	}

	Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
		return flux.onErrorResume(error->Flux.just(User.SAUL, User.JESSE));
	}

	Flux<User> capitalizeMany(Flux<User> flux) {
		return flux.map(user->{
		    try{
	            return capitalizeUser(user);
		    }catch(GetOutOfHereException e){
                throw Exceptions.propagate(e);
		    }
		});
	}

	User capitalizeUser(User user) throws GetOutOfHereException {
		if (user.equals(User.SAUL)) {
			throw new GetOutOfHereException();
		}
		return new User(user.getUsername(), user.getFirstname(), user.getLastname());
	}

	protected final class GetOutOfHereException extends Exception {
	}

}

onErrorReturn : 에러가 발생하면 어떤 값을 리턴

onErrorResume : 에러가 발생하면 원하는 엘리먼트(sequence) 를 리턴

 

Exception.propagate : 발생한 Exception 이 RuntimeException 을 상속하고 있지 않을 경우, CheckedException 이라는 형태로 발생한다. Exception.propagate 을 이용하면 CheckedException은 RuntimeException 으로 바꿔서 전파한다. RuntimeException 은 Subscriber 나 StepVerifier 에 의해서 잡히게된다. Subscriber 나 StepVerifier 는 RuntimeException 을 보고 다시 Checked Exception 타입으로 이해하여 처리할 수 있다.

 

flux.just("hello world")
	.map(string->{
		try{
			return Integer.parseInt(string);
		}catch(GetOutOfHereException e){
			throw Exceptions.propagate(e);
		}
	})
	.onErrorReturn(400)
	.doOnNext(System.out::println)
	.subscribe();

 

 

 

 

 

RxJava와 호환 (Adapt)

Reactive stream 의 구현체가 Reactor, RxJava... 등이 있는데 이 구현체들 끼리 서로 호환해서 사용할 수 있도록 하는 라이브러리다. 팩토리 메소드가 있으므로 이를 이용해서 호환하면 된다.

Reactor <-> RxJava

Mono<-> Single

Flux <-> Flowable

public class Part09Adapt {
	Flowable<User> fromFluxToFlowable(Flux<User> flux) {
		return Flowable.fromPublisher(flux);
	}

	Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
		return Flux.from(flowable);
	}

	Observable<User> fromFluxToObservable(Flux<User> flux) {
		return Observable.fromPublisher(flux);
	}

	Flux<User> fromObservableToFlux(Observable<User> observable) {
		return Flux.from(observable.toFlowable(BackpressureStrategy.BUFFER));
	}

	Single<User> fromMonoToSingle(Mono<User> mono) {
		return Single.fromPublisher(mono);
	}

	Mono<User> fromSingleToMono(Single<User> single) {
		return Mono.from(single.toFlowable());
	}

	CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
		return mono.toFuture();
	}

	Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
		return Mono.fromFuture(future);
	}

}

RxJava2 부터는 Observable 을 지원하지 않는 듯하다.

테스트를 통과하지 않는다. Observable 은 Backpressure 구현체가 없는 인터페이스 였던 듯하다.

 

CompletableFuture

CompletableFuture<String> future = CompletableFuture.supplyAsync(()->"hello");
completableFuture.thenApply(string->string.toUpperCase());
completableFuture.get();

막간 상식 : Reactor 는 자바 8 기반이며 자바 8 에 있는 Completable Future 를 이용하여 만들어진 듯하다. 

 

 

 

 

 

Blocking 과 호환

Reactive -> Blocking

* 에러가 발생하면 exception 을 발생시킨다.

public class Part10ReactiveToBlocking {
	User monoToValue(Mono<User> mono) {
		return mono.block();
	}

	Iterable<User> fluxToValues(Flux<User> flux) {
		return flux.toIterable();
	}

}

 

Blocking -> Reactive

public class Part11BlockingToReactive {
	Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
		return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
			.subscribeOn(Schedulers.elastic());
	}

	Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
		return flux.publishOn(Schedulers.elastic())
			.doOnNext(repository::save)
			.then();
	}
	
}

subscribeOn : subsribe -> onSubribe -> request

명시한 스케줄러에 맞춰서 위 순서에 맞는 일을 처리하라

then : operator 의 성공 여부를 알수 있다. flux 를 종료하는 mono 하나로 바꾼다

 

 

 

 

기타

public class Part08OtherOperations {
	Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
		return Flux
		      .zip(usernameFlux, firstnameFlux, lastnameFlux)
		      .map((tuple)->new User(tuple.getT1(), tuple.getT2(), tuple.getT3()));
		      // getTN 을 사용해야 타입을 유지 시켜준다. get(N) 은 타입이 유지되지 않아서 문제가 있을 수도 있다.
	}

	Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
		return Mono.first(mono1, mono2); // 처음 publish 한 mono 를 사용
	}

	Flux<User> useFastestFlux(Flux<User> flux1, Flux<User> flux2) {
		return Flux.first(flux1, flux2); // 처음  publish 한 flux 를 사용
	}

	Mono<Void> fluxCompletion(Flux<User> flux) {
		return flux.then(); // 종료
	}

	Mono<User> nullAwareUserToMono(User user) {
		return Mono.justOrEmpty(user); // user가 null 이면 Mono.empty() 를 반환
	}

	Mono<User> emptyToSkyler(Mono<User> mono) {
		return mono.defaultIfEmpty(User.SKYLER); // mono 가 empty 이면 ~ 를 반환하라
	}

}