-
Notifications
You must be signed in to change notification settings - Fork 1.5k
AMQ-9855: VMTransport, Defensive copy of messages to prevent mutation #1659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
AMQ-9855: VMTransport, Defensive copy of messages to prevent mutation #1659
Conversation
|
Have you observed a bug or problem with message mutation? If so, please provide the scenario and reproducible test case. ActiveMQConnectionFactory already has copyMessageOnSend enabled by default. I need help understanding why (in effect) 2 copies are needed. |
|
@pradeep85841 can you please provide some details here ? Do you have an issue/test case ? It's not obvious to me if it's cosmetic or actual issue. |
|
This is based on issue with vm:// and topics (AMQ-9855). With VMTransport the same ActiveMQMessage instance is dispatched to multiple consumers. If one consumer reads or mutates the body (Camel split/processor does this), other consumers can see an empty body. This does not happen over tcp:// because marshal/unmarshal creates a copy. copyMessageOnSend only applies at the producer and broker boundary. The problem here happens inside the broker during dispatch, so that setting does not help. The change makes VM transport behave consistently with TCP transport and avoids shared mutable state Happy to add a test if needed. |
|
OK, let me take a new look. I'm adding the Jira id in the title of the PR to avoid confusion. Thanks. |
|
This probably needs a config flag that is a corollary to the copyMessageOnSend ie. copyMessageOnDispatch |
|
Sounds good, thanks. |
|
Applied a defensive copy in VMTransport.java to ensure each consumer on vm:// topics receives an independent message. Added VMTransportDefensiveCopyTest.java to reproduce the issue and verify the fix. Confirms that message bodies are not shared/mutated between consumers. Addresses the intermittent null/empty body problem reported in AMQ-9855. |
|
Should this be enabled on the client connection factory? JMS messages are supposed to be immutable. If someone is modifying it post recv, seems like that is on the app-side to ensure the spec is not violated when using vm transport. |
|
Thanks for the suggestion. The issue in AMQ-9855 happens inside the vm:// transport dispatch path. The same ActiveMQMessage instance is delivered to multiple consumers in-memory. With tcp://, marshal/unmarshal creates separate instances, so this does not occur. So this is not about client code mutating a received JMS message, it’s about vm:// sharing the same object reference across consumers, which makes its behaviour inconsistent with other transports. |
|
The consumer-side may perform the copy, no? |
|
The consumer side could defensively copy the message, but that would not solve the root cause. By the time the consumer performs the copy, the same message instance may already have been observed or mutated by another consumer. In that case, the copy would simply duplicate an already-modified state. |
|
This is an interesting PR because it's a good way to have the VM transport behavior close to another "remote" transport connector. However, there's a purpose about the VM: direct communication without marshaling. |
|
@mattrpav I would appreciate that you don't remove me from the reviewer when I requested it. Thanks. |
|
@pradeep85841 That's a very good catch. And I think it makes a lot of sense to have this to make sure vm transport behaves the same as any remote protocol. |
|
Thanks for the feedback. I’ve removed the Dockerfile and entrypoint.sh changes from this PR since they belong to the separate non-root Docker work. This PR is now strictly focused on the VM transport change for AMQ-9855. |
jeanouii
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking good to me. Great job!
See comments bellow
| import java.sql.PreparedStatement; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is discouraged in the project, even though it's not a blocker
|
|
||
| import org.apache.activemq.command.ShutdownInfo; | ||
| import jakarta.jms.JMSException; | ||
| import org.apache.activemq.command.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here FYI
| private volatile int receiveCounter; | ||
|
|
||
| private final List<TransportListener> listeners = new CopyOnWriteArrayList<>(); | ||
| private final ExecutorService executor = Executors.newCachedThreadPool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm probably blind or my search/replace does not work properly. Where are the 2 fields used so far?
| toSend = wf.unmarshal(data); // deep copy | ||
| } catch (IOException e) { | ||
| LOG.warn("Failed to deep copy MessageDispatch, sending original", e); | ||
| toSend = command; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it's desire or not to be honest. The goal to me of this PR (and it's great) is to have VM transport behave the same as other remote transport. Benefit being that others in the same JVM can't mutate the message. Great!
Now if we can't serialize/de-serialize to create a deep copy and we still send the original, we might introduce a case where VM does work when remote does not. So I'm tempted to just fail here. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
command.copy() should suffice here
edit: Agree, we definitely don't want to instantiate an OpenWireFormat object per-message here.
| ByteSequence data = wf.marshal(original); | ||
| toSend = (ActiveMQMessage) wf.unmarshal(data); | ||
| } catch (IOException e) { | ||
| LOG.warn("Failed to marshal/unmarshal ActiveMQMessage, sending original", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
| } | ||
|
|
||
| // Dispatch to listener | ||
| dispatch(peer, peer.messageQueue, toSend); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a return after this one to avoid the second dispatch bellow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR should be rebased against apache/main to avoid this class to be here
cshannon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past I have also noticed some odd behavior with the VM transport when using clients (producers/consumers) that could be a thread safety bug (it is used for bridges though and seems fine there so that is interesting).
However, this proposed fix in this PR doesn't make sense to me at all because the entire point is to NOT need to marshal the message. If you want to marshal the object then you should just use the TCP transport as you lose the benefits.
The VM transport is supposed to already do a deep copy during dispatch to prevent issues with multi-threading. It's possible there is an issue with that, so that's probably where to look for a fix vs adding in marshaling.
Here is where the copying is doing on dispatch in the connection:
activemq/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
Lines 1906 to 1929 in a8fa4b0
| public Response processMessageDispatch(MessageDispatch md) throws Exception { | |
| waitForTransportInterruptionProcessingToComplete(); | |
| ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); | |
| if (dispatcher != null) { | |
| // Copy in case a embedded broker is dispatching via | |
| // vm:// | |
| // md.getMessage() == null to signal end of queue | |
| // browse. | |
| Message msg = md.getMessage(); | |
| if (msg != null) { | |
| msg = msg.copy(); | |
| msg.setReadOnlyBody(true); | |
| msg.setReadOnlyProperties(true); | |
| msg.setRedeliveryCounter(md.getRedeliveryCounter()); | |
| msg.setConnection(ActiveMQConnection.this); | |
| msg.setMemoryUsage(null); | |
| md.setMessage(msg); | |
| } | |
| dispatcher.dispatch(md); | |
| } else { | |
| LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers); | |
| } | |
| return null; | |
| } |
I would start there as to investigating if that is the issue
|
I think @cshannon has a very good point. The VM purpose is to "bypass" the marshaling (direct JVM communication). It was my first question about the use case. If the issue is about multi-threading/consumer, it's worth to investigate there (and not change the intentional behavior). |
- HEALTHCHECK uses Jolokia search on Broker MBean - Ensures broker is running and accessible - Auth and Origin headers included for Jolokia security - Docker CI-ready and deterministic
Thanks for the background @cshannon |
- Messages are now copied via OpenWireFormat marshal/unmarshal - Prevents consumers from mutating original message - Includes fallback if marshal/unmarshal fails - Verified with unit tests: original message remains intact
- Updated VMTransport to create defensive copy of ActiveMQMessage during dispatch. This prevents shared mutable state across consumers in VM transport. - Added VMTransportDefensiveCopyTest to reproduce AMQ-9855 scenario and verify fix. - Test ensures that message body remains intact across multiple consumers. - Patch aligns VM transport behavior with TCP transport and avoids Camel route failures.
3891225 to
3b71d42
Compare
|
I’ve applied all requested changes: cleaned up imports and unused fields in VMTransport.java, ensured proper deep-copy handling in oneway() so VM transport behaves like other remote transports, added a return to avoid duplicate dispatch, and updated the unit tests to focus on the AMQ-9855 scenario. All changes are tested and ready for review. |
Updated VMTransport.java doDispatch() to create a defensive copy of ActiveMQMessage to prevent shared message mutation.
Verified locally with a helper: original message body remains unchanged and copy is correctly dispatched.
No new files added; only VMTransport.java modified.