코딩은 마라톤

[Reactive Spring] 리액티브 프로그래밍과 오퍼레이션 (Flux, Mono) 본문

Backend/Spring Boot

[Reactive Spring] 리액티브 프로그래밍과 오퍼레이션 (Flux, Mono)

anxi 2025. 3. 24. 18:31

시작에 앞서,

애플리케이션 코드를 개발할 때는 명령형(imperative)리액티브(reactive, 반응형)의 두 가지 형태로 코드를 작성할 수 있다.

  • 명령형 코드
    • 순차적으로 연속되는 작업, 각 작업은 한 번에 하나씩 그리고 이전 작업 다음에 실행된다.
    • 데이터는 모아서 처리되고 이전 작업이 데이터 처리를 끝낸 후에 다음 작업으로 넘어간다.
  • 리액티브 코드
    • 데이터 처리를 위해 일련의 작업들이 정의되지만, 이 작업들은 병렬로 실행될 수 있다.
    • 각 작업은 부분 집합의 데이터를 처리할 수 있으며, 처리가 끝난 데이터를 다음 작업에 넘겨주고 다른 부분 집합의 데이터로 계속 작업할 수 있다.

리액티브 프로그래밍 이해하기

리액티브 프로그래밍은 명령형 프로그래밍의 대안이 되는 패러다임이다.

  • 명령형 프로그래밍의 문제
    • 작업이 수행되는 동안 특히 이 작업이 원격지 서버로부터 데이터베이스에 데이터를 쓰거나 가져오는 것과 같은 것이라면 이 작업이 완료될 때까지 아무 것도 할 수 없다. → 작업을 수행하는 스레드는 차단된다.

위 문제에 반해 리액티브 프로그래밍은 함수적이면서 선언적이다. 즉, 순차적으로 수행되는 작업 단계가 아닌 데이터가 흘러가는 파이프라인이나 스트림을 포함한다.

이런 리액티브 스트림은 데이터 전체를 사용할 수 있을 때까지 기다리지 않고 사용 가능한 데이터가 있을 때마다 처리되므로 사실상 입력되는 데이터는 무한할 수 있다.

리액티브 스트림 정의하기

리액티브 스트림은 4개의 인터페이스로 정의된다.

Publisher(발행자)

하나의 Subscription당 하나의 Subscriber에 발행(전송)하는 데이터를 생성한다.

따라서 Publisher 인터페이스에는 Subscriber가 Publisher를 구독 신청할 수 있는 subscribe() 메서드 한 개가 선언되어 있다.

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

Subscriber(구독자)

Subscriber가 구독 신청되면 Publisher로부터 이벤트를 수신할 수 있다.

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item)
    public void onError(Throwable throwable);
    public void onComplete();
}

 

Subscriber가 수신할 첫 번째 이벤트는 onSubscribe()의 호출을 통해 이루어진다.

Publisher가 onSubscribe()를 호출할 때 이 메서드의 인자로 Subscription 객체를 Subscriber에 전달한다.

Subscriber는 Subscription 객체를 통해서 구독을 관리할 수 있다.

Subscription(구독)

public static interface Subscription {
    public void request(long n);
    public void cancel();
}

 

Subscriber는 request()를 호출하여 전송되는 데이터를 요청하거나, 또는 더 이상 데이터를 수신하지 않고 구독을 취소하는 cancel()를 호출할 수 있다.

request()의 매개변수인 n은 받고자 하는 데이터 항목 수이다. (백 프레셔_데이터 생산자(Producer)가 소비자(Consumer)의 처리 속도보다 빠르게 데이터를 생성할 때 발생하는 문제를 해결하는 메커니즘)

→ 요청된 수의 데이터를 Publisher가 전송한 후에 Subscriber는 다시 request()를 호출하여 더 많은 요청을 할 수 있다.

 

요청 과정

→ 데이터가 스트림을 통해 전달

→ 전송할 데이터가 없으면 onComplete()를 호출하여 작업이 끝났다고 알려준다.

