From d08820469aa4dda4a7eaf24ef5cde4acfb4fbdaf Mon Sep 17 00:00:00 2001 From: Sergey Konovalov Date: Mon, 14 Apr 2025 17:41:56 +0300 Subject: [PATCH] [feature] Refactor downloadUrlPromise --- Common/config/default.json | 11 +- Common/sources/utils.js | 234 ++++++++++------- DocService/sources/canvasservice.js | 11 +- .../withServerInstance/request.tests.js | 248 ++++++++++++------ 4 files changed, 318 insertions(+), 186 deletions(-) diff --git a/Common/config/default.json b/Common/config/default.json index 7597e968..6462e77c 100644 --- a/Common/config/default.json +++ b/Common/config/default.json @@ -316,13 +316,12 @@ "forceSaveUsingButtonWithoutChanges": false }, "requestDefaults": { - "headers": { - "User-Agent": "Node.js/6.13", - "Connection": "Keep-Alive" - }, - "gzip": true, + "headers": { + "User-Agent": "Node.js/6.13", + "Connection": "Keep-Alive" + }, "rejectUnauthorized": true - }, + }, "autoAssembly": { "enable": false, "interval": "5m", diff --git a/Common/sources/utils.js b/Common/sources/utils.js index a55440ec..6e1ed5ab 100644 --- a/Common/sources/utils.js +++ b/Common/sources/utils.js @@ -36,6 +36,8 @@ require("tls").DEFAULT_ECDH_CURVE = "auto"; const { pipeline } = require('node:stream/promises'); +const { buffer } = require('node:stream/consumers'); +const { Transform } = require('stream'); var config = require('config'); var fs = require('fs'); var path = require('path'); @@ -108,10 +110,6 @@ function getIpFilterRule(address) { } const pemfileCache = new NodeCache({stdTTL: ms(cfgExpPemStdTtl) / 1000, checkperiod: ms(cfgExpPemCheckPeriod) / 1000, errorOnMissing: false, useClones: true}); -function getRequestFilterAgent(url, options) { - return url.startsWith("https") ? new RequestFilteringHttpsAgent(options) : new RequestFilteringHttpAgent(options); -} - exports.getConvertionTimeout = function(opt_ctx) { if (opt_ctx) { const tenVisibilityTimeout = opt_ctx.getCfg('queue.visibilityTimeout', cfgVisibilityTimeout); @@ -275,7 +273,8 @@ function raiseErrorObj(ro, error) { ro.emit('error', error); } function isRedirectResponse(response) { - return response && response.status >= 300 && response.status < 400 && Object.keys(response.headers).some(key => key.toLowerCase() === 'location'); + //All header names are lower cased and can be accessed using the bracket notation. + return response && response.status >= 300 && response.status < 400 && !!response.headers['location']; } function isAllowDirectRequest(ctx, uri, isInJwtToken) { @@ -294,9 +293,8 @@ function isAllowDirectRequest(ctx, uri, isInJwtToken) { } return res; } -function addExternalRequestOptions(ctx, uri, isInJwtToken, options) { +function addExternalRequestOptions(ctx, uri, isInJwtToken, options, httpAgentOptions, httpsAgentOptions) { let res = false; - const tenTenantRequestDefaults = ctx.getCfg('services.CoAuthoring.requestDefaults', cfgRequestDefaults); const tenExternalRequestAction = ctx.getCfg('externalRequest.action', cfgExternalRequestAction); const tenRequestFilteringAgent = ctx.getCfg('services.CoAuthoring.request-filtering-agent', cfgRequesFilteringAgent); if (isAllowDirectRequest(ctx, uri, isInJwtToken)) { @@ -304,34 +302,28 @@ function addExternalRequestOptions(ctx, uri, isInJwtToken, options) { } else if (tenExternalRequestAction.allow) { res = true; if (tenExternalRequestAction.blockPrivateIP) { - const agentOptions = { - ...https.globalAgent.options, + options.httpsAgent = new RequestFilteringHttpsAgent({ + ...httpsAgentOptions, ...tenRequestFilteringAgent - }; - - if (tenExternalRequestAction.proxyUrl) { - const proxyUrl = tenExternalRequestAction.proxyUrl; - const parsedProxyUrl = url.parse(proxyUrl); - - agentOptions.host = parsedProxyUrl.hostname; - agentOptions.port = parsedProxyUrl.port; - agentOptions.protocol = parsedProxyUrl.protocol; - } - - if (tenTenantRequestDefaults.forever !== undefined) { - agentOptions.keepAlive = !!tenTenantRequestDefaults.forever; - } - - if (uri.startsWith('https:')) { - options.httpsAgent = new RequestFilteringHttpsAgent(agentOptions); - } else { - options.httpAgent = new RequestFilteringHttpAgent(agentOptions); - } + }); + options.httpAgent = new RequestFilteringHttpAgent({ + ...httpAgentOptions, + ...tenRequestFilteringAgent + }); } + if (tenExternalRequestAction.proxyUrl) { + const proxyUrl = tenExternalRequestAction.proxyUrl; + const parsedProxyUrl = url.parse(proxyUrl); + + options.proxy.host = parsedProxyUrl.hostname; + options.proxy.port = parsedProxyUrl.port; + options.proxy.protocol = parsedProxyUrl.protocol; + } + if (tenExternalRequestAction.proxyUser?.username) { - const user = tenExternalRequestAction.proxyUser.username; - const pass = tenExternalRequestAction.proxyUser.password || ''; - options.headers['proxy-authorization'] = `${user}:${pass}`; + //This will set an `Proxy-Authorization` header, overwriting any existing + //`Proxy-Authorization` custom headers you have set using `headers`. + options.proxy.auth = tenExternalRequestAction.proxyUser; } if (tenExternalRequestAction.proxyHeaders) { options.headers = { @@ -342,37 +334,59 @@ function addExternalRequestOptions(ctx, uri, isInJwtToken, options) { } return res; } - -async function downloadUrlPromise(ctx, uri, optTimeout, optLimit, opt_Authorization, opt_filterPrivate, opt_headers, opt_streamWriter) { +/* + * @param {object} options - The options object to modify. + */ +function changeOptionsForCompatibilityWithRequest(options, httpAgentOptions, httpsAgentOptions) { + if (false === options.followRedirect) { + options.maxRedirects = 0; + } + if (false === options.gzip) { + options.headers = { ...options.headers, 'Accept-Encoding': 'identity' }; + delete options.gzip; + } + if (options.forever !== undefined) { + httpAgentOptions.keepAlive = !!options.forever; + httpsAgentOptions.keepAlive = !!options.forever; + } +} +/* + * Download a URL and return the response. + * @param {operationContext.Context} ctx - The operation context. + * @param {string} uri - The URL to download. + * @param {object} optTimeout - Optional timeout configuration. + * @param {number} optLimit - Optional limit on the size of the response. + * @param {string} opt_Authorization - Optional authorization header. + * @param {boolean} opt_filterPrivate - Optional flag to filter private requests. + * @param {object} opt_headers - Optional headers to include in the request. + * @param {boolean} opt_returnStream - Optional flag to return stream. + * @returns {Promise<{response: axios.AxiosResponse, sha256: string|null, body: Buffer|null, stream: NodeJS.ReadableStream|null}>} - A promise that resolves to object containing response, sha256 hash, and body (null if opt_streamWriter is provided). + */ +async function downloadUrlPromise(ctx, uri, optTimeout, optLimit, opt_Authorization, opt_filterPrivate, opt_headers, opt_returnStream) { const tenTenantRequestDefaults = ctx.getCfg('services.CoAuthoring.requestDefaults', cfgRequestDefaults); const tenTokenOutboxHeader = ctx.getCfg('services.CoAuthoring.token.outbox.header', cfgTokenOutboxHeader); const tenTokenOutboxPrefix = ctx.getCfg('services.CoAuthoring.token.outbox.prefix', cfgTokenOutboxPrefix); - const maxRedirects = (undefined !== tenTenantRequestDefaults.maxRedirects) ? tenTenantRequestDefaults.maxRedirects : 10; - const followRedirect = (undefined !== tenTenantRequestDefaults.followRedirect) ? tenTenantRequestDefaults.followRedirect : true; - const sizeLimit = optLimit || Number.MAX_VALUE; + let sizeLimit = optLimit || Number.MAX_VALUE; uri = URI.serialize(URI.parse(uri)); - const connectionAndInactivity = optTimeout?.connectionAndInactivity ? ms(optTimeout.connectionAndInactivity) : undefined; const options = config.util.cloneDeep(tenTenantRequestDefaults); - if (options.gzip !== undefined && !options.gzip) { - options.headers = options.headers || {}; - options.headers['Accept-Encoding'] = 'identity'; - delete options.gzip; + + //baseRequest creates new agent(win-ca injects in globalAgent) + const httpsAgentOptions = { ...https.globalAgent.options, ...options}; + const httpAgentOptions = { ...http.globalAgent.options, ...options}; + changeOptionsForCompatibilityWithRequest(options, httpAgentOptions, httpsAgentOptions); + + if (optTimeout.connectionAndInactivity) { + httpAgentOptions.timeout = ms(optTimeout.connectionAndInactivity); + httpsAgentOptions.timeout = ms(optTimeout.connectionAndInactivity); } - if (!exports.addExternalRequestOptions(ctx, uri, opt_filterPrivate, options)) { + + if (!addExternalRequestOptions(ctx, uri, opt_filterPrivate, options, httpAgentOptions, httpsAgentOptions)) { throw new Error('Block external request. See externalRequest config options'); } - const protocol = new URL(uri).protocol; - if (!options.httpsAgent && !options.httpAgent) { - const agentOptions = { ...https.globalAgent.options, rejectUnauthorized: tenTenantRequestDefaults.rejectUnauthorized === false? false : true}; - if (tenTenantRequestDefaults.forever !== undefined) { - agentOptions.keepAlive = !!tenTenantRequestDefaults.forever; - } - if (protocol === 'https:') { - options.httpsAgent = new https.Agent(agentOptions); - } else if (protocol === 'http:') { - options.httpAgent = new http.Agent(agentOptions); - } + if (!options.httpsAgent || !options.httpAgent) { + options.httpsAgent = new https.Agent(httpsAgentOptions); + options.httpAgent = new http.Agent(httpAgentOptions); } const headers = { ...options.headers }; @@ -389,18 +403,16 @@ async function downloadUrlPromise(ctx, uri, optTimeout, optLimit, opt_Authorizat method: 'GET', responseType: 'stream', headers, - maxRedirects: followRedirect ? maxRedirects : 0, - timeout: connectionAndInactivity, validateStatus: (status) => status >= 200 && status < 300, - cancelToken: new axios.CancelToken(cancel => { - if (optTimeout?.wholeCycle) { - setTimeout(() => { - cancel(`ETIMEDOUT: ${optTimeout.wholeCycle}`); - }, ms(optTimeout.wholeCycle)); - } - }), + signal: optTimeout.wholeCycle && AbortSignal.timeout ? AbortSignal.timeout(ms(optTimeout.wholeCycle)) : undefined, + // cancelToken: new axios.CancelToken(cancel => { + // if (optTimeout?.wholeCycle) { + // setTimeout(() => { + // cancel(`ETIMEDOUT: ${optTimeout.wholeCycle}`); + // }, ms(optTimeout.wholeCycle)); + // } + // }), }; - try { const response = await axios(axiosConfig); const { status, headers } = response; @@ -413,34 +425,31 @@ async function downloadUrlPromise(ctx, uri, optTimeout, optLimit, opt_Authorizat const contentLength = headers['content-length']; if (contentLength && parseInt(contentLength) > sizeLimit) { - throw new Error('EMSGSIZE: Error response: content-length:' + contentLength); - } - - return await processResponseStream(ctx, { - response, - sizeLimit, - uri, - opt_streamWriter, - contentLength, - timeout: optTimeout?.wholeCycle ? ms(optTimeout.wholeCycle) : null - }); - } catch (err) { - if (axios.isCancel(err)) { - const error = new Error(err.message); - error.code = 'ETIMEDOUT'; + // Close the stream to prevent downloading + const error = new Error('EMSGSIZE: Error response: content-length:' + contentLength); + error.code = 'EMSGSIZE'; + response.data.destroy(error); throw error; } - - if (err.response) { - if (opt_streamWriter && !isRedirectResponse(err.response)) { - delete err.response.headers['set-cookie']; - return processErrorResponseStream(err.response, { - sizeLimit, - opt_streamWriter, - timeout: optTimeout?.wholeCycle ? ms(optTimeout.wholeCycle) : null - }); - } + const limitedStream = new SizeLimitStream(optLimit); + if (opt_returnStream) { + // When returning a stream, we'll return the response for the caller to handle streaming + // The content-length check is already done above + return { response, sha256: null, body: null, stream: response.data.pipe(limitedStream) }; } + + const body = await pipeline(response.data, limitedStream, buffer); + const sha256 = crypto.createHash('sha256').update(body).digest('hex'); + return { response, sha256, body, stream: null }; + } catch (err) { + if('ERR_CANCELED' === err.code) { + err.code = 'ETIMEDOUT'; + } + // if (axios.isCancel(err)) { + // const error = new Error(err.message); + // error.code = 'ETIMEDOUT'; + // throw error; + // } throw err; } } @@ -551,12 +560,11 @@ async function postRequestPromise(ctx, uri, postData, postDataStream, postDataSi }); if (options.gzip !== undefined && !options.gzip) { - options.headers = options.headers || {}; - options.headers['Accept-Encoding'] = 'identity'; + options.headers = { ...options.headers, 'Accept-Encoding': 'identity' }; delete options.gzip; } - if (!addExternalRequestOptions(ctx, uri, opt_isInJwtToken, options)) { + if (!addExternalRequestOptions(ctx, uri, opt_isInJwtToken, options, http.globalAgent.options, https.globalAgent.options)) { throw new Error('Block external request. See externalRequest config options'); } const protocol = new URL(uri).protocol; @@ -632,7 +640,6 @@ async function postRequestPromise(ctx, uri, postData, postDataStream, postDataSi } exports.postRequestPromise = postRequestPromise; exports.downloadUrlPromise = downloadUrlPromise; -exports.addExternalRequestOptions = addExternalRequestOptions; exports.mapAscServerErrorToOldError = function(error) { var res = -1; switch (error) { @@ -1306,6 +1313,47 @@ function getMonthDiff(d1, d2) { return months; } exports.getMonthDiff = getMonthDiff; + +/** + * A Transform stream that limits the size of data passing through it. + * It will throw an EMSGSIZE error if the size exceeds the limit. + * + * @class SizeLimitStream + * @extends {Transform} + */ +class SizeLimitStream extends Transform { + /** + * Creates an instance of SizeLimitStream. + * @param {number} sizeLimit - Maximum size in bytes that can pass through the stream + * @memberof SizeLimitStream + */ + constructor(sizeLimit) { + super(); + this.sizeLimit = sizeLimit; + this.bytesReceived = 0; + } + + /** + * Transform implementation that tracks the bytes received and enforces the size limit + * + * @param {Buffer|string} chunk - The chunk of data to process + * @param {string} encoding - The encoding of the chunk if it's a string + * @param {Function} callback - Called when processing is complete + * @memberof SizeLimitStream + */ + _transform(chunk, encoding, callback) { + this.bytesReceived += chunk.length; + + if (this.sizeLimit && this.bytesReceived > this.sizeLimit) { + const error = new Error(`EMSGSIZE: Response too large: ${this.bytesReceived} bytes (limit: ${this.sizeLimit} bytes)`); + error.code = 'EMSGSIZE'; + callback(error); + return; + } + + callback(null, chunk); + } +} exports.getLicensePeriod = function(startDate, now) { startDate = new Date(startDate.getTime());//clone startDate.addMonths(getMonthDiff(startDate, now)); diff --git a/DocService/sources/canvasservice.js b/DocService/sources/canvasservice.js index 845dcab8..95fca8e0 100644 --- a/DocService/sources/canvasservice.js +++ b/DocService/sources/canvasservice.js @@ -34,6 +34,8 @@ const crypto = require('crypto'); var pathModule = require('path'); var urlModule = require('url'); +const { pipeline } = require('node:stream/promises'); +const { buffer } = require('node:stream/consumers'); var co = require('co'); const ms = require('ms'); const retry = require('retry'); @@ -1746,7 +1748,14 @@ exports.downloadFile = function(req, res) { headers['Range'] = req.get('Range'); } - yield utils.downloadUrlPromise(ctx, url, tenDownloadTimeout, tenDownloadMaxBytes, authorization, isInJwtToken, headers, res); + const { response, stream } = yield utils.downloadUrlPromise(ctx, url, tenDownloadTimeout, tenDownloadMaxBytes, authorization, isInJwtToken, headers, true); + //Set-Cookie resets browser session + delete response.headers['set-cookie']; + // Set the response headers to match the target response + res.set(response.headers); + + // Use pipeline to pipe the response data to the client + yield pipeline(stream, res); } if (clientStatsD) { diff --git a/tests/integration/withServerInstance/request.tests.js b/tests/integration/withServerInstance/request.tests.js index 0ff42804..c137f0c8 100644 --- a/tests/integration/withServerInstance/request.tests.js +++ b/tests/integration/withServerInstance/request.tests.js @@ -1,6 +1,8 @@ const { describe, test, expect, beforeAll, afterAll } = require('@jest/globals'); const { Writable, Readable } = require('stream'); +const { buffer } = require('node:stream/consumers'); const http = require('http'); +const https = require('https'); const express = require('express'); const operationContext = require('../../../Common/sources/operationContext'); const utils = require('../../../Common/sources/utils'); @@ -16,16 +18,6 @@ let testServer; const PORT = 3456; const BASE_URL = `http://localhost:${PORT}`; -// Helper to create a writable stream for testing -const createMockWriter = () => { - const chunks = []; - return new Writable({ - write(chunk, encoding, callback) { - chunks.push(chunk); - callback(); - } - }); -}; const getStatusCode = (response) => response.statusCode || response.status; @@ -101,14 +93,6 @@ describe('HTTP Request Integration Tests', () => { res.json({ success: true }); }); - // Endpoint that streams data - app.get('/api/stream', (req, res) => { - res.setHeader('content-type', 'application/octet-stream'); - res.setHeader('content-length', '1024'); - const buffer = Buffer.alloc(1024); - res.send(buffer); - }); - // Endpoint that simulates timeout app.get('/api/timeout', (req, res) => { // Never send response to trigger timeout @@ -144,17 +128,18 @@ describe('HTTP Request Integration Tests', () => { res.send(binaryData); }); - // Large file endpoint app.get('/api/large', (req, res) => { - res.setHeader('content-length', '2097152'); // 2MB res.setHeader('content-type', 'application/octet-stream'); - const buffer = Buffer.alloc(2097152); - res.send(buffer); + res.send(Buffer.alloc(2*1024*1024));//2MB }); - app.get('/api/headers', (req, res) => { - // Ensure you're only sending headers, which won't contain circular references - res.json({ headers: req.headers }); + + // Large file endpoint with truly no content-length header + app.get('/api/large-chunked', (req, res) => { + res.setHeader('content-type', 'application/octet-stream'); + res.setHeader('transfer-encoding', 'chunked'); + res.write(Buffer.alloc(2*1024*1024)); + res.end(); }); // Endpoint that returns connection header info @@ -180,7 +165,29 @@ describe('HTTP Request Integration Tests', () => { keepAlive: connectionHeader.toLowerCase() === 'keep-alive' }); }); - + + // Endpoint that mirrors whole request - handles any HTTP method + app.use('/api/mirror', express.json(), express.urlencoded({ extended: true }), (req, res) => { + // Create a mirror response object with all request details + const mirror = { + method: req.method, + url: req.url, + path: req.path, + query: req.query, + params: req.params, + headers: req.headers, + body: req.body, + protocol: req.protocol, + ip: req.ip, + hostname: req.hostname, + originalUrl: req.originalUrl, + xhr: req.xhr, + secure: req.secure + }; + + // Send the mirror response back + res.json(mirror); + }); // Start server server = http.createServer(app); @@ -210,47 +217,43 @@ describe('HTTP Request Integration Tests', () => { expect(JSON.parse(result.body.toString())).toEqual({ success: true }); }); - test('handles streaming with writer', async () => { - const mockStreamWriter = createMockWriter(); - - const result = await utils.downloadUrlPromise( - ctx, - `${BASE_URL}/api/stream`, - { wholeCycle: '5s', connectionAndInactivity: '3s' }, - 1024 * 1024, - null, - false, - null, - mockStreamWriter - ); - - expect(result).toBeUndefined(); - }); - test('throws error on timeout', async () => { - await expect(utils.downloadUrlPromise( - ctx, - `${BASE_URL}/api/timeout`, - { wholeCycle: '1s', connectionAndInactivity: '500ms' }, - 1024 * 1024, - null, - false, - null, - null - )).rejects.toThrow(/(?:ESOCKETTIMEDOUT|timeout of 500ms exceeded)/); + try { + await utils.downloadUrlPromise( + ctx, + `${BASE_URL}/api/timeout`, + { wholeCycle: '1s', connectionAndInactivity: '500ms' }, + 1024 * 1024, + null, + false, + null, + null + ); + fail('Expected an error to be thrown'); + } catch (error) { + expect(error.message).toContain('canceled'); + expect(error.code).toBe('ETIMEDOUT'); + } }); test('throws error on wholeCycle timeout', async () => { - await expect(utils.downloadUrlPromise( - ctx, - `${BASE_URL}/api/timeout`, - { wholeCycle: '1s', connectionAndInactivity: '5000ms' }, - 1024 * 1024, - null, - false, - null, - null - )).rejects.toThrow(/(?:ESOCKETTIMEDOUT|ETIMEDOUT: 1s|whole request cycle timeout: 1s)/); + try { + await utils.downloadUrlPromise( + ctx, + `${BASE_URL}/api/timeout`, + { wholeCycle: '1s', connectionAndInactivity: '5000ms' }, + 1024 * 1024, + null, + false, + null, + null + ); + fail('Expected an error to be thrown'); + } catch (error) { + expect(error.message).toContain('canceled'); + expect(error.code).toBe('ETIMEDOUT'); + } + }); test('follows redirects correctly', async () => { @@ -354,16 +357,74 @@ describe('HTTP Request Integration Tests', () => { }); test('throws error when content-length exceeds limit', async () => { - await expect(utils.downloadUrlPromise( - ctx, - `${BASE_URL}/api/large`, - { wholeCycle: '5s', connectionAndInactivity: '3s' }, - 1024 * 1024, - null, - false, - null, - null - )).rejects.toThrow('Error response: content-length:2097152'); + try { + await utils.downloadUrlPromise( + ctx, + `${BASE_URL}/api/large`, + { wholeCycle: '5s', connectionAndInactivity: '3s' }, + 1024 * 1024, + null, + false, + null + ); + fail('Expected an error to be thrown'); + } catch (error) { + expect(error.message).toContain('EMSGSIZE:'); + expect(error.code).toBe('EMSGSIZE'); + } + + try { + await utils.downloadUrlPromise( + ctx, + `${BASE_URL}/api/large-chunked`, + { wholeCycle: '5s', connectionAndInactivity: '3s' }, + 1024 * 1024, + null, + false, + null + ); + fail('Expected an error to be thrown'); + } catch (error) { + expect(error.message).toContain('EMSGSIZE:'); + expect(error.code).toBe('EMSGSIZE'); + } + }); + + test('throws error when content-length exceeds limit with stream', async () => { + try { + const {stream} = await utils.downloadUrlPromise( + ctx, + `${BASE_URL}/api/large`, + { wholeCycle: '5s', connectionAndInactivity: '3s' }, + 1024 * 1024, + null, + false, + null, + true + ); + const receivedData = await buffer(stream); + fail('Expected an error to be thrown'); + } catch (error) { + expect(error.message).toContain('EMSGSIZE:'); + expect(error.code).toBe('EMSGSIZE'); + } + try { + const {stream} = await utils.downloadUrlPromise( + ctx, + `${BASE_URL}/api/large-chunked`, + { wholeCycle: '5s', connectionAndInactivity: '3s' }, + 1024 * 1024, + null, + false, + null, + true + ); + const receivedData = await buffer(stream); + fail('Expected an error to be thrown'); + } catch (error) { + expect(error.message).toContain('EMSGSIZE:'); + expect(error.code).toBe('EMSGSIZE'); + } }); test('enables compression when gzip is true', async () => { @@ -522,6 +583,31 @@ describe('HTTP Request Integration Tests', () => { // so we're checking that keepAlive is false expect(responseData.keepAlive).toBe(false); }); + + test('test requestDefaults', async () => { + const defaultHeaders = {"user-agent": "Node.js/6.13"}; + const mockCtx = createMockContext({ + 'services.CoAuthoring.requestDefaults': { + headers: defaultHeaders + } + }); + let customHeaders = {"custom-header": "test-value", "set-cookie": ["cookie"]}; + let customQueryParams = {"custom-query-param": "value"}; + const result = await utils.downloadUrlPromise( + mockCtx, + `${BASE_URL}/api/mirror?${new URLSearchParams(customQueryParams).toString()}`, + { wholeCycle: '5s', connectionAndInactivity: '3s' }, + 1024 * 1024, + null, + false, + customHeaders + ); + expect(result).toBeDefined(); + expect(result.response.status).toBe(200); + const body = JSON.parse(result.body); + expect(body.headers).toMatchObject({...defaultHeaders, ...customHeaders}); + expect(body.query).toMatchObject(customQueryParams); + }); }); test('handles binary data correctly', async () => { @@ -551,15 +637,7 @@ describe('HTTP Request Integration Tests', () => { }); test('handles binary data with stream writer', async () => { - const chunks = []; - const mockStreamWriter = new Writable({ - write(chunk, encoding, callback) { - chunks.push(chunk); - callback(); - } - }); - - await utils.downloadUrlPromise( + const { stream } = await utils.downloadUrlPromise( ctx, `${BASE_URL}/api/binary`, { wholeCycle: '5s', connectionAndInactivity: '3s' }, @@ -567,11 +645,9 @@ describe('HTTP Request Integration Tests', () => { null, false, null, - mockStreamWriter + true ); - - // Combine chunks and verify - const receivedData = Buffer.concat(chunks); + const receivedData = await buffer(stream); const expectedData = Buffer.from([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]); expect(Buffer.isBuffer(receivedData)).toBe(true);