-
[RabbitMQ] 1. Hello World, basicPublish/basicConsumeRabbitMQ 2018. 12. 30. 18:02
- 소개
RabbitMQ는 메시지 브로커이다. 메시지를 받아서 발송한다. 우체국으로 생각해도 된다.
보내고싶은 메일을 우체통에 넣으면 우체부가 원하는 수신자에 배달을 보장한다.
하지만 종이를 취급 하지는 않는다. 대신에 binary-data를 받아서 저장하고 발송한다.
- 용어(Jargon)
Producing(Sending) : 메시지를 발송하는 프로그램을 Producer 라고 한다.
Queue(PostBox) : MQ 내부에 있는 우체국
Consuming(Receiving) : 메시지를 받으려고 기다리는 프로그램을 Consumer 라고 한다.
큐는 장비의 메모리 + 디스크 공간만 사용하며, 큰 메시지 버퍼라고 생각하면 된다.
다수의 Producer가 하나의 Q로 메시지를 보낼 수 있고, 다수의 Consumer가 하나의 Q에서 데이터를 받을 수 있다.
https://github.com/rabbitmq/rabbitmq-tutorials
https://www.rabbitmq.com/tutorials/tutorial-one-java.html
rabbitMq 설치하고 대충 따라 해보는 중..
- Send
1. ConnectionFactory.newConnection()
2. Connection.createChannel()
3. channel.queueDeclare()
4. channel.basicPublish()
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));- basicPublish 에 대해 좀더 자세히 알아보자
/**
* Publish a message.
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which closes the channel.
*
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;exchange : 뭐하는 녀석인지 잘 모르겠음(일단 "")
routingKey : QUEUE_NAME
BasicProperties : ??(일단 null)
body : message.getBytes()
- AMQP(Advanced Message Queuing Protocol)
인터페이스 하나씩 읽어보면 좋을듯 하다. 주석은 하나도 안달려있다 --;
Basic에는 통신에 필요한 인터페이스가 있는듯 하다.
- AMQP.Basic.Publish
Channel의 구현체는 AutorecoveringChannel, basicPublish를 실행하는 구현체는 RecoveryAwareChannelN(basicPublish, basicConsume은 ChannelN에 구현되어 있다) 이었다. 끝까지 가보면 다음과 같이 커맨드를 날리고 있는것을 확인.
/** Set of currently unconfirmed messages (i.e. messages that have
* not been ack'd or nack'd by the server yet. */
private final SortedSet<Long> unconfirmedSet =
Collections.synchronizedSortedSet(new TreeSet<Long>());/** Public API - {@inheritDoc} */
@Override
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
if (props == null) {
props = MessageProperties.MINIMAL_BASIC;
}
AMQCommand command = new AMQCommand(
new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(), props, body);
try {
transmit(command);
} catch (IOException e) {
metricsCollector.basicPublishFailure(this, e);
throw e;
}
metricsCollector.basicPublish(this);
}TreeSet에 seqNum추가, 파라미터로 받은 값을 세팅해서 AMQCommand를 만들어 transmit하고 마지막으로 통신 상태를 집계한다
- transmit은 어떻게 하는가?
/**
* Protected; used instead of synchronizing on the channel itself,
* so that clients can themselves use the channel to synchronize
* on.
*/
protected final Object _channelMutex = new Object();public void transmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingTransmit(c);
}
}public void ensureIsOpen()
throws AlreadyClosedException
{
if (!isOpen()) {
throw new AlreadyClosedException(getCloseReason());
}
}@Override
public boolean isOpen() {
synchronized(this.monitor) {
return this.shutdownCause == null;
}
}채널이 열려있는지 확인하고(일반적인 방식으로 모니터 Object로 동기화)
- quiescing : (전산학) 개입 중단(介入中斷) -- 대충 정지라고 알아먹자; 정지 전송??;
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {}
// This is to catch a situation when the thread wakes up during
// shutdown. Currently, no command that has content is allowed
// to send anything in a closing state.
ensureIsOpen();
}
}
this._trafficListener.write(c);
c.transmit(this);
}
}trafficListener를 설정한 Connection을 설정하면 통신할 때 뭔가 커스텀 할수있어 보인다. 기본 구현체는 아무것도 하지 않게 되어있다.
AMQPCommand의 transmit 으로 들어가보면
/**
* Sends this command down the named channel on the channel's
* connection, possibly in multiple frames.
* @param channel the channel on which to transmit the command
* @throws IOException if an error is encountered
*/
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}통신 프로토콜에 맞추어 헤더와 바디를 체크하고 Frame으로 조립 하는것을 볼 수 있다. 가장 아래쪽 IO 구현부로 들어가보면 적당히 이렇게 되있다.
/**
* Public API - writes this Frame to the given DataOutputStream
*/
public void writeTo(DataOutputStream os) throws IOException {
os.writeByte(type);
os.writeShort(channel);
if (accumulator != null) {
os.writeInt(accumulator.size());
accumulator.writeTo(os);
} else {
os.writeInt(payload.length);
os.write(payload);
}
os.write(AMQP.FRAME_END);
}- Recv
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });큐 선언 까지 Send와 동일, channel.basicConsume()으로 컨슘
/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param deliverCallback callback when a message is delivered
* @param cancelCallback callback when the consumer is cancelled
* @return the consumerTag generated by the server
* @throws IOException if an error is encountered
* @since 5.0
*/
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;세세한 과정은 basicPublish와 동일, DeliveryCallback과 CancelCallback은 consumerFromDeliverCancelCallbacks 에서 Consumer 하나로 합쳐지고, 실제 동작 구현 코드는 아래와 같다.
/** Public API - {@inheritDoc} */
@Override
public String basicConsume(String queue, final boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
final Consumer callback)
throws IOException
{
final Method m = new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build();
BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>(m) {
@Override
public String transformReply(AMQCommand replyCommand) {
String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();
_consumers.put(actualConsumerTag, callback);
// need to register consumer in stats before it actually starts consuming
metricsCollector.basicConsume(ChannelN.this, actualConsumerTag, autoAck);
dispatcher.handleConsumeOk(callback, actualConsumerTag);
return actualConsumerTag;
}
};
rpc(m, k);
try {
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
}BlockingRpcContinuation 뭐하는 녀석??;
public void rpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
}public void quiescingRpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
enqueueRpc(k);
quiescingTransmit(m);
}
}quiescingTransmit은 send와 같은 방식이고, enqueueRpc는 reply와 callback을 담아둔 RpcContinuation을 넘긴다.
public void enqueueRpc(RpcContinuation k)
{
doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
}public class RpcContinuationRpcWrapper implements RpcWrapper {
private final AMQChannel.RpcContinuation continuation;
public RpcContinuationRpcWrapper(AMQChannel.RpcContinuation continuation) {
this.continuation = continuation;
}
@Override
public boolean canHandleReply(AMQCommand command) {
return continuation.canHandleReply(command);
}
@Override
public void complete(AMQCommand command) {
continuation.handleCommand(command);
}
@Override
public void shutdown(ShutdownSignalException signal) {
continuation.handleShutdownSignal(signal);
}
}private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
synchronized (_channelMutex) {
boolean waitClearedInterruptStatus = false;
while (_activeRpc != null) {
try {
_channelMutex.wait();
} catch (InterruptedException e) {
waitClearedInterruptStatus = true;
}
}
if (waitClearedInterruptStatus) {
Thread.currentThread().interrupt();
}
_activeRpc = rpcWrapperSupplier.get();
}
}_channelMutex.wait() 하다가 인터럽트를 받으면 현재 쓰레드를 깨우고 _activeRpc에 new RpcContinuationRpcWrapper를 생성한다.
nio 방식도 사용 가능한것 같은데 기본값은 false로 되어있다. ConnectionFactoryConfigurator 을 참고하면 가능한 설정을 확인할 수 있다.
'RabbitMQ' 카테고리의 다른 글
[RabbitMQ] 5. Topics, Topic(Pattern) Exchange (0) 2019.01.07 [RabbitMQ] 4. Routing, Direct Exchange (0) 2019.01.06 [RabbitMQ] 3. Publish/Subscribe, Fanout(Broadcasting) Exchange (0) 2019.01.05 [RabbitMQ] 2. Work Queues, Qos (0) 2019.01.01