[perf] Speed up doc_changes inserts: bindDefs, MAX_EXECUTE_MANY_ROWS, IGNORE_ROW_ON_DUPKEY_INDEX, cap 2-way concurrency; Fix bug 76277

This commit is contained in:
Sergey Konovalov
2025-08-08 13:46:13 +03:00
parent 3327be2bd6
commit bab7659f37
5 changed files with 111 additions and 28 deletions

View File

@ -45,7 +45,7 @@ jobs:
- name: Creating schema
run: |
docker cp ./schema/oracle/createdb.sql oracle:/
docker exec oracle sqlplus -s onlyoffice/onlyoffice@//localhost/onlyoffice @/createdb.sql
docker exec oracle sqlplus -s onlyoffice/onlyoffice@//localhost/onlyoffice "@/createdb.sql"
- name: Run Jest
run: npm run "integration database tests"

View File

@ -48,7 +48,7 @@
- multi-integer-range 5.2.0 ([MIT](https://raw.githubusercontent.com/smikitky/node-multi-integer-range/master/LICENSE))
- multiparty 4.2.3 ([MIT](https://raw.githubusercontent.com/pillarjs/multiparty/master/LICENSE))
- mysql2 3.13.0 ([MIT](https://raw.githubusercontent.com/sidorares/node-mysql2/master/License))
- oracledb 6.8.0 ([(Apache-2.0 OR UPL-1.0)](https://raw.githubusercontent.com/oracle/node-oracledb/main/LICENSE.txt))
- oracledb 6.9.0 ([(Apache-2.0 OR UPL-1.0)](https://raw.githubusercontent.com/oracle/node-oracledb/main/LICENSE.txt))
- pg 8.14.0 ([MIT](https://raw.githubusercontent.com/brianc/node-postgres/master/LICENSE))
- redis 4.7.0 ([MIT](https://raw.githubusercontent.com/redis/node-redis/master/LICENSE))
- retry 0.13.1 ([MIT](https://raw.githubusercontent.com/tim-kos/node-retry/master/License))

View File

@ -1651,6 +1651,11 @@
"esprima": "^4.0.0"
}
},
"json-schema-traverse": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz",
"integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug=="
},
"json5": {
"version": "2.2.3",
"resolved": "https://registry.npmjs.org/json5/-/json5-2.2.3.tgz",
@ -2058,9 +2063,9 @@
}
},
"oracledb": {
"version": "6.8.0",
"resolved": "https://registry.npmjs.org/oracledb/-/oracledb-6.8.0.tgz",
"integrity": "sha512-A4ds4n4xtjPTzk1gwrHWuMeWsEcrScF0GFgVebGrhNpHSAzn6eDwMKcSbakZODKfFcI099iqhmqWsgko8D+7Ww=="
"version": "6.9.0",
"resolved": "https://registry.npmjs.org/oracledb/-/oracledb-6.9.0.tgz",
"integrity": "sha512-NwPbIGPv6m0GTFSbyy4/5WEjsKMiiJRxztLmYUcfD3oyh/uXdmVmKOwEWr84wFwWJ/0wQrYQh4PjnzvShibRaA=="
},
"pako": {
"version": "1.0.11",
@ -2336,11 +2341,6 @@
"resolved": "https://registry.npmjs.org/regenerator-runtime/-/regenerator-runtime-0.13.11.tgz",
"integrity": "sha512-kY1AZVr2Ra+t+piVaJ4gxaFaReZVH40AKNo7UCX6W+dEwBo/2oZJzqfuN1qLq1oL45o56cPaTXELwrTh8Fpggg=="
},
"json-schema-traverse": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz",
"integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug=="
},
"require-from-string": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/require-from-string/-/require-from-string-2.0.2.tgz",

View File

@ -32,7 +32,7 @@
"multi-integer-range": "5.2.0",
"multiparty": "4.2.3",
"mysql2": "3.13.0",
"oracledb": "6.8.0",
"oracledb": "6.9.0",
"pg": "8.14.0",
"redis": "4.7.0",
"retry": "0.13.1",

View File

@ -42,6 +42,11 @@ const cfgTableResult = configSql.get('tableResult');
const cfgTableChanges = configSql.get('tableChanges');
const cfgMaxPacketSize = configSql.get('max_allowed_packet');
// Limit rows per executeMany call to avoid large internal batches causing server instability
// Especially important for 21c 21.3 with NCLOB columns and batched inserts
// Higher to reduce round-trips while still bounded by cfgMaxPacketSize
const MAX_EXECUTE_MANY_ROWS = 2000;
const connectionConfiguration = {
user: configSql.get('dbUser'),
password: configSql.get('dbPass'),
@ -130,7 +135,20 @@ async function executeQuery(ctx, sqlCommand, values = [], noModifyRes = false, n
}
}
async function executeBunch(ctx, sqlCommand, values = [], noLog = false) {
/**
* Execute a batched DML statement using executeMany with optional bind options.
* Notes:
* - Accepts array-of-arrays for positional binds (recommended for :0..:N placeholders)
* - Options can include bindDefs, batchErrors, autoCommit, etc.
* - Logs batchErrors summary when present to aid debugging while keeping normal return shape
* @param {object} ctx - request context with logger
* @param {string} sqlCommand - SQL text with positional bind placeholders
* @param {Array<Array<any>>|Array<object>} values - rows to bind
* @param {object} [options] - executeMany options (e.g., { bindDefs: [...], batchErrors: true })
* @param {boolean} [noLog=false] - disable error logging
* @returns {{affectedRows:number}} affected rows count aggregate
*/
async function executeBunch(ctx, sqlCommand, values = [], options, noLog = false) {
let connection = null;
try {
if (!pool) {
@ -139,17 +157,36 @@ async function executeBunch(ctx, sqlCommand, values = [], noLog = false) {
connection = await pool.getConnection();
const result = await connection.executeMany(sqlCommand, values);
const result = await connection.executeMany(sqlCommand, values, options);
// Log batch errors if requested, without changing the public return shape
if (options?.batchErrors && Array.isArray(result?.batchErrors) && result.batchErrors.length && !noLog) {
const allDup = result.batchErrors.every(e => e?.errorNum === 1); // ORA-00001
const logMessage = `executeMany() batchErrors for: ${sqlCommand} -> count=${result.batchErrors.length}${allDup ? ' (duplicates)' : ''}`;
if (allDup) {
ctx.logger.debug(logMessage);
} else {
ctx.logger.error(logMessage);
}
}
return { affectedRows: result?.rowsAffected ?? 0 };
} catch (error) {
if (!noLog) {
ctx.logger.error(`sqlQuery() error while executing query: ${sqlCommand}\n${error.stack}`);
ctx.logger.error(`executeBunch() error while executing query: ${sqlCommand}\n${error.stack}`);
}
throw error;
} finally {
connection?.close();
if (connection) {
try {
await connection.close();
} catch (error) {
if (!noLog) {
ctx.logger.error(`connection.close() error while executing batched query: ${sqlCommand}\n${error.stack}`);
}
}
}
}
}
@ -287,7 +324,20 @@ function insertChanges(ctx, tableChanges, startIndex, objChanges, docId, index,
);
}
async function insertChangesAsync(ctx, tableChanges, startIndex, objChanges, docId, index, user) {
/**
* Insert a sequence of change records into the doc_changes table using executeMany.
* Removes APPEND_VALUES hint, adds explicit bindDefs, and chunks batches to reduce risk of ORA-03106.
* @param {object} ctx - request context
* @param {string} tableChanges - table name
* @param {number} startIndex - start offset in objChanges
* @param {Array<{change:string,time:Date|number|string}>} objChanges - changes payload
* @param {string} docId - document id
* @param {number} index - starting change_id value
* @param {{id:string,idOriginal:string,username:string}} user - user info
* @param {boolean} [allowParallel=true] - allow one-level parallel execution for next chunk
* @returns {Promise<{affectedRows:number}>}
*/
async function insertChangesAsync(ctx, tableChanges, startIndex, objChanges, docId, index, user, allowParallel = true) {
if (startIndex === objChanges.length) {
return { affectedRows: 0 };
}
@ -295,7 +345,7 @@ async function insertChangesAsync(ctx, tableChanges, startIndex, objChanges, doc
const parametersCount = 8;
const maxPlaceholderLength = ':99'.length;
// (parametersCount - 1) - separator symbols length.
const maxInsertStatementLength = `INSERT /*+ APPEND_VALUES*/INTO ${tableChanges} VALUES()`.length + maxPlaceholderLength * parametersCount + (parametersCount - 1);
const maxInsertStatementLength = `INSERT INTO ${tableChanges} VALUES()`.length + maxPlaceholderLength * parametersCount + (parametersCount - 1);
let packetCapacityReached = false;
const values = [];
@ -308,11 +358,16 @@ async function insertChangesAsync(ctx, tableChanges, startIndex, objChanges, doc
const lengthUtf8Row = maxInsertStatementLength + indexBytes + timeBytes
+ 4 * (ctx.tenant.length + docId.length + user.id.length + user.idOriginal.length + user.username.length + objChanges[currentIndex].change.length);
if (lengthUtf8Row + lengthUtf8Current >= cfgMaxPacketSize && currentIndex > startIndex) {
// Chunk by packet size and by max rows per batch
if ((lengthUtf8Row + lengthUtf8Current >= cfgMaxPacketSize || (values.length >= MAX_EXECUTE_MANY_ROWS)) && currentIndex > startIndex) {
packetCapacityReached = true;
break;
}
// Ensure TIMESTAMP bind is a valid JS Date
const _t = objChanges[currentIndex].time;
const changeTime = _t instanceof Date ? _t : new Date(_t);
const parameters = [
ctx.tenant,
docId,
@ -321,28 +376,56 @@ async function insertChangesAsync(ctx, tableChanges, startIndex, objChanges, doc
user.idOriginal,
user.username,
objChanges[currentIndex].change,
objChanges[currentIndex].time
changeTime
];
const rowValues = { ...parameters };
values.push(rowValues);
// Use positional binding (array-of-arrays) for :0..:7 placeholders
values.push(parameters);
lengthUtf8Current += lengthUtf8Row;
}
const placeholder = [];
for (let i = 0; i < parametersCount; i++) {
for (let i = 1; i <= parametersCount; i++) {
placeholder.push(`:${i}`);
}
const sqlInsert = `INSERT /*+ APPEND_VALUES*/INTO ${tableChanges} VALUES(${placeholder.join(',')})`;
const result = await executeBunch(ctx, sqlInsert, values);
// Use IGNORE_ROW_ON_DUPKEY_INDEX to avoid duplicate-key errors on retries and speed up inserts
const sqlInsert = `INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(${tableChanges} DOC_CHANGES_UNIQUE) */ INTO ${tableChanges} VALUES(${placeholder.join(',')})`;
// Explicit bind definitions to avoid thin-driver type inference pitfalls on NVARCHAR2/NCLOB/TIMESTAMP
const bindDefs = [
{ type: oracledb.DB_TYPE_NVARCHAR, maxSize: 255 }, // tenant NVARCHAR2(255)
{ type: oracledb.DB_TYPE_NVARCHAR, maxSize: 255 }, // id NVARCHAR2(255)
{ type: oracledb.DB_TYPE_NUMBER }, // change_id NUMBER
{ type: oracledb.DB_TYPE_NVARCHAR, maxSize: 255 }, // user_id NVARCHAR2(255)
{ type: oracledb.DB_TYPE_NVARCHAR, maxSize: 255 }, // user_id_original NVARCHAR2(255)
{ type: oracledb.DB_TYPE_NVARCHAR, maxSize: 255 }, // user_name NVARCHAR2(255)
{ type: oracledb.DB_TYPE_NCLOB }, // change_data NCLOB
{ type: oracledb.DB_TYPE_TIMESTAMP } // change_date TIMESTAMP
];
// With IGNORE_ROW_ON_DUPKEY_INDEX, duplicates are skipped server-side; disable batchErrors to reduce overhead
const executeOptions = { bindDefs, batchErrors: false, autoCommit: true };
// Execute current batch and optionally process next chunk concurrently if allowed
const p1 = executeBunch(ctx, sqlInsert, values, executeOptions);
if (packetCapacityReached) {
const recursiveValue = await insertChangesAsync(ctx, tableChanges, currentIndex, objChanges, docId, index, user);
result.affectedRows += recursiveValue.affectedRows;
if (allowParallel) {
// Start processing the remaining chunks concurrently (single-level parallelism)
const p2 = insertChangesAsync(ctx, tableChanges, currentIndex, objChanges, docId, index, user, false);
const [r1, r2] = await Promise.all([p1, p2]);
r1.affectedRows += r2.affectedRows;
return r1;
}
// Parallelism not allowed: finish this batch, then continue sequentially
const r1 = await p1;
const r2 = await insertChangesAsync(ctx, tableChanges, currentIndex, objChanges, docId, index, user, false);
r1.affectedRows += r2.affectedRows;
return r1;
}
const result = await p1;
return result;
}