Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/event-bus/event-consumer/azure-servicebus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as AzureIden from "@azure/identity";
import { RetryMode, ServiceBusClient } from "@azure/service-bus";
import { EventConsumerBuilder } from "./interface";
import { exponentialDelay } from "./utils";
import { exponentialDelay, randomDelay } from "./utils";

/**
* Azure ServiceBus supports
Expand Down Expand Up @@ -77,6 +77,7 @@ export const AzureServiceBusConsumerBuilder: EventConsumerBuilder = async (
await receiver.completeMessage(msg);
} else if (resp.statusCode === 429 || resp.statusCode === 409) {
// rate-limited or lock-conflict
await randomDelay();
await receiver.abandonMessage(msg);
} else if (resp.statusCode === 425) {
// delayed message
Expand Down Expand Up @@ -118,7 +119,7 @@ export const AzureServiceBusConsumerBuilder: EventConsumerBuilder = async (
);
instance.log.info(
"Attached to Azure ServiceBus Subscription=" +
process.env.EVENT_SUBSCRIPTION,
process.env.EVENT_SUBSCRIPTION,
);
return {
close: async () => {
Expand Down
4 changes: 3 additions & 1 deletion src/event-bus/event-consumer/gcp-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as timers from "node:timers/promises";
import { Message, PubSub, Subscription } from "@google-cloud/pubsub";
import { FastifyInstance } from "fastify";
import { EventConsumerBuilder } from "./interface";
import { randomDelay } from "./utils";

/**
* GCP Pub/Sub supports
Expand Down Expand Up @@ -53,7 +54,7 @@ class Runner {
public readonly instance: FastifyInstance,
public readonly pubsub: PubSub,
public readonly subName: string,
) {}
) { }

async close(): Promise<void> {
this.ctrl.abort();
Expand Down Expand Up @@ -120,6 +121,7 @@ class Runner {
msg.ack();
} else if (resp.statusCode === 429 || resp.statusCode === 409) {
// rate-limited or lock-conflict
await randomDelay();
msg.nack();
} else if (resp.statusCode === 425 && attempt < 2) {
const parsed = JSON.parse(resp.body);
Expand Down
2 changes: 2 additions & 0 deletions src/event-bus/event-consumer/rabbitmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
ensureRabbitMqExchangesAndQueues,
} from "../rabbitmq-utils";
import { EventConsumerBuilder } from "./interface";
import { randomDelay } from "./utils";

/**
* RabbitMq supports
Expand Down Expand Up @@ -58,6 +59,7 @@ export const RabbitMqServiceBusConsumerBuilder: EventConsumerBuilder = async (
if (resp.statusCode >= 200 && resp.statusCode < 300) {
return ConsumerStatus.ACK;
} else if (resp.statusCode === 429 || resp.statusCode === 409) {
await randomDelay();
// rate-limited or lock-conflict
return ConsumerStatus.REQUEUE;
} else if (resp.statusCode === 425) {
Expand Down
9 changes: 9 additions & 0 deletions src/event-bus/event-consumer/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ const MAX_DELAY = Math.max(
parseInt(process.env.EVENT_RETRY_MAX_DELAY ?? "0", 10) || 0,
60_000,
);
const RANDOMIZED_DELAY_MAX = Math.max(
parseInt(process.env.EVENT_RANDOMIZED_DELAY_MAX ?? "0", 10) || 0,
10_000
);

export async function exponentialDelay(
attempt: number,
Expand All @@ -22,3 +26,8 @@ export async function exponentialDelay(
}
await timers.setTimeout(delay);
}

export async function randomDelay(max: number = RANDOMIZED_DELAY_MAX) {
const delay = Math.ceil(Math.random() * max);
await timers.setTimeout(delay);
}