onNext() 메서드가 호출되어 Publisher가 전송하는 데이터가 Subscriber에게 전달 or onError()

-> Subscriber의 데이터 요청 완료

Processor(프로세서)

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

Processor는 Subscriber와 Publisher 인터페이스를 결합한 것이다.

 

요약 : "Publisher → 0 또는 그 이상의 Processor를 통해 데이터를 끌어옴 → Subscriber에 전달"


리액터 시작하기

리액티브 프로그래밍은 일련의 작업 단계를 기술하는 것(명령형)이 아닌 데이터가 전달될 파이프라인을 구성해야 한다.

@Test
void 명령형_프로그래밍_예시() {
    String name = "Seongmin";
    String capitalName = name.toUpperCase();
    String greeting = "Hello " + capitalName + "!";
    System.out.println(greeting);
}

@Test
void 라액티브_프로그래밍_예시() {
    Mono.just("Seongmin")
            .map(String::toUpperCase)
            .map(cn -> "Hello " + cn + "!")
            .subscribe(System.out::println);
}

 

위 예시(리액티브) 과정

  • just() 오퍼레이션은 첫 번째 것을 생성 (Mono 생성)
  • 첫 번째 Mono가 값을 방출하면 첫 번째 map() 오퍼레이션에 전달 (대문자 변경, Mono 생성)
  • 두 번째 map() 오퍼레이션에 전달 (문자열 결합, Mono 생성)
  • subscribe() 호출에서는 세 번째 Mono를 구독하여 데이터를 수신(출력)한다.
  • Mono 생성 이유 : 리액티브 스트림(Mono, Flux)은 불변(Immutable)함.

위 예의 코드가 단계별로 실행되는 것처럼 보이지만, 실제로는 데이터가 전달되는 파이프라인을 구성하는 것이다.
그리고 파이프라인의 각 단계에서는 어떻게 하든 데이터가 변경된다. 또한 각 오퍼레이션은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있다.

 

Mono VS Flux

공통

  • Publisher 인터페이스 구현체

차이

  • Mono : 하나의 데이터 항목만 갖는 데이터셋에 최적
  • Flux : 0, 1 또는 다수(무한)의 데이터를 갖는 파이프라인

10.3 리액티브 오퍼레이션 적용하기

Flux, Mono는 리액터가 제공하는 가장 핵심적인 구성 요소다. 500개 이상의 오퍼레이션이 있으며, 각 오퍼레이션은 다음과 같이 분류할 수 있다.

  • 생성(Creation) 오퍼레이션
  • 조합(Combination) 오퍼레이션
  • 변환(Transformation) 오퍼레이션
  • 로직(Logic) 오퍼레이션

리액티브 타입 생성하기

객체로부터 생성하기

Flux, Mono의 just() 메서드를 사용하여 생성할 수 있다.

@Test
void 객체로부터_생성하기() {
    Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
    StepVerifier.create(fruitFlux)
            .expectNext("Apple")
            .expectNext("Orange")
            .expectNext("Grape")
            .expectNext("Banana")
            .expectNext("Strawberry")
            .verifyComplete();
}

 

‼ 참고 (테스트 할 때, StepVerifier를 사용하자)

  • 리액터의 StepVerifier를 사용하는 것이 Flux나 Mono를 테스트하는 더 좋은 방법이다.
  • StepVerifier는 리액티브 타입을 구독한 후, 스트림을 통해 전달되는 데이터에 대해 assertion을 적용한다. 그리고 해당 스트림이 기대한 대로 동작하는지 검사한다.

컬렉션으로부터 생성하기

배열, Iterable 객체, 자바 Stream 객체로부터 생성할 수 있다.

  • 배열 : fromArray()
  • Iterable(List, Set, …) : fromIterable()
  • Stream : fromStream()

리액티브 타입 조합하기

리액티브 타입 결합하기

하나의 Flux를 다른 것과 결합하려면 mergeWith() 오퍼레이션을 사용한다.

