WEB BE Repository/RabbitMQ

[RabbitMQ Java] RabbitMQ를 이용한 RPC

조금씩 차근차근 2025. 9. 8. 19:44

두 번째 튜토리얼에서 우리는 Work Queue를 사용하여 시간이 오래 걸리는 작업을 여러 워커(worker)에게 분산하는 방법을 배웠다.

 

하지만 어떤 함수를 원격 컴퓨터에서 실행하고 그 결과를 기다려야 한다면 어떨까?
이는 조금 다른 이야기가 된다.
이 패턴은 흔히 원격 프로시저 호출(Remote Procedure Call, RPC) 이라고 불린다.

 

 

이번 튜토리얼에서는 RabbitMQ를 사용하여 RPC 시스템(클라이언트와 확장 가능한 RPC 서버) 을 만들어보자.
분산할 만한 시간이 오래 걸리는 작업은 딱히 없으므로, 임의로 피보나치 수를 반환하는 단순한 RPC 서비스를 예제로 구현할 것이다.


클라이언트 인터페이스

RPC 서비스 사용 방식을 보여주기 위해 간단한 클라이언트 클래스를 만든다.
call이라는 메서드를 제공하여 RPC 요청을 보내고 응답을 받을 때까지 블로킹하도록 만들어두자.

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println("fib(4) is " + result);

RPC에 대한 주의사항

RPC는 흔히 사용되는 패턴이지만 비판도 많다.
가장 큰 문제는 개발자가 어떤 함수 호출이 로컬인지, 아니면 느린 RPC 호출인지 구분하지 못할 때 발생한다.
이런 혼란은 예측 불가능한 시스템을 만들고 디버깅을 복잡하게 만든다.
잘못 사용된 RPC는 소프트웨어를 단순하게 하기보다 유지보수 불가능한 스파게티 코드가 될 수 있다.

 

이를 방지하기 위한 조언은 다음과 같다.

  1. 어떤 함수가 로컬이고 어떤 함수가 원격인지 명확히 하라.
  2. 시스템을 문서화하고, 컴포넌트 간 의존성을 분명히 하라.
  3. 에러 상황을 처리하라. RPC 서버가 장시간 다운되면 클라이언트는 어떻게 대응해야 할까?
  4. 애매하면 RPC를 피하라. 가능하다면 비동기 파이프라인을 사용하라. RPC처럼 블로킹하지 않고, 결과를 다음 처리 단계로 비동기 전달하는 방식이 더 낫다.

콜백 큐 (Callback Queue)

RabbitMQ에서 요청-응답 패턴은 서버와 클라이언트 간의 단순한 상호작용으로 이루어진다.

  • 클라이언트는 요청 메시지를 보낸다.
  • 서버는 응답 메시지로 되돌려준다.

RPC는 결국 '응답'을 받아야 하기 때문에, 한 노드가 동시에 송신자/수신자가 된다.

 

응답을 받기 위해서는 요청과 함께 '콜백 큐(callback queue)' 이름을 보내야 한다.
이 큐는 서버가 이름을 정할 수도 있고, 클라이언트가 이름을 정할 수도 있다.
서버는 이 이름을 사용해 기본 exchange를 통해 응답을 보낸다.

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... 이후 callback_queue에서 응답 메시지를 읽는 코드 ...

여기서 우리는 새로운 import가 필요해졌다.

import com.rabbitmq.client.AMQP.BasicProperties;

메시지 속성 (Message Properties)

AMQP 0-9-1 프로토콜은 메시지와 함께 14개의 속성을 정의한다.
대부분은 잘 사용되지 않지만 다음 속성들은 자주 쓰이니 알아두자.

  • deliveryMode: 메시지를 지속성(persistent, 값=2) 으로 지정하거나 일시적(transient)으로 지정. (2번째 튜토리얼에서 다룸)
  • contentType: 인코딩의 MIME 타입 설명. 예: JSON을 쓸 경우 application/json으로 설정 권장.
  • replyTo: 콜백 큐 이름 지정.
  • correlationId: 요청과 응답을 매칭하기 위해 사용.

Correlation Id

매 요청마다 콜백 큐를 만드는 것은 비효율적이다.
대신 클라이언트당 하나의 콜백 큐를 만드는 것이 낫다.

 

문제는 응답이 그 큐에 들어올 때 어떤 요청에 대한 응답인지 구분하기 어렵다는 점이다.
여기서 correlationId 속성을 활용하면 좋다.
이 ID는 각 요청에 대해 고유한 값을 부여하고, 응답을 받을 때 이를 확인하여 요청과 매칭하는 방식으로 동작한다.

 

만약 알 수 없는 correlationId라면 안전하게 무시하면 된다.
이는 서버 측의 race condition 가능성 때문이다.
예를 들어, 서버가 응답은 보냈지만 요청 ack을 보내기 전에 죽어버렸다면, 재시작 후 같은 요청을 다시 처리할 수 있다.
따라서 클라이언트는 중복 응답을 허용하고, RPC는 가능하다면 멱등성(idempotent) 을 가져야 한다.


요약 (동작 방식)

  1. 클라이언트가 시작되면 고유한 콜백 큐를 생성한다.
  2. 요청 시 메시지에 reply_to (콜백 큐 이름)과 correlation_id (고유 값)를 포함시킨다.
  3. 요청은 rpc_queue로 전송된다.
  4. RPC 서버는 큐에서 요청을 받아 작업을 수행하고 결과를 reply_to 큐로 보낸다.
  5. 클라이언트는 응답 큐에서 메시지를 받고 correlationId를 확인하여 요청과 매칭한다.

