kok202
리액터 강의 정리 ( 7강 ~ 8강 : Backpressure)

2019. 6. 9. 03:09[공부] 영상/Reactor

Backpressure ( = Request, Pull )

Volume control

Subscriber 가 처리할 수 있는 수준의 양을 받도록 Stream 의 양을 조절하는 개념

Subscriber 가 publisher 에게하는 피드백 메카니즘 이다.

얼마나 데이터를 처리하길 원하는지 publisher 에게 이야기하는 것

 

출처 : https://grokonez.com/java/java-9/java-9-flow-api-reactive-streams

Subscription level 에서 처리된다.

publisher.subscribe 할 때 subscription 을 줄 수 있다.

publisher.subscribe 에 의해 onSubscribe 가 실행된다.

onSubscribe 에의해 전달받은 subscription 이 사용한다.

 

 

 

 

 

Backpressure ( = Request, Pull ) 의 크기를 보는 방법 : log

Flux.range(1, 100)
    .log()
    .doOnNext(System.out::println)
    .subscribe();

log 를 찍어보면 request(unbounded) 라고 찍히는 것을 확인할 수 있다.

 

Flux.range(1, 100)
    .log()
    .doOnNext(System.out::println)
    .subscribe(new Subscriber<Integer>(){
        private Subscription subscription;
        private int REQUEST_COUNT = 10;
        private int count;
        
        @Override
        public void onSubscribe(Subscription subscription){
            this.subscription = subscription;
            this.subscription.request(REQUEST_COUNT);
        }
        
        @Override
        public void onNext(Subscription subscription){
            count++;
            if(count % REQUEST_COUNT == 0){
                this.subscription.request(REQUEST_COUNT);
            }
        }
        
        @Override
        public void onError(Subscription subscription){
        }
        
        @Override
        public void onComplete(Subscription subscription){
        }
    });

log 를 찍어보면 request(10) 이라고 찍히는 것을 확인할 수 있다.

이후 10개의 데이터를 publish 하고 count 가 증가하면서 10 이되면 다시 또 10개를 요청하게된다.

 

 

 

 

 

public class Part06Request {
	ReactiveRepository<User> repository = new ReactiveUserRepository();

	StepVerifier requestAllExpectFour(Flux<User> flux) {
		return StepVerifier.create(flux)
				.expectNextCount(4)
				.expectComplete();
	}

	StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
		return StepVerifier.create(flux, 1)
				.expectNext(User.SKYLER)
				.thenRequest(1)
				.expectNext(User.JESSE)
				.thenCancel();
	}

	Flux<User> fluxWithLog() {
		return repository.findAll()
				.log();
	}

	Flux<User> fluxWithDoOnPrintln() {
		return repository.findAll()
				.log()
				.doOnSubscribe(sub->System.out.println("started"))
				.doOnNext(user->System.out.println(user.getFirstname() + user.getLastname()))
				.doOnComplete(()->System.out.println("ended"));
	}

}

StepVerifier.create(flux, 1) : flux로 StepVerifier 를 만들고 하나를 가져와라

thenRequest(1) : 그 다음에 하나를 가져와라

thenCancel() : 여기까지 확인했으면 됬다. 이제 stream 을 그만하겠다.

스프링 부트에서는 Backpressure 알아서 조절 해주기도 하는 듯 싶다.

 

fluxWithLog 결과

2019-06-08 17:47:08 [main] INFO  reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
2019-06-08 17:47:08 [main] INFO  reactor.Flux.Zip.1 - request(1)
2019-06-08 17:47:08 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='swhite', firstname='Skyler', lastname='White'})
2019-06-08 17:47:08 [parallel-1] INFO  reactor.Flux.Zip.1 - request(1)
2019-06-08 17:47:08 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='jpinkman', firstname='Jesse', lastname='Pinkman'})
2019-06-08 17:47:08 [parallel-1] INFO  reactor.Flux.Zip.1 - request(2)
2019-06-08 17:47:08 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='wwhite', firstname='Walter', lastname='White'})
2019-06-08 17:47:09 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='sgoodman', firstname='Saul', lastname='Goodman'})
2019-06-08 17:47:09 [parallel-1] INFO  reactor.Flux.Zip.1 - onComplete()

fluxWithDoOnPrintln 결과

2019-06-08 17:47:19 [main] INFO  reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
started
2019-06-08 17:47:19 [main] INFO  reactor.Flux.Zip.1 - request(unbounded)
2019-06-08 17:47:19 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='swhite', firstname='Skyler', lastname='White'})
SkylerWhite
2019-06-08 17:47:19 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='jpinkman', firstname='Jesse', lastname='Pinkman'})
JessePinkman
2019-06-08 17:47:19 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='wwhite', firstname='Walter', lastname='White'})
WalterWhite
2019-06-08 17:47:19 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='sgoodman', firstname='Saul', lastname='Goodman'})
SaulGoodman
2019-06-08 17:47:19 [parallel-1] INFO  reactor.Flux.Zip.1 - onComplete()
ended