@Test
void Flux_결합하기() {
    Flux<String> characterFlux = Flux
            .just("짱구", "철수", "맹구")
            .delayElements(Duration.ofMillis(500));
    Flux<String> foodFlux = Flux
            .just("제육", "돈까스", "커피")
            .delaySubscription(Duration.ofMillis(250))
            .delayElements(Duration.ofMillis(500));

    Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);

    StepVerifier.create(mergedFlux)
            .expectNext("짱구", "제육", "철수", "돈까스", "맹구", "커피")
            .verifyComplete();
}

 

두 Flux 객체가 결합되면 하나의 Flux가 새로 생성된다. 그리고 StepVerifier가 구독할 때는 두개의 스트림을 번갈아 구독하게 된다.

다만 mergeWith()는 Flux들의 값이 완벽하게 번갈아 방출되게 보장할 수 없다. 이를 해결하고 싶다면 zip() 오퍼레이션을 사용할 수 있다.

 

@Test
void ZIP으로_결합하기() {
    Flux<String> characterFlux = Flux
            .just("짱구", "철수", "맹구");
    Flux<String> foodFlux = Flux
            .just("제육", "돈까스", "커피");

    Flux<Tuple2<String, String>> mergedFlux = Flux.zip(characterFlux, foodFlux);

    StepVerifier.create(mergedFlux)
            .expectNextMatches(p ->
                p.getT1().equals("짱구") && p.getT2().equals("제육")
            )
            .expectNextMatches(p ->
                    p.getT1().equals("철수") && p.getT2().equals("돈까스")
            )
            .expectNextMatches(p ->
                    p.getT1().equals("맹구") && p.getT2().equals("커피")
            )
            .verifyComplete();
}

 

Flux.zip()을 통해 방출되는 각 항목은 Tuple2(두 개의 다른 객체를 전달하는 컨테이너 객체)이다.

만약 Tuple2를 반환하고 싶지 않다면, 원하는 타입으로 변경하면 된다.

@Test
void ZIP으로_커스텀하게_결합하기() {
    Flux<String> characterFlux = Flux
            .just("짱구", "철수", "맹구");
    Flux<String> foodFlux = Flux
            .just("제육", "돈까스", "커피");

    Flux<String> mergedFlux = Flux.zip(characterFlux, foodFlux, (c, f) -> c + " " + f);

    StepVerifier.create(mergedFlux)
            .expectNext("짱구 제육")
            .expectNext("철수 돈까스")
            .expectNext("맹구 커피")
            .verifyComplete();
}

먼저 값을 방출하는 리액티브 타입 선택하기

first() 오퍼레이션은 두 Flux 객체 중 먼저 값을 방출하는 Flux의 값을 선택해서 발행한다.

→ first()가 deprecated되어, firstWithSignal()을 사용한다.

@Test
void 먼저_값을_방출하는_값을_발행하기() {
    Flux<String> characterFlux = Flux
            .just("짱구", "철수", "맹구")
            .delaySubscription(Duration.ofMillis(100));
    Flux<String> foodFlux = Flux
            .just("제육", "돈까스", "커피");

    Flux<String> mergedFlux = Flux.firstWithSignal(characterFlux, foodFlux);

    StepVerifier.create(mergedFlux)
            .expectNext("제육", "돈까스", "커피")
            .verifyComplete();
}

리액티브 스트림의 변환과 필터링

리액티브 타입으로부터 데이터 필터링하기

맨 앞부터 원하는 개수의 항목을 무시하고 싶다면 skip() 오퍼레이션을 사용한다.

@Test
void 지정된_수의_메시지를_건너뛰기() {
    Flux<String> skipFlux = Flux
            .just("짱구", "철수", "맹구", "유리", "훈이")
            .skip(3); // 1번째부터 3번째까지 무시 후 4번째부터 발행한다.

    StepVerifier.create(skipFlux)
            .expectNext("유리", "훈이")
            .verifyComplete();
}

 

