Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public PreviouslyDeliveredMap(TransactionId transactionId) {
class PreviouslyDelivered {
org.apache.activemq.command.Message message;
boolean redelivered;
boolean prefetchedOnly; // true if message was only prefetched, not delivered to application

PreviouslyDelivered(MessageDispatch messageDispatch) {
message = messageDispatch.getMessage();
Expand All @@ -122,6 +123,12 @@ class PreviouslyDelivered {
message = messageDispatch.getMessage();
this.redelivered = redelivered;
}

PreviouslyDelivered(MessageDispatch messageDispatch, boolean redelivered, boolean prefetchedOnly) {
message = messageDispatch.getMessage();
this.redelivered = redelivered;
this.prefetchedOnly = prefetchedOnly;
}
}

private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
Expand Down Expand Up @@ -770,8 +777,12 @@ void clearMessagesInProgress() {
LOG.debug("{} clearing unconsumed list ({}) on transport interrupt", getConsumerId(), unconsumedMessages.size());
// ensure unconsumed are rolledback up front as they may get redelivered to another consumer
List<MessageDispatch> list = unconsumedMessages.removeAll();
final boolean isTransacted = session.isTransacted();
if (!this.info.isBrowser()) {
for (MessageDispatch old : list) {
if (isTransacted) {
capturePrefetchedMessagesForDuplicateSuppression(old);
}
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
Expand Down Expand Up @@ -933,6 +944,16 @@ private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
if (session.isTransacted()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on how this is only impacted by transacted sessions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug (prefetched messages not redelivered after transaction failure) was related to transactions only. So I did not wanted to impact other scenarios. But there is outside of this a rational.

The previouslyDeliveredMessages map (and prefetchedOnly) is only used to track rollback/redelivery behavior, which only exists for transacted sessions. In a non‑transacted session there is no rollback; delivery state is driven by the ack mode (AUTO/CLIENT/DUPS_OK).

PreviouslyDelivered entry = null;
if (previouslyDeliveredMessages != null) {
entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId());
}
if (entry != null && entry.prefetchedOnly) {
entry.prefetchedOnly = false;
entry.redelivered = true;
}
}
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
Expand Down Expand Up @@ -1382,6 +1403,7 @@ private void rollbackPreviouslyDeliveredAndNotRedelivered() {
removeFromDeliveredMessages(entry.message.getMessageId());
}
}
// Clear everything on rollback - prefetched messages will be redelivered by broker
clearPreviouslyDelivered();
}
}
Expand Down Expand Up @@ -1420,7 +1442,8 @@ public void dispatch(MessageDispatch md) {
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {
// deliverySequenceId non zero means previously queued dispatch
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || isPrefetchedRedelivery(md)
|| !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) {
poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
Expand Down Expand Up @@ -1570,6 +1593,33 @@ private void captureDeliveredMessagesForDuplicateSuppressionWithRequireRedeliver
LOG.trace("{} tracking existing transacted {} delivered list({})", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
}

private void capturePrefetchedMessagesForDuplicateSuppression(final MessageDispatch pending) {
if (pending.getMessage() == null) {
return; // nothing to track
}
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<>(session.getTransactionContext().getTransactionId());
}
previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), new PreviouslyDelivered(pending, false, true));
LOG.trace("{} tracking existing transacted {} prefetched ({})", getConsumerId(), previouslyDeliveredMessages.transactionId, pending);
}

private boolean isPrefetchedRedelivery(final MessageDispatch md) {
if (!session.isTransacted()) {
return false;
}
if (md.getMessage() == null) {
return false;
}
synchronized (deliveredMessages) {
if (previouslyDeliveredMessages != null) {
PreviouslyDelivered entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId());
return entry != null && entry.prefetchedOnly;
}
}
return false;
}

public int getMessageSize() {
return unconsumedMessages.size();
}
Expand Down Expand Up @@ -1689,4 +1739,11 @@ public boolean isConsumerExpiryCheckEnabled() {
public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
}

// Protected method for testing
protected int getPreviouslyDeliveredMessagesSize() {
synchronized(deliveredMessages) {
return previouslyDeliveredMessages != null ? previouslyDeliveredMessages.size() : 0;
}
}
}
Loading
Loading