Skip to content

信号量 (Semaphore)

一种用于限制并发操作的同步原语

bytes
since v12.6.0

用法

信号量是一种同步原语,它允许有限数量的并发操作进行。

import { Semaphore } from 'radashi'
const semaphore = new Semaphore(2)
const permit = await semaphore.acquire()
permit.release()

当信号量的容量达到上限时,后续调用 semaphore.acquire() 将会阻塞,直到有许可被释放。

import { Semaphore } from 'radashi'
const semaphore = new Semaphore(1)
const permit = await semaphore.acquire() // 获取唯一的许可
// 下一个 acquire 调用将会阻塞,直到第一个许可被释放
const blockingAcquire = semaphore.acquire()
// ... 稍后 ...
permit.release() // 释放许可后,blockingAcquire 得以继续执行

你可以使用特定的 weight(权重)来获取许可。信号量的容量会减去这个权重值。如果剩余容量小于请求的权重,获取操作将会阻塞。

import { Semaphore } from 'radashi'
const semaphore = new Semaphore(4)
// 获取一个权重为 2 的许可
const permit = await semaphore.acquire({ weight: 2 })
// 当前容量为 2。可以获取权重为 1 或 2 的许可。
await semaphore.acquire({ weight: 1 })
// 现在获取权重为 2 的许可会阻塞,因为当前容量仅为 1。

一个许可只能被释放一次。对同一个许可实例多次调用 permit.release() 将不会产生任何效果。

import { Semaphore } from 'radashi'
const semaphore = new Semaphore(1)
const permit = await semaphore.acquire()
permit.release() // 释放许可
permit.release() // 无效果

信号量的构造函数要求 maxCapacity(最大容量)必须大于 0。获取许可时要求的 weight(权重)也必须大于 0 且不能超过信号量的 maxCapacity。无效的选项将导致错误。

import { Semaphore } from 'radashi'
// 无效的构造函数
// new Semaphore(0) // 抛出错误: maxCapacity 必须 > 0
const semaphore = new Semaphore(2)
// 无效的 acquire 选项
// await semaphore.acquire({ weight: 0 }) // 抛出错误: weight 必须 > 0
// await semaphore.acquire({ weight: 3 }) // 抛出错误: weight 必须 ≤ maxCapacity

你可以使用 AbortController 及其 signal(信号)来中止一个待定的获取操作。如果在获取到许可之前信号被中止,acquire 的 Promise 将会拒绝并抛出一个 AbortError

import { Semaphore } from 'radashi'
const semaphore = new Semaphore(1)
await semaphore.acquire() // 占用唯一的许可
const controller = new AbortController()
const acquirePromise = semaphore.acquire({ signal: controller.signal })
// 在获取操作完成之前中止它
controller.abort()
// acquirePromise 现在将会拒绝
await expect(acquirePromise).rejects.toThrow('This operation was aborted')

你可以通过调用 semaphore.reject() 来拒绝所有待定的和未来的获取请求。所有由待定的和未来的 acquire 调用返回的 Promise 都将拒绝,并抛出提供的错误。

import { Semaphore } from 'radashi'
const semaphore = new Semaphore(1)
await semaphore.acquire() // 占用唯一的许可
const acquirePromise = semaphore.acquire() // 这将阻塞
const rejectionError = new Error('操作已取消')
semaphore.reject(rejectionError)
// acquirePromise 现在将抛出指定的错误
await expect(acquirePromise).rejects.toThrow('操作已取消')
// 未来的获取请求也将被拒绝
await expect(() => semaphore.acquire()).rejects.toThrow('操作已取消')