맨 앞부터 원하는 개수의 항목만 출력한다면 take() 오퍼레이션을 사용한다.

@Test
void 지정된_수의_메시지만_방출하기() {
    Flux<String> favoriteCharacter = Flux
            .just("짱구", "철수", "맹구", "유리", "훈이")
            .take(3);

    StepVerifier.create(favoriteCharacter)
            .expectNext("짱구", "철수", "맹구")
            .verifyComplete();
}

skip, take의 매개변수로 항목의 개수뿐만 아닌 경과 시간을 사용할 수도 있다.

 

Flux의 값을 더 범용적으로 필터링을 할 때는 filter() 오퍼레이션을 사용한다.

@Test
void 지정된_조건식의_메시지만_방출하기() {
    Flux<String> favoriteCharacter = Flux
            .just("짱구", "철수", "맹구", "유리", "훈이")
            .filter(k -> !k.equals("훈이"));

    StepVerifier.create(favoriteCharacter)
            .expectNext("짱구", "철수", "맹구", "유리")
            .verifyComplete();
}

 

항목의 중복을 제거하고 싶다면 distinct() 오퍼레이션을 사용한다.

@Test
void 중복되지_않는_메시지만_방출하기() {
    Flux<String> duplicateFlux = Flux
            .just("짱구", "철수", "맹구", "짱구", "철수")
            .distinct();

    StepVerifier.create(duplicateFlux)
            .expectNext("짱구", "철수", "맹구")
            .verifyComplete();
}

리액티브 데이터 매핑하기

가장 많이 사용하는 오퍼레이션 중 하나는 발행된 항목을 다른 형태나 타입으로 매핑(변환)하는 것이다.

이때는 목적에 따라 map(), flatMap() 오퍼레이션을 사용한다.

map() 오퍼레이션은 변환을 수행하는 Flux를 생성한다.

@Test
void map으로_메시지를_변환하기() {
    Flux<String> animation = Flux
            .just("짱구는 못말려", "철수는 못말려", "맹구는 못말려")
            .map(n -> {
                String[] split = n.split(" ");
                return split[0];
            });

    StepVerifier.create(animation)
            .expectNext("짱구는", "철수는", "맹구는")
            .verifyComplete();
}

map()은 각 항목이 동기적으로 매핑이 수행된다. 따라서 비동기적으로 매핑을 수행하고 싶다면 flatMap() 오퍼레이션을 사용해야 한다. (그러나 flatMap()은 어렵다..)

 

flatMap()은 각 객체를 새로운 Mono나 Flux로 매핑하며, 해당 Mono나 Flux들의 결과는 하나의 새로운 Flux가 된다. 또한 위에서 말했던, 비동기적으로 매핑을 수행하고 싶다면 flatMap() + subscribeOn()을 사용한다.

@Test
void flatMap으로_메시지를_변환하기() {
    Flux<String> animation = Flux
            .just("짱구는 못말려", "철수는 못말려", "맹구는 못말려")
            .flatMap(n -> Mono.just(n)
                    .map(p -> {
                        String[] split = n.split(" ");
                        return split[0];
                    })
                    .subscribeOn(Schedulers.parallel())
            );

    List<String> characterList = List.of("짱구는", "철수는", "맹구는");

    StepVerifier.create(animation)
            .expectNextMatches(characterList::contains)
            .expectNextMatches(characterList::contains)
            .expectNextMatches(characterList::contains)
            .verifyComplete();
}

 

flatMap()이나 subscribeOn()을 사용할 때의 장점
: 다수의 병행 스레드에 작업을 분할하여 스트림의 처리량을 증가시킬 수 있다. 다만 어떤 작업이 먼저 끝날지 순서 보장이 안되어 방출되는 순서를 알 방법이 없다.

 

리액티브 스트림의 데이터 버퍼링하기

Flux를 통해 데이터를 처리할 때 데이터 스트림을 작은 청크 단위로 분할할 때 buffer()를 사용한다.

