Skip to content

Commit 0dde0f8

Browse files
authored
fix: handle worker messages larger than 64 KB on Azure ACI (#3662)
1 parent 5549de1 commit 0dde0f8

File tree

2 files changed

+93
-2
lines changed
  • packages/artillery/lib/platform

2 files changed

+93
-2
lines changed

packages/artillery/lib/platform/aws-ecs/legacy/plugins/artillery-plugin-sqs-reporter/azure-aqs.js

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
// Non-evaluation use of Artillery on Azure requires a commercial license
55

66
const { QueueClient } = require('@azure/storage-queue');
7+
const { BlobServiceClient } = require('@azure/storage-blob');
78
const { DefaultAzureCredential } = require('@azure/identity');
9+
const { randomUUID } = require('node:crypto');
810

911
function getAQS() {
1012
return new QueueClient(
@@ -13,16 +15,65 @@ function getAQS() {
1315
);
1416
}
1517

16-
function sendMessage(queue, body, tags) {
18+
// Azure Queue Storage has a 64KB message limit
19+
// Use 60KB threshold to leave margin for encoding overhead
20+
const AQS_SIZE_LIMIT = 60 * 1024;
21+
22+
let blobContainerClient = null;
23+
24+
function getBlobClient() {
25+
if (!blobContainerClient) {
26+
const storageAccount = process.env.AZURE_STORAGE_ACCOUNT;
27+
const containerName = process.env.AZURE_STORAGE_BLOB_CONTAINER;
28+
if (!storageAccount || !containerName) {
29+
throw new Error(
30+
'AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_BLOB_CONTAINER must be set'
31+
);
32+
}
33+
const blobServiceClient = new BlobServiceClient(
34+
`https://${storageAccount}.blob.core.windows.net`,
35+
new DefaultAzureCredential()
36+
);
37+
blobContainerClient = blobServiceClient.getContainerClient(containerName);
38+
}
39+
return blobContainerClient;
40+
}
41+
42+
async function sendMessage(queue, body, tags) {
1743
const payload = JSON.stringify({
1844
payload: body,
19-
// attributes: this.tags
2045
attributes: tags.reduce((acc, tag) => {
2146
acc[tag.key] = tag.value;
2247
return acc;
2348
}, {})
2449
});
2550

51+
// Check if payload exceeds Azure Queue Storage limit
52+
if (Buffer.byteLength(payload, 'utf8') > AQS_SIZE_LIMIT) {
53+
// Upload to blob storage and send reference
54+
const testId = tags.find((t) => t.key === 'testId')?.value;
55+
const workerId = tags.find((t) => t.key === 'workerId')?.value;
56+
const messageId = randomUUID();
57+
const blobName = `tests/${testId}/overflow/${workerId}/${messageId}.json`;
58+
59+
const blobClient = getBlobClient().getBlockBlobClient(blobName);
60+
await blobClient.upload(payload, Buffer.byteLength(payload, 'utf8'));
61+
62+
// Send reference message
63+
const refPayload = JSON.stringify({
64+
payload: {
65+
_overflowRef: blobName,
66+
event: body.event
67+
},
68+
attributes: tags.reduce((acc, tag) => {
69+
acc[tag.key] = tag.value;
70+
return acc;
71+
}, {})
72+
});
73+
74+
return queue.sendMessage(refPayload);
75+
}
76+
2677
return queue.sendMessage(payload);
2778
}
2879

packages/artillery/lib/platform/az/aci.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,20 @@ const dotenv = require('dotenv');
2424
const fs = require('node:fs');
2525
const request = require('got');
2626

27+
// Helper to convert readable stream to string
28+
async function streamToString(readableStream) {
29+
return new Promise((resolve, reject) => {
30+
const chunks = [];
31+
readableStream.on('data', (data) => {
32+
chunks.push(data.toString());
33+
});
34+
readableStream.on('end', () => {
35+
resolve(chunks.join(''));
36+
});
37+
readableStream.on('error', reject);
38+
});
39+
}
40+
2741
class PlatformAzureACI {
2842
constructor(script, variablePayload, opts, platformOpts) {
2943
this.script = script;
@@ -257,6 +271,28 @@ class PlatformAzureACI {
257271
throw new Error('AQS message with an empty body');
258272
}
259273

274+
// Handle overflow messages stored in blob storage
275+
if (payload._overflowRef) {
276+
try {
277+
const blobClient =
278+
this.blobContainerClient.getBlockBlobClient(
279+
payload._overflowRef
280+
);
281+
const downloadResponse = await blobClient.download(0);
282+
const downloaded = await streamToString(
283+
downloadResponse.readableStreamBody
284+
);
285+
const fullMessage = JSON.parse(downloaded);
286+
payload = fullMessage.payload;
287+
attributes = fullMessage.attributes;
288+
} catch (blobErr) {
289+
console.error('Failed to fetch worker message:', blobErr);
290+
throw new Error(
291+
`Failed to fetch worker message: ${payload._overflowRef}`
292+
);
293+
}
294+
}
295+
260296
if (!attributes || !attributes.testId || !attributes.workerId) {
261297
throw new Error('AQS message with no testId or workerId');
262298
}
@@ -507,6 +543,10 @@ class PlatformAzureACI {
507543
name: 'AZURE_STORAGE_ACCOUNT',
508544
value: this.storageAccount
509545
},
546+
{
547+
name: 'AZURE_STORAGE_BLOB_CONTAINER',
548+
value: this.blobContainerName
549+
},
510550
{
511551
name: 'AZURE_SUBSCRIPTION_ID',
512552
secureValue: this.azureSubscriptionId

0 commit comments

Comments
 (0)