信号量 (Semaphore)
一种用于限制并发操作的同步原语
bytes
since v12.6.0
用法
信号量是一种同步原语,它允许有限数量的并发操作进行。
import { Semaphore } from 'radashi'
const semaphore = new Semaphore(2)
const permit = await semaphore.acquire()
permit.release()当信号量的容量达到上限时,后续调用 semaphore.acquire() 将会阻塞,直到有许可被释放。
import { Semaphore } from 'radashi'
const semaphore = new Semaphore(1)
const permit = await semaphore.acquire() // 获取唯一的许可
// 下一个 acquire 调用将会阻塞,直到第一个许可被释放const blockingAcquire = semaphore.acquire()
// ... 稍后 ...permit.release() // 释放许可后,blockingAcquire 得以继续执行你可以使用特定的 weight(权重)来获取许可。信号量的容量会减去这个权重值。如果剩余容量小于请求的权重,获取操作将会阻塞。
import { Semaphore } from 'radashi'
const semaphore = new Semaphore(4)
// 获取一个权重为 2 的许可const permit = await semaphore.acquire({ weight: 2 })
// 当前容量为 2。可以获取权重为 1 或 2 的许可。await semaphore.acquire({ weight: 1 })
// 现在获取权重为 2 的许可会阻塞,因为当前容量仅为 1。一个许可只能被释放一次。对同一个许可实例多次调用 permit.release() 将不会产生任何效果。
import { Semaphore } from 'radashi'
const semaphore = new Semaphore(1)const permit = await semaphore.acquire()
permit.release() // 释放许可permit.release() // 无效果信号量的构造函数要求 maxCapacity(最大容量)必须大于 0。获取许可时要求的 weight(权重)也必须大于 0 且不能超过信号量的 maxCapacity。无效的选项将导致错误。
import { Semaphore } from 'radashi'
// 无效的构造函数// new Semaphore(0) // 抛出错误: maxCapacity 必须 > 0
const semaphore = new Semaphore(2)
// 无效的 acquire 选项// await semaphore.acquire({ weight: 0 }) // 抛出错误: weight 必须 > 0// await semaphore.acquire({ weight: 3 }) // 抛出错误: weight 必须 ≤ maxCapacity你可以使用 AbortController 及其 signal(信号)来中止一个待定的获取操作。如果在获取到许可之前信号被中止,acquire 的 Promise 将会拒绝并抛出一个 AbortError。
import { Semaphore } from 'radashi'
const semaphore = new Semaphore(1)await semaphore.acquire() // 占用唯一的许可
const controller = new AbortController()const acquirePromise = semaphore.acquire({ signal: controller.signal })
// 在获取操作完成之前中止它controller.abort()
// acquirePromise 现在将会拒绝await expect(acquirePromise).rejects.toThrow('This operation was aborted')你可以通过调用 semaphore.reject() 来拒绝所有待定的和未来的获取请求。所有由待定的和未来的 acquire 调用返回的 Promise 都将拒绝,并抛出提供的错误。
import { Semaphore } from 'radashi'
const semaphore = new Semaphore(1)await semaphore.acquire() // 占用唯一的许可
const acquirePromise = semaphore.acquire() // 这将阻塞
const rejectionError = new Error('操作已取消')semaphore.reject(rejectionError)
// acquirePromise 现在将抛出指定的错误await expect(acquirePromise).rejects.toThrow('操作已取消')
// 未来的获取请求也将被拒绝await expect(() => semaphore.acquire()).rejects.toThrow('操作已取消')