일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- 밋업프로젝트
- cicd
- jdbc
- Domain Driven Design
- rabbitmq-delayed-message-exchange
- 도메인 주도 개발 시작하기
- ddd
- delayed message plugin
- JPQL
- reactive operaton
- 최범균
- 한국대학생it경영학회
- 교육기획팀원
- 객체지향 쿼리 언어
- JPA
- 교육기획팀
- 자바 ORM 표준 JPA 프로그래밍
- 이펙티브자바
- springboot
- kusitms
- 30기
- RESTClient
- 큐시즘
- java
- Spring Batch
- GitHub Actions
- scheduling messages with rabbitmq
- Spring
- 자동처리
- 영속성
- Today
- Total
코딩은 마라톤
[Reactive Spring] 리액티브 프로그래밍과 오퍼레이션 (Flux, Mono) 본문
시작에 앞서,
애플리케이션 코드를 개발할 때는 명령형(imperative)과 리액티브(reactive, 반응형)의 두 가지 형태로 코드를 작성할 수 있다.
- 명령형 코드
- 순차적으로 연속되는 작업, 각 작업은 한 번에 하나씩 그리고 이전 작업 다음에 실행된다.
- 데이터는 모아서 처리되고 이전 작업이 데이터 처리를 끝낸 후에 다음 작업으로 넘어간다.
- 리액티브 코드
- 데이터 처리를 위해 일련의 작업들이 정의되지만, 이 작업들은 병렬로 실행될 수 있다.
- 각 작업은 부분 집합의 데이터를 처리할 수 있으며, 처리가 끝난 데이터를 다음 작업에 넘겨주고 다른 부분 집합의 데이터로 계속 작업할 수 있다.
리액티브 프로그래밍 이해하기
리액티브 프로그래밍은 명령형 프로그래밍의 대안이 되는 패러다임이다.
- 명령형 프로그래밍의 문제
- 작업이 수행되는 동안 특히 이 작업이 원격지 서버로부터 데이터베이스에 데이터를 쓰거나 가져오는 것과 같은 것이라면 이 작업이 완료될 때까지 아무 것도 할 수 없다. → 작업을 수행하는 스레드는 차단된다.
위 문제에 반해 리액티브 프로그래밍은 함수적이면서 선언적이다. 즉, 순차적으로 수행되는 작업 단계가 아닌 데이터가 흘러가는 파이프라인이나 스트림을 포함한다.
이런 리액티브 스트림은 데이터 전체를 사용할 수 있을 때까지 기다리지 않고 사용 가능한 데이터가 있을 때마다 처리되므로 사실상 입력되는 데이터는 무한할 수 있다.
리액티브 스트림 정의하기
리액티브 스트림은 4개의 인터페이스로 정의된다.
Publisher(발행자)
하나의 Subscription당 하나의 Subscriber에 발행(전송)하는 데이터를 생성한다.
따라서 Publisher 인터페이스에는 Subscriber가 Publisher를 구독 신청할 수 있는 subscribe() 메서드 한 개가 선언되어 있다.
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
Subscriber(구독자)
Subscriber가 구독 신청되면 Publisher로부터 이벤트를 수신할 수 있다.
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item)
public void onError(Throwable throwable);
public void onComplete();
}
Subscriber가 수신할 첫 번째 이벤트는 onSubscribe()의 호출을 통해 이루어진다.
Publisher가 onSubscribe()를 호출할 때 이 메서드의 인자로 Subscription 객체를 Subscriber에 전달한다.
Subscriber는 Subscription 객체를 통해서 구독을 관리할 수 있다.
Subscription(구독)
public static interface Subscription {
public void request(long n);
public void cancel();
}
Subscriber는 request()를 호출하여 전송되는 데이터를 요청하거나, 또는 더 이상 데이터를 수신하지 않고 구독을 취소하는 cancel()를 호출할 수 있다.
request()의 매개변수인 n은 받고자 하는 데이터 항목 수이다. (백 프레셔_데이터 생산자(Producer)가 소비자(Consumer)의 처리 속도보다 빠르게 데이터를 생성할 때 발생하는 문제를 해결하는 메커니즘)
→ 요청된 수의 데이터를 Publisher가 전송한 후에 Subscriber는 다시 request()를 호출하여 더 많은 요청을 할 수 있다.
요청 과정
→ 데이터가 스트림을 통해 전달
→ 전송할 데이터가 없으면 onComplete()를 호출하여 작업이 끝났다고 알려준다.
→ onNext() 메서드가 호출되어 Publisher가 전송하는 데이터가 Subscriber에게 전달 or onError()
-> Subscriber의 데이터 요청 완료
Processor(프로세서)
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
Processor는 Subscriber와 Publisher 인터페이스를 결합한 것이다.
요약 : "Publisher → 0 또는 그 이상의 Processor를 통해 데이터를 끌어옴 → Subscriber에 전달"
리액터 시작하기
리액티브 프로그래밍은 일련의 작업 단계를 기술하는 것(명령형)이 아닌 데이터가 전달될 파이프라인을 구성해야 한다.
@Test
void 명령형_프로그래밍_예시() {
String name = "Seongmin";
String capitalName = name.toUpperCase();
String greeting = "Hello " + capitalName + "!";
System.out.println(greeting);
}
@Test
void 라액티브_프로그래밍_예시() {
Mono.just("Seongmin")
.map(String::toUpperCase)
.map(cn -> "Hello " + cn + "!")
.subscribe(System.out::println);
}
위 예시(리액티브) 과정
- just() 오퍼레이션은 첫 번째 것을 생성 (Mono 생성)
- 첫 번째 Mono가 값을 방출하면 첫 번째 map() 오퍼레이션에 전달 (대문자 변경, Mono 생성)
- 두 번째 map() 오퍼레이션에 전달 (문자열 결합, Mono 생성)
- subscribe() 호출에서는 세 번째 Mono를 구독하여 데이터를 수신(출력)한다.
- Mono 생성 이유 : 리액티브 스트림(Mono, Flux)은 불변(Immutable)함.
위 예의 코드가 단계별로 실행되는 것처럼 보이지만, 실제로는 데이터가 전달되는 파이프라인을 구성하는 것이다.
그리고 파이프라인의 각 단계에서는 어떻게 하든 데이터가 변경된다. 또한 각 오퍼레이션은 같은 스레드로 실행되거나 다른 스레드로 실행될 수 있다.
Mono VS Flux
공통
- Publisher 인터페이스 구현체
차이
- Mono : 하나의 데이터 항목만 갖는 데이터셋에 최적
- Flux : 0, 1 또는 다수(무한)의 데이터를 갖는 파이프라인
10.3 리액티브 오퍼레이션 적용하기
Flux, Mono는 리액터가 제공하는 가장 핵심적인 구성 요소다. 500개 이상의 오퍼레이션이 있으며, 각 오퍼레이션은 다음과 같이 분류할 수 있다.
- 생성(Creation) 오퍼레이션
- 조합(Combination) 오퍼레이션
- 변환(Transformation) 오퍼레이션
- 로직(Logic) 오퍼레이션
리액티브 타입 생성하기
객체로부터 생성하기
Flux, Mono의 just() 메서드를 사용하여 생성할 수 있다.
@Test
void 객체로부터_생성하기() {
Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
‼ 참고 (테스트 할 때, StepVerifier를 사용하자)
- 리액터의 StepVerifier를 사용하는 것이 Flux나 Mono를 테스트하는 더 좋은 방법이다.
- StepVerifier는 리액티브 타입을 구독한 후, 스트림을 통해 전달되는 데이터에 대해 assertion을 적용한다. 그리고 해당 스트림이 기대한 대로 동작하는지 검사한다.
컬렉션으로부터 생성하기
배열, Iterable 객체, 자바 Stream 객체로부터 생성할 수 있다.
- 배열 : fromArray()
- Iterable(List, Set, …) : fromIterable()
- Stream : fromStream()
리액티브 타입 조합하기
리액티브 타입 결합하기
하나의 Flux를 다른 것과 결합하려면 mergeWith() 오퍼레이션을 사용한다.
@Test
void Flux_결합하기() {
Flux<String> characterFlux = Flux
.just("짱구", "철수", "맹구")
.delayElements(Duration.ofMillis(500));
Flux<String> foodFlux = Flux
.just("제육", "돈까스", "커피")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
StepVerifier.create(mergedFlux)
.expectNext("짱구", "제육", "철수", "돈까스", "맹구", "커피")
.verifyComplete();
}
두 Flux 객체가 결합되면 하나의 Flux가 새로 생성된다. 그리고 StepVerifier가 구독할 때는 두개의 스트림을 번갈아 구독하게 된다.
다만 mergeWith()는 Flux들의 값이 완벽하게 번갈아 방출되게 보장할 수 없다. 이를 해결하고 싶다면 zip() 오퍼레이션을 사용할 수 있다.
@Test
void ZIP으로_결합하기() {
Flux<String> characterFlux = Flux
.just("짱구", "철수", "맹구");
Flux<String> foodFlux = Flux
.just("제육", "돈까스", "커피");
Flux<Tuple2<String, String>> mergedFlux = Flux.zip(characterFlux, foodFlux);
StepVerifier.create(mergedFlux)
.expectNextMatches(p ->
p.getT1().equals("짱구") && p.getT2().equals("제육")
)
.expectNextMatches(p ->
p.getT1().equals("철수") && p.getT2().equals("돈까스")
)
.expectNextMatches(p ->
p.getT1().equals("맹구") && p.getT2().equals("커피")
)
.verifyComplete();
}
Flux.zip()을 통해 방출되는 각 항목은 Tuple2(두 개의 다른 객체를 전달하는 컨테이너 객체)이다.
만약 Tuple2를 반환하고 싶지 않다면, 원하는 타입으로 변경하면 된다.
@Test
void ZIP으로_커스텀하게_결합하기() {
Flux<String> characterFlux = Flux
.just("짱구", "철수", "맹구");
Flux<String> foodFlux = Flux
.just("제육", "돈까스", "커피");
Flux<String> mergedFlux = Flux.zip(characterFlux, foodFlux, (c, f) -> c + " " + f);
StepVerifier.create(mergedFlux)
.expectNext("짱구 제육")
.expectNext("철수 돈까스")
.expectNext("맹구 커피")
.verifyComplete();
}
먼저 값을 방출하는 리액티브 타입 선택하기
first() 오퍼레이션은 두 Flux 객체 중 먼저 값을 방출하는 Flux의 값을 선택해서 발행한다.
→ first()가 deprecated되어, firstWithSignal()을 사용한다.
@Test
void 먼저_값을_방출하는_값을_발행하기() {
Flux<String> characterFlux = Flux
.just("짱구", "철수", "맹구")
.delaySubscription(Duration.ofMillis(100));
Flux<String> foodFlux = Flux
.just("제육", "돈까스", "커피");
Flux<String> mergedFlux = Flux.firstWithSignal(characterFlux, foodFlux);
StepVerifier.create(mergedFlux)
.expectNext("제육", "돈까스", "커피")
.verifyComplete();
}
리액티브 스트림의 변환과 필터링
리액티브 타입으로부터 데이터 필터링하기
맨 앞부터 원하는 개수의 항목을 무시하고 싶다면 skip() 오퍼레이션을 사용한다.
@Test
void 지정된_수의_메시지를_건너뛰기() {
Flux<String> skipFlux = Flux
.just("짱구", "철수", "맹구", "유리", "훈이")
.skip(3); // 1번째부터 3번째까지 무시 후 4번째부터 발행한다.
StepVerifier.create(skipFlux)
.expectNext("유리", "훈이")
.verifyComplete();
}
맨 앞부터 원하는 개수의 항목만 출력한다면 take() 오퍼레이션을 사용한다.
@Test
void 지정된_수의_메시지만_방출하기() {
Flux<String> favoriteCharacter = Flux
.just("짱구", "철수", "맹구", "유리", "훈이")
.take(3);
StepVerifier.create(favoriteCharacter)
.expectNext("짱구", "철수", "맹구")
.verifyComplete();
}
skip, take의 매개변수로 항목의 개수뿐만 아닌 경과 시간을 사용할 수도 있다.
Flux의 값을 더 범용적으로 필터링을 할 때는 filter() 오퍼레이션을 사용한다.
@Test
void 지정된_조건식의_메시지만_방출하기() {
Flux<String> favoriteCharacter = Flux
.just("짱구", "철수", "맹구", "유리", "훈이")
.filter(k -> !k.equals("훈이"));
StepVerifier.create(favoriteCharacter)
.expectNext("짱구", "철수", "맹구", "유리")
.verifyComplete();
}
항목의 중복을 제거하고 싶다면 distinct() 오퍼레이션을 사용한다.
@Test
void 중복되지_않는_메시지만_방출하기() {
Flux<String> duplicateFlux = Flux
.just("짱구", "철수", "맹구", "짱구", "철수")
.distinct();
StepVerifier.create(duplicateFlux)
.expectNext("짱구", "철수", "맹구")
.verifyComplete();
}
리액티브 데이터 매핑하기
가장 많이 사용하는 오퍼레이션 중 하나는 발행된 항목을 다른 형태나 타입으로 매핑(변환)하는 것이다.
이때는 목적에 따라 map(), flatMap() 오퍼레이션을 사용한다.
map() 오퍼레이션은 변환을 수행하는 Flux를 생성한다.
@Test
void map으로_메시지를_변환하기() {
Flux<String> animation = Flux
.just("짱구는 못말려", "철수는 못말려", "맹구는 못말려")
.map(n -> {
String[] split = n.split(" ");
return split[0];
});
StepVerifier.create(animation)
.expectNext("짱구는", "철수는", "맹구는")
.verifyComplete();
}
map()은 각 항목이 동기적으로 매핑이 수행된다. 따라서 비동기적으로 매핑을 수행하고 싶다면 flatMap() 오퍼레이션을 사용해야 한다. (그러나 flatMap()은 어렵다..)
flatMap()은 각 객체를 새로운 Mono나 Flux로 매핑하며, 해당 Mono나 Flux들의 결과는 하나의 새로운 Flux가 된다. 또한 위에서 말했던, 비동기적으로 매핑을 수행하고 싶다면 flatMap() + subscribeOn()을 사용한다.
@Test
void flatMap으로_메시지를_변환하기() {
Flux<String> animation = Flux
.just("짱구는 못말려", "철수는 못말려", "맹구는 못말려")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = n.split(" ");
return split[0];
})
.subscribeOn(Schedulers.parallel())
);
List<String> characterList = List.of("짱구는", "철수는", "맹구는");
StepVerifier.create(animation)
.expectNextMatches(characterList::contains)
.expectNextMatches(characterList::contains)
.expectNextMatches(characterList::contains)
.verifyComplete();
}
flatMap()이나 subscribeOn()을 사용할 때의 장점
: 다수의 병행 스레드에 작업을 분할하여 스트림의 처리량을 증가시킬 수 있다. 다만 어떤 작업이 먼저 끝날지 순서 보장이 안되어 방출되는 순서를 알 방법이 없다.
리액티브 스트림의 데이터 버퍼링하기
Flux를 통해 데이터를 처리할 때 데이터 스트림을 작은 청크 단위로 분할할 때 buffer()를 사용한다.
@Test
void 데이터를_버퍼링하기() {
Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);
StepVerifier.create(bufferedFlux)
.expectNext(Arrays.asList("Apple", "Orange", "Grape"))
.expectNext(Arrays.asList("Banana", "Strawberry"))
.verifyComplete();
}
위 코드를 List 컬렉션으로 병행 처리한 코드는 다음과 같다.
@Test
void 데이터를_병행으로_버퍼링하기() {
Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(String::toUpperCase)
.subscribeOn(Schedulers.parallel())
.log()
).subscribe();
}
04:44:55.447 [Test worker] INFO reactor.Flux.SubscribeOn.1 -- onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
04:44:55.451 [Test worker] INFO reactor.Flux.SubscribeOn.1 -- request(32)
04:44:55.452 [Test worker] INFO reactor.Flux.SubscribeOn.2 -- onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
04:44:55.452 [Test worker] INFO reactor.Flux.SubscribeOn.2 -- request(32)
04:44:55.453 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(APPLE)
04:44:55.453 [parallel-2] INFO reactor.Flux.SubscribeOn.2 -- onNext(BANANA)
04:44:55.453 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(ORANGE)
04:44:55.454 [parallel-2] INFO reactor.Flux.SubscribeOn.2 -- onNext(STRAWBERRY)
04:44:55.454 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(GRAPE)
04:44:55.454 [parallel-2] INFO reactor.Flux.SubscribeOn.2 -- onComplete()
04:44:55.454 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onComplete()
여기서 보면 1 스레드에서 APPLE, ORANGE, GRAPE를 처리하고, 2 스레드에서 BANANA, STRAWBERRY를 처리하는 즉, 두 개의 버퍼가 병행 처리됨을 볼 수 있다.
모든 항목을 포함하는 List를 방출하고 싶다면 collectList()를 사용한다.
@Test
void 모든_항목을_방출하는_리스트를_방출하기() {
Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
Mono<List<String>> fruitListMono = fruitFlux.collectList();
StepVerifier.create(fruitListMono)
.expectNext(Arrays.asList("Apple", "Orange", "Grape", "Banana", "Strawberry"))
.verifyComplete();
}
collectMap()을 사용해서 Map을 포함한 Mono를 생성할 수 있다.
@Test
void 모든_항목을_맵으로_방출하기() {
Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Orange", "Grape");
Mono<Map<Character, String>> fruitMapMono = fruitFlux.collectMap(a -> a.charAt(0));
StepVerifier.create(fruitMapMono)
.expectNextMatches(map -> map.size() == 3 &&
map.get('A').equals("Apple") &&
map.get('O').equals("Orange") &&
map.get('G').equals("Grape"))
.verifyComplete();
}
리액티브 타입에 로직 오퍼레이션 수행하기
모든 항목이 조건을 충족하는지 확인하기 위해 all() 오퍼레이션을 사용한다.
@Test
void 모든_항목이_조건을_충족하는지_검사하기() {
Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Strawberry");
// 모든 항목이 e를 가지기 때문에 true를 반환한다.
Mono<Boolean> hasBMono = fruitFlux.all(a -> a.contains("e"));
StepVerifier.create(hasBMono)
.expectNext(true)
.verifyComplete();
}
최소한 하나의 항목이 조건을 충족하는지 확인하기 위해 any() 오퍼레이션을 사용한다.
@Test
void 일부_항목이_조건을_충족하는지_검사하기() {
Flux<String> fruitFlux = Flux.just("Apple", "Orange", "Grape", "Strawberry");
// Apple만 A를 갖고 있기에 true를 반환한다.
Mono<Boolean> hasAMono = fruitFlux.any(a -> a.contains("A"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
}
출처 : 스프링 인 액션, Flux VS Mono
'Backend > Spring Boot' 카테고리의 다른 글
[SpringBoot + OpenAI(ChatGPT)] SpringBoot에서 OpenAI API를 이용해 연동하기 (3) | 2024.11.10 |
---|---|
[SpringBoot] 서블릿과 서블릿 컨테이너 (1) | 2024.06.28 |
[Springboot] Filter와 Interceptor (0) | 2024.05.08 |
Request DTO에서 @Getter를 쓰는 이유 (바인딩) (0) | 2024.04.08 |