WEB BE Repository/RabbitMQ

[RabbitMQ Java] Pub-Sub 2: 메시지의 라우팅, Direct Binding

조금씩 차근차근 2025. 9. 8. 18:22

이전 튜토리얼 에서는 간단한 로깅 시스템을 구축했다.

 

[RabbitMQ Java] Pub-Sub 모델, 그리고 Exchange

이전 튜토리얼 에서는 Work Queue를 생성했다. [RabbitMQ Java] Work Queue 직접 만들어보기와 메시지의 분배 방식이번 튜토리얼에서는 시간이 많이 걸리는 작업을 여러 작업자 간에 분배하는 데 사용할 Wo

dev.go-gradually.me

이를 통해 로그 메시지를 여러 수신자에게 브로드캐스트할 수 있었다.

 

이 튜토리얼에서는 여기에 기능을 추가해볼 것이다.
바로 메시지의 일부만 구독할 수 있도록 하는 것이다.
예를 들어, 디스크 공간을 절약하기 위해 중요한 오류 메시지만 로그 파일에 저장하면서도 모든 로그 메시지를 콘솔에 출력할 수 있다.

바인딩

이전 예제에서는 이미 바인딩을 생성하고 있었다. 다음과 같은 코드가 기억날 것이다.

channel.queueBind(queueName, EXCHANGE_NAME, "");

바인딩은 교환과 큐 사이의 관계이다.
즉, 큐가 이 교환의 메시지에 관심이 있다는 뜻이다.

 

바인딩은 추가 routingKey매개변수를 가질 수 있다.
basic_publish매개변수와의 혼동을 피하기 위해 이를 binding key라고 부르겠다 .
다음과 같이 키를 사용하여 바인딩을 만들 수 있다.

channel.queueBind(queueName, EXCHANGE_NAME, "black");

바인딩 키의 의미는 exchange 유형에 따라 달라진다.

이전에 사용했던 fanout exchange은 해당 값을 무시했었다.

Direct Exchange

이전 튜토리얼의 로깅 시스템은 모든 메시지를 모든 소비자에게 브로드캐스트했다.
이제 '심각도' 에 따라 메시지를 필터링할 수 있도록 이 시스템을 확장하려고 한다.
예를 들어, 심각한 오류만 디스크에 기록하고 경고나 정보 로그 메시지에 디스크 공간을 낭비하지 않도록 로그 메시지를 디스크에 기록하는 프로그램을 만들 수 있다.

 

우리는 별로 유연성이 없는 fanout exchange를 이용하고 있었다.

그저 무의미한 방송만 할 수 있었을 뿐이었다.

 

이제 대신 direct exchange을 사용할 것이다 .
direct exchange의 라우팅 알고리즘은 간단하다.
메시지는 메시지의 라우팅 키와 정확히 일치하는 바인팅 키를 갖는 큐로 전송된다.

이를 설명하기 위해 다음과 같은 구성을 생각해보자.


이 구성에서는 두 개의 큐가 바인딩된 directexchange을 볼 수 있다.
첫 번째 큐는 binding key orange와 바인딩되어 있고, 두 번째 큐는 black과 green 두 가지 binding key와 바인딩되어 있다.

 

 

이런 구성이면, routing key가 orange인 메시지를 exchange에 퍼블리시하면 메시지는 Q₁으로 라우팅될 것이다.
routing key가 black이나 green인 메시지는 Q₂로 전달될 것이다.
그 외의 메시지는 모두 폐기된다.

다중 바인딩


동일한 binding key로 여러 큐를 바인딩하는 것은 아무런 문제가 없다.
예를 들어, 우리의 예시에서 X와 Q₁을 binding key black으로 바인딩할 수도 있다.
이 경우 direct exchange는 fanout처럼 동작하여, 메시지를 일치하는 모든 큐에 브로드캐스트한다.
즉, routing key가 black인 메시지는 Q₁과 Q₂ 모두에 전달된다.

로그 Emit

이제 이 모델을 로깅 시스템에 적용해 보자.
fanout 대신 메시지를 direct exchange로 보낼 것이며, 로그의 심각도(severity) 를 routing key로 전달할 것이다.
이렇게 하면 수신 프로그램이 원하는 심각도만 선택해서 받을 수 있다.
먼저 로그를 발행(emitting)하는 부분에 집중해 보자.

 

언제나 그렇듯, 먼저 exchange를 생성해야 한다.

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

이제 메시지를 보낼 준비가 되었다.

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

단순화를 위해, 여기서 severityinfo, warning, error 중 하나라고 가정하겠다.

구독 (Subscribing)

메시지 수신은 이전 튜토리얼과 동일하게 동작한다.
단 하나의 차이점은, 우리가 관심 있는 각 심각도(severity)에 대해 새로운 바인딩을 생성한다는 점이다.

String queueName = channel.queueDeclare().getQueue();for(String severity : argv){  channel.queueBind(queueName, EXCHANGE_NAME, severity);}

구현


클래스 코드 EmitLogDirect.java

package gogradually.logsdirect;  

import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  

import java.nio.charset.StandardCharsets;  

public class EmitLogDirect {  

    public static final String EXCHANGE_NAME = "direct_logs";  

    public static void main(String[] args) throws Exception{  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        try(Connection connection = factory.newConnection();  
            Channel channel = connection.createChannel()){  
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);  

            String serverity = getSeverity(args);  
            String message = getMessage(args);  

            channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes(StandardCharsets.UTF_8));  
            System.out.println(" [x] Sent '" + serverity + "':'" + message + "'");  
        }  
    }  


    private static String getSeverity(String[] strings) {  
        if (strings.length < 1)  
            return "info";  
        return strings[0];  
    }  

    private static String getMessage(String[] strings) {  
        if (strings.length < 2)  
            return "Hello World!";  
        return joinStrings(strings, " ", 1);  
    }  

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {  
        int length = strings.length;  
        if (length == 0) return "";  
        if (length <= startIndex) return "";  
        StringBuilder words = new StringBuilder(strings[startIndex]);  
        for (int i = startIndex + 1; i < length; i++) {  
            words.append(delimiter).append(strings[i]);  
        }  
        return words.toString();  
    }  
}

코드 ReceiveLogsDirect.java

package gogradually.logsdirect;  

import com.rabbitmq.client.*;  

import java.nio.charset.StandardCharsets;  

public class ReceiveLogsDirect {  

    public static final String EXCHANGE_NAME = "direct_logs";  

    public static void main(String[] args) throws Exception{  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);  
        String queueName = channel.queueDeclare().getQueue();  

        if (args.length < 1){  
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");  
            System.exit(1);  
        }  

        for (String severity : args) {  
            channel.queueBind(queueName, EXCHANGE_NAME, severity);  
        }  
        System.out.println(" [*] Waiting for message. To exit press CTRL+C");  

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);  
            System.out.println(" [*] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");  
        };  
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});  
    }  
}

이제 이를 실행해보자.
먼제 info를 받는 Reveiver와 error를 받는 receiver를 각각 정의해주었다.

 

이후 EmitLogDirect를 실행해보니, 정상적으로 자신이 구독한 로그만 받는 것을 확인할 수 있었다.

 

용어/메소드 정리

  • 라우팅
  • 바인딩 키
  • 라우팅 키
  • 메시지의 라우팅 키 = 큐의 바인딩 키
  • Direct Exchange(직접 교환기)
  • fanout exchange

 

본 내용은 RabbitMQ Tutorial - 라우팅을 참고하여 작성되었습니다.