Skip to content

Commit

Permalink
chore: Add semaphore.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shakeskeyboarde committed Nov 16, 2023
1 parent 021573f commit e745eb5
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Completely free TypeScript one-file source code snippets _with tests_, which can
- [read-stream](read-stream) - Read all data from a stream.
- [retry](retry) - Retry asynchronous actions on error (rejection).
- [schema](schema) - Composable type predicates for runtime type checking.
- [semaphore](semaphore) - Limit the concurrency of asynchronous tasks.
- [sorted-list](sorted-list) - Always sorted list with binary searching.
- [subject](subject) - Values you can subscribe to.
- [subject-selector](subject-selector) - Computed values you can subscribe to.
Expand Down
24 changes: 24 additions & 0 deletions semaphore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Semaphore

Limit the concurrency of asynchronous tasks.

Create a semaphore with a size of 3. This semaphore will allow up to 3 locks to be acquired at any given time.

```tsx
const semaphore = new Semaphore(3);
```

The following for loop runs 10 asynchronous tasks, but only the first 3 will run immediately, and only 3 will ever be running simultaneously. The 4th iteration will wait until one of the first 3 iterations releases a lock, and the 5th will wait until another one of the first 4 iterations releases a lock, and so on.

```tsx
const promises: Promise<void>[] = [];

for (let i = 0; i < 10; ++i) {
const lock = await semaphore.acquire();
const promise = doAsyncTask().finally(lock.release);

promises.push(promise);
}

await Promise.all(promises);
```
82 changes: 82 additions & 0 deletions semaphore/semaphore.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { Semaphore } from './semaphore.js';

describe('Semaphore', () => {
test('size is truncated to an integer >= 1', () => {
expect(new Semaphore(0).size).toBe(1);
expect(new Semaphore(-1).size).toBe(1);
expect(new Semaphore(2.2).size).toBe(2);
expect(new Semaphore(2.8).size).toBe(2);
});

test('available and waiting counts are correct', async () => {
const semaphore = new Semaphore(1);
expect(semaphore.available).toBe(1);
expect(semaphore.waiting).toBe(0);

const p0 = semaphore.acquire();

expect(semaphore.available).toBe(0);
expect(semaphore.waiting).toBe(0);

const p1 = semaphore.acquire();

expect(semaphore.available).toBe(0);
expect(semaphore.waiting).toBe(1);

const p2 = semaphore.acquire();

expect(semaphore.available).toBe(0);
expect(semaphore.waiting).toBe(2);

(await p0).release();

expect(semaphore.available).toBe(0);
expect(semaphore.waiting).toBe(1);

(await p1).release();

expect(semaphore.available).toBe(0);
expect(semaphore.waiting).toBe(0);

(await p2).release();

expect(semaphore.available).toBe(1);
expect(semaphore.waiting).toBe(0);
});

test('parallelism is limited by the semaphore size', async () => {
const semaphore = new Semaphore(3);
const promises: Promise<number>[] = [];

let maxConcurrency = 0;
let concurrency = 0;

for (let i = 0; i < 10; ++i) {
const index = i;
const lock = await semaphore.acquire();
const promise = new Promise<number>((resolve) => {
concurrency += 1;
maxConcurrency = Math.max(maxConcurrency, concurrency);
setTimeout(() => {
concurrency -= 1;
resolve(index);
});
}).finally(lock.release);

promises.push(promise);
}

expect(maxConcurrency).toBe(3);
expect(await Promise.all(promises)).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
});

test('release() is a no-op when called twice', async () => {
const semaphore = new Semaphore(1);
const lock = await semaphore.acquire();

lock.release();
lock.release();

expect(semaphore.available).toBe(1);
});
});
63 changes: 63 additions & 0 deletions semaphore/semaphore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
class Lock {
#released = false;
#onRelease: () => void;

constructor(onRelease: () => void) {
this.#onRelease = onRelease;
this.release = this.release.bind(this);
}

release(): void {
if (this.#released) return;

this.#released = true;
this.#onRelease();
}
}

export class Semaphore {
readonly #size: number;
#available: number;
#waiting: (() => void)[] = [];

#next(): void {
if (this.#available <= 0) return;

this.#waiting.shift()?.();
}

get size(): number {
return this.#size;
}

get available(): number {
return this.#available;
}

get waiting(): number {
return this.#waiting.length;
}

constructor(size: number) {
this.#size = Math.max(1, Math.trunc(size));
this.#available = this.#size;
this.acquire = this.acquire.bind(this);
}

acquire(): Promise<Lock> {
return new Promise((resolve) => {
this.#waiting.push(() => {
this.#available -= 1;

resolve(
new Lock(() => {
this.#available += 1;
this.#next();
}),
);
});

this.#next();
});
}
}

0 comments on commit e745eb5

Please sign in to comment.