ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [RabbitMQ] 2. Work Queues, Qos
    RabbitMQ 2019. 1. 1. 18:18

    처리에 시간이 걸리는 메시지가 집중될 때를 대비해서  여러 컨슈머에 작업을 분배해서 배달할 수 있는 큐를 만들어보자

    작업이 나중에 끝나도록 스케줄 하는것 보다 작업을 메시지로 만들어 큐로 던진다.

    이러한 방법은 http 방식으로 처리하기 복잡한 웹 프로그램 환경에서 유용하다.

    기본 설정으로 MQ는 각 메시지를 다음 컨슈머에게 순차적으로 전달한다(라운드 로빈). 


    2개 이상 컨슈머를 실행시켜두고 여러 메시지를 보내니 순차적으로 핸들링 하는것을 확인 할 수 있었다.


    MQ는 메시지를 배달하는 즉시 해당 메시지에 삭제 마킹을 하는데 워커에서 작업을 처리중일 때 kill을 날리면 메시지를 유실한다. 메시지 유실을 방지하기 위해 MQ는 message acknowledgments 를 제공한다. 배달 후 메시지 삭제 마킹을 하기 전에 배달 한 메시지 처리가 완료되었다는 ack를 받고 삭제 마킹을 하면 메시지 유실을 방지 할 수 있다. 배달 후 ack가 오지않으면 어떠한 문제로 해당 메시지 작업 처리 실패로 간주하고 다른 워커에 다시 배달한다.


    그런데 ack에 대한 time아웃은 따로 없다고 한다 --; ack로 컨슈머가 죽는 것에 대한 메시지 유실이 보장하기는 한데 작업을 처리하다가 잘못되어 아주 오래 걸리고 있으면 컨슈머가 죽지도 않고 그 메시지를 다시 보내지도 않을 것이다.


    autoAck를 true로 하면 basicAck를 안만들어줘도 자동으로 ack를 날리고, false로 하면 컨슈머에서 ack를 만들어서 직접 날려줘야한다.

    rpc(new Basic.Consume.Builder()
    .queue(queue)
    .consumerTag(consumerTag)
    .noLocal(noLocal)
    .noAck(autoAck)
    .exclusive(exclusive)
    .arguments(arguments)
    .build(),
    k);


    - 메시지의 내구성

    컨슈머가 죽었을때 메시지 유실은 보장되었지만, MQ서버가 죽으면 여전히 메시지는 유실된다. 유실을 막기위해서는 큐와 메시지 모두 필요한 설정이 있다.

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);
    * @param durable true if we are declaring a durable queue (the queue will survive a server restart)

    이미 정의되어있는 큐 이름을 다른 파라미터로 queueDelare() 하면 에러가 난다.


    또한 메시지를 생성할 때 MessageProperties.PERSISTENT_TEXT_PLAIN 옵션을 주어서 생성한다.

    channel.basicPublish("", TASK_QUEUE_NAME,
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes("UTF-8"));
    /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
    public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    new BasicProperties("text/plain",
    null,
    null,
    2,
    0, null, null, null,
    null, null, null, null,
    null, null);


    - Fair Dispatch

    MQ에서 컨슈머에 라운드 로빈으로 분배한다고는 했지만, 완전히 공정하게 배분된 것이 아니다. 홀수번째 메시지는 오래 걸리는 작업이고, 짝수번째 메시지는 빨리 끝나는 작업이라면 처음 작업을 받는 큐만 정체될 것이다.

    이 문제를 해결하려면 컨슈머 채널에 다음 코드가 필요하다

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);

    컨슈머가 일을 1개 하고있으면, 다음 메시지를 또 보내지 말라는 설정이다. 그러면 MQ는 놀고있는 다른 컨슈머에게 메시지를 보낼 것이다.

    /**
    * Request a specific prefetchCount "quality of service" settings
    * for this channel.
    *
    * @see #basicQos(int, int, boolean)
    * @param prefetchCount maximum number of messages that the server
    * will deliver, 0 if unlimited
    * @throws java.io.IOException if an error is encountered
    */
    void basicQos(int prefetchCount) throws IOException;
    basicQos(0, prefetchCount, false);
    /** Public API - {@inheritDoc} */
    @Override
    public void basicQos(int prefetchSize, int prefetchCount, boolean global)
    throws IOException
    {
    exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
    }

    Basic.Qos 커맨드를 만들어 요청하나보다.


    참고

    - https://rabbitmq.github.io/rabbitmq-java-client/api/current/

    https://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html








    댓글

Designed by Tistory.