diff --git a/topic/pom.xml b/topic/pom.xml index d7cb1405c..c0be28e12 100644 --- a/topic/pom.xml +++ b/topic/pom.xml @@ -62,6 +62,20 @@ + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ydbplatform/local-ydb:trunk + + + + + + jdk8-build diff --git a/topic/src/main/java/tech/ydb/topic/description/Consumer.java b/topic/src/main/java/tech/ydb/topic/description/Consumer.java index 9bda68b8d..23965e0fc 100644 --- a/topic/src/main/java/tech/ydb/topic/description/Consumer.java +++ b/topic/src/main/java/tech/ydb/topic/description/Consumer.java @@ -1,5 +1,6 @@ package tech.ydb.topic.description; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; @@ -21,12 +22,14 @@ * @author Nikolay Perfilov */ public class Consumer { + private final String name; private final boolean important; private final Instant readFrom; private final List supportedCodecs; private final Map attributes; private final ConsumerStats stats; + private final Duration availabilityPeriod; private Consumer(Builder builder) { this.name = builder.name; @@ -35,6 +38,7 @@ private Consumer(Builder builder) { this.supportedCodecs = builder.supportedCodecs; this.attributes = ImmutableMap.copyOf(builder.attributes); this.stats = builder.stats; + this.availabilityPeriod = builder.availabilityPeriod; } public Consumer(YdbTopic.Consumer consumer) { @@ -45,6 +49,8 @@ public Consumer(YdbTopic.Consumer consumer) { .stream().map(ProtoUtils::codecFromProto).collect(Collectors.toList()); this.attributes = consumer.getAttributesMap(); this.stats = new ConsumerStats(consumer.getConsumerStats()); + this.availabilityPeriod = consumer.hasAvailabilityPeriod() ? + ProtobufUtils.protoToDuration(consumer.getAvailabilityPeriod()) : null; } public static Builder newBuilder() { @@ -55,6 +61,12 @@ public String getName() { return name; } + /** + * Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention. + * User should take care that such consumer never stalls, to prevent running out of disk space. + * + * @return Flag that this consumer is important. + */ public boolean isImportant() { return important; } @@ -80,24 +92,44 @@ public Map getAttributes() { @Nullable public ConsumerStats getStats() { return stats; + } /** - * BUILDER + * Message for this consumer will not expire due to retention for at least availabilityPeriod if they + * aren't committed. + * + * @return availability period for this consumer */ + @Nullable + public Duration getAvailabilityPeriod() { + return availabilityPeriod; + } + public static class Builder { + private String name; private boolean important = false; private Instant readFrom = null; private final List supportedCodecs = new ArrayList<>(); private Map attributes = new HashMap<>(); private ConsumerStats stats = null; + private Duration availabilityPeriod = null; public Builder setName(@Nonnull String name) { this.name = name; return this; } + /** + * Configure the importance for this consumer. + *
+ * An important consumer cannot have availabilityPeriod option + * + * @see Consumer#isImportant() + * @param important - this consumer importance flag + * @return this consumer builder + */ public Builder setImportant(boolean important) { this.important = important; return this; @@ -108,6 +140,20 @@ public Builder setReadFrom(Instant readFrom) { return this; } + /** + * Configure availabilityPeriod for this consumer. + *
+ * Option availabilityPeriod is not compatible with important option + * + * @see Consumer#getAvailabilityPeriod() + * @param period - availability period value + * @return this consumer builder + */ + public Builder setAvailabilityPeriod(Duration period) { + this.availabilityPeriod = period; + return this; + } + public Builder addSupportedCodec(Codec codec) { this.supportedCodecs.add(codec); return this; @@ -151,16 +197,17 @@ public boolean equals(Object o) { return false; } Consumer consumer = (Consumer) o; - return important == consumer.important && - Objects.equals(name, consumer.name) && - Objects.equals(readFrom, consumer.readFrom) && - Objects.equals(supportedCodecs, consumer.supportedCodecs) && - Objects.equals(attributes, consumer.attributes) && - Objects.equals(stats, consumer.stats); + return important == consumer.important + && Objects.equals(name, consumer.name) + && Objects.equals(readFrom, consumer.readFrom) + && Objects.equals(supportedCodecs, consumer.supportedCodecs) + && Objects.equals(attributes, consumer.attributes) + && Objects.equals(stats, consumer.stats) + && Objects.equals(availabilityPeriod, consumer.availabilityPeriod); } @Override public int hashCode() { - return Objects.hash(name, important, readFrom, supportedCodecs, attributes, stats); + return Objects.hash(name, important, readFrom, supportedCodecs, attributes, stats, availabilityPeriod); } } diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java index f4deaa637..7980ac790 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java @@ -452,6 +452,10 @@ private static YdbTopic.Consumer toProto(Consumer consumer) { consumerBuilder.setReadFrom(ProtobufUtils.instantToProto(consumer.getReadFrom())); } + if (consumer.getAvailabilityPeriod() != null) { + consumerBuilder.setAvailabilityPeriod(ProtobufUtils.durationToProto(consumer.getAvailabilityPeriod())); + } + List supportedCodecs = consumer.getSupportedCodecsList(); if (!supportedCodecs.isEmpty()) { YdbTopic.SupportedCodecs.Builder codecBuilder = YdbTopic.SupportedCodecs.newBuilder(); diff --git a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java index a4624e27b..86c5a42ee 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java @@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory; import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; import tech.ydb.test.junit4.GrpcTransportRule; import tech.ydb.topic.TopicClient; import tech.ydb.topic.description.Consumer; @@ -316,9 +317,11 @@ public void step09_describeTopicStats() { for (Consumer consumer: withoutStats.getConsumers()) { // TODO: fix it, must be null Assert.assertNotNull(consumer.getStats()); + Assert.assertNull(consumer.getAvailabilityPeriod()); } for (Consumer consumer: withStats.getConsumers()) { Assert.assertNotNull(consumer.getStats()); + Assert.assertNull(consumer.getAvailabilityPeriod()); } for (PartitionInfo partition: withoutStats.getPartitions()) { @@ -329,4 +332,20 @@ public void step09_describeTopicStats() { Assert.assertNotNull(partition.getPartitionStats()); } } + + @Test + public void step10_invalidConsumerTest() { + AlterTopicSettings settings = AlterTopicSettings.newBuilder() + .addAddConsumer(Consumer.newBuilder() + .setName("WRONG_CONSUMER") + // important and availability_period are incompatible + .setImportant(true) + .setAvailabilityPeriod(Duration.ofMinutes(5)) + .build() + ).build(); + + Status status = client.alterTopic(TEST_TOPIC, settings).join(); + Assert.assertFalse("Alter must fail, but get status " + status, status.isSuccess()); + Assert.assertEquals("Alter must fail, but get status " + status, StatusCode.BAD_REQUEST, status.getCode()); + } }