@Test
void 데이터를_버퍼링하기() {
    Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");

    Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);

    StepVerifier.create(bufferedFlux)
            .expectNext(Arrays.asList("Apple", "Orange", "Grape"))
            .expectNext(Arrays.asList("Banana", "Strawberry"))
            .verifyComplete();
}

 

위 코드를 List 컬렉션으로 병행 처리한 코드는 다음과 같다.

@Test
void 데이터를_병행으로_버퍼링하기() {
    Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
            .buffer(3)
            .flatMap(x ->
                    Flux.fromIterable(x)
                            .map(String::toUpperCase)
                            .subscribeOn(Schedulers.parallel())
                            .log()
            ).subscribe();
}
04:44:55.447 [Test worker] INFO reactor.Flux.SubscribeOn.1 -- onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
04:44:55.451 [Test worker] INFO reactor.Flux.SubscribeOn.1 -- request(32)
04:44:55.452 [Test worker] INFO reactor.Flux.SubscribeOn.2 -- onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
04:44:55.452 [Test worker] INFO reactor.Flux.SubscribeOn.2 -- request(32)
04:44:55.453 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(APPLE)
04:44:55.453 [parallel-2] INFO reactor.Flux.SubscribeOn.2 -- onNext(BANANA)
04:44:55.453 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(ORANGE)
04:44:55.454 [parallel-2] INFO reactor.Flux.SubscribeOn.2 -- onNext(STRAWBERRY)
04:44:55.454 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(GRAPE)
04:44:55.454 [parallel-2] INFO reactor.Flux.SubscribeOn.2 -- onComplete()
04:44:55.454 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onComplete()

 

여기서 보면 1 스레드에서 APPLE, ORANGE, GRAPE를 처리하고, 2 스레드에서 BANANA, STRAWBERRY를 처리하는 즉, 두 개의 버퍼가 병행 처리됨을 볼 수 있다.

 

모든 항목을 포함하는 List를 방출하고 싶다면 collectList()를 사용한다.

@Test
void 모든_항목을_방출하는_리스트를_방출하기() {
    Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");

    Mono<List<String>> fruitListMono = fruitFlux.collectList();

    StepVerifier.create(fruitListMono)
            .expectNext(Arrays.asList("Apple", "Orange", "Grape", "Banana", "Strawberry"))
            .verifyComplete();
}

 

collectMap()을 사용해서 Map을 포함한 Mono를 생성할 수 있다.

@Test
void 모든_항목을_맵으로_방출하기() {
    Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Orange", "Grape");

    Mono<Map<Character, String>> fruitMapMono = fruitFlux.collectMap(a -> a.charAt(0));

    StepVerifier.create(fruitMapMono)
            .expectNextMatches(map -> map.size() == 3 &&
                    map.get('A').equals("Apple") &&
                    map.get('O').equals("Orange") &&
                    map.get('G').equals("Grape"))
            .verifyComplete();
}

리액티브 타입에 로직 오퍼레이션 수행하기

모든 항목이 조건을 충족하는지 확인하기 위해 all() 오퍼레이션을 사용한다.

@Test
void 모든_항목이_조건을_충족하는지_검사하기() {
    Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Strawberry");

    // 모든 항목이 e를 가지기 때문에 true를 반환한다.
    Mono<Boolean> hasBMono = fruitFlux.all(a -> a.contains("e"));
    StepVerifier.create(hasBMono)
            .expectNext(true)
            .verifyComplete();
}

 

최소한 하나의 항목이 조건을 충족하는지 확인하기 위해 any() 오퍼레이션을 사용한다.

@Test
void 일부_항목이_조건을_충족하는지_검사하기() {
    Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Strawberry");

	// Apple만 A를 갖고 있기에 true를 반환한다.
    Mono<Boolean> hasAMono = fruitFlux.any(a -> a.contains("A"));
    StepVerifier.create(hasAMono)
            .expectNext(true)
            .verifyComplete();
}

 

출처 : 스프링 인 액션, Flux VS Mono