Skip to content

Conversation

@pradeep85841
Copy link
Contributor

Updated VMTransport.java doDispatch() to create a defensive copy of ActiveMQMessage to prevent shared message mutation.

Verified locally with a helper: original message body remains unchanged and copy is correctly dispatched.

No new files added; only VMTransport.java modified.

@jbonofre jbonofre self-requested a review February 8, 2026 15:55
@mattrpav
Copy link
Contributor

mattrpav commented Feb 8, 2026

Have you observed a bug or problem with message mutation? If so, please provide the scenario and reproducible test case.

ActiveMQConnectionFactory already has copyMessageOnSend enabled by default.

I need help understanding why (in effect) 2 copies are needed.

@jbonofre
Copy link
Member

jbonofre commented Feb 8, 2026

@pradeep85841 can you please provide some details here ? Do you have an issue/test case ? It's not obvious to me if it's cosmetic or actual issue.

@pradeep85841
Copy link
Contributor Author

This is based on issue with vm:// and topics (AMQ-9855).

With VMTransport the same ActiveMQMessage instance is dispatched to multiple consumers. If one consumer reads or mutates the body (Camel split/processor does this), other consumers can see an empty body. This does not happen over tcp:// because marshal/unmarshal creates a copy.

copyMessageOnSend only applies at the producer and broker boundary. The problem here happens inside the broker during dispatch, so that setting does not help.

The change makes VM transport behave consistently with TCP transport and avoids shared mutable state

Happy to add a test if needed.

@jbonofre
Copy link
Member

jbonofre commented Feb 8, 2026

OK, let me take a new look. I'm adding the Jira id in the title of the PR to avoid confusion. Thanks.

@jbonofre jbonofre changed the title VMTransport: Defensive copy of messages to prevent mutation AMQ-9855: VMTransport, Defensive copy of messages to prevent mutation Feb 8, 2026
@mattrpav
Copy link
Contributor

mattrpav commented Feb 8, 2026

This probably needs a config flag that is a corollary to the copyMessageOnSend

ie. copyMessageOnDispatch

@pradeep85841
Copy link
Contributor Author

Sounds good, thanks.
Happy to make this configurable and I’ll wait for next steps.

@pradeep85841
Copy link
Contributor Author

Applied a defensive copy in VMTransport.java to ensure each consumer on vm:// topics receives an independent message.

Added VMTransportDefensiveCopyTest.java to reproduce the issue and verify the fix.

Confirms that message bodies are not shared/mutated between consumers.

Addresses the intermittent null/empty body problem reported in AMQ-9855.

@mattrpav
Copy link
Contributor

mattrpav commented Feb 10, 2026

Should this be enabled on the client connection factory? JMS messages are supposed to be immutable. If someone is modifying it post recv, seems like that is on the app-side to ensure the spec is not violated when using vm transport.

@mattrpav mattrpav requested review from mattrpav and removed request for jbonofre February 10, 2026 23:19
@pradeep85841
Copy link
Contributor Author

Thanks for the suggestion.

The issue in AMQ-9855 happens inside the vm:// transport dispatch path. The same ActiveMQMessage instance is delivered to multiple consumers in-memory. With tcp://, marshal/unmarshal creates separate instances, so this does not occur.

So this is not about client code mutating a received JMS message, it’s about vm:// sharing the same object reference across consumers, which makes its behaviour inconsistent with other transports.

@mattrpav
Copy link
Contributor

The consumer-side may perform the copy, no?

@pradeep85841
Copy link
Contributor Author

The consumer side could defensively copy the message, but that would not solve the root cause. By the time the consumer performs the copy, the same message instance may already have been observed or mutated by another consumer. In that case, the copy would simply duplicate an already-modified state.

@jbonofre
Copy link
Member

jbonofre commented Feb 11, 2026

This is an interesting PR because it's a good way to have the VM transport behavior close to another "remote" transport connector. However, there's a purpose about the VM: direct communication without marshaling.
I will take a deeper look, but very interesting, thanks @pradeep85841 !

@jbonofre jbonofre self-requested a review February 11, 2026 10:46
@jbonofre
Copy link
Member

@mattrpav I would appreciate that you don't remove me from the reviewer when I requested it. Thanks.

@jeanouii
Copy link
Contributor

@pradeep85841 That's a very good catch. And I think it makes a lot of sense to have this to make sure vm transport behaves the same as any remote protocol.
In EJB world, we have also @Local and @Remote interfaces. Some app servers, like Apache TomEE have the same kind of optimization. If you call a @Remote interface within the same JVM, the server will mimic the serialization / deserialization before actually calling the target bean in the JVM.
That makes a lot of sense.

@pradeep85841
Copy link
Contributor Author

Thanks for the feedback.

I’ve removed the Dockerfile and entrypoint.sh changes from this PR since they belong to the separate non-root Docker work. This PR is now strictly focused on the VM transport change for AMQ-9855.

Copy link
Contributor

@jeanouii jeanouii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looking good to me. Great job!

See comments bellow

import java.sql.PreparedStatement;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is discouraged in the project, even though it's not a blocker


import org.apache.activemq.command.ShutdownInfo;
import jakarta.jms.JMSException;
import org.apache.activemq.command.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here FYI

