[RabbitMQ Java] Pub-Sub 3: 토픽
이전 튜토리얼에서는 로깅 시스템을 개선했다.
단순히 브로드캐스팅만 가능한 fanout exchange 대신 direct exchange를 사용하여 선택적으로 로그를 받을 수 있도록 했었다.
direct exchange를 사용함으로써 시스템이 개선되었지만, 여전히 한계가 존재한다.
바로 여러 기준을 기반으로 라우팅할 수는 없다는 점이다.
우리의 로깅 시스템에서는 심각도(severity)뿐 아니라 로그를 발생시킨 소스(source)에 따라서도 구독하고 싶을 수 있다.
이 개념은 Unix의 syslog 도구에서 익숙할 수 있다.
syslog는 심각도(info/warn/crit...)와 facility(auth/cron/kern...) 모두를 기준으로 로그를 라우팅한다.
이를 지원하는 기법이 바로 토픽(Topic)이다.
이 방식은 많은 유연성을 제공한다.
예를 들어 cron
에서 발생하는 치명적인 에러만 듣고 싶을 수도 있고, kern
에서 발생하는 모든 로그를 받고 싶을 수도 있다.
이를 구현하기 위해서는 더 복잡한 topic exchange를 알아야 한다.
Topic exchange
topic exchange로 전송되는 메시지의 routing_key
는 임의의 값일 수 없으며, 점(.
)으로 구분된 단어들의 리스트여야 한다.
단어들은 무엇이든 가능하지만, 보통 메시지와 연결된 속성을 나타내게 된다.
stock.usd.nyse
nyse.vmw
quick.orange.rabbit
routing key는 최대 255바이트까지 원하는 만큼의 단어를 포함할 수 있다.
binding key 또한 같은 형식을 가져야 한다.
동작 원리는 direct exchange와 비슷하다.
즉, 특정 routing key로 전송된 메시지는 해당하는 binding key와 일치하는 모든 큐에 전달된다.
다만, 두 가지 특별한 와일드카드가 있습니다:
*
(star): 정확히 한 단어를 대체함.#
(hash): 0개 이상의 단어를 대체함.
예시를 통해 이해해보자.
이 예시에서 우리는 동물을 설명하는 메시지를 보낼 것이다.
routing key는 <속도>.<색상>.<종>
세 개의 단어로 구성된다.
Q1
은*.orange.*
로 바인딩되어 있어 모든 "주황색 동물"을 구독한다.Q2
는*.*.rabbit
과lazy.#
로 바인딩되어 있어 모든 "토끼"와 모든 "게으른 동물"을 구독한다.
동작 예시
quick.orange.rabbit
→ Q1, Q2 모두 전달됨lazy.orange.elephant
→ Q1, Q2 모두 전달됨quick.orange.fox
→ Q1에만 전달됨lazy.brown.fox
→ Q2에만 전달됨lazy.pink.rabbit
→ Q2에 한 번만 전달됨 (두 바인딩에 모두 일치하지만 중복 없음)quick.brown.fox
→ 어떤 큐에도 전달되지 않고 폐기됨
만약 규칙을 깨고 단어나 점 개수가 맞지 않는 메시지(orange
, quick.orange.new.rabbit
)를 보내면 바인딩과 일치하지 않아 손실된다.
그러나 lazy.orange.new.rabbit
은 단어가 4개지만 lazy.#
와 일치하므로 Q2에 전달되게 된다.
Topic exchange의 특징
#
으로 바인딩하면 모든 메시지를 받게 되어 fanout exchange처럼 동작한다.*
나#
같은 특수 문자를 사용하지 않으면 direct exchange처럼 동작한다.
구현
우리의 로깅 시스템에서는 topic exchange를 사용할 것이다.
우선 로그의 routing key는 아래 두 단어 구조로 구성된다고 가정하자.
<facility>.<severity>
코드 EmitLogTopic.java
package gogradually.logstopic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = getRouting(args);
String message = getMessage(args);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
private static String getRouting(String[] strings) {
if (strings.length < 1)
return "anonymous.info";
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) return "";
if (length <= startIndex) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
코드 ReceiveLogsTopic.java
package gogradually.logstopic;
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class ReceiveLogsTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
if (args.length < 1){
System.err.println("Usage: ReceiveLogsTopic [bind_key]...");
System.exit(1);
}
for (String bindingKey : args) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for message. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [*] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
( EmitLogTopic.java 및 ReceiveLogsTopic.java 의 전체 소스 코드 )
용어 정리
- 라우팅 키
- 토픽
- 와일드카드
.
#
*
본 내용은 RabbitMQ Tutorial - 토픽 을 참고하여 작성되었습니다.