ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [RabbitMQ] 3. Publish/Subscribe, Fanout(Broadcasting) Exchange
    RabbitMQ 2019. 1. 5. 18:07

    이번에는 같은 메시지를 여러 컨슈머에 배달하는 법을 알아본다. publish/subscribe 패턴이라고 한다.

    예시 상황으로 로그 메시지를 여러 컨슈머에 발송한다. 컨슈머 A는 로그를 콘솔 출력하고, 컨슈머 B는 디스크에 파일로 저장한다.


    * Wrap-Up

    • producer is a user application that sends messages.
    • queue is a buffer that stores messages.
    • consumer is a user application that receives messages.


    - Exchange

    무슨 뜻인지 정확하게 몰랐는데 드디어 설명이 나온다;

    RabbitMQ의 메시징 모델에서 핵심 아이디어는 producer는 어떤 메시지도 직접 queue로 메시지를 보내지는 않는다는 것이다. 

    사실, producer는 메시지가 어떤 consumer에 배달되는지도 모른다.

    대신에 exchange로만 메시지를 보낸다. 한쪽은 producer로부터 메시지를 받고, 다른쪽은 큐로 받은 메시지를 전달한다. 때문에 exchage는 받은 메시지를 어떻게 할지 결정할 수 있어야 한다. 특정 큐로 전달할지, 여러 큐로 보낼지, 버릴지 등등을 선택해야하는데, 이 규칙은 exchage type에 따라 정해진다.


    directtopicheadersfanout 4가지 타입이 있다는데 예제에서는 우선 fanout을 사용한다.

    channel.exchangeDeclare("logs", "fanout");

    fanout type은 이름대로 수신한 모든 메시지를 알고 있는 모든 큐에 Broadcast 한다. Enum이 없고 왜 String으로 넘기는지 잘 모르겠다--;


    툴을 사용해서 현재 등록되어있는 exchange를 볼 수 있다.

    rabbitmqctl list_exchanges
    channel.basicPublish("", "hello", null, message.getBytes());

    이전 예제들에서는 "" 기본 exchage를 사용했었다.


    - Temporary Queue

    큐마다 이름을 지정하는데 producer와 consumer가 같은 큐를 사용하기 위해 이름을 관리하는것이 매우 중요하다. 하지만 이번 로거 예제의 경우, 모든 메시지를 모든 큐에 전달해야한다. 또한, 현재 발생하고 있는 메시지만 처리하면 된다. 이를위해 producer가 MQ에 연결 할 때 마다 비어있는 새로운 큐가 필요한데 큐를 만들 때 마다 random 이름을 가진 큐를 만들거나 서버가 그런 이름을 만들어 줄 수도 있다. 또한, consumer가 연결을 해제하면 큐가 자동으로 삭제되어야한다.

    String queueName = channel.queueDeclare (). getQueue ();

    이렇게 파라미터에 아무것도 없이 정의하면 non-durable, exclusive, autodelete queue with a generated name(random name;amq.gen-JzTY20BRgKO-HjmUJj0wLg) 을 만들 수 있다고 한다.



    execlusive queue는 다음에 다시..;


    - Binding

    channel.queueBind(queueName, "logs", "");
    
    logs exchage에게 메시지를 받으면 queueName이라는 큐로 메시지를 보내라고 설정한다.

    /**
    * Bind a queue to an exchange, with no extra arguments.
    * @param queue the name of the queue
    * @param exchange the name of the exchange
    * @param routingKey the routing key to use for the binding
    * @return a binding-confirm method if the binding was successfully created
    */
    Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

    routingKey는 basicPublish에서 routingKey에 QUEUE_NAME을 썻는데 뭔지 잘 모르겠다; 
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

    툴을 사용해 바인딩 목록도 볼 수 있다.

    rabbitmqctl list_bindings

    - Put it all together

    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

    이제는 MQ의 구조에 맞게 정의한 exchange로 메시지를 publish한다. 이전과 다르게 큐를 정의하지 않고, 임시 큐를 사용한다. 발송할때는 항상 routing_key를 줘야한다고 하는데 "fanout"  exchange 에서는 무시된다고 한다. consumer 프로그램이 동작하면 임의의 큐를 만들고 같은 exchage에 바인딩을 연결 할 것이다.

    댓글

Designed by Tistory.