Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
"**/Thumbs.db": true,
".github": false,
".vscode": false
}
},
"typescript.tsdk": "node_modules/typescript/lib"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ resource "aws_dynamodb_table" "letter_queue" {
billing_mode = "PAY_PER_REQUEST"

hash_key = "supplierId"
range_key = "queueTimestamp"
range_key = "letterId"

ttl {
attribute_name = "ttl"
enabled = true
}

local_secondary_index {
name = "letterId-index"
range_key = "letterId"
name = "queueTimestamp-index"
range_key = "queueTimestamp"
projection_type = "ALL"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ data "aws_iam_policy_document" "update_letter_queue_lambda" {

actions = [
"dynamodb:PutItem",
"dynamodb:DeleteItem",
]

resources = [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { GetCommand } from "@aws-sdk/lib-dynamodb";
import { Logger } from "pino";
import {
DBContext,
Expand All @@ -7,8 +8,9 @@ import {
} from "./db";
import LetterQueueRepository from "../letter-queue-repository";
import { InsertPendingLetter } from "../types";
import { LetterAlreadyExistsError } from "../errors";
import { LetterAlreadyExistsError } from "../letter-already-exists-error";
import { createTestLogger } from "./logs";
import { LetterDoesNotExistError } from "../letter-does-not-exist-error";

function createLetter(letterId = "letter1"): InsertPendingLetter {
return {
Expand Down Expand Up @@ -77,6 +79,7 @@ describe("LetterQueueRepository", () => {
expect(timestampInMillis).toBeGreaterThanOrEqual(before);
expect(timestampInMillis).toBeLessThanOrEqual(after);
assertTtl(pendingLetter.ttl, before, after);
expect(await letterExists(db, "supplier1", "letter1")).toBe(true);
});

it("throws LetterAlreadyExistsError when creating a letter which already exists", async () => {
Expand All @@ -101,4 +104,48 @@ describe("LetterQueueRepository", () => {
).rejects.toThrow("Cannot do operations on a non-existent table");
});
});

describe("deleteLetter", () => {
it("deletes a letter from the database", async () => {
await letterQueueRepository.putLetter(createLetter());

await letterQueueRepository.deleteLetter("supplier1", "letter1");

expect(await letterExists(db, "supplier1", "letter1")).toBe(false);
});

it("throws an error when the letter does not exist", async () => {
await expect(
letterQueueRepository.deleteLetter("supplier1", "letter1"),
).rejects.toThrow(LetterDoesNotExistError);
});

it("rethrows errors from DynamoDB when deleting a letter", async () => {
const misconfiguredRepository = new LetterQueueRepository(
db.docClient,
logger,
{
...db.config,
letterQueueTableName: "nonexistent-table",
},
);
await expect(
misconfiguredRepository.deleteLetter("supplier1", "letter1"),
).rejects.toThrow("Cannot do operations on a non-existent table");
});
});
});

async function letterExists(
db: DBContext,
supplierId: string,
letterId: string,
): Promise<boolean> {
const result = await db.docClient.send(
new GetCommand({
TableName: db.config.letterQueueTableName,
Key: { supplierId, letterId },
}),
);
return result.Item !== undefined;
}
3 changes: 2 additions & 1 deletion internal/datastore/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from "./types";
export * from "./errors";
export * from "./letter-already-exists-error";
export * from "./letter-does-not-exist-error";
export * from "./mi-repository";
export * from "./letter-repository";
export * from "./supplier-repository";
Expand Down
15 changes: 15 additions & 0 deletions internal/datastore/src/letter-does-not-exist-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Error thrown when attempting to delete a letter that does not exist in the database.
*/
// eslint-disable-next-line import-x/prefer-default-export
export class LetterDoesNotExistError extends Error {
constructor(
public readonly supplierId: string,
public readonly letterId: string,
) {
super(
`Letter does not exist: supplierId=${supplierId}, letterId=${letterId}`,
);
this.name = "LetterDoesNotExistError";
}
}
29 changes: 27 additions & 2 deletions internal/datastore/src/letter-queue-repository.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { DynamoDBDocumentClient, PutCommand } from "@aws-sdk/lib-dynamodb";
import {
DeleteCommand,
DynamoDBDocumentClient,
PutCommand,
} from "@aws-sdk/lib-dynamodb";
import { Logger } from "pino";
import {
InsertPendingLetter,
PendingLetter,
PendingLetterSchema,
} from "./types";
import { LetterAlreadyExistsError } from "./errors";
import { LetterAlreadyExistsError } from "./letter-already-exists-error";
import { LetterDoesNotExistError } from "./letter-does-not-exist-error";

type LetterQueueRepositoryConfig = {
letterQueueTableName: string;
Expand Down Expand Up @@ -52,4 +57,24 @@ export default class LetterQueueRepository {
}
return PendingLetterSchema.parse(pendingLetter);
}

async deleteLetter(supplierId: string, letterId: string): Promise<void> {
try {
await this.ddbClient.send(
new DeleteCommand({
TableName: this.config.letterQueueTableName,
Key: { supplierId, letterId },
ConditionExpression: "attribute_exists(letterId)",
}),
);
} catch (error) {
if (
error instanceof Error &&
error.name === "ConditionalCheckFailedException"
) {
throw new LetterDoesNotExistError(supplierId, letterId);
}
throw error;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
Letter,
LetterAlreadyExistsError,
LetterDoesNotExistError,
LetterQueueRepository,
} from "@internal/datastore";
import { mockDeep } from "jest-mock-extended";
Expand All @@ -21,6 +22,7 @@ import { LetterStatus } from "../../../api-handler/src/contracts/letters";
const mockedDeps: jest.Mocked<Deps> = {
letterQueueRepository: {
putLetter: jest.fn(),
deleteLetter: jest.fn(),
} as unknown as LetterQueueRepository,
logger: {
info: jest.fn(),
Expand Down Expand Up @@ -50,7 +52,7 @@ function generateLetter(status: LetterStatus, id?: string): Letter {
}

beforeEach(() => {
jest.clearAllMocks();
jest.resetAllMocks();
});

describe("update-letter-queue Lambda", () => {
Expand All @@ -74,23 +76,25 @@ describe("update-letter-queue Lambda", () => {
expect(result.batchItemFailures).toEqual([]);
});

it("does not publish updates", async () => {
it("deletes letters that are no longer pending", async () => {
const handler = createHandler(mockedDeps);
const oldLetter = generateLetter("PENDING");
const newLetter = generateLetter("PENDING");
const newLetter = generateLetter("ACCEPTED");

const testData = generateKinesisEvent([
generateModifyRecord(oldLetter, newLetter),
]);
const result = await handler(testData, mockDeep<Context>(), jest.fn());

expect(mockedDeps.letterQueueRepository.putLetter).not.toHaveBeenCalled();
expect(
mockedDeps.letterQueueRepository.deleteLetter,
).toHaveBeenCalledWith("supplier1", "1");
expect(result.batchItemFailures).toEqual([]);
});

it("does not publish non-PENDING letters", async () => {
const handler = createHandler(mockedDeps);
const newLetter = generateLetter("PRINTED");
const newLetter = generateLetter("ACCEPTED");

const testData = generateKinesisEvent([generateInsertRecord(newLetter)]);
const result = await handler(testData, mockDeep<Context>(), jest.fn());
Expand All @@ -99,6 +103,22 @@ describe("update-letter-queue Lambda", () => {
expect(result.batchItemFailures).toEqual([]);
});

it("does not delete letters that are still PENDING", async () => {
const handler = createHandler(mockedDeps);
const oldLetter = generateLetter("PENDING");
const newLetter = generateLetter("PENDING");

const testData = generateKinesisEvent([
generateModifyRecord(oldLetter, newLetter),
]);
const result = await handler(testData, mockDeep<Context>(), jest.fn());

expect(
mockedDeps.letterQueueRepository.deleteLetter,
).not.toHaveBeenCalled();
expect(result.batchItemFailures).toEqual([]);
});

it("handles empty Records array", async () => {
const handler = createHandler(mockedDeps);
const testData = { Records: [] } as unknown as KinesisStreamEvent;
Expand All @@ -116,11 +136,10 @@ describe("update-letter-queue Lambda", () => {
const newLetter = { id: "1", status: "PENDING" } as Letter;

const testData = generateKinesisEvent([generateInsertRecord(newLetter)]);
await expect(
handler(testData, mockDeep<Context>(), jest.fn()),
).rejects.toThrow();
const result = await handler(testData, mockDeep<Context>(), jest.fn());

expect(mockedDeps.letterQueueRepository.putLetter).not.toHaveBeenCalled();
expect(result.batchItemFailures).toEqual([{ itemIdentifier: "seq-0" }]);
});

it("returns on the first failure", async () => {
Expand All @@ -143,7 +162,7 @@ describe("update-letter-queue Lambda", () => {
expect(result.batchItemFailures).toEqual([{ itemIdentifier: "seq-0" }]);
});

it("does not treat a replayed event as a failure", async () => {
it("does not treat a replayed insert as a failure", async () => {
const handler = createHandler(mockedDeps);
const newLetter1 = generateLetter("PENDING", "1");
const newLetter2 = generateLetter("PENDING", "2");
Expand All @@ -160,6 +179,25 @@ describe("update-letter-queue Lambda", () => {
expect(result.batchItemFailures).toEqual([]);
});

it("does not treat a replayed delete as a failure", async () => {
const handler = createHandler(mockedDeps);
const oldLetter1 = generateLetter("PENDING", "1");
const oldLetter2 = generateLetter("PENDING", "2");
const newLetter1 = generateLetter("ACCEPTED", "1");
const newLetter2 = generateLetter("ACCEPTED", "2");
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
.mockRejectedValueOnce(new LetterDoesNotExistError("supplier1", "1"))
.mockResolvedValueOnce({});

const testData = generateKinesisEvent([
generateModifyRecord(oldLetter1, newLetter1),
generateModifyRecord(oldLetter2, newLetter2),
]);
const result = await handler(testData, mockDeep<Context>(), jest.fn());

expect(result.batchItemFailures).toEqual([]);
});

it("throws error when Kinesis payload cannot be parsed as JSON", async () => {
const handler = createHandler(mockedDeps);
const invalidJsonPayload = "not valid json {{{";
Expand Down Expand Up @@ -191,11 +229,12 @@ describe("update-letter-queue Lambda", () => {
describe("Metrics", () => {
it("emits success metrics when all letters are processed successfully", async () => {
const handler = createHandler(mockedDeps);
const newLetter1 = generateLetter("PENDING", "1");
const oldLetter1 = generateLetter("PENDING", "1");
const newLetter1 = generateLetter("ACCEPTED", "1");
const newLetter2 = generateLetter("PENDING", "2");

const testData = generateKinesisEvent([
generateInsertRecord(newLetter1),
generateModifyRecord(oldLetter1, newLetter1),
generateInsertRecord(newLetter2),
]);
await handler(testData, mockDeep<Context>(), jest.fn());
Expand All @@ -204,7 +243,7 @@ describe("update-letter-queue Lambda", () => {
assertFailureMetricLogged(0);
});

it("emits failure metrics when a letter fails to process", async () => {
it("emits failure metrics when a letter fails to be inserted", async () => {
const handler = createHandler(mockedDeps);
const newLetter1 = generateLetter("PENDING", "1");
const newLetter2 = generateLetter("PENDING", "2");
Expand All @@ -222,10 +261,31 @@ describe("update-letter-queue Lambda", () => {
assertFailureMetricLogged(1);
});

it("does not count a reprocessed event as a success or failure", async () => {
it("emits failure metrics when a letter fails to be deleted", async () => {
const handler = createHandler(mockedDeps);
const oldLetter1 = generateLetter("PENDING", "1");
const oldLetter2 = generateLetter("PENDING", "2");
const newLetter1 = generateLetter("ACCEPTED", "1");
const newLetter2 = generateLetter("ACCEPTED", "2");
(mockedDeps.letterQueueRepository.deleteLetter as jest.Mock)
.mockResolvedValueOnce({})
.mockRejectedValueOnce(new Error("DynamoDB error"));

const testData = generateKinesisEvent([
generateModifyRecord(oldLetter1, newLetter1),
generateModifyRecord(oldLetter2, newLetter2),
]);
await handler(testData, mockDeep<Context>(), jest.fn());

assertSuccessMetricLogged(1);
assertFailureMetricLogged(1);
});

it("does not count a replayed insert as a success or failure", async () => {
const handler = createHandler(mockedDeps);
const newLetter1 = generateLetter("PENDING", "1");
const newLetter2 = generateLetter("PENDING", "2");

(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
.mockRejectedValueOnce(new LetterAlreadyExistsError("supplier1", "1"))
.mockResolvedValueOnce({});
Expand All @@ -240,6 +300,26 @@ describe("update-letter-queue Lambda", () => {
assertFailureMetricLogged(0);
});

it("does not count a replayed delete as a success or failure", async () => {
const handler = createHandler(mockedDeps);
const oldLetter1 = generateLetter("PENDING", "1");
const oldLetter2 = generateLetter("PENDING", "2");
const newLetter1 = generateLetter("ACCEPTED", "1");
const newLetter2 = generateLetter("ACCEPTED", "2");
(mockedDeps.letterQueueRepository.deleteLetter as jest.Mock)
.mockRejectedValueOnce(new LetterDoesNotExistError("supplier1", "1"))
.mockResolvedValueOnce({});

const testData = generateKinesisEvent([
generateModifyRecord(oldLetter1, newLetter1),
generateModifyRecord(oldLetter2, newLetter2),
]);
await handler(testData, mockDeep<Context>(), jest.fn());

assertSuccessMetricLogged(1);
assertFailureMetricLogged(0);
});

it("emits zero success metrics when no pending letters are in the batch", async () => {
const handler = createHandler(mockedDeps);
const newLetter = generateLetter("PRINTED");
Expand Down
Loading
Loading