diff --git a/topic/pom.xml b/topic/pom.xml
index d7cb1405c..c0be28e12 100644
--- a/topic/pom.xml
+++ b/topic/pom.xml
@@ -62,6 +62,20 @@
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ ydbplatform/local-ydb:trunk
+
+
+
+
+
+
jdk8-build
diff --git a/topic/src/main/java/tech/ydb/topic/description/Consumer.java b/topic/src/main/java/tech/ydb/topic/description/Consumer.java
index 9bda68b8d..23965e0fc 100644
--- a/topic/src/main/java/tech/ydb/topic/description/Consumer.java
+++ b/topic/src/main/java/tech/ydb/topic/description/Consumer.java
@@ -1,5 +1,6 @@
package tech.ydb.topic.description;
+import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
@@ -21,12 +22,14 @@
* @author Nikolay Perfilov
*/
public class Consumer {
+
private final String name;
private final boolean important;
private final Instant readFrom;
private final List supportedCodecs;
private final Map attributes;
private final ConsumerStats stats;
+ private final Duration availabilityPeriod;
private Consumer(Builder builder) {
this.name = builder.name;
@@ -35,6 +38,7 @@ private Consumer(Builder builder) {
this.supportedCodecs = builder.supportedCodecs;
this.attributes = ImmutableMap.copyOf(builder.attributes);
this.stats = builder.stats;
+ this.availabilityPeriod = builder.availabilityPeriod;
}
public Consumer(YdbTopic.Consumer consumer) {
@@ -45,6 +49,8 @@ public Consumer(YdbTopic.Consumer consumer) {
.stream().map(ProtoUtils::codecFromProto).collect(Collectors.toList());
this.attributes = consumer.getAttributesMap();
this.stats = new ConsumerStats(consumer.getConsumerStats());
+ this.availabilityPeriod = consumer.hasAvailabilityPeriod() ?
+ ProtobufUtils.protoToDuration(consumer.getAvailabilityPeriod()) : null;
}
public static Builder newBuilder() {
@@ -55,6 +61,12 @@ public String getName() {
return name;
}
+ /**
+ * Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
+ * User should take care that such consumer never stalls, to prevent running out of disk space.
+ *
+ * @return Flag that this consumer is important.
+ */
public boolean isImportant() {
return important;
}
@@ -80,24 +92,44 @@ public Map getAttributes() {
@Nullable
public ConsumerStats getStats() {
return stats;
+
}
/**
- * BUILDER
+ * Message for this consumer will not expire due to retention for at least availabilityPeriod if they
+ * aren't committed.
+ *
+ * @return availability period for this consumer
*/
+ @Nullable
+ public Duration getAvailabilityPeriod() {
+ return availabilityPeriod;
+ }
+
public static class Builder {
+
private String name;
private boolean important = false;
private Instant readFrom = null;
private final List supportedCodecs = new ArrayList<>();
private Map attributes = new HashMap<>();
private ConsumerStats stats = null;
+ private Duration availabilityPeriod = null;
public Builder setName(@Nonnull String name) {
this.name = name;
return this;
}
+ /**
+ * Configure the importance for this consumer.
+ *
+ * An important consumer cannot have availabilityPeriod option
+ *
+ * @see Consumer#isImportant()
+ * @param important - this consumer importance flag
+ * @return this consumer builder
+ */
public Builder setImportant(boolean important) {
this.important = important;
return this;
@@ -108,6 +140,20 @@ public Builder setReadFrom(Instant readFrom) {
return this;
}
+ /**
+ * Configure availabilityPeriod for this consumer.
+ *
+ * Option availabilityPeriod is not compatible with important option
+ *
+ * @see Consumer#getAvailabilityPeriod()
+ * @param period - availability period value
+ * @return this consumer builder
+ */
+ public Builder setAvailabilityPeriod(Duration period) {
+ this.availabilityPeriod = period;
+ return this;
+ }
+
public Builder addSupportedCodec(Codec codec) {
this.supportedCodecs.add(codec);
return this;
@@ -151,16 +197,17 @@ public boolean equals(Object o) {
return false;
}
Consumer consumer = (Consumer) o;
- return important == consumer.important &&
- Objects.equals(name, consumer.name) &&
- Objects.equals(readFrom, consumer.readFrom) &&
- Objects.equals(supportedCodecs, consumer.supportedCodecs) &&
- Objects.equals(attributes, consumer.attributes) &&
- Objects.equals(stats, consumer.stats);
+ return important == consumer.important
+ && Objects.equals(name, consumer.name)
+ && Objects.equals(readFrom, consumer.readFrom)
+ && Objects.equals(supportedCodecs, consumer.supportedCodecs)
+ && Objects.equals(attributes, consumer.attributes)
+ && Objects.equals(stats, consumer.stats)
+ && Objects.equals(availabilityPeriod, consumer.availabilityPeriod);
}
@Override
public int hashCode() {
- return Objects.hash(name, important, readFrom, supportedCodecs, attributes, stats);
+ return Objects.hash(name, important, readFrom, supportedCodecs, attributes, stats, availabilityPeriod);
}
}
diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
index f4deaa637..7980ac790 100644
--- a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
+++ b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
@@ -452,6 +452,10 @@ private static YdbTopic.Consumer toProto(Consumer consumer) {
consumerBuilder.setReadFrom(ProtobufUtils.instantToProto(consumer.getReadFrom()));
}
+ if (consumer.getAvailabilityPeriod() != null) {
+ consumerBuilder.setAvailabilityPeriod(ProtobufUtils.durationToProto(consumer.getAvailabilityPeriod()));
+ }
+
List supportedCodecs = consumer.getSupportedCodecsList();
if (!supportedCodecs.isEmpty()) {
YdbTopic.SupportedCodecs.Builder codecBuilder = YdbTopic.SupportedCodecs.newBuilder();
diff --git a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java
index a4624e27b..86c5a42ee 100644
--- a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java
+++ b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java
@@ -19,6 +19,7 @@
import org.slf4j.LoggerFactory;
import tech.ydb.core.Status;
+import tech.ydb.core.StatusCode;
import tech.ydb.test.junit4.GrpcTransportRule;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Consumer;
@@ -316,9 +317,11 @@ public void step09_describeTopicStats() {
for (Consumer consumer: withoutStats.getConsumers()) {
// TODO: fix it, must be null
Assert.assertNotNull(consumer.getStats());
+ Assert.assertNull(consumer.getAvailabilityPeriod());
}
for (Consumer consumer: withStats.getConsumers()) {
Assert.assertNotNull(consumer.getStats());
+ Assert.assertNull(consumer.getAvailabilityPeriod());
}
for (PartitionInfo partition: withoutStats.getPartitions()) {
@@ -329,4 +332,20 @@ public void step09_describeTopicStats() {
Assert.assertNotNull(partition.getPartitionStats());
}
}
+
+ @Test
+ public void step10_invalidConsumerTest() {
+ AlterTopicSettings settings = AlterTopicSettings.newBuilder()
+ .addAddConsumer(Consumer.newBuilder()
+ .setName("WRONG_CONSUMER")
+ // important and availability_period are incompatible
+ .setImportant(true)
+ .setAvailabilityPeriod(Duration.ofMinutes(5))
+ .build()
+ ).build();
+
+ Status status = client.alterTopic(TEST_TOPIC, settings).join();
+ Assert.assertFalse("Alter must fail, but get status " + status, status.isSuccess());
+ Assert.assertEquals("Alter must fail, but get status " + status, StatusCode.BAD_REQUEST, status.getCode());
+ }
}