diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index 63509126022b..b5c20bc4fc81 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -414,6 +414,7 @@ public class SolaceIO { private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD = Duration.standardSeconds(30); + private static final Duration DEFAULT_ACK_DEADLINE = Duration.standardSeconds(30); public static final int DEFAULT_WRITER_NUM_SHARDS = 20; public static final int DEFAULT_WRITER_CLIENTS_PER_WORKER = 4; public static final Boolean DEFAULT_WRITER_PUBLISH_LATENCY_METRICS = false; @@ -461,7 +462,8 @@ public static Read read() { .setParseFn(SolaceRecordMapper::map) .setTimestampFn(SENDER_TIMESTAMP_FUNCTION) .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS) - .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)); + .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD) + .setAckDeadline(DEFAULT_ACK_DEADLINE)); } /** @@ -490,7 +492,8 @@ public static Read read( .setParseFn(parseFn) .setTimestampFn(timestampFn) .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS) - .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)); + .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD) + .setAckDeadline(DEFAULT_ACK_DEADLINE)); } /** @@ -576,6 +579,16 @@ public Read withWatermarkIdleDurationThreshold(Duration idleDurationThreshold return this; } + /** + * Optional. Sets the deadline for acknowledging messages. If a checkpoint is not finalized + * within this duration, the messages in that checkpoint will be negatively acknowledged + * (Nacked) to the broker. The default ack deadline is 30 seconds. + */ + public Read withAckDeadline(Duration ackDeadline) { + configurationBuilder.setAckDeadline(ackDeadline); + return this; + } + /** * Optional, default: false. Set to deduplicate messages based on the {@link * BytesXMLMessage#getApplicationMessageId()} of the incoming {@link BytesXMLMessage}. If the @@ -689,6 +702,8 @@ abstract static class Configuration { abstract Duration getWatermarkIdleDurationThreshold(); + abstract Duration getAckDeadline(); + public static Builder builder() { Builder builder = new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder(); @@ -719,6 +734,8 @@ abstract Builder setParseFn( abstract Builder setWatermarkIdleDurationThreshold(Duration idleDurationThreshold); + abstract Builder setAckDeadline(Duration ackDeadline); + abstract Configuration build(); } } @@ -756,7 +773,8 @@ public PCollection expand(PBegin input) { coder, configuration.getTimestampFn(), configuration.getWatermarkIdleDurationThreshold(), - configuration.getParseFn()))); + configuration.getParseFn(), + configuration.getAckDeadline()))); } @VisibleForTesting diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java new file mode 100644 index 000000000000..e8838f219dc8 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A global registry to keep track of active {@link UnboundedSolaceReader} instances on the worker + * JVM using weak references. + * + *

This allows serialized {@link SolaceCheckpointMark} instances to resolve their originating + * reader and perform sequential acknowledgments. + */ +class ActiveReadersRegistry { + private static final Cache> registry = + CacheBuilder.newBuilder().weakValues().build(); + + public static void register(String uuid, UnboundedSolaceReader reader) { + registry.put(uuid, reader); + } + + public static void unregister(String uuid) { + registry.invalidate(uuid); + } + + public static @Nullable UnboundedSolaceReader get(String uuid) { + return registry.getIfPresent(uuid); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index 83aed07374b3..8e9647549d76 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -17,15 +17,12 @@ */ package org.apache.beam.sdk.io.solace.read; -import com.solacesystems.jcsmp.BytesXMLMessage; import java.util.Objects; -import java.util.Queue; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,33 +36,41 @@ @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); - private transient Queue safeToAck; + private String readerUuid; + private long checkpointId; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction - private SolaceCheckpointMark() {} + private SolaceCheckpointMark() { + this.readerUuid = ""; + this.checkpointId = 0; + } /** * Creates a new {@link SolaceCheckpointMark}. * - * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged. + * @param readerUuid - the UUID of the originating reader. + * @param checkpointId - the unique ID of this checkpoint. */ - SolaceCheckpointMark(Queue safeToAck) { - this.safeToAck = safeToAck; + SolaceCheckpointMark(String readerUuid, long checkpointId) { + this.readerUuid = readerUuid; + this.checkpointId = checkpointId; } @Override public void finalizeCheckpoint() { - BytesXMLMessage msg; - while ((msg = safeToAck.poll()) != null) { - try { - msg.ackMessage(); - } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); - } + if (readerUuid == null || readerUuid.isEmpty()) { + LOG.warn("SolaceIO.Read: Checkpoint has no reader UUID, cannot finalize."); + return; + } + UnboundedSolaceReader reader = ActiveReadersRegistry.get(readerUuid); + if (reader != null) { + reader.finalizeCheckpoint(checkpointId); + } else { + LOG.warn( + "SolaceIO.Read: Reader with UUID {} not found in registry. " + + "Checkpoint {} cannot be finalized. Messages will be redelivered if session is closed.", + readerUuid, + checkpointId); } } @@ -81,14 +86,11 @@ public boolean equals(@Nullable Object o) { return false; } SolaceCheckpointMark that = (SolaceCheckpointMark) o; - return safeToAck == that.safeToAck - || (safeToAck != null - && that.safeToAck != null - && Iterables.elementsEqual(safeToAck, that.safeToAck)); + return checkpointId == that.checkpointId && Objects.equals(readerUuid, that.readerUuid); } @Override public int hashCode() { - return Objects.hash(safeToAck); + return Objects.hash(readerUuid, checkpointId); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index dc84e0a07017..0fbafc013e89 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -20,28 +20,38 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.XMLMessage; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.solace.broker.SempClient; import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -55,16 +65,27 @@ class UnboundedSolaceReader extends UnboundedReader { private final UnboundedSolaceSource currentSource; private final WatermarkPolicy watermarkPolicy; private final SempClient sempClient; - private final UUID readerUuid; + final String readerUuid; + private final ExecutorService ackExecutor; + @VisibleForTesting Supplier clock = System::currentTimeMillis; + private final Object lock = new Object(); private final SessionServiceFactory sessionServiceFactory; private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; + private final Counter messagesReceived = + Metrics.counter(UnboundedSolaceReader.class, "messages_received"); + private final Counter messagesAcked = + Metrics.counter(UnboundedSolaceReader.class, "messages_acked"); + + private final Duration ackDeadline; /** - * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION: - * Accessed by both reader and checkpointing threads. + * Map to track pending checkpoints and their messages. Accessed by both reader + * (getCheckpointMark) and finalizer (finalizeCheckpoint) threads. */ - private final Queue safeToAckMessages = new ConcurrentLinkedQueue<>(); + private final TreeMap pendingCheckpoints = new TreeMap<>(); + + private long nextCheckpointId = 1; /** * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a @@ -72,7 +93,7 @@ class UnboundedSolaceReader extends UnboundedReader { */ private final Queue receivedMessages = new ArrayDeque<>(); - private static final Cache sessionServiceCache; + private static final Cache sessionServiceCache; private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1); static { @@ -81,7 +102,7 @@ class UnboundedSolaceReader extends UnboundedReader { CacheBuilder.newBuilder() .expireAfterAccess(cacheExpirationTimeout) .removalListener( - (RemovalNotification notification) -> { + (RemovalNotification notification) -> { LOG.info( "SolaceIO.Read: Closing session for the reader with uuid {} as it has been idle for over {}.", notification.getKey(), @@ -108,7 +129,9 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold()); this.sessionServiceFactory = currentSource.getSessionServiceFactory(); this.sempClient = currentSource.getSempClientFactory().create(); - this.readerUuid = UUID.randomUUID(); + this.readerUuid = UUID.randomUUID().toString(); + this.ackExecutor = Executors.newFixedThreadPool(4); + this.ackDeadline = java.time.Duration.ofMillis(currentSource.getAckDeadline().getMillis()); } private SessionService getSessionService() { @@ -136,8 +159,7 @@ public boolean start() { @Override public boolean advance() { - finalizeReadyMessages(); - + checkTimeouts(); BytesXMLMessage receivedXmlMessage; try { receivedXmlMessage = getSessionService().getReceiver().receive(); @@ -152,29 +174,93 @@ public boolean advance() { solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); receivedMessages.add(receivedXmlMessage); + messagesReceived.inc(); return true; } @Override public void close() { - finalizeReadyMessages(); sessionServiceCache.invalidate(readerUuid); + ActiveReadersRegistry.unregister(readerUuid); + ackExecutor.shutdown(); + try { + if (!ackExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + ackExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + ackExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + void finalizeCheckpoint(long checkpointId) { + List messagesToAck = new ArrayList<>(); + + synchronized (lock) { + SortedMap toAck = pendingCheckpoints.headMap(checkpointId, true); + for (PendingCheckpoint cp : toAck.values()) { + messagesToAck.addAll(cp.messages); + } + toAck.clear(); + } + + List> futures = new ArrayList<>(); + for (BytesXMLMessage msg : messagesToAck) { + futures.add( + CompletableFuture.runAsync( + () -> { + try { + msg.ackMessage(); + messagesAcked.inc(); + } catch (IllegalStateException e) { + LOG.warn( + "SolaceIO.Read: Failed to acknowledge message with applicationMessageId={}, ackMessageId={}. Session might be closed.", + msg.getApplicationMessageId(), + msg.getAckMessageId(), + e); + } + }, + ackExecutor)); + } + + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } catch (Exception e) { + LOG.warn("SolaceIO.Read: Exception waiting for message acknowledgements", e); + } } - public void finalizeReadyMessages() { - BytesXMLMessage msg; - while ((msg = safeToAckMessages.poll()) != null) { - try { - msg.ackMessage(); - } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); - safeToAckMessages.add(msg); // In case the error was transient, might succeed later - break; // Commit is only best effort + private void checkTimeouts() { + long now = clock.get(); + List expired = new ArrayList<>(); + synchronized (lock) { + while (!pendingCheckpoints.isEmpty()) { + long oldestId = pendingCheckpoints.firstKey(); + PendingCheckpoint oldest = pendingCheckpoints.get(oldestId); + if (oldest != null && now - oldest.timestamp > ackDeadline.toMillis()) { + pendingCheckpoints.remove(oldestId); + expired.add(oldest); + } else { + break; + } + } + } + + for (PendingCheckpoint cp : expired) { + LOG.warn( + "SolaceIO.Read: Checkpoint {} timed out after {}ms. Nacking messages.", + cp.id, + now - cp.timestamp); + for (BytesXMLMessage msg : cp.messages) { + ackExecutor.execute( + () -> { + try { + msg.settle(XMLMessage.Outcome.FAILED); + } catch (Exception e) { + LOG.warn("SolaceIO.Read: Failed to nack message", e); + } + }); } } } @@ -190,9 +276,14 @@ public Instant getWatermark() { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - safeToAckMessages.addAll(receivedMessages); + long checkpointId = nextCheckpointId++; + ImmutableList messages = ImmutableList.copyOf(receivedMessages); receivedMessages.clear(); - return new SolaceCheckpointMark(safeToAckMessages); + synchronized (lock) { + pendingCheckpoints.put( + checkpointId, new PendingCheckpoint(checkpointId, messages, clock.get())); + } + return new SolaceCheckpointMark(readerUuid, checkpointId); } @Override @@ -242,4 +333,16 @@ public long getTotalBacklogBytes() { return BACKLOG_UNKNOWN; } } + + private static class PendingCheckpoint { + final long id; + final List messages; + final long timestamp; + + PendingCheckpoint(long id, List messages, long timestamp) { + this.id = id; + this.messages = messages; + this.timestamp = timestamp; + } + } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java index 1cb17a49fbdb..76feaaf5bb2a 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java @@ -49,6 +49,7 @@ public class UnboundedSolaceSource extends UnboundedSource timestampFn; private final Duration watermarkIdleDurationThreshold; private final SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn; + private final Duration ackDeadline; public Queue getQueue() { return queue; @@ -74,6 +75,10 @@ public Duration getWatermarkIdleDurationThreshold() { return parseFn; } + public Duration getAckDeadline() { + return ackDeadline; + } + public UnboundedSolaceSource( Queue queue, SempClientFactory sempClientFactory, @@ -83,7 +88,8 @@ public UnboundedSolaceSource( Coder coder, SerializableFunction timestampFn, Duration watermarkIdleDurationThreshold, - SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn) { + SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn, + Duration ackDeadline) { this.queue = queue; this.sempClientFactory = sempClientFactory; this.sessionServiceFactory = sessionServiceFactory; @@ -93,6 +99,7 @@ public UnboundedSolaceSource( this.timestampFn = timestampFn; this.watermarkIdleDurationThreshold = watermarkIdleDurationThreshold; this.parseFn = parseFn; + this.ackDeadline = ackDeadline; } @Override @@ -100,7 +107,9 @@ public UnboundedReader createReader( PipelineOptions options, @Nullable SolaceCheckpointMark checkpointMark) { // it makes no sense to resume a Solace Session with the previous checkpoint // so don't need the pass a checkpoint to new a Solace Reader - return new UnboundedSolaceReader<>(this); + UnboundedSolaceReader reader = new UnboundedSolaceReader<>(this); + ActiveReadersRegistry.register(reader.readerUuid, reader); + return reader; } @Override @@ -134,7 +143,8 @@ private List> getSolaceSources( coder, timestampFn, watermarkIdleDurationThreshold, - parseFn); + parseFn, + ackDeadline); sourceList.add(source); } return sourceList; diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index a1f80932eddf..44a3e526736c 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -97,7 +97,8 @@ private static UnboundedSolaceSource getSource(Read spec, TestPi spec.inferCoder(pipeline, configuration.getTypeDescriptor()), configuration.getTimestampFn(), configuration.getWatermarkIdleDurationThreshold(), - configuration.getParseFn()); + configuration.getParseFn(), + configuration.getAckDeadline()); } @Test @@ -458,18 +459,16 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { // mark all consumed messages as ready to be acknowledged CheckpointMark checkpointMark = reader.getCheckpointMark(); - // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked. + // consume 1 more message. reader.advance(); - assertEquals(4, countAckMessages.get()); + assertEquals(0, countAckMessages.get()); // consume 1 more message. No change in the acknowledged messages. reader.advance(); - assertEquals(4, countAckMessages.get()); + assertEquals(0, countAckMessages.get()); // acknowledge from the first checkpoint checkpointMark.finalizeCheckpoint(); - // No change in the acknowledged messages, because they were acknowledged in the #advance() - // method. assertEquals(4, countAckMessages.get()); } @@ -542,7 +541,7 @@ public void testCheckpointMarkSafety() throws Exception { @Test public void testDefaultCoder() { Coder coder = - new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null, null) + new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null, null, null) .getCheckpointMarkCoder(); CoderProperties.coderSerializable(coder); } @@ -607,4 +606,64 @@ public void testTopicEncoding() { PAssert.that(destAreTopics).containsInAnyOrder(expected); pipeline.run(); } + + @Test + public void testLostCheckpointCatchUp() throws Exception { + AtomicInteger countConsumedMessages = new AtomicInteger(0); + AtomicInteger countAckMessages = new AtomicInteger(0); + + // Broker that creates input data + SerializableFunction recordFn = + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }; + + SessionServiceFactory fakeSessionServiceFactory = + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); + + Read spec = + getDefaultRead() + .withSessionServiceFactory(fakeSessionServiceFactory) + .withMaxNumConnections(4); + + UnboundedSolaceSource initialSource = getSource(spec, pipeline); + + UnboundedReader reader = + initialSource.createReader(PipelineOptionsFactory.create(), null); + + // start the reader and move to the first record + assertTrue(reader.start()); + + // consume 3 messages (NB: start already consumed the first message) + for (int i = 0; i < 3; i++) { + assertTrue(reader.advance()); + } + assertEquals(0, countAckMessages.get()); + + // Create Checkpoint T1 (contains 4 messages) + reader.getCheckpointMark(); + + // consume 3 more messages + for (int i = 0; i < 3; i++) { + assertTrue(reader.advance()); + } + assertEquals(0, countAckMessages.get()); + + // Create Checkpoint T2 (contains 3 messages) + CheckpointMark checkpointMark2 = reader.getCheckpointMark(); + + // We "lose" checkpointMark1 (do NOT finalize it) + // We finalize checkpointMark2 + checkpointMark2.finalizeCheckpoint(); + + // checkpointMark2 should have caught up and acked both T1 and T2 (4 + 3 = 7 messages) + assertEquals(7, countAckMessages.get()); + } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java index 9e04c4cfd276..2d2b9911f5de 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java @@ -26,6 +26,7 @@ import com.solacesystems.jcsmp.ReplicationGroupMessageId; import com.solacesystems.jcsmp.SDTMap; import com.solacesystems.jcsmp.User_Cos; +import com.solacesystems.jcsmp.XMLMessage.Outcome; import com.solacesystems.jcsmp.impl.ReplicationGroupMessageIdImpl; import java.io.IOException; import java.io.InputStream; @@ -35,6 +36,7 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; import org.apache.beam.sdk.schemas.JavaBeanSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -129,11 +131,29 @@ public static BytesXMLMessage getBytesXmlMessage( return getBytesXmlMessage(payload, messageId, ackMessageFn, null); } + public static BytesXMLMessage getBytesXmlMessageWithSettle( + String payload, + String messageId, + SerializableFunction ackMessageFn, + Consumer settleCallback) { + return getBytesXmlMessageInternal(payload, messageId, ackMessageFn, null, settleCallback); + } + public static BytesXMLMessage getBytesXmlMessage( String payload, String messageId, SerializableFunction ackMessageFn, ReplicationGroupMessageId replicationGroupMessageId) { + return getBytesXmlMessageInternal( + payload, messageId, ackMessageFn, replicationGroupMessageId, null); + } + + private static BytesXMLMessage getBytesXmlMessageInternal( + String payload, + String messageId, + SerializableFunction ackMessageFn, + ReplicationGroupMessageId replicationGroupMessageId, + Consumer settleCallback) { long receiverTimestamp = 1708100477067L; long expiration = 1000L; long timeToLive = 1000L; @@ -654,7 +674,11 @@ public void setTopicNameLocation(int arg0, int arg1) {} public void setUserData(byte[] arg0) {} @Override - public void settle(Outcome arg0) throws JCSMPException {} + public void settle(Outcome arg0) throws JCSMPException { + if (settleCallback != null) { + settleCallback.accept(arg0); + } + } @Override public int writeAttachment(byte[] arg0) { diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReaderTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReaderTest.java new file mode 100644 index 000000000000..cf0a1b3353cb --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReaderTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.XMLMessage.Outcome; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.io.solace.MockSempClientFactory; +import org.apache.beam.sdk.io.solace.MockSessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.SolaceDataUtils; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class UnboundedSolaceReaderTest { + + @Test + public void testCheckpointTimeoutAndNack() throws Exception { + AtomicReference settledOutcome = new AtomicReference<>(); + + // 1. Create a mock message that captures the settle outcome + SerializableFunction recordFn = + index -> { + if (index == 0) { + return SolaceDataUtils.getBytesXmlMessageWithSettle( + "payload_test0", + "450", + null, // ackCallback + settledOutcome::set // settleCallback + ); + } + return null; + }; + + MockSessionServiceFactory fakeSessionServiceFactory = + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(1).build(); + + // 2. Create the source directly + UnboundedSolaceSource source = + new UnboundedSolaceSource<>( + com.solacesystems.jcsmp.JCSMPFactory.onlyInstance().createQueue("queue"), + MockSempClientFactory.getDefaultMock(), + fakeSessionServiceFactory, + 1, // maxNumConnections + false, // enableDeduplication + null, // coder (not needed for direct reader test if we don't serialize) + input -> org.joda.time.Instant.ofEpochMilli(1000L), // timestampFn + org.joda.time.Duration.standardSeconds(1), // watermarkIdleDurationThreshold + input -> SolaceDataUtils.getSolaceRecord("payload_test0", "450"), // parseFn + org.joda.time.Duration.standardSeconds(30) // ackDeadline + ); + + UnboundedSolaceReader reader = + (UnboundedSolaceReader) + source.createReader(PipelineOptionsFactory.create(), null); + + // 3. Inject a controllable clock + AtomicReference currentTime = new AtomicReference<>(1000L); + reader.clock = currentTime::get; + + // 4. Start the reader (ingests message 0) + assertTrue(reader.start()); + + // 5. Create a checkpoint (T1) + reader.getCheckpointMark(); + + // Verify no nack yet + assertEquals(null, settledOutcome.get()); + + // 6. Advance time by 10 seconds (less than 30s timeout) and advance reader + currentTime.set(11000L); + reader.advance(); // This calls checkTimeouts() + assertEquals(null, settledOutcome.get()); + + // 7. Advance time by 21 more seconds (total 31 seconds, > 30s timeout) and advance reader + currentTime.set(32000L); + reader.advance(); // This calls checkTimeouts() which should trigger Nack + + // 8. Wait for async nack to complete (since it runs in ackExecutor) + // We shut down the reader which awaits termination of the executor + reader.close(); + + // 9. Verify that the message was Nacked with FAILED outcome + assertEquals(Outcome.FAILED, settledOutcome.get()); + } +}