[feature] Refactor downloadUrlPromise

This commit is contained in:
Sergey Konovalov
2025-04-14 17:41:56 +03:00
parent 47399233bb
commit 4d735703d7
4 changed files with 318 additions and 186 deletions

View File

@ -316,13 +316,12 @@
"forceSaveUsingButtonWithoutChanges": false
},
"requestDefaults": {
"headers": {
"User-Agent": "Node.js/6.13",
"Connection": "Keep-Alive"
},
"gzip": true,
"headers": {
"User-Agent": "Node.js/6.13",
"Connection": "Keep-Alive"
},
"rejectUnauthorized": true
},
},
"autoAssembly": {
"enable": false,
"interval": "5m",

View File

@ -36,6 +36,8 @@
require("tls").DEFAULT_ECDH_CURVE = "auto";
const { pipeline } = require('node:stream/promises');
const { buffer } = require('node:stream/consumers');
const { Transform } = require('stream');
var config = require('config');
var fs = require('fs');
var path = require('path');
@ -108,10 +110,6 @@ function getIpFilterRule(address) {
}
const pemfileCache = new NodeCache({stdTTL: ms(cfgExpPemStdTtl) / 1000, checkperiod: ms(cfgExpPemCheckPeriod) / 1000, errorOnMissing: false, useClones: true});
function getRequestFilterAgent(url, options) {
return url.startsWith("https") ? new RequestFilteringHttpsAgent(options) : new RequestFilteringHttpAgent(options);
}
exports.getConvertionTimeout = function(opt_ctx) {
if (opt_ctx) {
const tenVisibilityTimeout = opt_ctx.getCfg('queue.visibilityTimeout', cfgVisibilityTimeout);
@ -275,7 +273,8 @@ function raiseErrorObj(ro, error) {
ro.emit('error', error);
}
function isRedirectResponse(response) {
return response && response.status >= 300 && response.status < 400 && Object.keys(response.headers).some(key => key.toLowerCase() === 'location');
//All header names are lower cased and can be accessed using the bracket notation.
return response && response.status >= 300 && response.status < 400 && !!response.headers['location'];
}
function isAllowDirectRequest(ctx, uri, isInJwtToken) {
@ -294,9 +293,8 @@ function isAllowDirectRequest(ctx, uri, isInJwtToken) {
}
return res;
}
function addExternalRequestOptions(ctx, uri, isInJwtToken, options) {
function addExternalRequestOptions(ctx, uri, isInJwtToken, options, httpAgentOptions, httpsAgentOptions) {
let res = false;
const tenTenantRequestDefaults = ctx.getCfg('services.CoAuthoring.requestDefaults', cfgRequestDefaults);
const tenExternalRequestAction = ctx.getCfg('externalRequest.action', cfgExternalRequestAction);
const tenRequestFilteringAgent = ctx.getCfg('services.CoAuthoring.request-filtering-agent', cfgRequesFilteringAgent);
if (isAllowDirectRequest(ctx, uri, isInJwtToken)) {
@ -304,34 +302,28 @@ function addExternalRequestOptions(ctx, uri, isInJwtToken, options) {
} else if (tenExternalRequestAction.allow) {
res = true;
if (tenExternalRequestAction.blockPrivateIP) {
const agentOptions = {
...https.globalAgent.options,
options.httpsAgent = new RequestFilteringHttpsAgent({
...httpsAgentOptions,
...tenRequestFilteringAgent
};
if (tenExternalRequestAction.proxyUrl) {
const proxyUrl = tenExternalRequestAction.proxyUrl;
const parsedProxyUrl = url.parse(proxyUrl);
agentOptions.host = parsedProxyUrl.hostname;
agentOptions.port = parsedProxyUrl.port;
agentOptions.protocol = parsedProxyUrl.protocol;
}
if (tenTenantRequestDefaults.forever !== undefined) {
agentOptions.keepAlive = !!tenTenantRequestDefaults.forever;
}
if (uri.startsWith('https:')) {
options.httpsAgent = new RequestFilteringHttpsAgent(agentOptions);
} else {
options.httpAgent = new RequestFilteringHttpAgent(agentOptions);
}
});
options.httpAgent = new RequestFilteringHttpAgent({
...httpAgentOptions,
...tenRequestFilteringAgent
});
}
if (tenExternalRequestAction.proxyUrl) {
const proxyUrl = tenExternalRequestAction.proxyUrl;
const parsedProxyUrl = url.parse(proxyUrl);
options.proxy.host = parsedProxyUrl.hostname;
options.proxy.port = parsedProxyUrl.port;
options.proxy.protocol = parsedProxyUrl.protocol;
}
if (tenExternalRequestAction.proxyUser?.username) {
const user = tenExternalRequestAction.proxyUser.username;
const pass = tenExternalRequestAction.proxyUser.password || '';
options.headers['proxy-authorization'] = `${user}:${pass}`;
//This will set an `Proxy-Authorization` header, overwriting any existing
//`Proxy-Authorization` custom headers you have set using `headers`.
options.proxy.auth = tenExternalRequestAction.proxyUser;
}
if (tenExternalRequestAction.proxyHeaders) {
options.headers = {
@ -342,37 +334,59 @@ function addExternalRequestOptions(ctx, uri, isInJwtToken, options) {
}
return res;
}
async function downloadUrlPromise(ctx, uri, optTimeout, optLimit, opt_Authorization, opt_filterPrivate, opt_headers, opt_streamWriter) {
/*
* @param {object} options - The options object to modify.
*/
function changeOptionsForCompatibilityWithRequest(options, httpAgentOptions, httpsAgentOptions) {
if (false === options.followRedirect) {
options.maxRedirects = 0;
}
if (false === options.gzip) {
options.headers = { ...options.headers, 'Accept-Encoding': 'identity' };
delete options.gzip;
}
if (options.forever !== undefined) {
httpAgentOptions.keepAlive = !!options.forever;
httpsAgentOptions.keepAlive = !!options.forever;
}
}
/*
* Download a URL and return the response.
* @param {operationContext.Context} ctx - The operation context.
* @param {string} uri - The URL to download.
* @param {object} optTimeout - Optional timeout configuration.
* @param {number} optLimit - Optional limit on the size of the response.
* @param {string} opt_Authorization - Optional authorization header.
* @param {boolean} opt_filterPrivate - Optional flag to filter private requests.
* @param {object} opt_headers - Optional headers to include in the request.
* @param {boolean} opt_returnStream - Optional flag to return stream.
* @returns {Promise<{response: axios.AxiosResponse, sha256: string|null, body: Buffer|null, stream: NodeJS.ReadableStream|null}>} - A promise that resolves to object containing response, sha256 hash, and body (null if opt_streamWriter is provided).
*/
async function downloadUrlPromise(ctx, uri, optTimeout, optLimit, opt_Authorization, opt_filterPrivate, opt_headers, opt_returnStream) {
const tenTenantRequestDefaults = ctx.getCfg('services.CoAuthoring.requestDefaults', cfgRequestDefaults);
const tenTokenOutboxHeader = ctx.getCfg('services.CoAuthoring.token.outbox.header', cfgTokenOutboxHeader);
const tenTokenOutboxPrefix = ctx.getCfg('services.CoAuthoring.token.outbox.prefix', cfgTokenOutboxPrefix);
const maxRedirects = (undefined !== tenTenantRequestDefaults.maxRedirects) ? tenTenantRequestDefaults.maxRedirects : 10;
const followRedirect = (undefined !== tenTenantRequestDefaults.followRedirect) ? tenTenantRequestDefaults.followRedirect : true;
const sizeLimit = optLimit || Number.MAX_VALUE;
let sizeLimit = optLimit || Number.MAX_VALUE;
uri = URI.serialize(URI.parse(uri));
const connectionAndInactivity = optTimeout?.connectionAndInactivity ? ms(optTimeout.connectionAndInactivity) : undefined;
const options = config.util.cloneDeep(tenTenantRequestDefaults);
if (options.gzip !== undefined && !options.gzip) {
options.headers = options.headers || {};
options.headers['Accept-Encoding'] = 'identity';
delete options.gzip;
//baseRequest creates new agent(win-ca injects in globalAgent)
const httpsAgentOptions = { ...https.globalAgent.options, ...options};
const httpAgentOptions = { ...http.globalAgent.options, ...options};
changeOptionsForCompatibilityWithRequest(options, httpAgentOptions, httpsAgentOptions);
if (optTimeout.connectionAndInactivity) {
httpAgentOptions.timeout = ms(optTimeout.connectionAndInactivity);
httpsAgentOptions.timeout = ms(optTimeout.connectionAndInactivity);
}
if (!exports.addExternalRequestOptions(ctx, uri, opt_filterPrivate, options)) {
if (!addExternalRequestOptions(ctx, uri, opt_filterPrivate, options, httpAgentOptions, httpsAgentOptions)) {
throw new Error('Block external request. See externalRequest config options');
}
const protocol = new URL(uri).protocol;
if (!options.httpsAgent && !options.httpAgent) {
const agentOptions = { ...https.globalAgent.options, rejectUnauthorized: tenTenantRequestDefaults.rejectUnauthorized === false? false : true};
if (tenTenantRequestDefaults.forever !== undefined) {
agentOptions.keepAlive = !!tenTenantRequestDefaults.forever;
}
if (protocol === 'https:') {
options.httpsAgent = new https.Agent(agentOptions);
} else if (protocol === 'http:') {
options.httpAgent = new http.Agent(agentOptions);
}
if (!options.httpsAgent || !options.httpAgent) {
options.httpsAgent = new https.Agent(httpsAgentOptions);
options.httpAgent = new http.Agent(httpAgentOptions);
}
const headers = { ...options.headers };
@ -389,18 +403,16 @@ async function downloadUrlPromise(ctx, uri, optTimeout, optLimit, opt_Authorizat
method: 'GET',
responseType: 'stream',
headers,
maxRedirects: followRedirect ? maxRedirects : 0,
timeout: connectionAndInactivity,
validateStatus: (status) => status >= 200 && status < 300,
cancelToken: new axios.CancelToken(cancel => {
if (optTimeout?.wholeCycle) {
setTimeout(() => {
cancel(`ETIMEDOUT: ${optTimeout.wholeCycle}`);
}, ms(optTimeout.wholeCycle));
}
}),
signal: optTimeout.wholeCycle && AbortSignal.timeout ? AbortSignal.timeout(ms(optTimeout.wholeCycle)) : undefined,
// cancelToken: new axios.CancelToken(cancel => {
// if (optTimeout?.wholeCycle) {
// setTimeout(() => {
// cancel(`ETIMEDOUT: ${optTimeout.wholeCycle}`);
// }, ms(optTimeout.wholeCycle));
// }
// }),
};
try {
const response = await axios(axiosConfig);
const { status, headers } = response;
@ -413,34 +425,31 @@ async function downloadUrlPromise(ctx, uri, optTimeout, optLimit, opt_Authorizat
const contentLength = headers['content-length'];
if (contentLength && parseInt(contentLength) > sizeLimit) {
throw new Error('EMSGSIZE: Error response: content-length:' + contentLength);
}
return await processResponseStream(ctx, {
response,
sizeLimit,
uri,
opt_streamWriter,
contentLength,
timeout: optTimeout?.wholeCycle ? ms(optTimeout.wholeCycle) : null
});
} catch (err) {
if (axios.isCancel(err)) {
const error = new Error(err.message);
error.code = 'ETIMEDOUT';
// Close the stream to prevent downloading
const error = new Error('EMSGSIZE: Error response: content-length:' + contentLength);
error.code = 'EMSGSIZE';
response.data.destroy(error);
throw error;
}
if (err.response) {
if (opt_streamWriter && !isRedirectResponse(err.response)) {
delete err.response.headers['set-cookie'];
return processErrorResponseStream(err.response, {
sizeLimit,
opt_streamWriter,
timeout: optTimeout?.wholeCycle ? ms(optTimeout.wholeCycle) : null
});
}
const limitedStream = new SizeLimitStream(optLimit);
if (opt_returnStream) {
// When returning a stream, we'll return the response for the caller to handle streaming
// The content-length check is already done above
return { response, sha256: null, body: null, stream: response.data.pipe(limitedStream) };
}
const body = await pipeline(response.data, limitedStream, buffer);
const sha256 = crypto.createHash('sha256').update(body).digest('hex');
return { response, sha256, body, stream: null };
} catch (err) {
if('ERR_CANCELED' === err.code) {
err.code = 'ETIMEDOUT';
}
// if (axios.isCancel(err)) {
// const error = new Error(err.message);
// error.code = 'ETIMEDOUT';
// throw error;
// }
throw err;
}
}
@ -551,12 +560,11 @@ async function postRequestPromise(ctx, uri, postData, postDataStream, postDataSi
});
if (options.gzip !== undefined && !options.gzip) {
options.headers = options.headers || {};
options.headers['Accept-Encoding'] = 'identity';
options.headers = { ...options.headers, 'Accept-Encoding': 'identity' };
delete options.gzip;
}
if (!addExternalRequestOptions(ctx, uri, opt_isInJwtToken, options)) {
if (!addExternalRequestOptions(ctx, uri, opt_isInJwtToken, options, http.globalAgent.options, https.globalAgent.options)) {
throw new Error('Block external request. See externalRequest config options');
}
const protocol = new URL(uri).protocol;
@ -632,7 +640,6 @@ async function postRequestPromise(ctx, uri, postData, postDataStream, postDataSi
}
exports.postRequestPromise = postRequestPromise;
exports.downloadUrlPromise = downloadUrlPromise;
exports.addExternalRequestOptions = addExternalRequestOptions;
exports.mapAscServerErrorToOldError = function(error) {
var res = -1;
switch (error) {
@ -1306,6 +1313,47 @@ function getMonthDiff(d1, d2) {
return months;
}
exports.getMonthDiff = getMonthDiff;
/**
* A Transform stream that limits the size of data passing through it.
* It will throw an EMSGSIZE error if the size exceeds the limit.
*
* @class SizeLimitStream
* @extends {Transform}
*/
class SizeLimitStream extends Transform {
/**
* Creates an instance of SizeLimitStream.
* @param {number} sizeLimit - Maximum size in bytes that can pass through the stream
* @memberof SizeLimitStream
*/
constructor(sizeLimit) {
super();
this.sizeLimit = sizeLimit;
this.bytesReceived = 0;
}
/**
* Transform implementation that tracks the bytes received and enforces the size limit
*
* @param {Buffer|string} chunk - The chunk of data to process
* @param {string} encoding - The encoding of the chunk if it's a string
* @param {Function} callback - Called when processing is complete
* @memberof SizeLimitStream
*/
_transform(chunk, encoding, callback) {
this.bytesReceived += chunk.length;
if (this.sizeLimit && this.bytesReceived > this.sizeLimit) {
const error = new Error(`EMSGSIZE: Response too large: ${this.bytesReceived} bytes (limit: ${this.sizeLimit} bytes)`);
error.code = 'EMSGSIZE';
callback(error);
return;
}
callback(null, chunk);
}
}
exports.getLicensePeriod = function(startDate, now) {
startDate = new Date(startDate.getTime());//clone
startDate.addMonths(getMonthDiff(startDate, now));

View File

@ -34,6 +34,8 @@
const crypto = require('crypto');
var pathModule = require('path');
var urlModule = require('url');
const { pipeline } = require('node:stream/promises');
const { buffer } = require('node:stream/consumers');
var co = require('co');
const ms = require('ms');
const retry = require('retry');
@ -1746,7 +1748,14 @@ exports.downloadFile = function(req, res) {
headers['Range'] = req.get('Range');
}
yield utils.downloadUrlPromise(ctx, url, tenDownloadTimeout, tenDownloadMaxBytes, authorization, isInJwtToken, headers, res);
const { response, stream } = yield utils.downloadUrlPromise(ctx, url, tenDownloadTimeout, tenDownloadMaxBytes, authorization, isInJwtToken, headers, true);
//Set-Cookie resets browser session
delete response.headers['set-cookie'];
// Set the response headers to match the target response
res.set(response.headers);
// Use pipeline to pipe the response data to the client
yield pipeline(stream, res);
}
if (clientStatsD) {

View File

@ -1,6 +1,8 @@
const { describe, test, expect, beforeAll, afterAll } = require('@jest/globals');
const { Writable, Readable } = require('stream');
const { buffer } = require('node:stream/consumers');
const http = require('http');
const https = require('https');
const express = require('express');
const operationContext = require('../../../Common/sources/operationContext');
const utils = require('../../../Common/sources/utils');
@ -16,16 +18,6 @@ let testServer;
const PORT = 3456;
const BASE_URL = `http://localhost:${PORT}`;
// Helper to create a writable stream for testing
const createMockWriter = () => {
const chunks = [];
return new Writable({
write(chunk, encoding, callback) {
chunks.push(chunk);
callback();
}
});
};
const getStatusCode = (response) => response.statusCode || response.status;
@ -101,14 +93,6 @@ describe('HTTP Request Integration Tests', () => {
res.json({ success: true });
});
// Endpoint that streams data
app.get('/api/stream', (req, res) => {
res.setHeader('content-type', 'application/octet-stream');
res.setHeader('content-length', '1024');
const buffer = Buffer.alloc(1024);
res.send(buffer);
});
// Endpoint that simulates timeout
app.get('/api/timeout', (req, res) => {
// Never send response to trigger timeout
@ -144,17 +128,18 @@ describe('HTTP Request Integration Tests', () => {
res.send(binaryData);
});
// Large file endpoint
app.get('/api/large', (req, res) => {
res.setHeader('content-length', '2097152'); // 2MB
res.setHeader('content-type', 'application/octet-stream');
const buffer = Buffer.alloc(2097152);
res.send(buffer);
res.send(Buffer.alloc(2*1024*1024));//2MB
});
app.get('/api/headers', (req, res) => {
// Ensure you're only sending headers, which won't contain circular references
res.json({ headers: req.headers });
// Large file endpoint with truly no content-length header
app.get('/api/large-chunked', (req, res) => {
res.setHeader('content-type', 'application/octet-stream');
res.setHeader('transfer-encoding', 'chunked');
res.write(Buffer.alloc(2*1024*1024));
res.end();
});
// Endpoint that returns connection header info
@ -180,7 +165,29 @@ describe('HTTP Request Integration Tests', () => {
keepAlive: connectionHeader.toLowerCase() === 'keep-alive'
});
});
// Endpoint that mirrors whole request - handles any HTTP method
app.use('/api/mirror', express.json(), express.urlencoded({ extended: true }), (req, res) => {
// Create a mirror response object with all request details
const mirror = {
method: req.method,
url: req.url,
path: req.path,
query: req.query,
params: req.params,
headers: req.headers,
body: req.body,
protocol: req.protocol,
ip: req.ip,
hostname: req.hostname,
originalUrl: req.originalUrl,
xhr: req.xhr,
secure: req.secure
};
// Send the mirror response back
res.json(mirror);
});
// Start server
server = http.createServer(app);
@ -210,47 +217,43 @@ describe('HTTP Request Integration Tests', () => {
expect(JSON.parse(result.body.toString())).toEqual({ success: true });
});
test('handles streaming with writer', async () => {
const mockStreamWriter = createMockWriter();
const result = await utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/stream`,
{ wholeCycle: '5s', connectionAndInactivity: '3s' },
1024 * 1024,
null,
false,
null,
mockStreamWriter
);
expect(result).toBeUndefined();
});
test('throws error on timeout', async () => {
await expect(utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/timeout`,
{ wholeCycle: '1s', connectionAndInactivity: '500ms' },
1024 * 1024,
null,
false,
null,
null
)).rejects.toThrow(/(?:ESOCKETTIMEDOUT|timeout of 500ms exceeded)/);
try {
await utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/timeout`,
{ wholeCycle: '1s', connectionAndInactivity: '500ms' },
1024 * 1024,
null,
false,
null,
null
);
fail('Expected an error to be thrown');
} catch (error) {
expect(error.message).toContain('canceled');
expect(error.code).toBe('ETIMEDOUT');
}
});
test('throws error on wholeCycle timeout', async () => {
await expect(utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/timeout`,
{ wholeCycle: '1s', connectionAndInactivity: '5000ms' },
1024 * 1024,
null,
false,
null,
null
)).rejects.toThrow(/(?:ESOCKETTIMEDOUT|ETIMEDOUT: 1s|whole request cycle timeout: 1s)/);
try {
await utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/timeout`,
{ wholeCycle: '1s', connectionAndInactivity: '5000ms' },
1024 * 1024,
null,
false,
null,
null
);
fail('Expected an error to be thrown');
} catch (error) {
expect(error.message).toContain('canceled');
expect(error.code).toBe('ETIMEDOUT');
}
});
test('follows redirects correctly', async () => {
@ -354,16 +357,74 @@ describe('HTTP Request Integration Tests', () => {
});
test('throws error when content-length exceeds limit', async () => {
await expect(utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/large`,
{ wholeCycle: '5s', connectionAndInactivity: '3s' },
1024 * 1024,
null,
false,
null,
null
)).rejects.toThrow('Error response: content-length:2097152');
try {
await utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/large`,
{ wholeCycle: '5s', connectionAndInactivity: '3s' },
1024 * 1024,
null,
false,
null
);
fail('Expected an error to be thrown');
} catch (error) {
expect(error.message).toContain('EMSGSIZE:');
expect(error.code).toBe('EMSGSIZE');
}
try {
await utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/large-chunked`,
{ wholeCycle: '5s', connectionAndInactivity: '3s' },
1024 * 1024,
null,
false,
null
);
fail('Expected an error to be thrown');
} catch (error) {
expect(error.message).toContain('EMSGSIZE:');
expect(error.code).toBe('EMSGSIZE');
}
});
test('throws error when content-length exceeds limit with stream', async () => {
try {
const {stream} = await utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/large`,
{ wholeCycle: '5s', connectionAndInactivity: '3s' },
1024 * 1024,
null,
false,
null,
true
);
const receivedData = await buffer(stream);
fail('Expected an error to be thrown');
} catch (error) {
expect(error.message).toContain('EMSGSIZE:');
expect(error.code).toBe('EMSGSIZE');
}
try {
const {stream} = await utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/large-chunked`,
{ wholeCycle: '5s', connectionAndInactivity: '3s' },
1024 * 1024,
null,
false,
null,
true
);
const receivedData = await buffer(stream);
fail('Expected an error to be thrown');
} catch (error) {
expect(error.message).toContain('EMSGSIZE:');
expect(error.code).toBe('EMSGSIZE');
}
});
test('enables compression when gzip is true', async () => {
@ -522,6 +583,31 @@ describe('HTTP Request Integration Tests', () => {
// so we're checking that keepAlive is false
expect(responseData.keepAlive).toBe(false);
});
test('test requestDefaults', async () => {
const defaultHeaders = {"user-agent": "Node.js/6.13"};
const mockCtx = createMockContext({
'services.CoAuthoring.requestDefaults': {
headers: defaultHeaders
}
});
let customHeaders = {"custom-header": "test-value", "set-cookie": ["cookie"]};
let customQueryParams = {"custom-query-param": "value"};
const result = await utils.downloadUrlPromise(
mockCtx,
`${BASE_URL}/api/mirror?${new URLSearchParams(customQueryParams).toString()}`,
{ wholeCycle: '5s', connectionAndInactivity: '3s' },
1024 * 1024,
null,
false,
customHeaders
);
expect(result).toBeDefined();
expect(result.response.status).toBe(200);
const body = JSON.parse(result.body);
expect(body.headers).toMatchObject({...defaultHeaders, ...customHeaders});
expect(body.query).toMatchObject(customQueryParams);
});
});
test('handles binary data correctly', async () => {
@ -551,15 +637,7 @@ describe('HTTP Request Integration Tests', () => {
});
test('handles binary data with stream writer', async () => {
const chunks = [];
const mockStreamWriter = new Writable({
write(chunk, encoding, callback) {
chunks.push(chunk);
callback();
}
});
await utils.downloadUrlPromise(
const { stream } = await utils.downloadUrlPromise(
ctx,
`${BASE_URL}/api/binary`,
{ wholeCycle: '5s', connectionAndInactivity: '3s' },
@ -567,11 +645,9 @@ describe('HTTP Request Integration Tests', () => {
null,
false,
null,
mockStreamWriter
true
);
// Combine chunks and verify
const receivedData = Buffer.concat(chunks);
const receivedData = await buffer(stream);
const expectedData = Buffer.from([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]);
expect(Buffer.isBuffer(receivedData)).toBe(true);