Skip to content

NIFI-15483: Route PublishAMQP to failure on undeliverable messages instead of silent success#11213

Open
rakesh-rsky wants to merge 2 commits into
apache:mainfrom
rakesh-rsky:fix/NIFI-15483-publishamqp-route-on-failure
Open

NIFI-15483: Route PublishAMQP to failure on undeliverable messages instead of silent success#11213
rakesh-rsky wants to merge 2 commits into
apache:mainfrom
rakesh-rsky:fix/NIFI-15483-publishamqp-route-on-failure

Conversation

@rakesh-rsky
Copy link
Copy Markdown
Contributor

@rakesh-rsky rakesh-rsky commented May 6, 2026

Summary

PublishAMQP silently routes FlowFiles to REL_SUCCESS even when the AMQP broker cannot deliver the message, causing silent data loss.

Two failure modes are addressed:

  1. Undeliverable message (basic.return) — broker returns the message when no queue is bound to the exchange/routing-key. The fix uses AMQP Publisher Confirms + basic.return to detect and surface delivery failures, routing the FlowFile to REL_FAILURE.

  2. Exchange not found (ShutdownSignalException) — when the exchange does not exist, the broker closes the channel with 404 NOT_FOUND, causing waitForConfirms() to throw ShutdownSignalException. This is now caught and converted to AMQPException so the FlowFile routes to REL_FAILURE instead of causing an unhandled processor failure.

Testing

  • Publish to an exchange with no bound queues → FlowFile routes to failure with the broker's return reason
  • Publish to a non-existent exchange → FlowFile routes to failure instead of unhandled processor error
  • Normal publish (queue bound) → still routes to success
  • Added regression tests for all failure scenarios — verified to fail against unfixed code

Fixes: https://issues.apache.org/jira/browse/NIFI-15483

@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15483-publishamqp-route-on-failure branch from 5da2156 to ace38df Compare May 6, 2026 11:09
@turcsanyip
Copy link
Copy Markdown
Contributor

@rakesh-rsky Thanks for working on this issue. Please note the unit tests are failing.

I tried the new error handling in my local environment but I'm getting the following runtime error (instead of routing the FlowFile to failure):

2026-05-06 14:34:55,697 ERROR [Timer-Driven Process Thread-4] o.a.nifi.amqp.processors.PublishAMQP PublishAMQP[id=4fd4ee4e-8e3a-3969-7db4-953498193e4d] Processor failure
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dummy' in vhost '/', class-id=60, method-id=40)
	at com.rabbitmq.client.impl.ChannelN.waitForConfirms(ChannelN.java:219)
	at org.apache.nifi.amqp.processors.AMQPPublisher.publish(AMQPPublisher.java:108)
	at org.apache.nifi.amqp.processors.PublishAMQP.processResource(PublishAMQP.java:185)
	at org.apache.nifi.amqp.processors.PublishAMQP.processResource(PublishAMQP.java:52)
	at org.apache.nifi.amqp.processors.AbstractAMQPProcessor.onTrigger(AbstractAMQPProcessor.java:236)
	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1292)
	at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:229)
	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
	at org.apache.nifi.engine.FlowEngine.lambda$wrap$1(FlowEngine.java:105)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'dummy' in vhost '/', class-id=60, method-id=40)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:529)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:350)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:193)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:125)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:768)
	at com.rabbitmq.client.impl.AMQConnection.access$400(AMQConnection.java:49)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:695)
	... 1 common frames omitted

@turcsanyip turcsanyip self-requested a review May 6, 2026 12:43
@rakesh-rsky
Copy link
Copy Markdown
Contributor Author

@turcsanyip Thank you for validating this.

Root cause: When the exchange does not exist, the broker closes the channel with 404 NOT_FOUND, causing waitForConfirms() to throw ShutdownSignalException — which was not caught, resulting in an unhandled processor failure instead of routing to REL_FAILURE.

This has been fixed in the latest commits — ShutdownSignalException is now caught and converted to AMQPException, along with regression tests covering this and related broker failure scenarios.

