2019. 6. 9. 03:09ㆍ[공부] 영상/Reactor
Backpressure ( = Request, Pull )
Volume control
Subscriber 가 처리할 수 있는 수준의 양을 받도록 Stream 의 양을 조절하는 개념
Subscriber 가 publisher 에게하는 피드백 메카니즘 이다.
얼마나 데이터를 처리하길 원하는지 publisher 에게 이야기하는 것
Subscription level 에서 처리된다.
publisher.subscribe 할 때 subscription 을 줄 수 있다.
publisher.subscribe 에 의해 onSubscribe 가 실행된다.
onSubscribe 에의해 전달받은 subscription 이 사용한다.
Backpressure ( = Request, Pull ) 의 크기를 보는 방법 : log
Flux.range(1, 100)
.log()
.doOnNext(System.out::println)
.subscribe();
log 를 찍어보면 request(unbounded) 라고 찍히는 것을 확인할 수 있다.
Flux.range(1, 100)
.log()
.doOnNext(System.out::println)
.subscribe(new Subscriber<Integer>(){
private Subscription subscription;
private int REQUEST_COUNT = 10;
private int count;
@Override
public void onSubscribe(Subscription subscription){
this.subscription = subscription;
this.subscription.request(REQUEST_COUNT);
}
@Override
public void onNext(Subscription subscription){
count++;
if(count % REQUEST_COUNT == 0){
this.subscription.request(REQUEST_COUNT);
}
}
@Override
public void onError(Subscription subscription){
}
@Override
public void onComplete(Subscription subscription){
}
});
log 를 찍어보면 request(10) 이라고 찍히는 것을 확인할 수 있다.
이후 10개의 데이터를 publish 하고 count 가 증가하면서 10 이되면 다시 또 10개를 요청하게된다.
public class Part06Request {
ReactiveRepository<User> repository = new ReactiveUserRepository();
StepVerifier requestAllExpectFour(Flux<User> flux) {
return StepVerifier.create(flux)
.expectNextCount(4)
.expectComplete();
}
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
return StepVerifier.create(flux, 1)
.expectNext(User.SKYLER)
.thenRequest(1)
.expectNext(User.JESSE)
.thenCancel();
}
Flux<User> fluxWithLog() {
return repository.findAll()
.log();
}
Flux<User> fluxWithDoOnPrintln() {
return repository.findAll()
.log()
.doOnSubscribe(sub->System.out.println("started"))
.doOnNext(user->System.out.println(user.getFirstname() + user.getLastname()))
.doOnComplete(()->System.out.println("ended"));
}
}
StepVerifier.create(flux, 1) : flux로 StepVerifier 를 만들고 하나를 가져와라
thenRequest(1) : 그 다음에 하나를 가져와라
thenCancel() : 여기까지 확인했으면 됬다. 이제 stream 을 그만하겠다.
스프링 부트에서는 Backpressure 알아서 조절 해주기도 하는 듯 싶다.
fluxWithLog 결과
2019-06-08 17:47:08 [main] INFO reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
2019-06-08 17:47:08 [main] INFO reactor.Flux.Zip.1 - request(1)
2019-06-08 17:47:08 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='swhite', firstname='Skyler', lastname='White'})
2019-06-08 17:47:08 [parallel-1] INFO reactor.Flux.Zip.1 - request(1)
2019-06-08 17:47:08 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='jpinkman', firstname='Jesse', lastname='Pinkman'})
2019-06-08 17:47:08 [parallel-1] INFO reactor.Flux.Zip.1 - request(2)
2019-06-08 17:47:08 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='wwhite', firstname='Walter', lastname='White'})
2019-06-08 17:47:09 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='sgoodman', firstname='Saul', lastname='Goodman'})
2019-06-08 17:47:09 [parallel-1] INFO reactor.Flux.Zip.1 - onComplete()
fluxWithDoOnPrintln 결과
2019-06-08 17:47:19 [main] INFO reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
started
2019-06-08 17:47:19 [main] INFO reactor.Flux.Zip.1 - request(unbounded)
2019-06-08 17:47:19 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='swhite', firstname='Skyler', lastname='White'})
SkylerWhite
2019-06-08 17:47:19 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='jpinkman', firstname='Jesse', lastname='Pinkman'})
JessePinkman
2019-06-08 17:47:19 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='wwhite', firstname='Walter', lastname='White'})
WalterWhite
2019-06-08 17:47:19 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='sgoodman', firstname='Saul', lastname='Goodman'})
SaulGoodman
2019-06-08 17:47:19 [parallel-1] INFO reactor.Flux.Zip.1 - onComplete()
ended
'[공부] 영상 > Reactor' 카테고리의 다른 글
리액터 강의 정리 ( 9강 ~ 12강 : 에러처리, 호환, 기타) (0) | 2019.06.09 |
---|---|
리액터 강의 정리 ( 3강 ~ 6강 : StepVerifier, Map, flatMap, Merge) (0) | 2019.06.09 |
리액터 강의 정리 ( 1강 ~ 2강 : 인트로 ) (0) | 2019.06.09 |