diff --git a/docs/content/docs/options.md b/docs/content/docs/options.md index d9f7abe..fe55ca2 100644 --- a/docs/content/docs/options.md +++ b/docs/content/docs/options.md @@ -121,6 +121,39 @@ The maximum amount of time (in milliseconds) that the in-memory lock for [stampe This is usually not needed, but can provide an extra layer of protection against theoretical deadlocks. +### `lockManager` + +Default: built-in in-memory lock manager. + +Levels: `global` + +A custom lock manager class. + +This lock manager is used for stampede protection and file driver writes. + +`LockManager` must return a lock compatible with `MutexInterface` from `async-mutex`. Bentocache exports `LockHandle` and `LockReleaser` as aliases for `MutexInterface` and `MutexInterface.Releaser`. + +```ts +import { Mutex, withTimeout } from 'async-mutex' + +class CustomLockManager { + #lock = new Mutex() + + getOrCreateForKey(key, timeout) { + return withTimeout(this.#lock, timeout ?? Infinity) + } + + release(key, releaser) { + releaser() + } +} + +const bento = new BentoCache({ + lockManager: CustomLockManager, + // ... +}) +``` + ### `onFactoryError` Default: `undefined` diff --git a/docs/content/docs/stampede_protection.md b/docs/content/docs/stampede_protection.md index 8623feb..cb1854a 100644 --- a/docs/content/docs/stampede_protection.md +++ b/docs/content/docs/stampede_protection.md @@ -54,3 +54,11 @@ Well, in truth, this is not really a problem. Indeed, there will be more than on Given the same scenario with the 10k users. Imagine that your application is running in cluster mode with PM2 and you have 10 instances of your app. Also, imagine that the 10k requests are distributed equally across the 10 instances. This results in 1k requests per instance. And so, it will lead to **10 queries to the database instead of 10k**, with the help of our protection. + +## Shared locks + +BentoCache allows you to use custom lock manager class so you can implement **shared** locks. + +For example, you can implement lock manager which uses patterns like [Redis distributed locks](https://redis.io/docs/latest/develop/clients/patterns/distributed-locks/) where locks are created in L2 storage and shared across all application instances and don’t have process instance limitations. + +In multi-instance applications scenario, this will lead to **1 query to database instead of 10k**. diff --git a/packages/bentocache/src/bento_cache.ts b/packages/bentocache/src/bento_cache.ts index 63f88e4..cfd683c 100644 --- a/packages/bentocache/src/bento_cache.ts +++ b/packages/bentocache/src/bento_cache.ts @@ -69,11 +69,13 @@ export class BentoCache> implemen l1Driver: entry.l1?.factory({ prefix: driverItemOptions.prefix, logger: driverItemOptions.logger, + lockManager: driverItemOptions.lockManager, ...entry.l1.options, }), l2Driver: entry.l2?.factory({ prefix: driverItemOptions.prefix, logger: driverItemOptions.logger, + lockManager: driverItemOptions.lockManager, ...entry.l2.options, }), busDriver: entry.bus?.factory(entry.bus?.options), diff --git a/packages/bentocache/src/bento_cache_options.ts b/packages/bentocache/src/bento_cache_options.ts index e1ac0f5..911d313 100644 --- a/packages/bentocache/src/bento_cache_options.ts +++ b/packages/bentocache/src/bento_cache_options.ts @@ -3,10 +3,17 @@ import { ms } from '@julr/utils/string/ms' import { noopLogger } from '@julr/utils/logger' import { Logger } from './logger.js' +import { Locks } from './cache/locks.js' import { resolveTtl } from './helpers.js' import type { FactoryError } from './errors.js' import { JsonSerializer } from './serializers/json.js' -import type { CacheSerializer, Duration, Emitter, RawBentoCacheOptions } from './types/main.js' +import type { + CacheSerializer, + Duration, + Emitter, + LockManagerConstructor, + RawBentoCacheOptions, +} from './types/main.js' const defaultSerializer = new JsonSerializer() @@ -68,6 +75,11 @@ export class BentoCacheOptions { */ lockTimeout?: Duration = null + /** + * The lock manager class used throughout the library + */ + lockManager: LockManagerConstructor + /** * Duration for the circuit breaker to stay open * if l2 cache fails @@ -90,6 +102,7 @@ export class BentoCacheOptions { this.hardTimeout = this.#options.hardTimeout this.suppressL2Errors = this.#options.suppressL2Errors this.lockTimeout = this.#options.lockTimeout + this.lockManager = this.#options.lockManager ?? Locks this.grace = this.#options.grace! this.graceBackoff = this.#options.graceBackoff! diff --git a/packages/bentocache/src/cache/factory_runner.ts b/packages/bentocache/src/cache/factory_runner.ts index 5a73113..405e24b 100644 --- a/packages/bentocache/src/cache/factory_runner.ts +++ b/packages/bentocache/src/cache/factory_runner.ts @@ -1,12 +1,11 @@ import pTimeout from 'p-timeout' import { tryAsync } from '@julr/utils/functions' -import type { MutexInterface } from 'async-mutex' import { errors } from '../errors.js' -import type { Locks } from './locks.js' import type { CacheStack } from './cache_stack.js' import { cacheOperation } from '../tracing_channels.js' import type { GetSetFactory } from '../types/helpers.js' +import type { LockManager, LockReleaser } from '../types/main.js' import type { GetCacheValueReturn } from '../types/internals/index.js' import type { CacheOperationMessage } from '../types/tracing_channels.js' import type { CacheEntryOptions } from './cache_entry/cache_entry_options.js' @@ -15,7 +14,7 @@ interface RunFactoryParameters { key: string factory: GetSetFactory options: CacheEntryOptions - lockReleaser: MutexInterface.Releaser + lockReleaser: LockReleaser isBackground?: boolean gracedValue?: GetCacheValueReturn } @@ -24,11 +23,11 @@ interface RunFactoryParameters { * Factory Runner is responsible for executing factories */ export class FactoryRunner { - #locks: Locks + #locks: LockManager #stack: CacheStack #skipSymbol = Symbol('bentocache.skip') - constructor(stack: CacheStack, locks: Locks) { + constructor(stack: CacheStack, locks: LockManager) { this.#stack = stack this.#locks = locks } @@ -119,7 +118,7 @@ export class FactoryRunner { factory: GetSetFactory, gracedValue: GetCacheValueReturn | undefined, options: CacheEntryOptions, - lockReleaser: MutexInterface.Releaser, + lockReleaser: LockReleaser, ) { const hasGracedValue = !!gracedValue const timeout = options.factoryTimeout(hasGracedValue) diff --git a/packages/bentocache/src/cache/get_set/single_tier_handler.ts b/packages/bentocache/src/cache/get_set/single_tier_handler.ts index 8283207..b3d6e8a 100644 --- a/packages/bentocache/src/cache/get_set/single_tier_handler.ts +++ b/packages/bentocache/src/cache/get_set/single_tier_handler.ts @@ -1,6 +1,3 @@ -import type { MutexInterface } from 'async-mutex' - -import { Locks } from '../locks.js' import { errors } from '../../errors.js' import type { CacheStack } from '../cache_stack.js' import { FactoryRunner } from '../factory_runner.js' @@ -8,6 +5,7 @@ import type { Factory } from '../../types/helpers.js' import type { CacheEvent } from '../../types/events.js' import { cacheEvents } from '../../events/cache_events.js' import { cacheOperation } from '../../tracing_channels.js' +import type { LockManager, LockReleaser } from '../../types/main.js' import type { GetCacheValueReturn } from '../../types/internals/index.js' import type { CacheOperationMessage } from '../../types/tracing_channels.js' import type { CacheEntryOptions } from '../cache_entry/cache_entry_options.js' @@ -16,10 +14,11 @@ export class SingleTierHandler { /** * A map that will hold active locks for each key */ - #locks = new Locks() + #locks: LockManager #factoryRunner: FactoryRunner constructor(protected stack: CacheStack) { + this.#locks = new this.stack.options.lockManager() this.#factoryRunner = new FactoryRunner(this.stack, this.#locks) } @@ -140,7 +139,7 @@ export class SingleTierHandler { * If nothing is found in the remote cache, or if forceFresh is true, * we try to acquire a lock to run the factory */ - let releaser: MutexInterface.Releaser + let releaser: LockReleaser try { releaser = await this.#acquireLock(key, !!remoteItem, options) } catch (err) { diff --git a/packages/bentocache/src/cache/get_set/two_tier_handler.ts b/packages/bentocache/src/cache/get_set/two_tier_handler.ts index eb0f22c..bc52c8a 100644 --- a/packages/bentocache/src/cache/get_set/two_tier_handler.ts +++ b/packages/bentocache/src/cache/get_set/two_tier_handler.ts @@ -1,6 +1,3 @@ -import type { MutexInterface } from 'async-mutex' - -import { Locks } from '../locks.js' import { errors } from '../../errors.js' import type { CacheStack } from '../cache_stack.js' import { FactoryRunner } from '../factory_runner.js' @@ -8,6 +5,7 @@ import type { Factory } from '../../types/helpers.js' import type { CacheEvent } from '../../types/events.js' import { cacheEvents } from '../../events/cache_events.js' import { cacheOperation } from '../../tracing_channels.js' +import type { LockManager, LockReleaser } from '../../types/main.js' import type { GetCacheValueReturn } from '../../types/internals/index.js' import type { CacheOperationMessage } from '../../types/tracing_channels.js' import type { CacheEntryOptions } from '../cache_entry/cache_entry_options.js' @@ -16,10 +14,11 @@ export class TwoTierHandler { /** * A map that will hold active locks for each key */ - #locks = new Locks() + #locks: LockManager #factoryRunner: FactoryRunner constructor(protected stack: CacheStack) { + this.#locks = new this.stack.options.lockManager() this.#factoryRunner = new FactoryRunner(this.stack, this.#locks) } @@ -130,7 +129,7 @@ export class TwoTierHandler { * * We acquire a lock to prevent a cache stampede. */ - let releaser: MutexInterface.Releaser + let releaser: LockReleaser try { this.logger.trace({ key, cache: this.stack.name, opId: options.id }, 'acquiring lock...') releaser = await this.#acquireLock(key, !!localItem, options) diff --git a/packages/bentocache/src/cache/locks.ts b/packages/bentocache/src/cache/locks.ts index 162e3cb..1df4c93 100644 --- a/packages/bentocache/src/cache/locks.ts +++ b/packages/bentocache/src/cache/locks.ts @@ -1,11 +1,13 @@ import { is } from '@julr/utils/is' -import { Mutex, withTimeout, type MutexInterface } from 'async-mutex' +import { Mutex, withTimeout } from 'async-mutex' -export class Locks { +import type { LockHandle, LockManager, LockReleaser } from '../types/main.js' + +export class Locks implements LockManager { /** * A map that will hold active locks for each key */ - #locks = new Map() + #locks = new Map() /** * For a given key, get or create a new lock @@ -13,7 +15,7 @@ export class Locks { * @param key Key to get or create a lock for * @param timeout Time to wait to acquire the lock */ - getOrCreateForKey(key: string, timeout?: number) { + getOrCreateForKey(key: string, timeout?: number): LockHandle { let lock = this.#locks.get(key) if (!lock) { lock = new Mutex() @@ -23,7 +25,7 @@ export class Locks { return is.number(timeout) ? withTimeout(lock, timeout) : lock } - release(key: string, releaser: MutexInterface.Releaser) { + release(key: string, releaser: LockReleaser) { releaser() this.#locks.delete(key) } diff --git a/packages/bentocache/src/drivers/file/file.ts b/packages/bentocache/src/drivers/file/file.ts index dbc4ace..13760c2 100644 --- a/packages/bentocache/src/drivers/file/file.ts +++ b/packages/bentocache/src/drivers/file/file.ts @@ -10,6 +10,7 @@ import type { CreateDriverResult, DriverCommonInternalOptions, FileConfig, + LockManager, } from '../../types/main.js' /** @@ -42,13 +43,14 @@ export class FileDriver extends BaseDriver implements CacheDriver { * Worker thread that will clean up the expired files */ #cleanerWorker?: Worker - #locks = new Locks() + #locks: LockManager declare config: FileConfig & DriverCommonInternalOptions - constructor(config: FileConfig, isNamespace: boolean = false) { + constructor(config: FileConfig & DriverCommonInternalOptions, isNamespace: boolean = false) { super(config) + this.#locks = new (config.lockManager ?? Locks)() this.#directory = this.#sanitizePath(join(config.directory, config.prefix || '')) /** diff --git a/packages/bentocache/src/types/options/drivers_options.ts b/packages/bentocache/src/types/options/drivers_options.ts index 1b255b1..587c347 100644 --- a/packages/bentocache/src/types/options/drivers_options.ts +++ b/packages/bentocache/src/types/options/drivers_options.ts @@ -10,6 +10,7 @@ import type { import type { Logger } from '../../logger.js' import type { Duration } from '../helpers.js' +import type { LockManagerConstructor } from './options.js' /** * Options that are common to all drivers @@ -24,6 +25,7 @@ export type DriverCommonOptions = { export type DriverCommonInternalOptions = { logger?: Logger + lockManager?: LockManagerConstructor } /** diff --git a/packages/bentocache/src/types/options/options.ts b/packages/bentocache/src/types/options/options.ts index 35463de..c39f079 100644 --- a/packages/bentocache/src/types/options/options.ts +++ b/packages/bentocache/src/types/options/options.ts @@ -1,3 +1,5 @@ +import type { MutexInterface } from 'async-mutex' + import type { FactoryError } from '../../errors.js' import type { CacheSerializer, Duration, Emitter, Logger } from '../main.js' @@ -95,6 +97,16 @@ export type RawCommonOptions = { */ export type InternalOperationWrapper = (fn: () => T) => T +export type LockHandle = MutexInterface +export type LockReleaser = MutexInterface.Releaser + +export interface LockManager { + getOrCreateForKey(key: string, timeout?: number): LockHandle + release(key: string, releaser: LockReleaser): void +} + +export type LockManagerConstructor = new () => LockManager + export type RawBentoCacheOptions = { prefix?: string @@ -125,6 +137,11 @@ export type RawBentoCacheOptions = { * suppress or customize instrumentation. */ internalOperationWrapper?: InternalOperationWrapper + + /** + * Custom lock manager class used for stampede protection and file driver writes. + */ + lockManager?: LockManagerConstructor } & Omit /** diff --git a/packages/bentocache/tests/bento_cache_options.spec.ts b/packages/bentocache/tests/bento_cache_options.spec.ts index f376e3f..34c7bfb 100644 --- a/packages/bentocache/tests/bento_cache_options.spec.ts +++ b/packages/bentocache/tests/bento_cache_options.spec.ts @@ -1,6 +1,7 @@ import { test } from '@japa/runner' import { ms } from '@julr/utils/string/ms' +import { Locks } from '../src/cache/locks.js' import { BentoCacheOptions } from '../src/bento_cache_options.js' test.group('Bento Cache Options', () => { @@ -9,6 +10,7 @@ test.group('Bento Cache Options', () => { assert.deepEqual(options.ttl, ms.parse('30m')) assert.deepEqual(options.prefix, 'bentocache') + assert.strictEqual(options.lockManager, Locks) }) test('override defaults', ({ assert }) => { @@ -25,4 +27,13 @@ test.group('Bento Cache Options', () => { assert.deepEqual(options.prefix, 'foo') assert.deepEqual(options.grace, false) }) + + test('uses custom lock manager class', ({ assert }) => { + class CustomLocks extends Locks {} + + const lockManager = CustomLocks + const options = new BentoCacheOptions({ lockManager }) + + assert.strictEqual(options.lockManager, lockManager) + }) }) diff --git a/packages/bentocache/tests/cache/stampede_protection.spec.ts b/packages/bentocache/tests/cache/stampede_protection.spec.ts index d573e8e..54467c4 100644 --- a/packages/bentocache/tests/cache/stampede_protection.spec.ts +++ b/packages/bentocache/tests/cache/stampede_protection.spec.ts @@ -2,12 +2,59 @@ import { test } from '@japa/runner' import { sleep } from '@julr/utils/misc' import { errors } from '../../src/errors.js' +import { Locks } from '../../src/cache/locks.js' import { RedisDriver } from '../../src/drivers/redis.js' import { MemoryDriver } from '../../src/drivers/memory.js' import { CacheFactory } from '../../factories/cache_factory.js' import { REDIS_CREDENTIALS, throwingFactory } from '../helpers/index.js' +import type { LockManager, LockReleaser } from '../../src/types/main.js' test.group('Cache | Stampede protection', () => { + test('should use custom lock manager class', async ({ assert }) => { + class SpyLockManager implements LockManager { + calls = 0 + releases = 0 + #locks = new Locks() + + getOrCreateForKey(key: string, timeout?: number) { + this.calls++ + return this.#locks.getOrCreateForKey(key, timeout) + } + + release(key: string, releaser: LockReleaser) { + this.releases++ + this.#locks.release(key, releaser) + } + } + + const lockManager = new SpyLockManager() + class CustomLockManager implements LockManager { + getOrCreateForKey(key: string, timeout?: number) { + return lockManager.getOrCreateForKey(key, timeout) + } + + release(key: string, releaser: LockReleaser) { + return lockManager.release(key, releaser) + } + } + + const { cache } = new CacheFactory() + .merge({ + lockManager: CustomLockManager, + l1Driver: new MemoryDriver({ maxItems: 100, prefix: 'test' }), + }) + .create() + + const results = await Promise.all([ + cache.getOrSet({ key: 'key', factory: async () => 42 }), + cache.getOrSet({ key: 'key', factory: async () => 42 }), + ]) + + assert.deepEqual(results, [42, 42]) + assert.isTrue(lockManager.calls > 0) + assert.isTrue(lockManager.releases > 0) + }) + test('only one background factory should be executed if soft timeout is triggered', async ({ assert, }) => { diff --git a/packages/bentocache/tests/drivers/file.spec.ts b/packages/bentocache/tests/drivers/file.spec.ts index 46c58c6..ba48e2b 100644 --- a/packages/bentocache/tests/drivers/file.spec.ts +++ b/packages/bentocache/tests/drivers/file.spec.ts @@ -4,7 +4,9 @@ import { sleep } from '@julr/utils/misc' import { testLogger } from '@julr/utils/logger' import { BASE_URL } from '../helpers/index.js' +import { Locks } from '../../src/cache/locks.js' import { FileDriver } from '../../src/drivers/file/file.js' +import type { LockManager, LockReleaser } from '../../src/types/main.js' import { registerCacheDriverTestSuite } from '../helpers/driver_test_suite.js' test.group('File driver', (group) => { @@ -121,6 +123,45 @@ test.group('File Driver | Prune', () => { await assert.doesNotReject(() => driver.get('foo')) }) + test('uses custom lock manager for writes', async ({ cleanup, assert }) => { + class SpyLockManager implements LockManager { + calls = 0 + #locks = new Locks() + + getOrCreateForKey(key: string, timeout?: number) { + this.calls++ + return this.#locks.getOrCreateForKey(key, timeout) + } + + release(key: string, releaser: LockReleaser) { + this.#locks.release(key, releaser) + } + } + + const lockManager = new SpyLockManager() + class CustomLockManager implements LockManager { + getOrCreateForKey(key: string, timeout?: number) { + return lockManager.getOrCreateForKey(key, timeout) + } + + release(key: string, releaser: LockReleaser) { + return lockManager.release(key, releaser) + } + } + + const driver = new FileDriver({ + pruneInterval: false, + directory: fileURLToPath(BASE_URL), + lockManager: CustomLockManager, + }) + + cleanup(() => driver.disconnect()) + + await Promise.all([driver.set('foo', 'bar', 300), driver.set('foo', 'baz', 300)]) + + assert.isTrue(lockManager.calls > 0) + }) + test('prune manually using prune() method', async ({ assert, fs, cleanup }) => { const driver = new FileDriver({ directory: fileURLToPath(BASE_URL), diff --git a/packages/bentocache/tests/typings.spec.ts b/packages/bentocache/tests/typings.spec.ts index 0b12cc5..1a88d55 100644 --- a/packages/bentocache/tests/typings.spec.ts +++ b/packages/bentocache/tests/typings.spec.ts @@ -1,4 +1,5 @@ import { test } from '@japa/runner' +import { Mutex, withTimeout } from 'async-mutex' import { bentostore } from '../src/bento_store.js' import { BentoCache } from '../src/bento_cache.js' @@ -6,12 +7,26 @@ import { memoryDriver } from '../src/drivers/memory.js' import type { Duration } from '../src/types/helpers.js' import type { CacheEvents } from '../src/types/events.js' import { CacheFactory } from '../factories/cache_factory.js' +import type { LockManager, LockReleaser } from '../src/types/main.js' import { BentoCacheFactory } from '../factories/bentocache_factory.js' test.group('Typings', () => { test('named caches typings', async ({ expectTypeOf }) => { + class CustomLockManager implements LockManager { + #lock = new Mutex() + + getOrCreateForKey(_key: string, timeout?: number) { + return withTimeout(this.#lock, timeout ?? Infinity) + } + + release(_key: string, releaser: LockReleaser) { + releaser() + } + } + const bento = new BentoCache({ default: 'primary', + lockManager: CustomLockManager, stores: { primary: bentostore().useL1Layer(memoryDriver({ maxItems: 100 })), secondary: bentostore().useL1Layer(memoryDriver({ maxItems: 100 })),