private volatile int receiveCounter;

private final List<TransportListener> listeners = new CopyOnWriteArrayList<>();
private final ExecutorService executor = Executors.newCachedThreadPool();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably blind or my search/replace does not work properly. Where are the 2 fields used so far?

toSend = wf.unmarshal(data); // deep copy
} catch (IOException e) {
LOG.warn("Failed to deep copy MessageDispatch, sending original", e);
toSend = command;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it's desire or not to be honest. The goal to me of this PR (and it's great) is to have VM transport behave the same as other remote transport. Benefit being that others in the same JVM can't mutate the message. Great!

Now if we can't serialize/de-serialize to create a deep copy and we still send the original, we might introduce a case where VM does work when remote does not. So I'm tempted to just fail here. What do you think?

Copy link
Contributor

@mattrpav mattrpav Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

command.copy() should suffice here

edit: Agree, we definitely don't want to instantiate an OpenWireFormat object per-message here.

ByteSequence data = wf.marshal(original);
toSend = (ActiveMQMessage) wf.unmarshal(data);
} catch (IOException e) {
LOG.warn("Failed to marshal/unmarshal ActiveMQMessage, sending original", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

}

// Dispatch to listener
dispatch(peer, peer.messageQueue, toSend);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a return after this one to avoid the second dispatch bellow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR should be rebased against apache/main to avoid this class to be here

Copy link
Contributor

@cshannon cshannon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past I have also noticed some odd behavior with the VM transport when using clients (producers/consumers) that could be a thread safety bug (it is used for bridges though and seems fine there so that is interesting).

However, this proposed fix in this PR doesn't make sense to me at all because the entire point is to NOT need to marshal the message. If you want to marshal the object then you should just use the TCP transport as you lose the benefits.

The VM transport is supposed to already do a deep copy during dispatch to prevent issues with multi-threading. It's possible there is an issue with that, so that's probably where to look for a fix vs adding in marshaling.

Here is where the copying is doing on dispatch in the connection:

public Response processMessageDispatch(MessageDispatch md) throws Exception {
waitForTransportInterruptionProcessingToComplete();
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
}
dispatcher.dispatch(md);
} else {
LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
}
return null;
}

I would start there as to investigating if that is the issue

@jbonofre
Copy link
Member

I think @cshannon has a very good point. The VM purpose is to "bypass" the marshaling (direct JVM communication). It was my first question about the use case. If the issue is about multi-threading/consumer, it's worth to investigate there (and not change the intentional behavior).

- HEALTHCHECK uses Jolokia search on Broker MBean
- Ensures broker is running and accessible
- Auth and Origin headers included for Jolokia security
- Docker CI-ready and deterministic
@jeanouii
Copy link
Contributor

In the past I have also noticed some odd behavior with the VM transport when using clients (producers/consumers) that could be a thread safety bug (it is used for bridges though and seems fine there so that is interesting).

However, this proposed fix in this PR doesn't make sense to me at all because the entire point is to NOT need to marshal the message. If you want to marshal the object then you should just use the TCP transport as you lose the benefits.

The VM transport is supposed to already do a deep copy during dispatch to prevent issues with multi-threading. It's possible there is an issue with that, so that's probably where to look for a fix vs adding in marshaling.

Here is where the copying is doing on dispatch in the connection:

public Response processMessageDispatch(MessageDispatch md) throws Exception {
waitForTransportInterruptionProcessingToComplete();
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
// vm://
// md.getMessage() == null to signal end of queue
// browse.
Message msg = md.getMessage();
if (msg != null) {
msg = msg.copy();
msg.setReadOnlyBody(true);
msg.setReadOnlyProperties(true);
msg.setRedeliveryCounter(md.getRedeliveryCounter());
msg.setConnection(ActiveMQConnection.this);
msg.setMemoryUsage(null);
md.setMessage(msg);
}
dispatcher.dispatch(md);
} else {
LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
}
return null;
}

I would start there as to investigating if that is the issue

Thanks for the background @cshannon

Pradeep Kunchala added 4 commits February 11, 2026 23:20
- Messages are now copied via OpenWireFormat marshal/unmarshal
- Prevents consumers from mutating original message
- Includes fallback if marshal/unmarshal fails
- Verified with unit tests: original message remains intact
- Updated VMTransport to create defensive copy of ActiveMQMessage during dispatch.
  This prevents shared mutable state across consumers in VM transport.
- Added VMTransportDefensiveCopyTest to reproduce AMQ-9855 scenario and verify fix.
- Test ensures that message body remains intact across multiple consumers.
- Patch aligns VM transport behavior with TCP transport and avoids Camel route failures.
@pradeep85841 pradeep85841 force-pushed the AMQ-9855-vmtransport-defensive-copy branch from 3891225 to 3b71d42 Compare February 11, 2026 18:34
@pradeep85841
Copy link
Contributor Author

I’ve applied all requested changes: cleaned up imports and unused fields in VMTransport.java, ensured proper deep-copy handling in oneway() so VM transport behaves like other remote transports, added a return to avoid duplicate dispatch, and updated the unit tests to focus on the AMQ-9855 scenario. All changes are tested and ready for review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants