이전 튜토리얼 에서는 간단한 로깅 시스템을 구축했다.
[RabbitMQ Java] Pub-Sub 모델, 그리고 Exchange
이전 튜토리얼 에서는 Work Queue를 생성했다. [RabbitMQ Java] Work Queue 직접 만들어보기와 메시지의 분배 방식이번 튜토리얼에서는 시간이 많이 걸리는 작업을 여러 작업자 간에 분배하는 데 사용할 Wo
dev.go-gradually.me
이를 통해 로그 메시지를 여러 수신자에게 브로드캐스트할 수 있었다.
이 튜토리얼에서는 여기에 기능을 추가해볼 것이다.
바로 메시지의 일부만 구독할 수 있도록 하는 것이다.
예를 들어, 디스크 공간을 절약하기 위해 중요한 오류 메시지만 로그 파일에 저장하면서도 모든 로그 메시지를 콘솔에 출력할 수 있다.
바인딩
이전 예제에서는 이미 바인딩을 생성하고 있었다. 다음과 같은 코드가 기억날 것이다.
channel.queueBind(queueName, EXCHANGE_NAME, "");
바인딩은 교환과 큐 사이의 관계이다.
즉, 큐가 이 교환의 메시지에 관심이 있다는 뜻이다.
바인딩은 추가 routingKey매개변수를 가질 수 있다.basic_publish매개변수와의 혼동을 피하기 위해 이를 binding key라고 부르겠다 .
다음과 같이 키를 사용하여 바인딩을 만들 수 있다.
channel.queueBind(queueName, EXCHANGE_NAME, "black");
바인딩 키의 의미는 exchange 유형에 따라 달라진다.
이전에 사용했던 fanout exchange은 해당 값을 무시했었다.
Direct Exchange
이전 튜토리얼의 로깅 시스템은 모든 메시지를 모든 소비자에게 브로드캐스트했다.
이제 '심각도' 에 따라 메시지를 필터링할 수 있도록 이 시스템을 확장하려고 한다.
예를 들어, 심각한 오류만 디스크에 기록하고 경고나 정보 로그 메시지에 디스크 공간을 낭비하지 않도록 로그 메시지를 디스크에 기록하는 프로그램을 만들 수 있다.
우리는 별로 유연성이 없는 fanout exchange를 이용하고 있었다.
그저 무의미한 방송만 할 수 있었을 뿐이었다.
이제 대신 direct exchange을 사용할 것이다 .direct exchange의 라우팅 알고리즘은 간단하다.
메시지는 메시지의 라우팅 키와 정확히 일치하는 바인팅 키를 갖는 큐로 전송된다.

이를 설명하기 위해 다음과 같은 구성을 생각해보자.

이 구성에서는 두 개의 큐가 바인딩된 directexchange을 볼 수 있다.
첫 번째 큐는 binding key orange와 바인딩되어 있고, 두 번째 큐는 black과 green 두 가지 binding key와 바인딩되어 있다.
이런 구성이면, routing key가 orange인 메시지를 exchange에 퍼블리시하면 메시지는 Q₁으로 라우팅될 것이다.
routing key가 black이나 green인 메시지는 Q₂로 전달될 것이다.
그 외의 메시지는 모두 폐기된다.
다중 바인딩

동일한 binding key로 여러 큐를 바인딩하는 것은 아무런 문제가 없다.
예를 들어, 우리의 예시에서 X와 Q₁을 binding key black으로 바인딩할 수도 있다.
이 경우 direct exchange는 fanout처럼 동작하여, 메시지를 일치하는 모든 큐에 브로드캐스트한다.
즉, routing key가 black인 메시지는 Q₁과 Q₂ 모두에 전달된다.
로그 Emit
이제 이 모델을 로깅 시스템에 적용해 보자.
fanout 대신 메시지를 direct exchange로 보낼 것이며, 로그의 심각도(severity) 를 routing key로 전달할 것이다.
이렇게 하면 수신 프로그램이 원하는 심각도만 선택해서 받을 수 있다.
먼저 로그를 발행(emitting)하는 부분에 집중해 보자.
언제나 그렇듯, 먼저 exchange를 생성해야 한다.
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
이제 메시지를 보낼 준비가 되었다.
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
단순화를 위해, 여기서 severity는 info, warning, error 중 하나라고 가정하겠다.
구독 (Subscribing)
메시지 수신은 이전 튜토리얼과 동일하게 동작한다.
단 하나의 차이점은, 우리가 관심 있는 각 심각도(severity)에 대해 새로운 바인딩을 생성한다는 점이다.
String queueName = channel.queueDeclare().getQueue();for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity);}
구현

클래스 코드 EmitLogDirect.java
package gogradually.logsdirect;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class EmitLogDirect {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String serverity = getSeverity(args);
String message = getMessage(args);
channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + serverity + "':'" + message + "'");
}
}
private static String getSeverity(String[] strings) {
if (strings.length < 1)
return "info";
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) return "";
if (length <= startIndex) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
코드 ReceiveLogsDirect.java
package gogradually.logsdirect;
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class ReceiveLogsDirect {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
if (args.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : args) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for message. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [*] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
이제 이를 실행해보자.
먼제 info를 받는 Reveiver와 error를 받는 receiver를 각각 정의해주었다.


이후 EmitLogDirect를 실행해보니, 정상적으로 자신이 구독한 로그만 받는 것을 확인할 수 있었다.


용어/메소드 정리
- 라우팅
- 바인딩 키
- 라우팅 키
- 메시지의 라우팅 키 = 큐의 바인딩 키
- Direct Exchange(직접 교환기)
- fanout exchange
본 내용은 RabbitMQ Tutorial - 라우팅을 참고하여 작성되었습니다.
'WEB BE Repository > RabbitMQ' 카테고리의 다른 글
| [RabbitMQ Java] RabbitMQ를 이용한 RPC (0) | 2025.09.08 |
|---|---|
| [RabbitMQ Java] Pub-Sub 3: 토픽 (0) | 2025.09.08 |
| [RabbitMQ Java] Pub-Sub 1: Pub-Sub 모델, 그리고 Exchange (0) | 2025.09.08 |
| [RabbitMQ Java] Work Queue 직접 만들어보기와 메시지의 분배 방식 (0) | 2025.09.08 |
| [RabbitMQ Java] 가장 간단한 프로듀서-컨슈머 사용해보기 (0) | 2025.09.07 |