2024. 11. 17. 17:15ㆍProject Reactor
Reactive Streams는 비동기적 데이터 스트림 처리를 위한 표준 규격입니다. 주로 JVM(Java Virtual Machine) 환경에서 사용되며, 데이터의 흐름을 효율적으로 처리하고, 특히 데이터 소비자(consumer)가 데이터 생산자(producer)의 처리 속도를 조절하는 백프레셔(Backpressure)를 명확하게 지원합니다.
📌 Reactive Streams의 목적
Reactive Streams의 주요 목적은 다음과 같습니다.
- ✅ 비동기(Asynchronous) 데이터 처리
데이터를 비동기적으로 효율적이며 논블로킹(non-blocking) 방식으로 처리합니다. - ✅ 백프레셔(Backpressure) 지원
데이터 소비자가 처리 속도에 따라 데이터 생산자로부터 데이터를 요청(request)하여 속도를 조절할 수 있게 합니다. - ✅ 상호운용성(Interoperability)
서로 다른 라이브러리와 시스템 간의 호환성을 보장합니다.
🚩 Reactive Streams의 주요 구성 요소
Reactive Streams는 다음의 4가지 인터페이스로 구성됩니다.
인터페이스 | 설명 |
---|---|
Publisher |
데이터를 발행하는 생산자입니다. |
Subscriber |
Publisher가 발행하는 데이터를 소비하는 소비자입니다. |
Subscription |
Publisher와 Subscriber 간의 연결을 나타내며, 데이터 요청(request)과 구독 취소(cancel) 기능을 제공합니다. |
Processor |
데이터를 받아 가공하고 다시 발행하는 역할을 수행하는 중간 처리기입니다. Publisher와 Subscriber 역할을 동시에 수행할 수 있습니다. |
🎯 Reactive Streams 흐름 과정
Reactive Streams는 주로 아래의 과정을 따릅니다.
- 구독(Subscribe)
- Subscriber가 Publisher에게 데이터를 받겠다고 알립니다.
- 구독 성공 시 Subscription 생성
- Publisher는 Subscription을 생성하여 Subscriber에게 전달합니다.
- 데이터 요청(Request)
- Subscriber는 Subscription의
request(n)
메서드를 통해 Publisher에게 처리 가능한 데이터 양을 요청합니다.
- Subscriber는 Subscription의
- 데이터 전달(onNext)
- Publisher는 Subscriber가 요청한 만큼의 데이터를 Subscriber에게 전달합니다.
- 완료(onComplete) 또는 에러(onError) 알림
- Publisher는 더 이상 전달할 데이터가 없으면
onComplete()
를 호출하거나, 에러가 발생하면onError(Throwable)
를 호출합니다.
- Publisher는 더 이상 전달할 데이터가 없으면
📌 Reactive Streams의 예시 (Java)
간단한 예시로 이해를 돕겠습니다.
Publisher<Integer> publisher = new Publisher<>() {
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
int count = 1;
boolean canceled = false;
public void request(long n) {
for (int i = 0; i < n && !canceled; i++) {
subscriber.onNext(count++);
}
if (!canceled) {
subscriber.onComplete();
}
}
public void cancel() {
canceled = true;
}
});
}
};
Subscriber<Integer> subscriber = new Subscriber<>() {
Subscription subscription;
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(3); // 데이터 3개 요청
}
public void onNext(Integer data) {
System.out.println("데이터 받음: " + data);
}
public void onError(Throwable t) {
System.out.println("에러 발생: " + t.getMessage());
}
public void onComplete() {
System.out.println("모든 데이터 전달 완료");
}
};
publisher.subscribe(subscriber);
출력:
데이터 받음: 1
데이터 받음: 2
데이터 받음: 3
모든 데이터 전달 완료
🚀 Reactive Streams의 구현체들
Reactive Streams는 대표적으로 다음과 같은 다양한 라이브러리에서 구현됩니다.
- Project Reactor
- RxJava
- Akka Streams
- Vert.x
이러한 구현체들은 모두 Reactive Streams의 인터페이스를 따르기 때문에 서로 간에 호환성을 유지할 수 있습니다.
🎖️ 결론 및 핵심 요약
Reactive Streams는 효율적인 데이터 처리, 특히 백프레셔를 지원하는 비동기 데이터 스트림 처리 표준입니다. Java 생태계 내 다양한 리액티브 프로그래밍 라이브러리가 Reactive Streams를 통해 통합되어 성능과 안정성을 동시에 향상시킬 수 있습니다. 🌟🚀
'Project Reactor' 카테고리의 다른 글
Non-Reactive(비반응적) vs. Reactive(반응적): 우유 공장 예제 (0) | 2024.11.21 |
---|---|
How to read marble diagrams? (0) | 2024.11.20 |