-
[RabbitMQ] 2. Work Queues, QosRabbitMQ 2019. 1. 1. 18:18
처리에 시간이 걸리는 메시지가 집중될 때를 대비해서 여러 컨슈머에 작업을 분배해서 배달할 수 있는 큐를 만들어보자
작업이 나중에 끝나도록 스케줄 하는것 보다 작업을 메시지로 만들어 큐로 던진다.
이러한 방법은 http 방식으로 처리하기 복잡한 웹 프로그램 환경에서 유용하다.
기본 설정으로 MQ는 각 메시지를 다음 컨슈머에게 순차적으로 전달한다(라운드 로빈).
2개 이상 컨슈머를 실행시켜두고 여러 메시지를 보내니 순차적으로 핸들링 하는것을 확인 할 수 있었다.
MQ는 메시지를 배달하는 즉시 해당 메시지에 삭제 마킹을 하는데 워커에서 작업을 처리중일 때 kill을 날리면 메시지를 유실한다. 메시지 유실을 방지하기 위해 MQ는 message acknowledgments 를 제공한다. 배달 후 메시지 삭제 마킹을 하기 전에 배달 한 메시지 처리가 완료되었다는 ack를 받고 삭제 마킹을 하면 메시지 유실을 방지 할 수 있다. 배달 후 ack가 오지않으면 어떠한 문제로 해당 메시지 작업 처리 실패로 간주하고 다른 워커에 다시 배달한다.
그런데 ack에 대한 time아웃은 따로 없다고 한다 --; ack로 컨슈머가 죽는 것에 대한 메시지 유실이 보장하기는 한데 작업을 처리하다가 잘못되어 아주 오래 걸리고 있으면 컨슈머가 죽지도 않고 그 메시지를 다시 보내지도 않을 것이다.
autoAck를 true로 하면 basicAck를 안만들어줘도 자동으로 ack를 날리고, false로 하면 컨슈머에서 ack를 만들어서 직접 날려줘야한다.
rpc(new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build(),
k);- 메시지의 내구성
컨슈머가 죽었을때 메시지 유실은 보장되었지만, MQ서버가 죽으면 여전히 메시지는 유실된다. 유실을 막기위해서는 큐와 메시지 모두 필요한 설정이 있다.
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
이미 정의되어있는 큐 이름을 다른 파라미터로 queueDelare() 하면 에러가 난다.
또한 메시지를 생성할 때 MessageProperties.PERSISTENT_TEXT_PLAIN 옵션을 주어서 생성한다.
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);- Fair Dispatch
MQ에서 컨슈머에 라운드 로빈으로 분배한다고는 했지만, 완전히 공정하게 배분된 것이 아니다. 홀수번째 메시지는 오래 걸리는 작업이고, 짝수번째 메시지는 빨리 끝나는 작업이라면 처음 작업을 받는 큐만 정체될 것이다.
이 문제를 해결하려면 컨슈머 채널에 다음 코드가 필요하다
int prefetchCount = 1; channel.basicQos(prefetchCount);
컨슈머가 일을 1개 하고있으면, 다음 메시지를 또 보내지 말라는 설정이다. 그러면 MQ는 놀고있는 다른 컨슈머에게 메시지를 보낼 것이다.
/**
* Request a specific prefetchCount "quality of service" settings
* for this channel.
*
* @see #basicQos(int, int, boolean)
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchCount) throws IOException;basicQos(0, prefetchCount, false);
/** Public API - {@inheritDoc} */
@Override
public void basicQos(int prefetchSize, int prefetchCount, boolean global)
throws IOException
{
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
}Basic.Qos 커맨드를 만들어 요청하나보다.
참고
- https://rabbitmq.github.io/rabbitmq-java-client/api/current/
- https://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html
'RabbitMQ' 카테고리의 다른 글
[RabbitMQ] 5. Topics, Topic(Pattern) Exchange (0) 2019.01.07 [RabbitMQ] 4. Routing, Direct Exchange (0) 2019.01.06 [RabbitMQ] 3. Publish/Subscribe, Fanout(Broadcasting) Exchange (0) 2019.01.05 [RabbitMQ] 1. Hello World, basicPublish/basicConsume (0) 2018.12.30