[RabbitMQ Java] Pub-Sub 2: 메시지의 라우팅, Direct Binding
이전 튜토리얼 에서는 간단한 로깅 시스템을 구축했다.
[RabbitMQ Java] Pub-Sub 모델, 그리고 Exchange
이전 튜토리얼 에서는 Work Queue를 생성했다. [RabbitMQ Java] Work Queue 직접 만들어보기와 메시지의 분배 방식이번 튜토리얼에서는 시간이 많이 걸리는 작업을 여러 작업자 간에 분배하는 데 사용할 Wo
dev.go-gradually.me
이를 통해 로그 메시지를 여러 수신자에게 브로드캐스트할 수 있었다.
이 튜토리얼에서는 여기에 기능을 추가해볼 것이다.
바로 메시지의 일부만 구독할 수 있도록 하는 것이다.
예를 들어, 디스크 공간을 절약하기 위해 중요한 오류 메시지만 로그 파일에 저장하면서도 모든 로그 메시지를 콘솔에 출력할 수 있다.
바인딩
이전 예제에서는 이미 바인딩을 생성하고 있었다. 다음과 같은 코드가 기억날 것이다.
channel.queueBind(queueName, EXCHANGE_NAME, "");
바인딩은 교환과 큐 사이의 관계이다.
즉, 큐가 이 교환의 메시지에 관심이 있다는 뜻이다.
바인딩은 추가 routingKey
매개변수를 가질 수 있다.basic_publish
매개변수와의 혼동을 피하기 위해 이를 binding key
라고 부르겠다 .
다음과 같이 키를 사용하여 바인딩을 만들 수 있다.
channel.queueBind(queueName, EXCHANGE_NAME, "black");
바인딩 키의 의미는 exchange 유형에 따라 달라진다.
이전에 사용했던 fanout exchange은 해당 값을 무시했었다.
Direct Exchange
이전 튜토리얼의 로깅 시스템은 모든 메시지를 모든 소비자에게 브로드캐스트했다.
이제 '심각도' 에 따라 메시지를 필터링할 수 있도록 이 시스템을 확장하려고 한다.
예를 들어, 심각한 오류만 디스크에 기록하고 경고나 정보 로그 메시지에 디스크 공간을 낭비하지 않도록 로그 메시지를 디스크에 기록하는 프로그램을 만들 수 있다.
우리는 별로 유연성이 없는 fanout
exchange를 이용하고 있었다.
그저 무의미한 방송만 할 수 있었을 뿐이었다.
이제 대신 direct exchange을 사용할 것이다 .direct exchange
의 라우팅 알고리즘은 간단하다.
메시지는 메시지의 라우팅 키와 정확히 일치하는 바인팅 키를 갖는 큐로 전송된다.
이를 설명하기 위해 다음과 같은 구성을 생각해보자.
이 구성에서는 두 개의 큐가 바인딩된 direct
exchange을 볼 수 있다.
첫 번째 큐는 binding key orange와 바인딩되어 있고, 두 번째 큐는 black과 green 두 가지 binding key와 바인딩되어 있다.
이런 구성이면, routing key가 orange인 메시지를 exchange에 퍼블리시하면 메시지는 Q₁으로 라우팅될 것이다.
routing key가 black이나 green인 메시지는 Q₂로 전달될 것이다.
그 외의 메시지는 모두 폐기된다.
다중 바인딩
동일한 binding key로 여러 큐를 바인딩하는 것은 아무런 문제가 없다.
예를 들어, 우리의 예시에서 X와 Q₁을 binding key black
으로 바인딩할 수도 있다.
이 경우 direct exchange는 fanout처럼 동작하여, 메시지를 일치하는 모든 큐에 브로드캐스트한다.
즉, routing key가 black
인 메시지는 Q₁과 Q₂ 모두에 전달된다.
로그 Emit
이제 이 모델을 로깅 시스템에 적용해 보자.
fanout 대신 메시지를 direct exchange로 보낼 것이며, 로그의 심각도(severity) 를 routing key로 전달할 것이다.
이렇게 하면 수신 프로그램이 원하는 심각도만 선택해서 받을 수 있다.
먼저 로그를 발행(emitting)하는 부분에 집중해 보자.
언제나 그렇듯, 먼저 exchange를 생성해야 한다.
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
이제 메시지를 보낼 준비가 되었다.
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
단순화를 위해, 여기서 severity
는 info, warning, error 중 하나라고 가정하겠다.
구독 (Subscribing)
메시지 수신은 이전 튜토리얼과 동일하게 동작한다.
단 하나의 차이점은, 우리가 관심 있는 각 심각도(severity)에 대해 새로운 바인딩을 생성한다는 점이다.
String queueName = channel.queueDeclare().getQueue();for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity);}
구현
클래스 코드 EmitLogDirect.java
package gogradually.logsdirect;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class EmitLogDirect {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String serverity = getSeverity(args);
String message = getMessage(args);
channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + serverity + "':'" + message + "'");
}
}
private static String getSeverity(String[] strings) {
if (strings.length < 1)
return "info";
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) return "";
if (length <= startIndex) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
코드 ReceiveLogsDirect.java
package gogradually.logsdirect;
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class ReceiveLogsDirect {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
if (args.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : args) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for message. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [*] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
이제 이를 실행해보자.
먼제 info를 받는 Reveiver와 error를 받는 receiver를 각각 정의해주었다.
이후 EmitLogDirect를 실행해보니, 정상적으로 자신이 구독한 로그만 받는 것을 확인할 수 있었다.
용어/메소드 정리
- 라우팅
- 바인딩 키
- 라우팅 키
- 메시지의 라우팅 키 = 큐의 바인딩 키
- Direct Exchange(직접 교환기)
- fanout exchange
본 내용은 RabbitMQ Tutorial - 라우팅을 참고하여 작성되었습니다.