Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions topic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<environmentVariables>
<YDB_DOCKER_IMAGE>ydbplatform/local-ydb:trunk</YDB_DOCKER_IMAGE>
</environmentVariables>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>jdk8-build</id>
Expand Down
63 changes: 55 additions & 8 deletions topic/src/main/java/tech/ydb/topic/description/Consumer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.topic.description;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -21,12 +22,14 @@
* @author Nikolay Perfilov
*/
public class Consumer {

private final String name;
private final boolean important;
private final Instant readFrom;
private final List<Codec> supportedCodecs;
private final Map<String, String> attributes;
private final ConsumerStats stats;
private final Duration availabilityPeriod;

private Consumer(Builder builder) {
this.name = builder.name;
Expand All @@ -35,6 +38,7 @@ private Consumer(Builder builder) {
this.supportedCodecs = builder.supportedCodecs;
this.attributes = ImmutableMap.copyOf(builder.attributes);
this.stats = builder.stats;
this.availabilityPeriod = builder.availabilityPeriod;
}

public Consumer(YdbTopic.Consumer consumer) {
Expand All @@ -45,6 +49,8 @@ public Consumer(YdbTopic.Consumer consumer) {
.stream().map(ProtoUtils::codecFromProto).collect(Collectors.toList());
this.attributes = consumer.getAttributesMap();
this.stats = new ConsumerStats(consumer.getConsumerStats());
this.availabilityPeriod = consumer.hasAvailabilityPeriod() ?
ProtobufUtils.protoToDuration(consumer.getAvailabilityPeriod()) : null;
}

public static Builder newBuilder() {
Expand All @@ -55,6 +61,12 @@ public String getName() {
return name;
}

/**
* Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
* User should take care that such consumer never stalls, to prevent running out of disk space.
*
* @return Flag that this consumer is important.
*/
public boolean isImportant() {
return important;
}
Expand All @@ -80,24 +92,44 @@ public Map<String, String> getAttributes() {
@Nullable
public ConsumerStats getStats() {
return stats;

}

/**
* BUILDER
* Message for this consumer will not expire due to retention for at least <code>availabilityPeriod</code> if they
* aren't committed.
*
* @return availability period for this consumer
*/
@Nullable
public Duration getAvailabilityPeriod() {
return availabilityPeriod;
}

public static class Builder {

private String name;
private boolean important = false;
private Instant readFrom = null;
private final List<Codec> supportedCodecs = new ArrayList<>();
private Map<String, String> attributes = new HashMap<>();
private ConsumerStats stats = null;
private Duration availabilityPeriod = null;

public Builder setName(@Nonnull String name) {
this.name = name;
return this;
}

/**
* Configure the importance for this consumer.
* <br>
* An important consumer cannot have <code>availabilityPeriod</code> option
*
* @see Consumer#isImportant()
* @param important - this consumer importance flag
* @return this consumer builder
*/
public Builder setImportant(boolean important) {
this.important = important;
return this;
Expand All @@ -108,6 +140,20 @@ public Builder setReadFrom(Instant readFrom) {
return this;
}

/**
* Configure <code>availabilityPeriod</code> for this consumer.
* <br>
* Option <code>availabilityPeriod</code> is not compatible with <code>important</code> option
*
* @see Consumer#getAvailabilityPeriod()
* @param period - availability period value
* @return this consumer builder
*/
public Builder setAvailabilityPeriod(Duration period) {
this.availabilityPeriod = period;
return this;
}

public Builder addSupportedCodec(Codec codec) {
this.supportedCodecs.add(codec);
return this;
Expand Down Expand Up @@ -151,16 +197,17 @@ public boolean equals(Object o) {
return false;
}
Consumer consumer = (Consumer) o;
return important == consumer.important &&
Objects.equals(name, consumer.name) &&
Objects.equals(readFrom, consumer.readFrom) &&
Objects.equals(supportedCodecs, consumer.supportedCodecs) &&
Objects.equals(attributes, consumer.attributes) &&
Objects.equals(stats, consumer.stats);
return important == consumer.important
&& Objects.equals(name, consumer.name)
&& Objects.equals(readFrom, consumer.readFrom)
&& Objects.equals(supportedCodecs, consumer.supportedCodecs)
&& Objects.equals(attributes, consumer.attributes)
&& Objects.equals(stats, consumer.stats)
&& Objects.equals(availabilityPeriod, consumer.availabilityPeriod);
}

@Override
public int hashCode() {
return Objects.hash(name, important, readFrom, supportedCodecs, attributes, stats);
return Objects.hash(name, important, readFrom, supportedCodecs, attributes, stats, availabilityPeriod);
}
}
4 changes: 4 additions & 0 deletions topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ private static YdbTopic.Consumer toProto(Consumer consumer) {
consumerBuilder.setReadFrom(ProtobufUtils.instantToProto(consumer.getReadFrom()));
}

if (consumer.getAvailabilityPeriod() != null) {
consumerBuilder.setAvailabilityPeriod(ProtobufUtils.durationToProto(consumer.getAvailabilityPeriod()));
}

List<Codec> supportedCodecs = consumer.getSupportedCodecsList();
if (!supportedCodecs.isEmpty()) {
YdbTopic.SupportedCodecs.Builder codecBuilder = YdbTopic.SupportedCodecs.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.slf4j.LoggerFactory;

import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.test.junit4.GrpcTransportRule;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Consumer;
Expand Down Expand Up @@ -316,9 +317,11 @@ public void step09_describeTopicStats() {
for (Consumer consumer: withoutStats.getConsumers()) {
// TODO: fix it, must be null
Assert.assertNotNull(consumer.getStats());
Assert.assertNull(consumer.getAvailabilityPeriod());
}
for (Consumer consumer: withStats.getConsumers()) {
Assert.assertNotNull(consumer.getStats());
Assert.assertNull(consumer.getAvailabilityPeriod());
}

for (PartitionInfo partition: withoutStats.getPartitions()) {
Expand All @@ -329,4 +332,20 @@ public void step09_describeTopicStats() {
Assert.assertNotNull(partition.getPartitionStats());
}
}

@Test
public void step10_invalidConsumerTest() {
AlterTopicSettings settings = AlterTopicSettings.newBuilder()
.addAddConsumer(Consumer.newBuilder()
.setName("WRONG_CONSUMER")
// important and availability_period are incompatible
.setImportant(true)
.setAvailabilityPeriod(Duration.ofMinutes(5))
.build()
).build();

Status status = client.alterTopic(TEST_TOPIC, settings).join();
Assert.assertFalse("Alter must fail, but get status " + status, status.isSuccess());
Assert.assertEquals("Alter must fail, but get status " + status, StatusCode.BAD_REQUEST, status.getCode());
}
}