ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [RabbitMQ] 1. Hello World, basicPublish/basicConsume
    RabbitMQ 2018. 12. 30. 18:02

    - 소개

    RabbitMQ는 메시지 브로커이다. 메시지를 받아서 발송한다. 우체국으로 생각해도 된다.

    보내고싶은 메일을 우체통에 넣으면 우체부가 원하는 수신자에 배달을 보장한다.

    하지만 종이를 취급 하지는 않는다. 대신에 binary-data를 받아서 저장하고 발송한다.


    - 용어(Jargon)

    Producing(Sending) : 메시지를 발송하는 프로그램을 Producer 라고 한다.

    Queue(PostBox) : MQ 내부에 있는 우체국

    Consuming(Receiving) : 메시지를 받으려고 기다리는 프로그램을 Consumer 라고 한다.


    큐는 장비의 메모리 + 디스크 공간만 사용하며, 큰 메시지 버퍼라고 생각하면 된다.

    다수의 Producer가 하나의 Q로 메시지를 보낼 수 있고, 다수의 Consumer가 하나의 Q에서 데이터를 받을 수 있다.


    https://github.com/rabbitmq/rabbitmq-tutorials

    https://www.rabbitmq.com/tutorials/tutorial-one-java.html

    rabbitMq 설치하고 대충 따라 해보는 중..


    - Send

    1. ConnectionFactory.newConnection()

    2. Connection.createChannel()

    3. channel.queueDeclare()

    4. channel.basicPublish()

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

    - basicPublish 에 대해 좀더 자세히 알아보자

    /**
    * Publish a message.
    *
    * Publishing to a non-existent exchange will result in a channel-level
    * protocol exception, which closes the channel.
    *
    * @see com.rabbitmq.client.AMQP.Basic.Publish
    * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
    * @param exchange the exchange to publish the message to
    * @param routingKey the routing key
    * @param props other properties for the message - routing headers etc
    * @param body the message body
    * @throws java.io.IOException if an error is encountered
    */
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

    exchange : 뭐하는 녀석인지 잘 모르겠음(일단 "")

    routingKey : QUEUE_NAME

    BasicProperties : ??(일단 null)

    body : message.getBytes()


    - AMQP(Advanced Message Queuing Protocol) 

    인터페이스 하나씩 읽어보면 좋을듯 하다. 주석은 하나도 안달려있다 --;

    Basic에는 통신에 필요한 인터페이스가 있는듯 하다.

                      


    - AMQP.Basic.Publish

                


    Channel의 구현체는 AutorecoveringChannel, basicPublish를 실행하는 구현체는 RecoveryAwareChannelN(basicPublish, basicConsume은 ChannelN에 구현되어 있다) 이었다. 끝까지 가보면 다음과 같이 커맨드를 날리고 있는것을 확인.

    /** Set of currently unconfirmed messages (i.e. messages that have
    * not been ack'd or nack'd by the server yet. */
    private final SortedSet<Long> unconfirmedSet =
    Collections.synchronizedSortedSet(new TreeSet<Long>());

    /** Public API - {@inheritDoc} */
    @Override
    public void basicPublish(String exchange, String routingKey,
    boolean mandatory, boolean immediate,
    BasicProperties props, byte[] body)
    throws IOException
    {
    if (nextPublishSeqNo > 0) {
    unconfirmedSet.add(getNextPublishSeqNo());
    nextPublishSeqNo++;
    }
    if (props == null) {
    props = MessageProperties.MINIMAL_BASIC;
    }
    AMQCommand command = new AMQCommand(
    new Basic.Publish.Builder()
    .exchange(exchange)
    .routingKey(routingKey)
    .mandatory(mandatory)
    .immediate(immediate)
    .build(), props, body);
    try {
    transmit(command);
    } catch (IOException e) {
    metricsCollector.basicPublishFailure(this, e);
    throw e;
    }
    metricsCollector.basicPublish(this);
    }

    TreeSet에 seqNum추가, 파라미터로 받은 값을 세팅해서 AMQCommand를 만들어 transmit하고 마지막으로 통신 상태를 집계한다


    - transmit은 어떻게 하는가?

    /**
    * Protected; used instead of synchronizing on the channel itself,
    * so that clients can themselves use the channel to synchronize
    * on.
    */
    protected final Object _channelMutex = new Object();
    public void transmit(AMQCommand c) throws IOException {
    synchronized (_channelMutex) {
    ensureIsOpen();
    quiescingTransmit(c);
    }
    }
    public void ensureIsOpen()
    throws AlreadyClosedException
    {
    if (!isOpen()) {
    throw new AlreadyClosedException(getCloseReason());
    }
    }
    @Override
    public boolean isOpen() {
    synchronized(this.monitor) {
    return this.shutdownCause == null;
    }
    }

    채널이 열려있는지 확인하고(일반적인 방식으로 모니터 Object로 동기화)

    (참고:https://www.rabbitmq.com/releases/rabbitmq-java-client/v2.3.1/rabbitmq-java-client-javadoc-2.3.1/com/rabbitmq/client/impl/AMQChannel.html)

    quiescing : (전산학) 개입 중단(介入中斷) -- 대충 정지라고 알아먹자; 정지 전송??;
    public void quiescingTransmit(AMQCommand c) throws IOException {
    synchronized (_channelMutex) {
    if (c.getMethod().hasContent()) {
    while (_blockContent) {
    try {
    _channelMutex.wait();
    } catch (InterruptedException ignored) {}

    // This is to catch a situation when the thread wakes up during
    // shutdown. Currently, no command that has content is allowed
    // to send anything in a closing state.
    ensureIsOpen();
    }
    }
    this._trafficListener.write(c);
    c.transmit(this);
    }
    }

    trafficListener를 설정한 Connection을 설정하면 통신할 때 뭔가 커스텀 할수있어 보인다. 기본 구현체는 아무것도 하지 않게 되어있다.

    AMQPCommand의 transmit 으로 들어가보면

    /**
    * Sends this command down the named channel on the channel's
    * connection, possibly in multiple frames.
    * @param channel the channel on which to transmit the command
    * @throws IOException if an error is encountered
    */
    public void transmit(AMQChannel channel) throws IOException {
    int channelNumber = channel.getChannelNumber();
    AMQConnection connection = channel.getConnection();

    synchronized (assembler) {
    Method m = this.assembler.getMethod();
    if (m.hasContent()) {
    byte[] body = this.assembler.getContentBody();

    Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);

    int frameMax = connection.getFrameMax();
    boolean cappedFrameMax = frameMax > 0;
    int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;

    if (cappedFrameMax && headerFrame.size() > frameMax) {
    String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
    throw new IllegalArgumentException(msg);
    }
    connection.writeFrame(m.toFrame(channelNumber));
    connection.writeFrame(headerFrame);

    for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
    int remaining = body.length - offset;

    int fragmentLength = (remaining < bodyPayloadMax) ? remaining
    : bodyPayloadMax;
    Frame frame = Frame.fromBodyFragment(channelNumber, body,
    offset, fragmentLength);
    connection.writeFrame(frame);
    }
    } else {
    connection.writeFrame(m.toFrame(channelNumber));
    }
    }

    connection.flush();
    }

    통신 프로토콜에 맞추어 헤더와 바디를 체크하고 Frame으로 조립 하는것을 볼 수 있다. 가장 아래쪽 IO 구현부로 들어가보면 적당히 이렇게 되있다.

    /**
    * Public API - writes this Frame to the given DataOutputStream
    */
    public void writeTo(DataOutputStream os) throws IOException {
    os.writeByte(type);
    os.writeShort(channel);
    if (accumulator != null) {
    os.writeInt(accumulator.size());
    accumulator.writeTo(os);
    } else {
    os.writeInt(payload.length);
    os.write(payload);
    }
    os.write(AMQP.FRAME_END);
    }


    - Recv

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

    큐 선언 까지 Send와 동일, channel.basicConsume()으로 컨슘

    /**
    * Start a non-nolocal, non-exclusive consumer, with
    * a server-generated consumerTag.
    * @param queue the name of the queue
    * @param autoAck true if the server should consider messages
    * acknowledged once delivered; false if the server should expect
    * explicit acknowledgements
    * @param deliverCallback callback when a message is delivered
    * @param cancelCallback callback when the consumer is cancelled
    * @return the consumerTag generated by the server
    * @throws IOException if an error is encountered
    * @since 5.0
    */
    String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

    세세한 과정은 basicPublish와 동일, DeliveryCallback과 CancelCallback은 consumerFromDeliverCancelCallbacks 에서 Consumer 하나로 합쳐지고, 실제 동작 구현 코드는 아래와 같다.

    /** Public API - {@inheritDoc} */
    @Override
    public String basicConsume(String queue, final boolean autoAck, String consumerTag,
    boolean noLocal, boolean exclusive, Map<String, Object> arguments,
    final Consumer callback)
    throws IOException
    {
    final Method m = new Basic.Consume.Builder()
    .queue(queue)
    .consumerTag(consumerTag)
    .noLocal(noLocal)
    .noAck(autoAck)
    .exclusive(exclusive)
    .arguments(arguments)
    .build();
    BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>(m) {
    @Override
    public String transformReply(AMQCommand replyCommand) {
    String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();
    _consumers.put(actualConsumerTag, callback);

    // need to register consumer in stats before it actually starts consuming
    metricsCollector.basicConsume(ChannelN.this, actualConsumerTag, autoAck);

    dispatcher.handleConsumeOk(callback, actualConsumerTag);
    return actualConsumerTag;
    }
    };


    rpc(m, k);

    try {
    if(_rpcTimeout == NO_RPC_TIMEOUT) {
    return k.getReply();
    } else {
    try {
    return k.getReply(_rpcTimeout);
    } catch (TimeoutException e) {
    throw wrapTimeoutException(m, e);
    }
    }
    } catch(ShutdownSignalException ex) {
    throw wrap(ex);
    }
    }

    BlockingRpcContinuation 뭐하는 녀석??;

    public void rpc(Method m, RpcContinuation k)
    throws IOException
    {
    synchronized (_channelMutex) {
    ensureIsOpen();
    quiescingRpc(m, k);
    }
    }
    public void quiescingRpc(Method m, RpcContinuation k)
    throws IOException
    {
    synchronized (_channelMutex) {
    enqueueRpc(k);
    quiescingTransmit(m);
    }
    }

    quiescingTransmit은 send와 같은 방식이고, enqueueRpc는 reply와 callback을 담아둔 RpcContinuation을 넘긴다.

    public void enqueueRpc(RpcContinuation k)
    {
    doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
    }
    public class RpcContinuationRpcWrapper implements RpcWrapper {

    private final AMQChannel.RpcContinuation continuation;

    public RpcContinuationRpcWrapper(AMQChannel.RpcContinuation continuation) {
    this.continuation = continuation;
    }

    @Override
    public boolean canHandleReply(AMQCommand command) {
    return continuation.canHandleReply(command);
    }

    @Override
    public void complete(AMQCommand command) {
    continuation.handleCommand(command);
    }

    @Override
    public void shutdown(ShutdownSignalException signal) {
    continuation.handleShutdownSignal(signal);
    }
    }
    private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
    synchronized (_channelMutex) {
    boolean waitClearedInterruptStatus = false;
    while (_activeRpc != null) {
    try {
    _channelMutex.wait();
    } catch (InterruptedException e) {
    waitClearedInterruptStatus = true;
    }
    }
    if (waitClearedInterruptStatus) {
    Thread.currentThread().interrupt();
    }
    _activeRpc = rpcWrapperSupplier.get();
    }
    }

    _channelMutex.wait() 하다가 인터럽트를 받으면 현재 쓰레드를 깨우고 _activeRpc에 new RpcContinuationRpcWrapper를 생성한다.

    nio 방식도 사용 가능한것 같은데 기본값은 false로 되어있다. ConnectionFactoryConfigurator 을 참고하면 가능한 설정을 확인할 수 있다.

    댓글

Designed by Tistory.