Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/content/docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,39 @@ The maximum amount of time (in milliseconds) that the in-memory lock for [stampe

This is usually not needed, but can provide an extra layer of protection against theoretical deadlocks.

### `lockManager`

Default: built-in in-memory lock manager.

Levels: `global`

A custom lock manager class.

This lock manager is used for stampede protection and file driver writes.

`LockManager` must return a lock compatible with `MutexInterface` from `async-mutex`. Bentocache exports `LockHandle` and `LockReleaser` as aliases for `MutexInterface` and `MutexInterface.Releaser`.

```ts
import { Mutex, withTimeout } from 'async-mutex'

class CustomLockManager {
#lock = new Mutex()

getOrCreateForKey(key, timeout) {
return withTimeout(this.#lock, timeout ?? Infinity)
}

release(key, releaser) {
releaser()
}
}

const bento = new BentoCache({
lockManager: CustomLockManager,
// ...
})
```

### `onFactoryError`

Default: `undefined`
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/stampede_protection.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,11 @@ Well, in truth, this is not really a problem. Indeed, there will be more than on
Given the same scenario with the 10k users. Imagine that your application is running in cluster mode with PM2 and you have 10 instances of your app. Also, imagine that the 10k requests are distributed equally across the 10 instances.

This results in 1k requests per instance. And so, it will lead to **10 queries to the database instead of 10k**, with the help of our protection.

## Shared locks

BentoCache allows you to use custom lock manager class so you can implement **shared** locks.

For example, you can implement lock manager which uses patterns like [Redis distributed locks](https://redis.io/docs/latest/develop/clients/patterns/distributed-locks/) where locks are created in L2 storage and shared across all application instances and don’t have process instance limitations.

In multi-instance applications scenario, this will lead to **1 query to database instead of 10k**.
2 changes: 2 additions & 0 deletions packages/bentocache/src/bento_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ export class BentoCache<KnownCaches extends Record<string, BentoStore>> implemen
l1Driver: entry.l1?.factory({
prefix: driverItemOptions.prefix,
logger: driverItemOptions.logger,
lockManager: driverItemOptions.lockManager,
...entry.l1.options,
}),
l2Driver: entry.l2?.factory({
prefix: driverItemOptions.prefix,
logger: driverItemOptions.logger,
lockManager: driverItemOptions.lockManager,
...entry.l2.options,
}),
busDriver: entry.bus?.factory(entry.bus?.options),
Expand Down
15 changes: 14 additions & 1 deletion packages/bentocache/src/bento_cache_options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@ import { ms } from '@julr/utils/string/ms'
import { noopLogger } from '@julr/utils/logger'

import { Logger } from './logger.js'
import { Locks } from './cache/locks.js'
import { resolveTtl } from './helpers.js'
import type { FactoryError } from './errors.js'
import { JsonSerializer } from './serializers/json.js'
import type { CacheSerializer, Duration, Emitter, RawBentoCacheOptions } from './types/main.js'
import type {
CacheSerializer,
Duration,
Emitter,
LockManagerConstructor,
RawBentoCacheOptions,
} from './types/main.js'

const defaultSerializer = new JsonSerializer()

Expand Down Expand Up @@ -68,6 +75,11 @@ export class BentoCacheOptions {
*/
lockTimeout?: Duration = null

/**
* The lock manager class used throughout the library
*/
lockManager: LockManagerConstructor

/**
* Duration for the circuit breaker to stay open
* if l2 cache fails
Expand All @@ -90,6 +102,7 @@ export class BentoCacheOptions {
this.hardTimeout = this.#options.hardTimeout
this.suppressL2Errors = this.#options.suppressL2Errors
this.lockTimeout = this.#options.lockTimeout
this.lockManager = this.#options.lockManager ?? Locks
this.grace = this.#options.grace!
this.graceBackoff = this.#options.graceBackoff!

Expand Down
11 changes: 5 additions & 6 deletions packages/bentocache/src/cache/factory_runner.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import pTimeout from 'p-timeout'
import { tryAsync } from '@julr/utils/functions'
import type { MutexInterface } from 'async-mutex'

import { errors } from '../errors.js'
import type { Locks } from './locks.js'
import type { CacheStack } from './cache_stack.js'
import { cacheOperation } from '../tracing_channels.js'
import type { GetSetFactory } from '../types/helpers.js'
import type { LockManager, LockReleaser } from '../types/main.js'
import type { GetCacheValueReturn } from '../types/internals/index.js'
import type { CacheOperationMessage } from '../types/tracing_channels.js'
import type { CacheEntryOptions } from './cache_entry/cache_entry_options.js'
Expand All @@ -15,7 +14,7 @@ interface RunFactoryParameters {
key: string
factory: GetSetFactory
options: CacheEntryOptions
lockReleaser: MutexInterface.Releaser
lockReleaser: LockReleaser
isBackground?: boolean
gracedValue?: GetCacheValueReturn
}
Expand All @@ -24,11 +23,11 @@ interface RunFactoryParameters {
* Factory Runner is responsible for executing factories
*/
export class FactoryRunner {
#locks: Locks
#locks: LockManager
#stack: CacheStack
#skipSymbol = Symbol('bentocache.skip')

constructor(stack: CacheStack, locks: Locks) {
constructor(stack: CacheStack, locks: LockManager) {
this.#stack = stack
this.#locks = locks
}
Expand Down Expand Up @@ -119,7 +118,7 @@ export class FactoryRunner {
factory: GetSetFactory,
gracedValue: GetCacheValueReturn | undefined,
options: CacheEntryOptions,
lockReleaser: MutexInterface.Releaser,
lockReleaser: LockReleaser,
) {
const hasGracedValue = !!gracedValue
const timeout = options.factoryTimeout(hasGracedValue)
Expand Down
9 changes: 4 additions & 5 deletions packages/bentocache/src/cache/get_set/single_tier_handler.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import type { MutexInterface } from 'async-mutex'

import { Locks } from '../locks.js'
import { errors } from '../../errors.js'
import type { CacheStack } from '../cache_stack.js'
import { FactoryRunner } from '../factory_runner.js'
import type { Factory } from '../../types/helpers.js'
import type { CacheEvent } from '../../types/events.js'
import { cacheEvents } from '../../events/cache_events.js'
import { cacheOperation } from '../../tracing_channels.js'
import type { LockManager, LockReleaser } from '../../types/main.js'
import type { GetCacheValueReturn } from '../../types/internals/index.js'
import type { CacheOperationMessage } from '../../types/tracing_channels.js'
import type { CacheEntryOptions } from '../cache_entry/cache_entry_options.js'
Expand All @@ -16,10 +14,11 @@ export class SingleTierHandler {
/**
* A map that will hold active locks for each key
*/
#locks = new Locks()
#locks: LockManager
#factoryRunner: FactoryRunner

constructor(protected stack: CacheStack) {
this.#locks = new this.stack.options.lockManager()
this.#factoryRunner = new FactoryRunner(this.stack, this.#locks)
}

Expand Down Expand Up @@ -140,7 +139,7 @@ export class SingleTierHandler {
* If nothing is found in the remote cache, or if forceFresh is true,
* we try to acquire a lock to run the factory
*/
let releaser: MutexInterface.Releaser
let releaser: LockReleaser
try {
releaser = await this.#acquireLock(key, !!remoteItem, options)
} catch (err) {
Expand Down
9 changes: 4 additions & 5 deletions packages/bentocache/src/cache/get_set/two_tier_handler.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import type { MutexInterface } from 'async-mutex'

import { Locks } from '../locks.js'
import { errors } from '../../errors.js'
import type { CacheStack } from '../cache_stack.js'
import { FactoryRunner } from '../factory_runner.js'
import type { Factory } from '../../types/helpers.js'
import type { CacheEvent } from '../../types/events.js'
import { cacheEvents } from '../../events/cache_events.js'
import { cacheOperation } from '../../tracing_channels.js'
import type { LockManager, LockReleaser } from '../../types/main.js'
import type { GetCacheValueReturn } from '../../types/internals/index.js'
import type { CacheOperationMessage } from '../../types/tracing_channels.js'
import type { CacheEntryOptions } from '../cache_entry/cache_entry_options.js'
Expand All @@ -16,10 +14,11 @@ export class TwoTierHandler {
/**
* A map that will hold active locks for each key
*/
#locks = new Locks()
#locks: LockManager
#factoryRunner: FactoryRunner

constructor(protected stack: CacheStack) {
this.#locks = new this.stack.options.lockManager()
this.#factoryRunner = new FactoryRunner(this.stack, this.#locks)
}

Expand Down Expand Up @@ -130,7 +129,7 @@ export class TwoTierHandler {
*
* We acquire a lock to prevent a cache stampede.
*/
let releaser: MutexInterface.Releaser
let releaser: LockReleaser
try {
this.logger.trace({ key, cache: this.stack.name, opId: options.id }, 'acquiring lock...')
releaser = await this.#acquireLock(key, !!localItem, options)
Expand Down
12 changes: 7 additions & 5 deletions packages/bentocache/src/cache/locks.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import { is } from '@julr/utils/is'
import { Mutex, withTimeout, type MutexInterface } from 'async-mutex'
import { Mutex, withTimeout } from 'async-mutex'

export class Locks {
import type { LockHandle, LockManager, LockReleaser } from '../types/main.js'

export class Locks implements LockManager {
/**
* A map that will hold active locks for each key
*/
#locks = new Map<string, MutexInterface>()
#locks = new Map<string, LockHandle>()

/**
* For a given key, get or create a new lock
*
* @param key Key to get or create a lock for
* @param timeout Time to wait to acquire the lock
*/
getOrCreateForKey(key: string, timeout?: number) {
getOrCreateForKey(key: string, timeout?: number): LockHandle {
let lock = this.#locks.get(key)
if (!lock) {
lock = new Mutex()
Expand All @@ -23,7 +25,7 @@ export class Locks {
return is.number(timeout) ? withTimeout(lock, timeout) : lock
}

release(key: string, releaser: MutexInterface.Releaser) {
release(key: string, releaser: LockReleaser) {
releaser()
this.#locks.delete(key)
}
Expand Down
6 changes: 4 additions & 2 deletions packages/bentocache/src/drivers/file/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
CreateDriverResult,
DriverCommonInternalOptions,
FileConfig,
LockManager,
} from '../../types/main.js'

/**
Expand Down Expand Up @@ -42,13 +43,14 @@ export class FileDriver extends BaseDriver implements CacheDriver {
* Worker thread that will clean up the expired files
*/
#cleanerWorker?: Worker
#locks = new Locks()
#locks: LockManager

declare config: FileConfig & DriverCommonInternalOptions

constructor(config: FileConfig, isNamespace: boolean = false) {
constructor(config: FileConfig & DriverCommonInternalOptions, isNamespace: boolean = false) {
super(config)

this.#locks = new (config.lockManager ?? Locks)()
this.#directory = this.#sanitizePath(join(config.directory, config.prefix || ''))

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/bentocache/src/types/options/drivers_options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {

import type { Logger } from '../../logger.js'
import type { Duration } from '../helpers.js'
import type { LockManagerConstructor } from './options.js'

/**
* Options that are common to all drivers
Expand All @@ -24,6 +25,7 @@ export type DriverCommonOptions = {

export type DriverCommonInternalOptions = {
logger?: Logger
lockManager?: LockManagerConstructor
}

/**
Expand Down
17 changes: 17 additions & 0 deletions packages/bentocache/src/types/options/options.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { MutexInterface } from 'async-mutex'

import type { FactoryError } from '../../errors.js'
import type { CacheSerializer, Duration, Emitter, Logger } from '../main.js'

Expand Down Expand Up @@ -95,6 +97,16 @@ export type RawCommonOptions = {
*/
export type InternalOperationWrapper = <T>(fn: () => T) => T

export type LockHandle = MutexInterface
export type LockReleaser = MutexInterface.Releaser

export interface LockManager {
getOrCreateForKey(key: string, timeout?: number): LockHandle
release(key: string, releaser: LockReleaser): void
}

export type LockManagerConstructor = new () => LockManager

export type RawBentoCacheOptions = {
prefix?: string

Expand Down Expand Up @@ -125,6 +137,11 @@ export type RawBentoCacheOptions = {
* suppress or customize instrumentation.
*/
internalOperationWrapper?: InternalOperationWrapper

/**
* Custom lock manager class used for stampede protection and file driver writes.
*/
lockManager?: LockManagerConstructor
} & Omit<RawCommonOptions, 'tags' | 'skipBusNotify' | 'skipL2Write'>

/**
Expand Down
11 changes: 11 additions & 0 deletions packages/bentocache/tests/bento_cache_options.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { test } from '@japa/runner'
import { ms } from '@julr/utils/string/ms'

import { Locks } from '../src/cache/locks.js'
import { BentoCacheOptions } from '../src/bento_cache_options.js'

test.group('Bento Cache Options', () => {
Expand All @@ -9,6 +10,7 @@ test.group('Bento Cache Options', () => {

assert.deepEqual(options.ttl, ms.parse('30m'))
assert.deepEqual(options.prefix, 'bentocache')
assert.strictEqual(options.lockManager, Locks)
})

test('override defaults', ({ assert }) => {
Expand All @@ -25,4 +27,13 @@ test.group('Bento Cache Options', () => {
assert.deepEqual(options.prefix, 'foo')
assert.deepEqual(options.grace, false)
})

test('uses custom lock manager class', ({ assert }) => {
class CustomLocks extends Locks {}

const lockManager = CustomLocks
const options = new BentoCacheOptions({ lockManager })

assert.strictEqual(options.lockManager, lockManager)
})
})
Loading