Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ public class SolaceIO {
private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;
private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD =
Duration.standardSeconds(30);
private static final Duration DEFAULT_ACK_DEADLINE = Duration.standardSeconds(30);
public static final int DEFAULT_WRITER_NUM_SHARDS = 20;
public static final int DEFAULT_WRITER_CLIENTS_PER_WORKER = 4;
public static final Boolean DEFAULT_WRITER_PUBLISH_LATENCY_METRICS = false;
Expand Down Expand Up @@ -461,7 +462,8 @@ public static Read<Solace.Record> read() {
.setParseFn(SolaceRecordMapper::map)
.setTimestampFn(SENDER_TIMESTAMP_FUNCTION)
.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)
.setAckDeadline(DEFAULT_ACK_DEADLINE));
}

/**
Expand Down Expand Up @@ -490,7 +492,8 @@ public static <T> Read<T> read(
.setParseFn(parseFn)
.setTimestampFn(timestampFn)
.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD));
.setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)
.setAckDeadline(DEFAULT_ACK_DEADLINE));
}

/**
Expand Down Expand Up @@ -576,6 +579,16 @@ public Read<T> withWatermarkIdleDurationThreshold(Duration idleDurationThreshold
return this;
}

/**
* Optional. Sets the deadline for acknowledging messages. If a checkpoint is not finalized
* within this duration, the messages in that checkpoint will be negatively acknowledged
* (Nacked) to the broker. The default ack deadline is 30 seconds.
*/
public Read<T> withAckDeadline(Duration ackDeadline) {
configurationBuilder.setAckDeadline(ackDeadline);
return this;
}

/**
* Optional, default: false. Set to deduplicate messages based on the {@link
* BytesXMLMessage#getApplicationMessageId()} of the incoming {@link BytesXMLMessage}. If the
Expand Down Expand Up @@ -689,6 +702,8 @@ abstract static class Configuration<T> {

abstract Duration getWatermarkIdleDurationThreshold();

abstract Duration getAckDeadline();

public static <T> Builder<T> builder() {
Builder<T> builder =
new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder<T>();
Expand Down Expand Up @@ -719,6 +734,8 @@ abstract Builder<T> setParseFn(

abstract Builder<T> setWatermarkIdleDurationThreshold(Duration idleDurationThreshold);

abstract Builder<T> setAckDeadline(Duration ackDeadline);

abstract Configuration<T> build();
}
}
Expand Down Expand Up @@ -756,7 +773,8 @@ public PCollection<T> expand(PBegin input) {
coder,
configuration.getTimestampFn(),
configuration.getWatermarkIdleDurationThreshold(),
configuration.getParseFn())));
configuration.getParseFn(),
configuration.getAckDeadline())));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.read;

import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A global registry to keep track of active {@link UnboundedSolaceReader} instances on the worker
* JVM using weak references.
*
* <p>This allows serialized {@link SolaceCheckpointMark} instances to resolve their originating
* reader and perform sequential acknowledgments.
*/
class ActiveReadersRegistry {
private static final Cache<String, UnboundedSolaceReader<?>> registry =
CacheBuilder.newBuilder().weakValues().build();

public static void register(String uuid, UnboundedSolaceReader<?> reader) {
registry.put(uuid, reader);
}

public static void unregister(String uuid) {
registry.invalidate(uuid);
}

public static @Nullable UnboundedSolaceReader<?> get(String uuid) {
return registry.getIfPresent(uuid);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
*/
package org.apache.beam.sdk.io.solace.read;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.util.Objects;
import java.util.Queue;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,33 +36,41 @@
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class);
private transient Queue<BytesXMLMessage> safeToAck;
private String readerUuid;
private long checkpointId;
Comment thread
iht marked this conversation as resolved.

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}
private SolaceCheckpointMark() {
this.readerUuid = "";
this.checkpointId = 0;
}

/**
* Creates a new {@link SolaceCheckpointMark}.
*
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
* @param readerUuid - the UUID of the originating reader.
* @param checkpointId - the unique ID of this checkpoint.
*/
SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
SolaceCheckpointMark(String readerUuid, long checkpointId) {
this.readerUuid = readerUuid;
this.checkpointId = checkpointId;
}

@Override
public void finalizeCheckpoint() {
BytesXMLMessage msg;
while ((msg = safeToAck.poll()) != null) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
LOG.error(
"SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.",
msg.getApplicationMessageId(),
msg.getAckMessageId(),
e);
}
if (readerUuid == null || readerUuid.isEmpty()) {
LOG.warn("SolaceIO.Read: Checkpoint has no reader UUID, cannot finalize.");
return;
}
UnboundedSolaceReader<?> reader = ActiveReadersRegistry.get(readerUuid);
if (reader != null) {
reader.finalizeCheckpoint(checkpointId);
} else {
LOG.warn(
"SolaceIO.Read: Reader with UUID {} not found in registry. "
+ "Checkpoint {} cannot be finalized. Messages will be redelivered if session is closed.",
readerUuid,
checkpointId);
}
}

Expand All @@ -81,14 +86,11 @@ public boolean equals(@Nullable Object o) {
return false;
}
SolaceCheckpointMark that = (SolaceCheckpointMark) o;
return safeToAck == that.safeToAck
|| (safeToAck != null
&& that.safeToAck != null
&& Iterables.elementsEqual(safeToAck, that.safeToAck));
return checkpointId == that.checkpointId && Objects.equals(readerUuid, that.readerUuid);
}

@Override
public int hashCode() {
return Objects.hash(safeToAck);
return Objects.hash(readerUuid, checkpointId);
}
}
Loading
Loading