WEB BE Repository/RabbitMQ

[RabbitMQ Java] Pub-Sub 3: 토픽

조금씩 차근차근 2025. 9. 8. 18:49

이전 튜토리얼에서는 로깅 시스템을 개선했다.
단순히 브로드캐스팅만 가능한 fanout exchange 대신 direct exchange를 사용하여 선택적으로 로그를 받을 수 있도록 했었다.

 

 

direct exchange를 사용함으로써 시스템이 개선되었지만, 여전히 한계가 존재한다.
바로 여러 기준을 기반으로 라우팅할 수는 없다는 점이다.

우리의 로깅 시스템에서는 심각도(severity)뿐 아니라 로그를 발생시킨 소스(source)에 따라서도 구독하고 싶을 수 있다.
이 개념은 Unix의 syslog 도구에서 익숙할 수 있다.

syslog는 심각도(info/warn/crit...)와 facility(auth/cron/kern...) 모두를 기준으로 로그를 라우팅한다.

 

이를 지원하는 기법이 바로 토픽(Topic)이다.

 

이 방식은 많은 유연성을 제공한다.
예를 들어 cron에서 발생하는 치명적인 에러만 듣고 싶을 수도 있고, kern에서 발생하는 모든 로그를 받고 싶을 수도 있다.

 

이를 구현하기 위해서는 더 복잡한 topic exchange를 알아야 한다.


Topic exchange

topic exchange로 전송되는 메시지의 routing_key는 임의의 값일 수 없으며, 점(.)으로 구분된 단어들의 리스트여야 한다.
단어들은 무엇이든 가능하지만, 보통 메시지와 연결된 속성을 나타내게 된다.

  • stock.usd.nyse
  • nyse.vmw
  • quick.orange.rabbit

routing key는 최대 255바이트까지 원하는 만큼의 단어를 포함할 수 있다.

 

binding key 또한 같은 형식을 가져야 한다.
동작 원리는 direct exchange와 비슷하다.
즉, 특정 routing key로 전송된 메시지는 해당하는 binding key와 일치하는 모든 큐에 전달된다.
다만, 두 가지 특별한 와일드카드가 있습니다:

  • * (star): 정확히 한 단어를 대체함.
  • # (hash): 0개 이상의 단어를 대체함.

예시를 통해 이해해보자.

이 예시에서 우리는 동물을 설명하는 메시지를 보낼 것이다.
routing key는 <속도>.<색상>.<종>세 개의 단어로 구성된다.

  • Q1*.orange.*로 바인딩되어 있어 모든 "주황색 동물"을 구독한다.
  • Q2*.*.rabbitlazy.#로 바인딩되어 있어 모든 "토끼"와 모든 "게으른 동물"을 구독한다.

동작 예시

  • quick.orange.rabbit → Q1, Q2 모두 전달됨
  • lazy.orange.elephant → Q1, Q2 모두 전달됨
  • quick.orange.fox → Q1에만 전달됨
  • lazy.brown.fox → Q2에만 전달됨
  • lazy.pink.rabbit → Q2에 한 번만 전달됨 (두 바인딩에 모두 일치하지만 중복 없음)
  • quick.brown.fox → 어떤 큐에도 전달되지 않고 폐기됨

만약 규칙을 깨고 단어나 점 개수가 맞지 않는 메시지(orange, quick.orange.new.rabbit)를 보내면 바인딩과 일치하지 않아 손실된다.
그러나 lazy.orange.new.rabbit은 단어가 4개지만 lazy.#와 일치하므로 Q2에 전달되게 된다.


Topic exchange의 특징

  • #으로 바인딩하면 모든 메시지를 받게 되어 fanout exchange처럼 동작한다.
  • *# 같은 특수 문자를 사용하지 않으면 direct exchange처럼 동작한다.

구현

우리의 로깅 시스템에서는 topic exchange를 사용할 것이다.
우선 로그의 routing key는 아래 두 단어 구조로 구성된다고 가정하자.

  • <facility>.<severity>

코드 EmitLogTopic.java

package gogradually.logstopic;  

import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  

import java.nio.charset.StandardCharsets;  

public class EmitLogTopic {  

    public static final String EXCHANGE_NAME = "topic_logs";  

    public static void main(String[] args) throws Exception{  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        try(Connection connection = factory.newConnection();  
            Channel channel = connection.createChannel()){  
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);  

            String routingKey = getRouting(args);  
            String message = getMessage(args);  

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));  
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");  
        }  
    }  


    private static String getRouting(String[] strings) {  
        if (strings.length < 1)  
            return "anonymous.info";  
        return strings[0];  
    }  

    private static String getMessage(String[] strings) {  
        if (strings.length < 2)  
            return "Hello World!";  
        return joinStrings(strings, " ", 1);  
    }  

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {  
        int length = strings.length;  
        if (length == 0) return "";  
        if (length <= startIndex) return "";  
        StringBuilder words = new StringBuilder(strings[startIndex]);  
        for (int i = startIndex + 1; i < length; i++) {  
            words.append(delimiter).append(strings[i]);  
        }  
        return words.toString();  
    }  
}

코드 ReceiveLogsTopic.java

package gogradually.logstopic;  

import com.rabbitmq.client.*;  

import java.nio.charset.StandardCharsets;  

public class ReceiveLogsTopic {  

    public static final String EXCHANGE_NAME = "topic_logs";  

    public static void main(String[] args) throws Exception{  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);  
        String queueName = channel.queueDeclare().getQueue();  

        if (args.length < 1){  
            System.err.println("Usage: ReceiveLogsTopic [bind_key]...");  
            System.exit(1);  
        }  

        for (String bindingKey : args) {  
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);  
        }  
        System.out.println(" [*] Waiting for message. To exit press CTRL+C");  

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);  
            System.out.println(" [*] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  
        };  
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});  
    }  
}

실행결과, 적절히 구독한 메시지를 가져오는 것을 알 수 있다.

( EmitLogTopic.javaReceiveLogsTopic.java 의 전체 소스 코드 )

용어 정리

  • 라우팅 키
  • 토픽
  • 와일드카드
  • .
  • #
  • *

본 내용은 RabbitMQ Tutorial - 토픽 을 참고하여 작성되었습니다.