[RabbitMQ Java] 가장 간단한 프로듀서-컨슈머 사용해보기
원래 RabbitMQ 는 할 생각이 없었고, Kafka를 해보려고 했는데
Kafka Definitive Guide의 내용이 너무 방대해서 당장 애플리케이션을 만들기 위한 기술에는 부적합하다고 판단했다.
따라서, RabbitMQ를 공부해보고자 한다.
RabbitMQ는 메시지를 수신하고 전달하는 메시지 브로커이다.
간단하게 우체국에 비유해 보자.
우편물을 우체통에 넣으면 우편 배달부가 결국 수신자에게 우편물을 배달할 것이라고 확신할 수 있다.
이 비유에서 RabbitMQ는 우체통, 우체국, 그리고 우편 배달부이다.
RabbitMQ와 우체국의 주요 차이점은 RabbitMQ는 종이를 다루지 않고 대신 이진 데이터 블롭(메시지)을 수용, 저장, 전달한다는 점이다.
RabbitMQ와 메시징 전반에서는 몇 가지 전문 용어를 사용한다.
- 생산은 단순히 보내는 것을 의미한다. 메시지를 보내는 프로그램은 생산자(Producer) 이다 .
- 큐는 RabbitMQ의 포스트 박스를 지칭한다.
메시지는 RabbitMQ와 애플리케이션을 통해 전달되지만, 안에만 저장할 수 있다 .
큐는 호스트의 메모리 및 디스크 용량 제한에 의해서만 제한되며, 본질적으로 대용량 메시지 버퍼이다.- 여러 생산자가 하나의 대기열로 이동하는 메시지를 보낼 수 있으며,
여러 소비자가 하나의 대기열 에서 데이터를 수신하려고 시도할 수 있다. - 대기열을 표현하는 방법은 다음과 같다.
- 여러 생산자가 하나의 대기열로 이동하는 메시지를 보낼 수 있으며,
- 소비는 수신과 비슷한 의미를 갖는다. 소비자(Consumer)는 주로 메시지 수신을 기다리는 프로그램이다.
프로듀서, 컨슈머, 브로커가 반드시 같은 호스트에 상주할 필요는 없다.
실제로 대부분의 애플리케이션에서는 그렇지 않다.
즉, 애플리케이션은 프로듀서이면서 동시에 컨슈머일 수도 있다.
이제 본격적으로 RabbitMQ를 사용해보자.
주의: RabbitMQ가 설치 및 서비스로 실행이 된 상태라고 가정합니다.
Hello, world!
Java 클라이언트 사용
이 튜토리얼의 이 부분에서는 Java로 두 가지 프로그램을 작성한다.
프로듀서는 단일 메시지를 전송하고, 컨슈머는 메시지를 수신하여 출력한다.
Java API의 세부 사항은 간략하게 설명하고, 시작하기 위해 아주 간단한 부분에 집중하겠다.
메시징의 "Hello World"를 시작해보자.
아래 다이어그램에서 "P"는 프로듀서이고 "C"는 컨슈머이다.
가운데 상자는 큐, 즉 RabbitMQ가 컨슈머를 대신하여 보관하는 메시지 버퍼이다.
Java 클라이언트 라이브러리
RabbitMQ는 여러 프로토콜을 지원한다.
이 튜토리얼에서는 메시징을 위한 개방형 범용 프로토콜인 AMQP 0-9-1을 사용한다.
RabbitMQ용 클라이언트는 다양한 언어 로 제공되며,
여기에서는 RabbitMQ에서 제공하는 Java 클라이언트를 사용한다.
Clients Libraries and Developer Tools | RabbitMQ
<!--
www.rabbitmq.com
먼저 다음과 같이 프로젝트를 생성해준다.
이후 gradle에 다음 의존성을 추가해주었다.
rabbitmq는 내부적으로 slf4j와 같은 로깅 도구를 함께 사용하기에, 함께 의존성을 추가해주었다.
이제 Java 클라이언트와 종속성이 있으므로 코드를 작성할 수 있다.
Sending
우린 메시지 발행자(송신자)Send
와 메시지 소비자(수신자) Recv
를 호출해볼 것이다.
발행자는 RabbitMQ에 연결하여 단일 메시지를 전송한 후 종료할 것이다.
Send.java
에서 우리는 몇몇 클래스를 가져와야 한다.
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
클래스를 설정하고 큐 이름을 지정하자.
그러면 psvm에서 서버에 연결을 생성할 수 있게 된다.
연결은 소켓 연결을 추상화하고 프로토콜 버전 협상, 인증 등을 처리한다.
여기서는 로컬 머신의 RabbitMQ 노드에 연결하기 때문에, localhost 를 사용한다.
다른 머신의 노드에 연결하려면 호스트 이름이나 IP 주소만 지정하면 된다.
다음으로, 대부분의 작업을 처리하는 API가 있는 채널을 만들 것이다.
Connection과 Channel 모두 java.lang.AutoClosable을 구현하고 있기 때문에, try-with-resources 문과 implements 문 모두 사용할 수 있다.
생산자가 메시지를 보내려면 보낼 큐를 선언해야 한다.
그런 다음 큐에 메시지를 게시할 수 있다.
이 모든 작업은 try-with-sources 문에서 수행된다.
보내기를 위해, 우리는 큐를 먼저 선언하고, 메시지를 큐를 향해 발행할 수 있다.
이 모든 작업은 try-with-resources 문 내에서 실행된다.
여기서 큐를 선언하는 것은 멱등성을 지닌다.
즉, 큐가 존재하지 않는 경우에만 생성된다.
메시지 내용은 바이트 배열이므로 원하는 대로 인코딩할 수 있다.
Receiving
Publisher에 대한 내용은 여기까지이다.
Consumer는 RabbitMQ에서 메시지를 수신 대기한다.
따라서 단일 메시지를 발행하는 Publisher와 달리, Consumer는 메시지를 수신 대기하고 출력하기 위해 계속 실행된다.
코드(in Recv.java
)는 Send
와 거의 동일한 import를 갖는다 .
DeliverCallback
는 서버에서 우리에게 푸시된 메시지를 버퍼링하는 데 사용할 추가 인터페이스이다.
셋업은 퍼블리셔와 동일하다.
연결과 채널을 열고, 소비할 큐를 선언한다.
이 큐는 send
가 선언한 큐와 일치한다.
여기서도 큐를 선언한다. 게시자보다 먼저 컨슈머를 시작할 수 있으므로, 메시지를 소비하기 전에 큐가 존재하는지 확인해야 한다.
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
는 브로커(해당 vhost)에 이름이QUEUE_NAME
인 큐가 없으면 생성하고, 이미 같은 속성( durable/exclusive/autoDelete/args )으로 존재하면 아무 일도 안 하는 선언이다.- 만약 같은 이름의 큐가 이미 다른 속성으로 존재한다면,
PRECONDITION_FAILED (406)
로 채널이 닫힌다. - 따라서 여러 곳에서 반복 호출해도 중복 큐가 생기지 않으며(이름이 키), “한 번만 만들기”를 보장하려고 별도 싱글톤 패턴을 쓸 필요는 없다.
try-with-resource 문을 사용하여 채널과 연결을 자동으로 닫는 건 어떨까?
그렇게 하면 프로그램이 계속 진행되면서 모든 것을 닫고 종료될 것이다.
이렇게 하면 소비자가 메시지 도착을 비동기적으로 기다리는 동안 프로세스가 계속 실행되어야 하므로 불편할 것이다.
서버에 큐에서 메시지를 전달해 달라고 요청해보자.
서버는 메시지를 비동기적으로 푸시하기 때문에, 메시지를 사용할 준비가 될 때까지 버퍼링하는 객체 형태의 콜백을 제공해야 한다.
이것이 바로 DeliverCallback
서브클래스가 하는 일이다.
이제 Recv 를 실행해둔 뒤 Send를 실행하면, 다음과 같이 메시지를 받을 수 있다.