[RabbitMQ Java] Work Queue 직접 만들어보기와 메시지의 분배 방식
이번 튜토리얼에서는 시간이 많이 걸리는 작업을 여러 작업자 간에 분배하는 데 사용할 Work Queue(작업 큐)를 만들어 보자.
Work Queue(Task Queue)의 핵심 아이디어는 리소스 소모가 많은 작업을 즉시 실행하고 완료될 때까지 기다리는 것을 방지하는 것이다.
- 먼저, 우리는 이어질 작업이 나중에 수행되도록 작업을 예약시킨다.
- 또한 우리는 그 작업을 메시지로 캡슐화하여 큐에 보낼 것이다.
- 백그라운드에서 실행되는 worker 프로세스가 각자 작업을 팝(pop)하고 최종적으로 작업을 실행한다.
- 여러 worker를 실행하면 작업이 작업자 간에 공유되도록 만든다.
이 개념은 짧은 HTTP 요청 창 동안 복잡한 작업을 처리하는 것이 불가능한 웹 애플리케이션에서 특히 유용하다.
준비 작업
이 튜토리얼의 이전 부분에서는 "Hello World!"가 포함된 메시지를 전송했었다.
[RabbitMQ Java] 가장 간단한 프로듀서-컨슈머 사용해보기
원래 RabbitMQ 는 할 생각이 없었고, Kafka를 해보려고 했는데Kafka Definitive Guide의 내용이 너무 방대해서 당장 애플리케이션을 만들기 위한 기술에는 부적합하다고 판단했다.따라서, RabbitMQ를 공부해
dev.go-gradually.me
이제 본격적으로 복잡한 작업을 나타내는 문자열을 전송해보자.
- 이미지 크기를 조정하거나 PDF 파일을 렌더링하는 것과 같은 실제 작업은 없으므로,
Thread.sleep()
함수를 사용하여 바쁜 척하는 것으로 가정하자. - 문자열의 점 개수를 복잡성으로 간주하자.
- 각 점은 1초의 "작업"을 나타낸다.
- 예를 들어,
Hello...
로 설명된 가짜 작업은 3초가 걸린다.
이전 예제의 Send.java 코드를 약간 수정하여 명령줄에서 임의의 메시지를 전송할 수 있도록 해보자.
이 프로그램은 작업 대기열에 작업을 예약하므로, 이름을 NewTask.java
로 지정했다 .
기존 Recv.java 프로그램도 몇 가지 변경이 필요하다.
메시지 본문의 모든 점(dot)에 대해 1초의 작업을 가짜로 처리해야 한다.
전달된 메시지를 처리하고 해당 작업을 수행하므로, 이름을 Worker.java
로 지정하자 .
위에서 사용된 doWork 메소드는 다음과 같다.
이제 프로그램을 실행해보자.
먼저 Worker를 실행해보자.
이는 그냥 실행시켜주면 된다.
이제 새 작업을 건내줘보자.
실행하기 전에, 메시지를 실행할 때 인수로 넘겨줘야 하므로, 실행 구성을 편집을 수행해야 한다.
위에서 '구성 편집'을 선택하자.
이후 프로그램 인수에 메시지 "Hello....."를 입력한다.
이 코드는 워커가 작업을 수행하는 데 5초가 걸릴 것을 암시한다.
이제 이 NewTask를 실행하면, 아래와 같이 워커에서 5초가 걸린 뒤 작업이 완료되는 모습을 확인할 수 있다.
라운드 로빈 디스패칭
Work Queue를 사용하면 작업을 쉽게 병렬화할 수 있다는 장점이 있다.
작업 백로그를 구축하는 경우, 작업자를 추가하기만 하면 쉽게 확장할 수 있다.
먼저, 두 개의 워커 인스턴스를 동시에 실행해 보자.
두 인스턴스 모두 큐에서 메시지를 받게 되는데, 정확히 어떻게 되는 걸까? 알아보자.
우리는 세 개의 콘솔을 열어야 한다.
두 개는 워커 프로그램을 실행할 것이다.
이 두 콘솔은 C1과 C2라는 두 개의 소비자가 된다.
다음과 같이 좌측 상단의 '새 구성 추가'를 이용해, 병렬로 수행할 새로운 Worker를 생성하고 실행해보자.
다음과 같이 두 개의 워커를 동시에 실행시켰다.
세 번째 단계에서는 새로운 작업을 게시해보자.
각 워커에 어떠한 메시지들이 전달됐는지 확인해보자.
기본적으로 RabbitMQ는 각 메시지를 다음 컨슈머에게 순서대로 전송한다.
평균적으로 모든 컨슈머는 같은 수의 메시지를 받게 된다.
이러한 메시지 분배 방식을 라운드 로빈이라고 한다.
메시지 확인
작업 수행에는 몇 초가 걸릴 수 있다.
컨슈머가 긴 작업을 시작했는데 완료되기 전에 종료되면 어떻게 될지 궁금할 수 있다.
- 현재 코드에서는 RabbitMQ가 컨슈머에게 메시지를 전달하면 즉시 삭제 대상으로 표시한다.
- 이 경우 워커를 종료하면 처리 중이던 메시지가 손실된다.
- 해당 워커로 전송되었지만 아직 처리되지 않은 메시지도 손실된다.
하지만 일반적으로 우리는 어떤 작업도 손실되는 것은 원치 않는다.
워커가 죽으면 다른 워커에게 작업이 전달되도록 하고 싶다.
RabbitMQ는 메시지 손실 방지를 위해 메시지 확인(acknowledgement)을 지원한다.
소비자는 특정 메시지가 수신 및 처리되었으며 RabbitMQ가 해당 메시지를 삭제할 수 있음을 알리기 위해 확인 메시지를 보낸다.
컨슈머가 ack를 전송하지 않고 종료(채널이 닫히거나, 연결이 끊어지거나, TCP 연결이 끊어짐)되면 RabbitMQ는 메시지가 완전히 처리되지 않았음을 인식하고 다시 큐에 넣는다.
다른 컨슈머가 동시에 온라인 상태인 경우, RabbitMQ는 해당 메시지를 다른 컨슈머에게 신속하게 다시 전달한다.
이렇게 하면 워커가 가끔 종료되더라도 메시지가 손실되지 않도록 할 수 있다.
컨슈머 전달 확인에는 시간 제한(기본 30분)이 적용된다.
이는 전달을 확인하지 않는 버그가 있는(멈춘) 컨슈머를 감지하는 데 도움이 된다.
전달 확인 시간 제한 에 설명된 대로 사용자(우리)는 이 시간 제한을 늘릴 수 있다.
수동 메시지 확인은 기본적으로 켜져 있다.
이전 예제에서는 autoAck=true
플래그를 통해 명시적으로 해제했다.
작업이 완료되면 플래그를 false
로 설정 하고 워커로부터 적절한 확인 메시지를 보내야 한다.
이 코드를 사용하면 메시지를 처리하는 도중 CTRL+C를 사용하여 워커를 종료하더라도 아무 것도 손실되지 않도록 할 수 있다.
워커가 종료된 직후, 확인되지 않은 모든 메시지는 RabbitMQ에 의해 다시 전달되게 된다.
확인은 전달을 수신한 것과 동일한 채널로 전송해야 한다. 다른 채널을 사용하여 확인하려고 하면 채널 수준 프로토콜 예외가 발생한다.
자세한 내용은 확인 관련 문서 가이드를 확인하세요.
참고) 잊어버린 Ack
basicAck
을(를) 놓치는 것은 흔한 실수이다 .
간단한 오류이지만, 그 결과는 심각하다.
클라이언트가 종료되면 메시지가 재전송되지만(무작위 재전송처럼 보일 수 있음), RabbitMQ는 확인되지 않은 메시지를 해제할 수 없기 때문에 점점 더 많은 메모리를 소모하게 된다.(메모리 누수)
이런 종류의 실수를 디버깅하려면 필드 messages_unacknowledged
를 인쇄하는 데 rabbitmqctl
를 사용할 수 있다.
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windows에서는 sudo를 삭제하고 사용한다.
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
메시지 지속성 (Message Durability)
우리는 방금 컨슈머가 종료되더라도 작업이 손실되지 않도록 하는 방법을 배웠다.
하지만 RabbitMQ 서버가 중단되면 작업은 여전히 손실되고 있다.
RabbitMQ가 종료되거나 충돌하면 사용자가 지정하지 않는 한 큐와 메시지가 삭제된다.
메시지가 손실되지 않도록 하려면 두 가지가 필요하다.
- 큐가 지속 가능(durable)해야 한다.
- 메시지가 지속 가능(durable)해야 한다.
여기서 durable은 ACID의 D와 같은 의미로, 영속적인 저장소에 저장된 듯한 효과가 있어야 한다는 뜻이다.
먼저, RabbitMQ 노드 재시작 후에도 큐가 유지되는지 확인해야 한다.
이를 위해 큐를 durable 로 선언해야 한다 .
이 코드 자체는 올바르지만, 현재 설정에서는 작동하지 않는다.
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
이는 이미 영구적이지 않은 hello
큐를 정의했기 때문이다.
RabbitMQ는 기존 큐를 다른 매개변수로 재정의하는 것을 허용하지 않으며, 이를 시도하는 모든 프로그램에 오류를 반환한다.
하지만 간단한 해결 방법이 있다. 예를 들어, task_queue
라는 다른 이름으로 큐를 선언해 보자.
이 queueDeclare
변경 사항은 생산자 코드와 소비자 코드 모두에 적용되어야 한다.
이 시점에서 RabbitMQ가 재시작되더라도 task_queue
큐가 손실되지 않을 것이라는 확신이 생겼다.
이제 메시지를 영구 메시지로 표시해야 한다.
MessageProperties
(BasicProperties
를 구현하는 속성) 값을PERSISTENT_TEXT_PLAIN
로 설정하자 .
이 옵션을 키게 되면, RabbitMQ가 재시작하더라도 메시지는 유지된다.
메시지 지속성
메시지를 Persistant로 표시한다고 해서 메시지 손실이 완전히 배제되진 않는다. RabbitMQ가 메시지를 수신하고 디스크에 fsync로 내리기 전의 짧은 구간에는 손실 가능성이 있다(모든 메시지마다 fsync를 하지는 않는다).
또한 RabbitMQ는 모든 메시지에 대해 영구성을 보장하지 않는다.(캐시에만 저장되고 실제로 디스크에 기록되지 않을 수도 있다.)
영구성 보장은 강력하지는 않지만, 간단한 작업 대기열에는 충분하다. 더 강력한 보장이 필요하다면 publisher confirms 를 사용할 수 있다 .
공정한 배송
디스패칭이 완벽하게 '공정'을 보장하지는 않는다.
예를 들어, 두 개의 워커가 있는 상황에서 모든 홀수 메시지는 많고 짝수 메시지는 적은 경우, 한 워커는 계속 바쁘고 다른 워커는 거의 작업을 하지 않게 된다.
RabbitMQ는 이러한 상황을 전혀 인지하지 못하기 때문에 메시지를 균등하게 디스패칭한다.
이는 RabbitMQ가 메시지가 큐에 들어올 때만 메시지를 전송하기 때문에 발생한다.
컨슈머의 확인되지 않은 메시지 수를 확인하지 않고, n번째 메시지를 n번째 컨슈머에게 무조건 전송하기 때문이다.
이 문제를 해결하려면 basicQos
메서드를 prefetchCount=1
매개변수와 함께 사용할 수 있다.
이 설정은 RabbitMQ가 워커에게 한 번에 두 개 이상의 메시지를 전송하지 않도록 한다.
다시 말해, 이전 메시지를 처리하고 확인 응답을 받을 때까지 워커에게 새 메시지를 전송하지 않는다.
대신, 아직 사용 중이 아닌 다음 워커에게 메시지를 전송한다.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
큐 크기에 주의하자!
모든 작업자가 바쁘면 대기열이 가득 찰 수 있다.
이 부분을 주의 깊게 살펴보고 문제가 발생할 것 같으면 작업자를 추가하거나 다른 전략을 세우는 것이 좋다.
Channel
메소드와MessageProperties
에 대한 자세한 내용은 온라인 JavaDocs를 탐색보길 바란다 .
주요 용어/ 메소드 정리
- 기본적으로 Round-Robbin 방식으로 동작한다.
- basicConsume - autoAck
- Ack 메시지를 잊지 말자.
- Message Durability
channel.basicQos(prefetchCount=1)
본 내용은 RabbitMQ Tutorial - Work Queues를 참고하여 작성되었습니다.
RabbitMQ tutorial - Work Queues | RabbitMQ
<!--
www.rabbitmq.com