직접 해보기 - 피보나치 예제

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

간단히 피보나치 수를 구하는 함수를 정의해 두었다. (큰 숫자에는 적합하지 않고, DP를 사용하지 않은 재귀 구현이다.)


RPC 서버 코드 (RPCServer.java)

package gogradually.rpc;  
import com.rabbitmq.client.*;  

import java.nio.charset.StandardCharsets;  

public class RPCServer {  

    public static final String RPC_QUEUE_NAME = "rpc_queue";  

    private static int fib(int n) {  
        if (n == 0) return 0;  
        if (n == 1) return 1;  
        return fib(n - 1) + fib(n - 2);  
    }  

    public static void main(String[] args) throws Exception {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  

        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  
        channel.queuePurge(RPC_QUEUE_NAME);  

        channel.basicQos(1);  

        System.out.println(" [x] Awaiting RPC requests");  

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()  
                    .correlationId(delivery.getProperties().getCorrelationId())  
                    .build();  

            String response = "";  
            try {  
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);  
                int n = Integer.parseInt(message);  

                System.out.println(" [.] fib("+ message + ")");  
                response += fib(n);  
            }catch (RuntimeException e){  
                System.out.println(" [.] " + e);  
            }finally {  
                channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));  
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
            }  
        };  

        channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}) );  
    }  
}
  1. 연결, 채널, 큐 선언
  2. 여러 서버 프로세스를 실행 가능 → 부하 분산을 위해 channel.basicQos에서 prefetchCount 설정 필요
  3. basicConsume을 사용해 큐에 접근, DeliverCallback 객체를 통해 응답 처리 및 전송
  • 참고: queuePurge는 큐 내부에 저장된 메시지를 모두 삭제하는 메소드이다.

RPC 클라이언트 코드 (RPCClient.java)

package gogradually.rpc;  

import com.rabbitmq.client.*;  

import java.io.IOException;  
import java.nio.charset.StandardCharsets;  
import java.util.UUID;  
import java.util.concurrent.CompletableFuture;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.TimeoutException;  

public class RPCClient implements AutoCloseable {  
    private Connection connection;  
    private Channel channel;  
    private String requestQueueName = "rpc_queue";  

    public RPCClient() throws IOException, TimeoutException {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("localhost");  

        connection = factory.newConnection();  
        channel = connection.createChannel();  
    }  

    public static void main(String[] args) {  
        try (RPCClient fibonacciRpc = new RPCClient()){  
            for(int i = 40; i<60; i++){  
                String i_str = Integer.toString(i);  
                System.out.println(" [x] Requesting fib(" + i_str + ")");  
                String response = fibonacciRpc.call(i_str);  
                System.out.println(" [.] Got '" + response + "'");  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  

    public String call(String message) throws IOException, ExecutionException, InterruptedException {  
        final String corrId = UUID.randomUUID().toString();  

        String replyQueueName = channel.queueDeclare().getQueue();  
        AMQP.BasicProperties props = new AMQP.BasicProperties  
                .Builder()  
                .correlationId(corrId)  
                .replyTo(replyQueueName)  
                .build();  
        // replyTo와 correlationId 를 포함한 요청 메시지 전송  
        channel.basicPublish("", requestQueueName, props, message.getBytes(StandardCharsets.UTF_8));  

        final CompletableFuture<String> response = new CompletableFuture<>();  

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {  
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
                response.complete(new String(delivery.getBody(), StandardCharsets.UTF_8));  
            }  
        }, consumerTag -> {  
        });  

        // 응답을 기다림  
        String result = response.get();  
        channel.basicCancel(ctag);  
        return result;  
    }  

    @Override  
    public void close() throws Exception {  
        connection.close();  
    }  
}
  1. 연결과 채널 생성
  2. call 메서드에서 실제 RPC 요청 수행
  3. 요청마다 고유한 correlationId 생성
  4. 전용 콜백 큐 생성 및 구독
  5. 요청 메시지 전송 (replyTo, correlationId 속성 포함)
  6. 응답을 기다림 (메인 스레드를 잠시 멈추기 위해 CompletableFuture 사용 가능)
  7. 응답의 correlationId 확인 후 매칭되면 반환

실행 결과, 결과가 잘 나오는 것을 확인할 수 있었다.


RPC의 장점과 고려사항

장점

  • 서버가 느리면 추가 서버 실행만으로 손쉽게 확장 가능
  • 클라이언트는 요청당 메시지 1회 송수신만 필요 (추가 네트워크 왕복 없음)

고려할 점

  • 서버가 아예 없을 경우 클라이언트는 어떻게 해야 하는가?
  • RPC에 타임아웃이 필요하지 않은가?
  • 서버에서 예외 발생 시 클라이언트로 전달해야 하는가?
  • 잘못된 입력 메시지(범위, 타입 등)는 어떻게 방어할 것인가?

RabbitMQ 관리 UI를 사용하면 큐 상태를 확인하는 데 도움이 될 수 있다.

용어 정리

  • RPC 의 정의
  • RPC는 동기 통신(요청-응답 모델)이다.
  • queuePurge: 큐 내부에 저장된 메시지 모두 삭제
  • Message Properties
    • deliveryMode
    • contentType
    • replyTo
    • correlationId