- 
                            
                            [RabbitMQ] 2. Work Queues, QosRabbitMQ 2019. 1. 1. 18:18처리에 시간이 걸리는 메시지가 집중될 때를 대비해서 여러 컨슈머에 작업을 분배해서 배달할 수 있는 큐를 만들어보자 작업이 나중에 끝나도록 스케줄 하는것 보다 작업을 메시지로 만들어 큐로 던진다. 이러한 방법은 http 방식으로 처리하기 복잡한 웹 프로그램 환경에서 유용하다. 기본 설정으로 MQ는 각 메시지를 다음 컨슈머에게 순차적으로 전달한다(라운드 로빈). 2개 이상 컨슈머를 실행시켜두고 여러 메시지를 보내니 순차적으로 핸들링 하는것을 확인 할 수 있었다. MQ는 메시지를 배달하는 즉시 해당 메시지에 삭제 마킹을 하는데 워커에서 작업을 처리중일 때 kill을 날리면 메시지를 유실한다. 메시지 유실을 방지하기 위해 MQ는 message acknowledgments 를 제공한다. 배달 후 메시지 삭제 마킹을 하기 전에 배달 한 메시지 처리가 완료되었다는 ack를 받고 삭제 마킹을 하면 메시지 유실을 방지 할 수 있다. 배달 후 ack가 오지않으면 어떠한 문제로 해당 메시지 작업 처리 실패로 간주하고 다른 워커에 다시 배달한다. 그런데 ack에 대한 time아웃은 따로 없다고 한다 --; ack로 컨슈머가 죽는 것에 대한 메시지 유실이 보장하기는 한데 작업을 처리하다가 잘못되어 아주 오래 걸리고 있으면 컨슈머가 죽지도 않고 그 메시지를 다시 보내지도 않을 것이다. autoAck를 true로 하면 basicAck를 안만들어줘도 자동으로 ack를 날리고, false로 하면 컨슈머에서 ack를 만들어서 직접 날려줘야한다. rpc(new Basic.Consume.Builder() 
 .queue(queue)
 .consumerTag(consumerTag)
 .noLocal(noLocal)
 .noAck(autoAck)
 .exclusive(exclusive)
 .arguments(arguments)
 .build(),
 k);- 메시지의 내구성 컨슈머가 죽었을때 메시지 유실은 보장되었지만, MQ서버가 죽으면 여전히 메시지는 유실된다. 유실을 막기위해서는 큐와 메시지 모두 필요한 설정이 있다. boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);* @param durable true if we are declaring a durable queue (the queue will survive a server restart) 이미 정의되어있는 큐 이름을 다른 파라미터로 queueDelare() 하면 에러가 난다. 또한 메시지를 생성할 때 MessageProperties.PERSISTENT_TEXT_PLAIN 옵션을 주어서 생성한다. channel.basicPublish("", TASK_QUEUE_NAME, 
 MessageProperties.PERSISTENT_TEXT_PLAIN,
 message.getBytes("UTF-8"));/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */ 
 public static final BasicProperties PERSISTENT_TEXT_PLAIN =
 new BasicProperties("text/plain",
 null,
 null,
 2,
 0, null, null, null,
 null, null, null, null,
 null, null);- Fair Dispatch MQ에서 컨슈머에 라운드 로빈으로 분배한다고는 했지만, 완전히 공정하게 배분된 것이 아니다. 홀수번째 메시지는 오래 걸리는 작업이고, 짝수번째 메시지는 빨리 끝나는 작업이라면 처음 작업을 받는 큐만 정체될 것이다. 이 문제를 해결하려면 컨슈머 채널에 다음 코드가 필요하다 int prefetchCount = 1; channel.basicQos(prefetchCount);컨슈머가 일을 1개 하고있으면, 다음 메시지를 또 보내지 말라는 설정이다. 그러면 MQ는 놀고있는 다른 컨슈머에게 메시지를 보낼 것이다. /** 
 * Request a specific prefetchCount "quality of service" settings
 * for this channel.
 *
 * @see #basicQos(int, int, boolean)
 * @param prefetchCount maximum number of messages that the server
 * will deliver, 0 if unlimited
 * @throws java.io.IOException if an error is encountered
 */
 void basicQos(int prefetchCount) throws IOException;basicQos(0, prefetchCount, false); /** Public API - {@inheritDoc} */ 
 @Override
 public void basicQos(int prefetchSize, int prefetchCount, boolean global)
 throws IOException
 {
 exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
 }Basic.Qos 커맨드를 만들어 요청하나보다. 참고 - https://rabbitmq.github.io/rabbitmq-java-client/api/current/ - https://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html 'RabbitMQ' 카테고리의 다른 글[RabbitMQ] 5. Topics, Topic(Pattern) Exchange (0) 2019.01.07 [RabbitMQ] 4. Routing, Direct Exchange (0) 2019.01.06 [RabbitMQ] 3. Publish/Subscribe, Fanout(Broadcasting) Exchange (0) 2019.01.05 [RabbitMQ] 1. Hello World, basicPublish/basicConsume (0) 2018.12.30