Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import jakarta.inject.Inject;

import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BucketName;
import org.apache.james.core.Username;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.vault.blob.BlobIdTimeGenerator;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
Expand All @@ -47,15 +47,15 @@ public class StorageInformationDAO {
private final PreparedStatement addStatement;
private final PreparedStatement removeStatement;
private final PreparedStatement readStatement;
private final BlobId.Factory blobIdFactory;
private final BlobIdTimeGenerator blobIdTimeGenerator;

@Inject
StorageInformationDAO(CqlSession session, BlobId.Factory blobIdFactory) {
StorageInformationDAO(CqlSession session, BlobIdTimeGenerator blobIdTimeGenerator) {
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
this.addStatement = prepareAdd(session);
this.removeStatement = prepareRemove(session);
this.readStatement = prepareRead(session);
this.blobIdFactory = blobIdFactory;
this.blobIdTimeGenerator = blobIdTimeGenerator;
Copy link
Copy Markdown
Contributor

@jeantil jeantil Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have a custom BlobId.Factory implementation here to avoid changing the API and affecting the cassandra variant for a S3 only problem ?

check https://github.com/apache/james-project/blob/master/server/mailrepository/mailrepository-blob/src/main/scala/org/apache/james/mailrepository/blob/BlobMailRepositoryFactory.scala#L29 for an example

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @jeantil

-> I wonder if this piece of coude could not work with PlainBlobId as the cassandra implem is blobId transparent. That's a technical nitpick that do not address the core of your remarks but could be a nice simplification I presume. Worth testing...

a custom BlobId.Factory implementation

On the principle mostly agree. This would mean acting at the blobStoreDAO level and not on the BlobStore. Which do not seem like a major issue to me.

I;ll try to push a fix in this direction.

Copy link
Copy Markdown
Contributor

@jeantil jeantil Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean acting at the blobStoreDAO level and not on the BlobStore.

🤔 not sure I follow : it does require creating a custom blobstore instance but it has no impact on the blobstoreDAO itself. This is how we did it for the blob backed mailrepository

It could arguably be managed at the injection layer by using a Qualified blobid factory instance ( this is a limitation of the BlobMailRepositoryFactory which forces usage of a passthrough blobstore and cannot use deduplication but I was so happy to have something working I stopped without the deduplication, also deduplication makes less sense for a mail repository which I don't expect to store mails for years )

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see

    val metadataBlobStore = BlobStoreFactory.builder()
      .blobStoreDAO(blobStoreDao)
      .blobIdFactory(metadataIdFactory)
      .bucket(defaultBucketName)
      .passthrough()

So re-assembling a ad-hoc blobStore tailor made for the feature.

Interesting.

Sorry do like ... comment was hard for me to interprete.

Copy link
Copy Markdown
Contributor

@jeantil jeantil Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry do like ... comment was hard for me to interprete.

my apologies, I should have been clearer

}

private PreparedStatement prepareRead(CqlSession session) {
Expand Down Expand Up @@ -102,6 +102,6 @@ Mono<StorageInformation> retrieveStorageInformation(Username username, MessageId
.setString(MESSAGE_ID, messageId.serialize()))
.map(row -> StorageInformation.builder()
.bucketName(BucketName.of(row.getString(BUCKET_NAME)))
.blobId(blobIdFactory.parse(row.getString(BLOB_ID))));
.blobId(blobIdTimeGenerator.toDeletedMessageBlobId(row.getString(BLOB_ID))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.james.vault.metadata;

import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
import static org.apache.james.vault.DeletedMessageFixture.NOW;
import static org.apache.james.vault.DeletedMessageFixture.USERNAME;
import static org.apache.james.vault.metadata.DeletedMessageMetadataDataDefinition.MODULE;
import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.BUCKET_NAME;
Expand All @@ -35,6 +36,8 @@
import org.apache.james.blob.api.PlainBlobId;
import org.apache.james.mailbox.inmemory.InMemoryId;
import org.apache.james.mailbox.inmemory.InMemoryMessageId;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.james.vault.blob.BlobIdTimeGenerator;
import org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
Expand All @@ -57,11 +60,12 @@ public class CassandraDeletedMessageMetadataVaultTest implements DeletedMessageM
@BeforeEach
void setUp(CassandraCluster cassandra) {
PlainBlobId.Factory blobIdFactory = new PlainBlobId.Factory();
BlobIdTimeGenerator blobIdTimeGenerator = new BlobIdTimeGenerator(blobIdFactory, new UpdatableTickingClock(NOW.toInstant()));
InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
DeletedMessageWithStorageInformationConverter dtoConverter = new DeletedMessageWithStorageInformationConverter(blobIdFactory, messageIdFactory, new InMemoryId.Factory());
DeletedMessageWithStorageInformationConverter dtoConverter = new DeletedMessageWithStorageInformationConverter(blobIdTimeGenerator, messageIdFactory, new InMemoryId.Factory());

metadataDAO = new MetadataDAO(cassandra.getConf(), messageIdFactory, new MetadataSerializer(dtoConverter));
storageInformationDAO = new StorageInformationDAO(cassandra.getConf(), blobIdFactory);
storageInformationDAO = new StorageInformationDAO(cassandra.getConf(), blobIdTimeGenerator);
userPerBucketDAO = new UserPerBucketDAO(cassandra.getConf());

testee = new CassandraDeletedMessageMetadataVault(metadataDAO, storageInformationDAO, userPerBucketDAO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.james.vault.metadata;

import static org.apache.james.vault.DeletedMessageFixture.MESSAGE_ID;
import static org.apache.james.vault.DeletedMessageFixture.NOW;
import static org.apache.james.vault.DeletedMessageFixture.USERNAME;
import static org.apache.james.vault.metadata.DeletedMessageMetadataDataDefinition.MODULE;
import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.BUCKET_NAME;
Expand All @@ -35,6 +36,8 @@
import org.apache.james.mailbox.inmemory.InMemoryId;
import org.apache.james.mailbox.inmemory.InMemoryMessageId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.james.vault.blob.BlobIdTimeGenerator;
import org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -49,7 +52,9 @@ class MetadataDAOTest {
@BeforeEach
void setUp(CassandraCluster cassandra) {
DeletedMessageWithStorageInformationConverter dtoConverter = new DeletedMessageWithStorageInformationConverter(
new PlainBlobId.Factory(), new InMemoryMessageId.Factory(), new InMemoryId.Factory());
new BlobIdTimeGenerator(new PlainBlobId.Factory(), new UpdatableTickingClock(NOW.toInstant())),
new InMemoryMessageId.Factory(),
new InMemoryId.Factory());

testee = new MetadataDAO(cassandra.getConf(), new InMemoryMessageId.Factory(),
new MetadataSerializer(dtoConverter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.james.vault.metadata;

import static org.apache.james.vault.DeletedMessageFixture.NOW;
import static org.apache.james.vault.metadata.DeletedMessageMetadataDataDefinition.MODULE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
Expand All @@ -32,14 +33,16 @@
import org.apache.james.blob.api.PlainBlobId;
import org.apache.james.core.Username;
import org.apache.james.mailbox.model.TestMessageId;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.james.vault.blob.BlobIdTimeGenerator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class StorageInformationDAOTest {
private static final BucketName BUCKET_NAME = BucketName.of("deletedMessages-2019-06-01");
private static final BucketName BUCKET_NAME_2 = BucketName.of("deletedMessages-2019-07-01");
private static final PlainBlobId.Factory BLOB_ID_FACTORY = new PlainBlobId.Factory();
private static final BlobIdTimeGenerator BLOB_ID_TIME_GENERATOR = new BlobIdTimeGenerator(new PlainBlobId.Factory(), new UpdatableTickingClock(NOW.toInstant()));
private static final Username OWNER = Username.of("owner");
private static final TestMessageId MESSAGE_ID = TestMessageId.of(36);
private static final BlobId BLOB_ID = new PlainBlobId.Factory().parse("05dcb33b-8382-4744-923a-bc593ad84d23");
Expand All @@ -54,7 +57,7 @@ class StorageInformationDAOTest {

@BeforeEach
void setUp(CassandraCluster cassandra) {
testee = new StorageInformationDAO(cassandra.getConf(), BLOB_ID_FACTORY);
testee = new StorageInformationDAO(cassandra.getConf(), BLOB_ID_TIME_GENERATOR);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import jakarta.inject.Inject;

import org.apache.james.backends.postgres.utils.PostgresExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BucketName;
import org.apache.james.core.Username;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.vault.blob.BlobIdTimeGenerator;
import org.jooq.Record;
import org.reactivestreams.Publisher;

Expand All @@ -46,15 +46,15 @@
public class PostgresDeletedMessageMetadataVault implements DeletedMessageMetadataVault {
private final PostgresExecutor postgresExecutor;
private final MetadataSerializer metadataSerializer;
private final BlobId.Factory blobIdFactory;
private final BlobIdTimeGenerator blobIdTimeGenerator;

@Inject
public PostgresDeletedMessageMetadataVault(PostgresExecutor postgresExecutor,
MetadataSerializer metadataSerializer,
BlobId.Factory blobIdFactory) {
BlobIdTimeGenerator blobIdTimeGenerator) {
this.postgresExecutor = postgresExecutor;
this.metadataSerializer = metadataSerializer;
this.blobIdFactory = blobIdFactory;
this.blobIdTimeGenerator = blobIdTimeGenerator;
}

@Override
Expand Down Expand Up @@ -93,7 +93,7 @@ public Publisher<StorageInformation> retrieveStorageInformation(Username usernam
private Function<Record, StorageInformation> toStorageInformation() {
return record -> StorageInformation.builder()
.bucketName(BucketName.of(record.get(BUCKET_NAME)))
.blobId(blobIdFactory.parse(record.get(BLOB_ID)));
.blobId(blobIdTimeGenerator.toDeletedMessageBlobId(record.get(BLOB_ID)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

package org.apache.james.vault.metadata;

import static org.apache.james.vault.DeletedMessageFixture.NOW;

import org.apache.james.backends.postgres.PostgresDataDefinition;
import org.apache.james.backends.postgres.PostgresExtension;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.PlainBlobId;
import org.apache.james.mailbox.inmemory.InMemoryId;
import org.apache.james.mailbox.inmemory.InMemoryMessageId;
import org.apache.james.utils.UpdatableTickingClock;
import org.apache.james.vault.blob.BlobIdTimeGenerator;
import org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand All @@ -36,12 +40,13 @@ class PostgresDeletedMessageMetadataVaultTest implements DeletedMessageMetadataV
@Override
public DeletedMessageMetadataVault metadataVault() {
BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
BlobIdTimeGenerator blobIdTimeGenerator = new BlobIdTimeGenerator(blobIdFactory, new UpdatableTickingClock(NOW.toInstant()));
InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
DeletedMessageWithStorageInformationConverter dtoConverter = new DeletedMessageWithStorageInformationConverter(blobIdFactory,
DeletedMessageWithStorageInformationConverter dtoConverter = new DeletedMessageWithStorageInformationConverter(blobIdTimeGenerator,
messageIdFactory, new InMemoryId.Factory());

return new PostgresDeletedMessageMetadataVault(postgresExtension.getDefaultPostgresExecutor(),
new MetadataSerializer(dtoConverter),
blobIdFactory);
blobIdTimeGenerator);
}
}
9 changes: 9 additions & 0 deletions mailbox/plugin/deleted-messages-vault/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@
<groupId>${james.groupId}</groupId>
<artifactId>james-server-core</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-data-api</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-data-memory</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-task-json</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import jakarta.inject.Inject;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BucketName;
Expand All @@ -37,6 +38,7 @@
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.task.Task;
import org.apache.james.user.api.UsersRepository;
import org.apache.james.vault.DeletedMessage;
import org.apache.james.vault.DeletedMessageContentNotFoundException;
import org.apache.james.vault.DeletedMessageVault;
Expand Down Expand Up @@ -74,13 +76,14 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
private final Clock clock;
private final VaultConfiguration vaultConfiguration;
private final BlobIdTimeGenerator blobIdTimeGenerator;
private final UsersRepository usersRepository;
private final BlobStoreVaultGarbageCollectionTask.Factory taskFactory;

@Inject
public BlobStoreDeletedMessageVault(MetricFactory metricFactory, DeletedMessageMetadataVault messageMetadataVault,
BlobStore blobStore, BlobStoreDAO blobStoreDAO, BucketNameGenerator nameGenerator,
Clock clock, BlobIdTimeGenerator blobIdTimeGenerator,
VaultConfiguration vaultConfiguration) {
VaultConfiguration vaultConfiguration, UsersRepository usersRepository) {
this.metricFactory = metricFactory;
this.messageMetadataVault = messageMetadataVault;
this.blobStore = blobStore;
Expand All @@ -89,6 +92,7 @@ public BlobStoreDeletedMessageVault(MetricFactory metricFactory, DeletedMessageM
this.clock = clock;
this.vaultConfiguration = vaultConfiguration;
this.blobIdTimeGenerator = blobIdTimeGenerator;
this.usersRepository = usersRepository;
this.taskFactory = new BlobStoreVaultGarbageCollectionTask.Factory(this);
}

Expand Down Expand Up @@ -201,7 +205,7 @@ public Task deleteExpiredMessagesTask() {
return taskFactory.create();
}


@Deprecated
Flux<BucketName> deleteExpiredMessages(ZonedDateTime beginningOfRetentionPeriod) {
return Flux.from(
metricFactory.decoratePublisherWithTimerMetric(
Expand All @@ -216,6 +220,7 @@ ZonedDateTime getBeginningOfRetentionPeriod() {
}

@VisibleForTesting
@Deprecated
Flux<BucketName> retentionQualifiedBuckets(ZonedDateTime beginningOfRetentionPeriod) {
return Flux.from(messageMetadataVault.listRelatedBuckets())
.filter(bucketName -> isFullyExpired(beginningOfRetentionPeriod, bucketName));
Expand All @@ -235,4 +240,30 @@ private Mono<Void> deleteBucketData(BucketName bucketName) {
return Mono.from(blobStore.deleteBucket(bucketName))
.then(Mono.from(messageMetadataVault.removeMetadataRelatedToBucket(bucketName)));
}

Mono<Void> deleteUserExpiredMessages(ZonedDateTime beginningOfRetentionPeriod) {
Comment thread
Arsnael marked this conversation as resolved.
Outdated
BucketName bucketName = BucketName.of(vaultConfiguration.getSingleBucketName());

return Flux.from(metricFactory.decoratePublisherWithTimerMetric(
DELETE_EXPIRED_MESSAGES_METRIC_NAME,
Flux.from(usersRepository.listReactive())
.flatMap(username -> Flux.from(messageMetadataVault.listMessages(bucketName, username))
.filter(deletedMessage -> isMessageFullyExpired(beginningOfRetentionPeriod, deletedMessage))
.flatMap(deletedMessage -> messageMetadataVault.remove(bucketName, username, deletedMessage.getDeletedMessage().getMessageId())))))
.then();
}


Comment thread
Arsnael marked this conversation as resolved.
Outdated

private boolean isMessageFullyExpired(ZonedDateTime beginningOfRetentionPeriod, DeletedMessageWithStorageInformation deletedMessage) {
BlobId blobId = deletedMessage.getStorageInformation().getBlobId();
Optional<ZonedDateTime> maybeEndDate = blobIdTimeGenerator.blobIdEndTime(blobId);

if (maybeEndDate.isEmpty()) {
LOGGER.error("Pattern used for blobId used in deletedMessageVault is invalid and end date cannot be parsed {}", blobId);
Comment thread
Arsnael marked this conversation as resolved.
Outdated
}

return maybeEndDate.map(endDate -> endDate.isBefore(beginningOfRetentionPeriod))
.orElse(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private BlobStoreVaultGarbageCollectionTask(BlobStoreDeletedMessageVault deleted
public Result run() {
deletedMessageVault.deleteExpiredMessages(beginningOfRetentionPeriod)
.doOnNext(deletedBuckets::add)
.then()
.then(deletedMessageVault.deleteUserExpiredMessages(beginningOfRetentionPeriod))
.block();

return Result.COMPLETED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,38 @@
import jakarta.inject.Inject;
import jakarta.mail.internet.AddressException;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BucketName;
import org.apache.james.core.MailAddress;
import org.apache.james.core.MaybeSender;
import org.apache.james.core.Username;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.vault.DeletedMessage;
import org.apache.james.vault.blob.BlobIdTimeGenerator;
import org.apache.james.vault.metadata.DeletedMessageWithStorageInformation;
import org.apache.james.vault.metadata.StorageInformation;

import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;

public class DeletedMessageWithStorageInformationConverter {
private final BlobId.Factory blobFactory;
private final BlobIdTimeGenerator blobIdTimeGenerator;
private final MessageId.Factory messageIdFactory;
private final MailboxId.Factory mailboxIdFactory;

@Inject
public DeletedMessageWithStorageInformationConverter(BlobId.Factory blobFactory,
public DeletedMessageWithStorageInformationConverter(BlobIdTimeGenerator blobIdTimeGenerator,
MessageId.Factory messageIdFactory,
MailboxId.Factory mailboxIdFactory) {
this.blobFactory = blobFactory;
this.blobIdTimeGenerator = blobIdTimeGenerator;
this.messageIdFactory = messageIdFactory;
this.mailboxIdFactory = mailboxIdFactory;
}

public StorageInformation toDomainObject(DeletedMessageWithStorageInformationDTO.StorageInformationDTO storageInformationDTO) {
return StorageInformation.builder()
.bucketName(BucketName.of(storageInformationDTO.getBucketName()))
.blobId(blobFactory.parse(storageInformationDTO.getBlobId()));
.blobId(blobIdTimeGenerator.toDeletedMessageBlobId(storageInformationDTO.getBlobId()));
}

public DeletedMessage toDomainObject(DeletedMessageWithStorageInformationDTO.DeletedMessageDTO deletedMessageDTO) throws AddressException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public interface DeletedMessageFixture {
InMemoryId MAILBOX_ID_3 = InMemoryId.of(45);
Username USERNAME = Username.of("bob@apache.org");
Username USERNAME_2 = Username.of("dimitri@apache.org");
String PASSWORD = "123456";
ZonedDateTime DELIVERY_DATE = ZonedDateTime.parse("2014-10-30T14:12:00Z");
ZonedDateTime DELETION_DATE = ZonedDateTime.parse("2015-10-30T14:12:00Z");
ZonedDateTime NOW = ZonedDateTime.parse("2015-10-30T16:12:00Z");
Expand Down
Loading