diff --git a/src/event-bus/event-consumer/azure-servicebus.ts b/src/event-bus/event-consumer/azure-servicebus.ts index f0e349b..af3c8d9 100644 --- a/src/event-bus/event-consumer/azure-servicebus.ts +++ b/src/event-bus/event-consumer/azure-servicebus.ts @@ -1,7 +1,7 @@ import * as AzureIden from "@azure/identity"; import { RetryMode, ServiceBusClient } from "@azure/service-bus"; import { EventConsumerBuilder } from "./interface"; -import { exponentialDelay } from "./utils"; +import { exponentialDelay, randomDelay } from "./utils"; /** * Azure ServiceBus supports @@ -77,6 +77,7 @@ export const AzureServiceBusConsumerBuilder: EventConsumerBuilder = async ( await receiver.completeMessage(msg); } else if (resp.statusCode === 429 || resp.statusCode === 409) { // rate-limited or lock-conflict + await randomDelay(); await receiver.abandonMessage(msg); } else if (resp.statusCode === 425) { // delayed message @@ -118,7 +119,7 @@ export const AzureServiceBusConsumerBuilder: EventConsumerBuilder = async ( ); instance.log.info( "Attached to Azure ServiceBus Subscription=" + - process.env.EVENT_SUBSCRIPTION, + process.env.EVENT_SUBSCRIPTION, ); return { close: async () => { diff --git a/src/event-bus/event-consumer/gcp-pubsub.ts b/src/event-bus/event-consumer/gcp-pubsub.ts index f5c0c81..c86600a 100644 --- a/src/event-bus/event-consumer/gcp-pubsub.ts +++ b/src/event-bus/event-consumer/gcp-pubsub.ts @@ -2,6 +2,7 @@ import * as timers from "node:timers/promises"; import { Message, PubSub, Subscription } from "@google-cloud/pubsub"; import { FastifyInstance } from "fastify"; import { EventConsumerBuilder } from "./interface"; +import { randomDelay } from "./utils"; /** * GCP Pub/Sub supports @@ -53,7 +54,7 @@ class Runner { public readonly instance: FastifyInstance, public readonly pubsub: PubSub, public readonly subName: string, - ) {} + ) { } async close(): Promise { this.ctrl.abort(); @@ -120,6 +121,7 @@ class Runner { msg.ack(); } else if (resp.statusCode === 429 || resp.statusCode === 409) { // rate-limited or lock-conflict + await randomDelay(); msg.nack(); } else if (resp.statusCode === 425 && attempt < 2) { const parsed = JSON.parse(resp.body); diff --git a/src/event-bus/event-consumer/rabbitmq.ts b/src/event-bus/event-consumer/rabbitmq.ts index 3a8aca6..bbe798b 100644 --- a/src/event-bus/event-consumer/rabbitmq.ts +++ b/src/event-bus/event-consumer/rabbitmq.ts @@ -4,6 +4,7 @@ import { ensureRabbitMqExchangesAndQueues, } from "../rabbitmq-utils"; import { EventConsumerBuilder } from "./interface"; +import { randomDelay } from "./utils"; /** * RabbitMq supports @@ -58,6 +59,7 @@ export const RabbitMqServiceBusConsumerBuilder: EventConsumerBuilder = async ( if (resp.statusCode >= 200 && resp.statusCode < 300) { return ConsumerStatus.ACK; } else if (resp.statusCode === 429 || resp.statusCode === 409) { + await randomDelay(); // rate-limited or lock-conflict return ConsumerStatus.REQUEUE; } else if (resp.statusCode === 425) { diff --git a/src/event-bus/event-consumer/utils.ts b/src/event-bus/event-consumer/utils.ts index 5b9c6a6..ebd4e75 100644 --- a/src/event-bus/event-consumer/utils.ts +++ b/src/event-bus/event-consumer/utils.ts @@ -8,6 +8,10 @@ const MAX_DELAY = Math.max( parseInt(process.env.EVENT_RETRY_MAX_DELAY ?? "0", 10) || 0, 60_000, ); +const RANDOMIZED_DELAY_MAX = Math.max( + parseInt(process.env.EVENT_RANDOMIZED_DELAY_MAX ?? "0", 10) || 0, + 10_000 +); export async function exponentialDelay( attempt: number, @@ -22,3 +26,8 @@ export async function exponentialDelay( } await timers.setTimeout(delay); } + +export async function randomDelay(max: number = RANDOMIZED_DELAY_MAX) { + const delay = Math.ceil(Math.random() * max); + await timers.setTimeout(delay); +} \ No newline at end of file