@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15483-publishamqp-route-on-failure branch 5 times, most recently from 89f5fe6 to 99cf2e5 Compare May 20, 2026 20:34
@turcsanyip
Copy link
Copy Markdown
Contributor

turcsanyip commented May 20, 2026

@rakesh-rsky I checked the latest changes and the error handling is correct now functionally.

However, the synchronous wait led to significant performance degradation compared to the original version. According to my measurement:

  • local NiFi + local RabbitMQ in Docker: 10x slower
  • local NiFi + RabbitMQ on a cloud VM: 300-1000x slower (depending on the cloud location)

So even if the error handling is improved, I don't think we can add the change as-is due to the performance effect. I suggest adding a feature flag property (e.g. Use Message Confirmation = true / false or Delivery Guarantee = At least once / At most once) with proper documentation on the pros and cons. This way, the user can optionally enable this feature if they want the extra failure handling, at the cost of performance.

@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15483-publishamqp-route-on-failure branch from 99cf2e5 to 4c18f20 Compare May 21, 2026 15:47
@rakesh-rsky
Copy link
Copy Markdown
Contributor Author

@turcsanyip Thank you for the performance feedback and the suggestion.

I've added a Delivery Guarantee property to PublishAMQP with two options:

  • At most once (default) - original fire-and-forget behaviour, no performance impact. Undeliverable messages are logged as a warning and the FlowFile still routes to success.
  • At least once - enables Publisher Confirms. The processor waits for a broker ack/nack before routing. Undeliverable messages and NACKs route to failure, preventing silent data loss.

This keeps the default behaviour unchanged while giving users the option to trade throughput for delivery reliability.

…r cannot deliver message

PublishAMQP uses mandatory=true on basicPublish() so the broker returns
messages it cannot route to any queue. However, the return arrives
asynchronously via ReturnListener.handleReturn() on the AMQP I/O thread
while the publishing thread had already moved on to session.transfer(REL_SUCCESS).
The UndeliverableMessageLogger only logged a warning — it never signaled
failure back to publish() or onTrigger(), so every unroutable message was
silently counted as a success despite never reaching any consumer.

Fix:
- Enabled Publisher Confirms (channel.confirmSelect()) in the constructor.
  The broker's basic.return frame for an unroutable message is guaranteed
  to arrive before the corresponding confirm frame, so waitForConfirms()
  acts as a synchronization barrier that makes return detection reliable.
- Added an AtomicReference<String> field (undeliverableReturnReason) that
  UndeliverableMessageLogger.handleReturn() populates with exchange/routingKey/
  replyCode/replyText when a message is returned.
- publish() now: resets the field before each call, calls waitForConfirms(5s)
  to synchronize with the broker, then checks the field and throws AMQPException
  if the message was returned — causing onTrigger() to route to REL_FAILURE.
- Broker NACKs (e.g., resource alarm) are also now surfaced as AMQPException
  because waitForConfirms() returns false on NACK.

- Added regression tests to verify that AMQPPublisher and PublishAMQP correctly
  route FlowFiles to REL_FAILURE for all broker-side failure modes:

- Added ShutdownSignalException to the catch block in AMQPPublisher.publish()
- Converts the channel-close signal into AMQPException so PublishAMQP routes
   the FlowFile to REL_FAILURE with a descriptive error message
- Added ShutdownSignalException import

Co-authored-by: Rakesh Kumar Singh <rsky.rakesh@gmail.com>
…rms opt-in

Added a new Delivery Guarantee property to PublishAMQP with two options:

At most once (default): works like the original - sends the message without
waiting for a broker reply. If the message cannot be delivered, only a warning
is logged and the FlowFile routes to success. Best for high throughput.

At least once: turns on RabbitMQ Publisher Confirms. The processor waits for
the broker to confirm the message before routing. If the message is returned
or the broker sends a NACK, the FlowFile routes to failure instead of success.
This prevents silent data loss but can be much slower, especially with remote
brokers.
@rakesh-rsky rakesh-rsky force-pushed the fix/NIFI-15483-publishamqp-route-on-failure branch from 4c18f20 to 96361df Compare May 22, 2026 05:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants