Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
private static final Logger LOGGER = LoggerFactory.getLogger(BlobStoreDeletedMessageVault.class);
Expand All @@ -72,12 +73,13 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
private final BucketNameGenerator nameGenerator;
private final Clock clock;
private final VaultConfiguration vaultConfiguration;
private final BlobIdTimeGenerator blobIdTimeGenerator;
private final BlobStoreVaultGarbageCollectionTask.Factory taskFactory;

@Inject
public BlobStoreDeletedMessageVault(MetricFactory metricFactory, DeletedMessageMetadataVault messageMetadataVault,
BlobStore blobStore, BlobStoreDAO blobStoreDAO, BucketNameGenerator nameGenerator,
Clock clock,
Clock clock, BlobIdTimeGenerator blobIdTimeGenerator,
VaultConfiguration vaultConfiguration) {
this.metricFactory = metricFactory;
this.messageMetadataVault = messageMetadataVault;
Expand All @@ -86,22 +88,45 @@ public BlobStoreDeletedMessageVault(MetricFactory metricFactory, DeletedMessageM
this.nameGenerator = nameGenerator;
this.clock = clock;
this.vaultConfiguration = vaultConfiguration;
this.blobIdTimeGenerator = blobIdTimeGenerator;
this.taskFactory = new BlobStoreVaultGarbageCollectionTask.Factory(this);
}

@Deprecated
@VisibleForTesting
public Publisher<Void> appendV1(DeletedMessage deletedMessage, InputStream mimeMessage) {
Preconditions.checkNotNull(deletedMessage);
Preconditions.checkNotNull(mimeMessage);
BucketName bucketName = nameGenerator.currentBucket();

return metricFactory.decoratePublisherWithTimerMetric(
APPEND_METRIC_NAME,
appendMessageV1(deletedMessage, mimeMessage, bucketName));
}

private Mono<Void> appendMessageV1(DeletedMessage deletedMessage, InputStream mimeMessage, BucketName bucketName) {
return Mono.from(blobStore.save(bucketName, mimeMessage, LOW_COST))
.map(blobId -> StorageInformation.builder()
.bucketName(bucketName)
.blobId(blobId))
.map(storageInformation -> new DeletedMessageWithStorageInformation(deletedMessage, storageInformation))
.flatMap(message -> Mono.from(messageMetadataVault.store(message)))
.then();
}

@Override
public Publisher<Void> append(DeletedMessage deletedMessage, InputStream mimeMessage) {
Preconditions.checkNotNull(deletedMessage);
Preconditions.checkNotNull(mimeMessage);
BucketName bucketName = nameGenerator.currentBucket();
BucketName bucketName = BucketName.of(vaultConfiguration.getSingleBucketName());

return metricFactory.decoratePublisherWithTimerMetric(
APPEND_METRIC_NAME,
appendMessage(deletedMessage, mimeMessage, bucketName));
}

private Mono<Void> appendMessage(DeletedMessage deletedMessage, InputStream mimeMessage, BucketName bucketName) {
return Mono.from(blobStore.save(bucketName, mimeMessage, LOW_COST))
return Mono.from(blobStore.save(bucketName, mimeMessage, withTimePrefixBlobId(), LOW_COST))
.map(blobId -> StorageInformation.builder()
.bucketName(bucketName)
.blobId(blobId))
Expand All @@ -110,6 +135,12 @@ private Mono<Void> appendMessage(DeletedMessage deletedMessage, InputStream mime
.then();
}

private BlobStore.BlobIdProvider<InputStream> withTimePrefixBlobId() {
return data -> Mono.just(Tuples.of(
blobIdTimeGenerator.currentBlobId(),
data));
}

@Override
public Publisher<InputStream> loadMimeMessage(Username username, MessageId messageId) {
Preconditions.checkNotNull(username);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.james.task.Task;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.james.vault.blob.BlobStoreDeletedMessageVault;
import org.apache.james.vault.search.CriterionFactory;
import org.apache.james.vault.search.Query;
import org.junit.jupiter.api.Test;
Expand All @@ -53,7 +54,7 @@
public interface DeletedMessageVaultContract {
Clock CLOCK = Clock.fixed(NOW.toInstant(), NOW.getZone());

DeletedMessageVault getVault();
BlobStoreDeletedMessageVault getVault();

UpdatableTickingClock getClock();

Expand Down Expand Up @@ -243,7 +244,7 @@ default void deleteExpiredMessagesTaskShouldCompleteWhenNoMail() throws Interrup

@Test
default void deleteExpiredMessagesTaskShouldCompleteWhenAllMailsDeleted() throws InterruptedException {
Mono.from(getVault().append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().delete(USERNAME, DELETED_MESSAGE.getMessageId())).block();

Task.Result result = getVault().deleteExpiredMessagesTask().run();
Expand All @@ -253,7 +254,7 @@ default void deleteExpiredMessagesTaskShouldCompleteWhenAllMailsDeleted() throws

@Test
default void deleteExpiredMessagesTaskShouldCompleteWhenOnlyRecentMails() throws InterruptedException {
Mono.from(getVault().append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();

Task.Result result = getVault().deleteExpiredMessagesTask().run();

Expand All @@ -262,7 +263,7 @@ default void deleteExpiredMessagesTaskShouldCompleteWhenOnlyRecentMails() throws

@Test
default void deleteExpiredMessagesTaskShouldCompleteWhenOnlyOldMails() throws InterruptedException {
Mono.from(getVault().append(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();

Task.Result result = getVault().deleteExpiredMessagesTask().run();

Expand All @@ -279,7 +280,7 @@ default void deleteExpiredMessagesTaskShouldDoNothingWhenEmpty() throws Interrup

@Test
default void deleteExpiredMessagesTaskShouldNotDeleteRecentMails() throws InterruptedException {
Mono.from(getVault().append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();

getVault().deleteExpiredMessagesTask().run();

Expand All @@ -289,7 +290,7 @@ default void deleteExpiredMessagesTaskShouldNotDeleteRecentMails() throws Interr

@Test
default void deleteExpiredMessagesTaskShouldDeleteOldMails() throws InterruptedException {
Mono.from(getVault().append(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();

getClock().setInstant(NOW.plusYears(2).toInstant());
getVault().deleteExpiredMessagesTask().run();
Expand All @@ -300,11 +301,11 @@ default void deleteExpiredMessagesTaskShouldDeleteOldMails() throws InterruptedE

@Test
default void deleteExpiredMessagesTaskShouldDeleteOldMailsWhenRunSeveralTime() throws InterruptedException {
Mono.from(getVault().append(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
getClock().setInstant(NOW.plusYears(2).toInstant());
getVault().deleteExpiredMessagesTask().run();

Mono.from(getVault().append(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
getClock().setInstant(NOW.plusYears(4).toInstant());
getVault().deleteExpiredMessagesTask().run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.stream.IntStream;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.PlainBlobId;
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
import org.apache.james.core.MailAddress;
Expand All @@ -53,6 +54,7 @@
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.mime4j.dom.Message;
import org.apache.james.server.blob.deduplication.BlobStoreFactory;
import org.apache.james.vault.blob.BlobIdTimeGenerator;
import org.apache.james.vault.blob.BlobStoreDeletedMessageVault;
import org.apache.james.vault.blob.BucketNameGenerator;
import org.apache.james.vault.memory.metadata.MemoryDeletedMessageMetadataVault;
Expand Down Expand Up @@ -115,12 +117,14 @@ private ComposedMessageId appendMessage(MessageManager messageManager) throws Ex
void setUp() throws Exception {
clock = Clock.fixed(DELETION_DATE.toInstant(), ZoneOffset.UTC);
MemoryBlobStoreDAO blobStoreDAO = new MemoryBlobStoreDAO();
BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
messageVault = new BlobStoreDeletedMessageVault(new RecordingMetricFactory(), new MemoryDeletedMessageMetadataVault(),
BlobStoreFactory.builder()
.blobStoreDAO(blobStoreDAO)
.blobIdFactory(new PlainBlobId.Factory())
.blobIdFactory(blobIdFactory)
.defaultBucketName()
.passthrough(), blobStoreDAO, new BucketNameGenerator(clock), clock,
new BlobIdTimeGenerator(blobIdFactory, clock),
VaultConfiguration.ENABLED_DEFAULT);

DeletedMessageConverter deletedMessageConverter = new DeletedMessageConverter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import static org.apache.james.vault.DeletedMessageFixture.CONTENT;
import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE;
import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_2;
import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_GENERATOR;
import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_WITH_SUBJECT;
import static org.apache.james.vault.DeletedMessageFixture.MESSAGE_ID;
import static org.apache.james.vault.DeletedMessageFixture.NOW;
import static org.apache.james.vault.DeletedMessageFixture.OLD_DELETED_MESSAGE;
import static org.apache.james.vault.DeletedMessageFixture.SUBJECT;
import static org.apache.james.vault.DeletedMessageFixture.USERNAME;
import static org.apache.james.vault.blob.BlobStoreDeletedMessageVault.APPEND_METRIC_NAME;
import static org.apache.james.vault.blob.BlobStoreDeletedMessageVault.DELETE_EXPIRED_MESSAGES_METRIC_NAME;
Expand All @@ -37,24 +40,29 @@
import java.io.ByteArrayInputStream;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.PlainBlobId;
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
import org.apache.james.mailbox.inmemory.InMemoryMessageId;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.server.blob.deduplication.BlobStoreFactory;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.james.vault.DeletedMessageVault;
import org.apache.james.vault.DeletedMessage;
import org.apache.james.vault.DeletedMessageVaultContract;
import org.apache.james.vault.DeletedMessageVaultSearchContract;
import org.apache.james.vault.VaultConfiguration;
import org.apache.james.vault.memory.metadata.MemoryDeletedMessageMetadataVault;
import org.apache.james.vault.search.CriterionFactory;
import org.apache.james.vault.search.Query;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


class BlobStoreDeletedMessageVaultTest implements DeletedMessageVaultContract, DeletedMessageVaultSearchContract.AllContracts {
private BlobStoreDeletedMessageVault messageVault;
private UpdatableTickingClock clock;
Expand All @@ -65,17 +73,19 @@ void setUp() {
clock = new UpdatableTickingClock(NOW.toInstant());
metricFactory = new RecordingMetricFactory();
MemoryBlobStoreDAO blobStoreDAO = new MemoryBlobStoreDAO();
BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
messageVault = new BlobStoreDeletedMessageVault(metricFactory, new MemoryDeletedMessageMetadataVault(),
BlobStoreFactory.builder()
.blobStoreDAO(blobStoreDAO)
.blobIdFactory(new PlainBlobId.Factory())
.blobIdFactory(blobIdFactory)
.defaultBucketName()
.passthrough(),
blobStoreDAO, new BucketNameGenerator(clock), clock, VaultConfiguration.ENABLED_DEFAULT);
blobStoreDAO, new BucketNameGenerator(clock), clock, new BlobIdTimeGenerator(blobIdFactory, clock),
VaultConfiguration.ENABLED_DEFAULT);
}

@Override
public DeletedMessageVault getVault() {
public BlobStoreDeletedMessageVault getVault() {
return messageVault;
}

Expand All @@ -87,9 +97,9 @@ public UpdatableTickingClock getClock() {
@Test
void retentionQualifiedBucketsShouldReturnOnlyBucketsFullyBeforeBeginningOfRetentionPeriod() {
clock.setInstant(Instant.parse("2007-12-03T10:15:30.00Z"));
Mono.from(getVault().append(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
clock.setInstant(Instant.parse("2008-01-03T10:15:30.00Z"));
Mono.from(getVault().append(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block();

ZonedDateTime beginningOfRetention = ZonedDateTime.parse("2008-01-30T10:15:30.00Z");
assertThat(messageVault.retentionQualifiedBuckets(beginningOfRetention).toStream())
Expand All @@ -99,9 +109,9 @@ void retentionQualifiedBucketsShouldReturnOnlyBucketsFullyBeforeBeginningOfReten
@Test
void retentionQualifiedBucketsShouldReturnAllWhenAllBucketMonthAreBeforeBeginningOfRetention() {
clock.setInstant(Instant.parse("2007-12-03T10:15:30.00Z"));
Mono.from(getVault().append(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(OLD_DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
clock.setInstant(Instant.parse("2008-01-30T10:15:30.00Z"));
Mono.from(getVault().append(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block();

assertThat(messageVault.retentionQualifiedBuckets(ZonedDateTime.parse("2008-02-01T10:15:30.00Z")).toStream())
.containsOnly(
Expand Down Expand Up @@ -163,4 +173,67 @@ void deleteExpiredMessagesTaskShouldPublishRetentionTimerMetrics() throws Except
assertThat(metricFactory.executionTimesFor(DELETE_EXPIRED_MESSAGES_METRIC_NAME))
.hasSize(1);
}

@Test
public void loadMimeMessageShouldReturnOldMessage() {
Mono.from(getVault().appendV1(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();

assertThat(Mono.from(getVault().loadMimeMessage(USERNAME, MESSAGE_ID)).blockOptional())
.isNotEmpty()
.satisfies(maybeContent -> assertThat(maybeContent.get()).hasSameContentAs(new ByteArrayInputStream(CONTENT)));
}

@Test
public void loadMimeMessageShouldReturnEmptyWhenOldMessageDeleted() {
Mono.from(getVault().appendV1(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();

Mono.from(getVault().delete(USERNAME, MESSAGE_ID)).block();

assertThat(Mono.from(getVault().loadMimeMessage(USERNAME, MESSAGE_ID)).blockOptional())
.isEmpty();
}

@Test
public void searchAllShouldReturnOldMessage() {
Mono.from(getVault().appendV1(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();

assertThat(Flux.from(getVault().search(USERNAME, ALL)).collectList().block())
.containsOnly(DELETED_MESSAGE);
}

@Test
public void searchAllShouldReturnOldAndNewMessages() {
Mono.from(getVault().appendV1(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block();
Comment thread
Arsnael marked this conversation as resolved.
Outdated

assertThat(Flux.from(getVault().search(USERNAME, ALL)).collectList().block())
.containsOnly(DELETED_MESSAGE, DELETED_MESSAGE_2);
}

@Test
public void searchAllShouldSupportLimitQueryWithOldAndNewMessages() {
Mono.from(getVault().appendV1(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block();
DeletedMessage deletedMessage3 = DELETED_MESSAGE_GENERATOR.apply(InMemoryMessageId.of(33).getRawId());
Mono.from(getVault().appendV1(deletedMessage3, new ByteArrayInputStream(CONTENT))).block();
Comment thread
Arsnael marked this conversation as resolved.
Outdated

assertThat(Flux.from(getVault().search(USERNAME, Query.of(1, List.of()))).collectList().block())
.hasSize(1);
assertThat(Flux.from(getVault().search(USERNAME, Query.of(3, List.of()))).collectList().block())
.containsExactlyInAnyOrder(DELETED_MESSAGE, DELETED_MESSAGE_2, deletedMessage3);
assertThat(Flux.from(getVault().search(USERNAME, Query.of(4, List.of()))).collectList().block())
.containsExactlyInAnyOrder(DELETED_MESSAGE, DELETED_MESSAGE_2, deletedMessage3);
}

@Test
public void searchShouldReturnMatchingOldMessages() {
Mono.from(getVault().appendV1(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block();
Mono.from(getVault().appendV1(DELETED_MESSAGE_WITH_SUBJECT, new ByteArrayInputStream(CONTENT))).block();

assertThat(
Flux.from(getVault().search(USERNAME,
Query.of(CriterionFactory.subject().containsIgnoreCase(SUBJECT))))
.collectList().block())
.containsOnly(DELETED_MESSAGE_WITH_SUBJECT);
}
}
Loading