WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 95c6be6

Browse files
committed
chore: use diff named locks
1 parent 8b3f9e7 commit 95c6be6

File tree

4 files changed

+34
-15
lines changed

4 files changed

+34
-15
lines changed

.mdeprc.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,24 @@ module.exports = exports = {
1313
user: `${uid}:${uid}`,
1414
environment: {
1515
DB: '${DB}'
16-
}
16+
},
1717
}
1818
}
1919
}
2020

2121
switch (process.env.DB) {
2222
case 'cluster':
2323
exports.services.push('redisCluster');
24+
25+
exports.extras['redis-cluster'] = {
26+
healthcheck: {
27+
test: "redis-cli -p 7000 cluster info | grep cluster_state:ok > /dev/null && exit 0 || exit 1",
28+
interval: '1s',
29+
timeout: '5s',
30+
retries: 30,
31+
}
32+
}
33+
2434
break;
2535
case 'sentinel':
2636
default:

__tests__/integration.spec.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import sinon = require('sinon')
55
import { noop } from 'lodash'
66
import { DistributedCallbackQueue, MultiLockError, Semaphore } from '../src/distributed-callback-queue'
77
import { LockAcquisitionError } from '@microfleet/ioredis-lock'
8+
import { setTimeout } from 'timers/promises'
89

910
describe('integration tests', () => {
1011
jest.setTimeout(10000)
@@ -78,7 +79,7 @@ describe('integration tests', () => {
7879

7980
it('#push: job is performed only once', () => {
8081
const args = [null, 'completed']
81-
const job = sinon.spy((next) => setTimeout(next, 500, ...args))
82+
const job = sinon.spy((next) => global.setTimeout(next, 500, ...args))
8283
const onComplete = sinon.spy()
8384
const failedToQueue = sinon.spy()
8485
const unexpectedError = sinon.spy()
@@ -216,7 +217,7 @@ describe('integration tests', () => {
216217
it('#fanout: job is performed only once', () => {
217218
const args = ['completed']
218219
const job = sinon.spy(async () => {
219-
await Promise.delay(500)
220+
await setTimeout(500)
220221
return [...args]
221222
})
222223
const onComplete = sinon.spy()
@@ -273,7 +274,7 @@ describe('integration tests', () => {
273274

274275
it('#fanout: fails after timeout', async () => {
275276
const job = sinon.spy(async (_: any) => {
276-
await Promise.delay(3000)
277+
await setTimeout(3000)
277278
})
278279
const arg1 = 'arg1'
279280
const onComplete = sinon.spy()
@@ -305,7 +306,7 @@ describe('integration tests', () => {
305306

306307
it('#fanout: fails after timeout even if lock has not been acquired', async () => {
307308
const job = sinon.spy(async () => {
308-
await Promise.delay(3000)
309+
await setTimeout(3000)
309310
})
310311
const onComplete = sinon.spy()
311312
const timeoutError = sinon.spy()
@@ -383,7 +384,7 @@ describe('integration tests', () => {
383384
await Promise.map(queueManagers, async (queueManager) => {
384385
try {
385386
const lock = await queueManager.dlock.once('once')
386-
await Promise.delay(1500)
387+
await setTimeout(1500)
387388
job()
388389
await lock.release()
389390
} catch (err) {
@@ -438,10 +439,13 @@ describe('integration tests', () => {
438439
const failedToQueue = sinon.spy()
439440
const unexpectedError = sinon.spy()
440441

441-
await Promise.map(queueManagers, async (queueManager) => {
442+
await Promise.map(queueManagers, async (queueManager, idx) => {
442443
try {
443-
const lock = await queueManager.dlock.multi('1', '2', '3')
444-
await Promise.delay(1500)
444+
if (idx !== 0) {
445+
await setTimeout(100) // give a chance for first idx to acquire _all_ locks so that test isnt flaky
446+
}
447+
const lock = await queueManager.dlock.multi('5', '6', '7')
448+
await setTimeout(1500)
445449
await lock.release()
446450
job()
447451
} catch (err) {
@@ -453,7 +457,7 @@ describe('integration tests', () => {
453457
}
454458
})
455459

456-
assert(job.calledOnce, 'job was called more than once')
460+
assert.equal(job.callCount, 1)
457461
assert.strictEqual(failedToQueue.callCount, 9, 'unexpected error was raised')
458462
assert.strictEqual(unexpectedError.called, false, 'fatal error was raised')
459463
})
@@ -479,7 +483,7 @@ describe('integration tests', () => {
479483
// if it's possible for other contestants
480484
// to run out of semaphore lock - counter will
481485
// increase multiple times before resolving following promise
482-
await Promise.delay(10)
486+
await setTimeout(10)
483487

484488
// return the counter
485489
return counter - 1

src/callback-queue.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import P from 'pino'
2-
import { delay } from 'bluebird'
2+
import { setTimeout } from 'node:timers/promises'
33
import * as callbackQueue from '@microfleet/callback-queue'
44
import { serializeError, deserializeError } from 'serialize-error'
55
import Redis = require('ioredis')
@@ -12,6 +12,8 @@ export type RedisInstance = Redis.Redis | Redis.Cluster
1212
export type Publisher = (key: string, err?: Error | null, ...args: any[]) => Promise<void>
1313
export type Consumer = (channel: string, message: string) => void
1414

15+
const kError = new Error('callback called multiple times')
16+
1517
/**
1618
* Call functions stored in local queues
1719
* @param queueName
@@ -21,7 +23,7 @@ export type Consumer = (channel: string, message: string) => void
2123
function call(queueName: string, args: any[], logger: P.Logger): void {
2224
const callback = queue.get(queueName)
2325
if (!callback) {
24-
throw new Error('callback called multiple times')
26+
throw kError
2527
}
2628

2729
// these are async anyways - gonna schedule them
@@ -98,7 +100,7 @@ export function createConsumer(redis: RedisInstance, pubsubChannel: string, logg
98100
logger.info({ pubsubChannel }, 'Subscribed to channel')
99101
} catch (err) {
100102
logger.error({ err }, 'Failed to subsctibe to pubsub channel')
101-
await delay(250)
103+
await setTimeout(250)
102104
return connect()
103105
}
104106
}

src/distributed-callback-queue.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,10 @@ export class DistributedCallbackQueue {
121121
const { client, pubsub } = this
122122

123123
this.logger.info('disconnecting redis clients')
124-
await Promise.all([client.quit(), pubsub.quit()])
124+
await Promise.all([
125+
client.quit(),
126+
pubsub.quit()
127+
])
125128
}
126129

127130
static isCompatibleLogger(logger: unknown): logger is Logger {

0 commit comments

Comments
 (0)