消息发送
amq消息发送中同步和异步
- 同步发送: producer发送消息后,会一直阻塞知道broker反馈一个确认消息,表示broker已经处理了消息
- 异步发送: producer不需要等待broker的反馈,性能会高一些,但是可能出现消息丢失的情况
amq中默认的消息发送策略:
- 非持久化的消息都是异步发送的
- 持久化消息在非事务模式下是同步发送的
- 在开启事务的情况下,消息都是异步发送
消息发送过程
producerWindowSize
producerWindow主要是针对异步发送消息时,控制允许能够积压的消息累计大小,这些消息没有得到broker的ack,在得到ack之后会减去相应的消息size,释放producerWindow
发送消息源码
// ActiveMQMessageProducer.send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { checkClosed(); if (destination == null) { if (info.getDestination() == null) { throw new UnsupportedOperationException("A destination must be specified."); } throw new InvalidDestinationException("Don't understand null destinations"); }
ActiveMQDestination dest; if (destination.equals(info.getDestination())) { dest = (ActiveMQDestination)destination; } else if (info.getDestination() == null) { dest = ActiveMQDestination.transform(destination); } else { throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); } if (dest == null) { throw new JMSException("No destination specified"); }
if (transformer != null) { Message transformedMessage = transformer.producerTransform(session, this, message); if (transformedMessage != null) { message = transformedMessage; } }
if (producerWindow != null) { try { producerWindow.waitForSpace(); } catch (InterruptedException e) { throw new JMSException("Send aborted due to thread interrupt."); } }
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
stats.onMessage(); }
|
// ActiveMQSession.send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
checkClosed(); if (destination.isTemporary() && connection.isDeleted(destination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } synchronized (sendMutex) { doStartTransaction(); TransactionId txid = transactionContext.getTransactionId(); long sequenceNumber = producer.getMessageSequence();
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { this.connection.asyncSendPacket(msg); if (producerWindow != null) { int size = msg.getSize(); producerWindow.increaseUsage(size); } } else { if (sendTimeout > 0 && onComplete==null) { this.connection.syncSendPacket(msg,sendTimeout); }else { this.connection.syncSendPacket(msg, onComplete); } }
} }
|
消息消费和ack应答
consumer 阻塞消费
// ActiveMQMessageConsumer.receive
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Override public Message receive() throws JMSException {
checkClosed(); checkMessageListener(); sendPullCommand(0); MessageDispatch md = dequeue(-1); if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); return createActiveMQMessage(md); }
|
在分析具体方法之前,说明几个属性:
- protected final MessageDispatchChannel unconsumedMessages:未消费的消息通道,里面用来储存未消费的消息,该通道容纳的最大消息数为预取值
- protected final LinkedListdeliveredMessages = new LinkedList():分发给该消费者但未应答的消息链表,列表中的消息顺序和被消费的顺序是相反的。
- private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages: 事务完整性保证,用于做本地消息重发
sendPullCommand
sendPullCommand检查unconsumedMessages(未曾消费消息)是否为空和PrefetchSize是不是等于0,满足才会异步发送拉取消息指令到broker,broker会推送消息到客户端的unconsumedMessages里面
1 2 3 4 5 6 7 8 9 10
| protected void sendPullCommand(long timeout) throws JMSException { clearDeliveredList(); if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); messagePull.setTimeout(timeout); session.asyncSendPacket(messagePull); } }
|
dequeue
这个方法是从本地消息队列unconsumedMessages
出队列一条消息进行消费。
broker将消息推送到本地 unconsumedMessages
的流程大概如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public MessageDispatch dequeue(long timeout) throws InterruptedException { synchronized (mutex) { while (timeout != 0 && !closed && (list.isEmpty() || !running)) { if (timeout == -1) { mutex.wait(); } else { mutex.wait(timeout); break; } } if (closed || !running || list.isEmpty()) { return null; } return list.removeFirst(); } }
|
beforeMessageIsConsumed
这里面主要是做消息消费之前的一些准备工作,如果ack不是topic的DUPS_OK_ACKNOWLEDGE
,则所有的消息先放到deliveredMessages链表的开头
如果当前是 事务类型的会话,还需要特殊操作,判断transactedIndividualAck,如果为true,表示单条消息直接返回ack。 否则,调用ackLater,消息放入pendingack,等待处理。批量应答, client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),等到这 些消息的条数达到一定阀值时,session.commit指令把它们全部确认;这比对每条消息都逐个确认,在性能上 要提高很多,pendingACK就是事务的批量ack
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { md.setDeliverySequenceId(session.getNextDeliveryId()); lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); if (!isAutoAcknowledgeBatch()) { synchronized(deliveredMessages) { deliveredMessages.addFirst(md);s } if (session.getTransacted()) { if (transactedIndividualAck) { immediateIndividualTransactedAck(md); } else { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } } }
|
afterMessageIsConsumed
afterMessageIsConsumed
主要是对消息进行ack操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { if (unconsumedMessages.isClosed()) { return; } if (messageExpired) { acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); stats.getExpiredMessageCount().increment(); } else { stats.onMessage(); if (session.getTransacted()) { } else if (isAutoAcknowledgeEach()) { if (deliveryingAcknowledgements.compareAndSet(false, true)) { synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++;
if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } if (pendingAck != null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } } } else { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack!=null) { deliveredMessages.clear(); session.sendAck(ack); } } } } deliveryingAcknowledgements.set(false); } } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { boolean messageUnackedByConsumer = false; synchronized (deliveredMessages) { messageUnackedByConsumer = deliveredMessages.contains(md); } if (messageUnackedByConsumer) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } else { throw new IllegalStateException("Invalid session state."); } } }
|
optimizeAcknowledge
是开启批量回传的标记,可以设置批量回传阀值来优化回传效率。
综上我们可以看到
队列延迟确认(DUPS_OK_ACKNOWLEDGE)或者自动确认(AUTO_ACKNOWLEDGE):如开启optimizeAcknowledge优化回传,相当于批量回传,达到设置的阀值之后自动批量回传ack 。没有开启的话就都是马上自动发送标准的ack,回传单条数据
topic的延时确认(DUPS_OK_ACKNOWLEDGE):统一都是批量确认,达到设置的阀值之后自动批量回传ack
手动确认方式,类似session.commit
消息重发
activeMQ中的消息重发,指的是消息可以被broker重新分派给消费者,不一定的之前的消费者。重发消息之后,消费者可以重新消费。消息重发的情况有以下几种:
- 事务会话中,当还未进行session.commit()时,进行session.rollback(),那么所有还没commit的消息都会进行重发
- 使用客户端手动确认的方式时,还未进行确认并且执行Session.recover(),那么所有还没acknowledge的消息都会进行重发
- 所有未ack的消息,当进行session.closed()关闭事务,那么所有还没ack的消息broker端都会进行重发,而且是马上重发
- 消息被消费者拉取之后,超时没有响应ack,消息会被broker重发