이전 튜토리얼 에서는 Work Queue를 생성했다.
[RabbitMQ Java] Work Queue 직접 만들어보기와 메시지의 분배 방식
이번 튜토리얼에서는 시간이 많이 걸리는 작업을 여러 작업자 간에 분배하는 데 사용할 Work Queue(작업 큐)를 만들어 보자. Work Queue(Task Queue)의 핵심 아이디어는 리소스 소모가 많은 작업을 즉시
dev.go-gradually.me
Work Queue의 기본 가정은 각 작업이 정확히 하나의 워커에게 전달된다는 것이다.
이번 파트에서는 완전히 다른 작업을 수행할 것이다.
바로, 하나의 메시지가 이를 구독한 여러 컨슈머에게 메시지를 전달하는 형태이다.
이 패턴을 "Pub/Sub"이라고 한다.
즉, 다음과 같다.
- 생산자/소비자 모델은 하나의 메시지가 하나의 소비자에게 전달되고,
- 발행/구독 모델은 하나의 메시지가 이를 구독하는 모든 구독자에게 전달된다.
(n사의 면접에서 이 질문을 받았던 기억이 있을지도...?)
패턴을 설명하기 위해 간단한 로깅 시스템을 구축해 보자.
이 시스템은 두 개의 프로그램으로 구성된다.
- 로그 메시지를 내보내는 프로그램
- 로그 메시지를 받아서 출력하는 프로그램.
로깅 시스템에서는 실행 중인 모든 수신 프로그램 사본이 메시지를 수신한다.
이렇게 하면 하나의 수신 프로그램을 실행하여 로그를 디스크로 전송할 수 있고, 동시에 다른 수신 프로그램을 실행하여 화면에서 로그를 확인할 수 있다.
기본적으로 게시된 로그 메시지는 모든 수신자에게 브로드캐스트될 것이다.
교환기(Exchange)
이전 튜토리얼에서는 큐와 메시지를 주고받는 방법을 살펴보았다.
이제 Rabbit의 전체 메시징 모델을 살펴보자.
이전 튜토리얼에서는 아래 세가지를 각습했다.
- 프로듀서는 메시지를 보내는 사용자 애플리케이션이다.
- 큐는 메시지를 저장하는 버퍼이다.
- 소비자는 메시지를 수신하는 사용자 애플리케이션이다.
RabbitMQ 메시징 모델의 핵심 아이디어는 프로듀서가 큐에 직접 메시지를 전송하지 않는다는 것이다.
실제로 프로듀서는 메시지가 큐에 전달될지조차 모르는 경우가 많다.
대신, 프로듀서는 교환(exchange) 에만 메시지를 보낼 수 있다.
교환은 매우 간단하다. 한쪽에서는 프로듀서로부터 메시지를 수신하고 다른 쪽에서는 메시지를 대기열에 푸시한다.
여기서의 핵심은 수신한 메시지의 처리이다.
교환은 수신한 메시지를 어떻게 처리할지 정확히 알아야 한다.
- 특정 대기열에 추가해야 할까?
- 여러 대기열에 추가해야 할까?
- 아니면 폐기해야 할까?
이러한 규칙은 교환 유형(exchange type) 에 따라 정의된다 .
사용 가능한 교환 유형은 direct
, topic
, headers
, fanout
이다.
마지막 유형인 팬아웃에 집중해보자.
팬아웃 유형의 exchange를 만들고 logs
로 명명해 보자.
팬아웃 교환은 매우 간단하다.
이름에서 짐작할 수 있듯이, 해당 교환기에 바인딩된 모든 큐로 메시지를 복제해 보낸다. 이때 라우팅 키는 무시된다.
바로 이것이 로거에 필요한 기능이다.
exchange목록 리스트업 하기
서버의 exchange 리스트를 나열하려면 다음과 같이 rabbitmqctl
명령어를 실행하면 된다 .
sudo rabbitmqctl list_exchanges
이 목록에는 몇 개의 amq.*
exchange와 기본(이름이 지정되지 않은) exchange가 있다.
이 exchange들은 기본적으로 생성되지만, 지금 당장 사용할 필요는 없을 것 같다.
Nameless Exchange
이전 튜토리얼에서는 exchange에 대해 전혀 알지 못했지만, 큐에 메시지를 보낼 수 있었다.
이는 빈 문자열( ""
)로 식별되는 기본 교환을 사용했기 때문에 가능했던 것이었다.
이전에 우리가 어떻게 메시지를 게시했는지 떠올려 보자.
channel.basicPublish("", "hello", null, message.getBytes());
첫 번째 매개변수는 exchange의 이름이다.
빈 문자열은 기본 교환 또는 이름이 없는 교환을 나타낸다.
메시지는 routingKey
로 지정된 이름의 대기열(존재하는 경우)로 라우팅된다.
이제 우리는 명명된 exchange에 게시할 수 있게 됐다.
channel.basicPublish( "logs", "", null, message.getBytes());
임시 큐
이전에는 특정 이름(hello
, task_queue
)을 가진 큐를 사용했던 것을 기억할 것이다.
큐에 이름을 붙일 수 있다는 것은 우리에게 매우 중요했다(워커들이 같은 큐를 가리키도록 해야 했기 때문).
프로듀서와 컨슈머 간에 큐를 공유하려면 큐에 이름을 지정하는 것이 중요하다.
하지만 우리가 방금 선언한 로거는 그렇지 않다.
우리는 일부 로그 메시지만이 아니라 모든 로그 메시지를 수신하고 싶다.
또한, 이전 메시지에는 없는 현재 전송 중인 메시지에만 관심이 있다.
이 문제를 해결하려면 두 가지가 필요하다.
- 첫째, Rabbit에 연결할 때마다 비어 있는 새 대기열이 필요하다.
- 이를 위해 무작위 이름을 가진 대기열을 생성하거나
- 더 나은 방법으로 서버가 무작위 대기열 이름을 자동으로 선택하도록 할 수 있다.
- 둘째, 소비자와의 연결을 끊으면 대기열이 자동으로 삭제되어야 한다.
Java 클라이언트에서 queueDeclare()
에 매개변수를 제공하지 않으면
- 알아서 생성된 이름을 갖는
- 비영구적이고
- 배타적이며
- 자동 삭제 되는
대기열을 생성한다.
String queueName = channel.queueDeclare().getQueue();
exclusive
플래그 및 기타 대기열 속성 에 대한 자세한 내용은 대기열 가이드 에서 확인할 수 있다 .
위 queueName 변수에는 임의의 큐 이름이 들어갈 것이다.
바인딩
방금 우리는 팬아웃 exchange와 큐를 만들었다.
이제 exchange에 큐로 메시지를 보내도록 설정해야 한다.
exchange와 큐 사이의 이러한 관계를 바인딩 이라고 한다 .
channel.queueBind(queueName, "logs", "");
이제부터 logs
exchange는 대기열에 메시지를 추가할 것이다.
바인딩 목록
위에서 exchange 리스트를 봤듯이, 바인딩의 리스트도 리스트업할 수 있다.
rabbitmqctl list_bindings
구현
로그 메시지를 내보내는 프로듀서 프로그램은 이전 튜토리얼과 크게 다르지 않다.
가장 중요한 변경 사항은 이제 이름이 없는 exchange 대신 자신의 exchange인 logs
에 메시지를 게시한다는 것이다.
메시지를 전송할 때는 routingKey
를 제공해야 하지만, fanout
exchange에서는 해당 값이 무시된다.EmitLog.java
프로그램 코드는 다음과 같다.
보다시피, 연결을 설정한 후 exchange를 선언했다.
존재하지 않는 echange에 게시하는 것은 금지되어 있으므로 이 단계는 필수이다.
아직 exchange에 큐가 연결되어 있지 않으면 메시지가 손실되지만 우리에게는 문제가 없다.
아직 수신하는 소비자가 없으면 메시지를 안전하게 삭제할 수 있다.
이제 결과를 살펴보자.
서로 다른 두개의 수신기에서 모두 잘 받은 것을 확인할 수 있었다.
주요 용어/메소드 정리
- 발행/구독 모델
- exchange
- exchange type
- exchange에 큐 바인딩
라우팅은 다음 장에서 다룬다.
본 내용은 RabbitMQ Tutorial - Pub-Sub를 참고하여 작성되었습니다.
RabbitMQ tutorial - Publish/Subscribe | RabbitMQ
<!--
www.rabbitmq.com
'WEB BE Repository > RabbitMQ' 카테고리의 다른 글
[RabbitMQ Java] RabbitMQ를 이용한 RPC (0) | 2025.09.08 |
---|---|
[RabbitMQ Java] Pub-Sub 3: 토픽 (0) | 2025.09.08 |
[RabbitMQ Java] Pub-Sub 2: 메시지의 라우팅, Direct Binding (0) | 2025.09.08 |
[RabbitMQ Java] Work Queue 직접 만들어보기와 메시지의 분배 방식 (0) | 2025.09.08 |
[RabbitMQ Java] 가장 간단한 프로듀서-컨슈머 사용해보기 (0) | 2025.09.07 |