[ds] Oracle base connector pt.5

This commit is contained in:
Georgii Petrov
2023-07-04 05:28:32 +03:00
parent 5c8b3b4859
commit fff79d9413
4 changed files with 103 additions and 49 deletions

View File

@ -113,7 +113,6 @@ exports.insertChangesPromise = function (ctx, objChanges, docId, index, user) {
} else { } else {
return exports.insertChangesPromiseCompatibility(ctx, objChanges, docId, index, user); return exports.insertChangesPromiseCompatibility(ctx, objChanges, docId, index, user);
} }
}; };
function _getDateTime2(oDate) { function _getDateTime2(oDate) {
return oDate.toISOString().slice(0, 19).replace('T', ' '); return oDate.toISOString().slice(0, 19).replace('T', ' ');
@ -127,10 +126,12 @@ function _insertChangesCallback (ctx, startIndex, objChanges, docId, index, user
if (i === l) if (i === l)
return; return;
const indexBytes = 4;
const timeBytes = 8;
for (; i < l; ++i, ++index) { for (; i < l; ++i, ++index) {
//44 - length of "($1001,... $1007)," //49 - length of "($1001,... $1008),"
//4 is max utf8 bytes per symbol //4 is max utf8 bytes per symbol
lengthUtf8Row = 44 + 4 * (docId.length + user.id.length + user.idOriginal.length + user.username.length + objChanges[i].change.length) + 4 + 8; lengthUtf8Row = 49 + 4 * (ctx.tenant.length + docId.length + user.id.length + user.idOriginal.length + user.username.length + objChanges[i].change.length) + indexBytes + timeBytes;
if (lengthUtf8Row + lengthUtf8Current >= maxPacketSize && i > startIndex) { if (lengthUtf8Row + lengthUtf8Current >= maxPacketSize && i > startIndex) {
sqlCommand += ';'; sqlCommand += ';';
(function(tmpStart, tmpIndex) { (function(tmpStart, tmpIndex) {

View File

@ -37,7 +37,8 @@ const config = require('config');
const connectorUtilities = require("./connectorUtilities"); const connectorUtilities = require("./connectorUtilities");
const configSql = config.get('services.CoAuthoring.sql'); const configSql = config.get('services.CoAuthoring.sql');
const cfgTableResult = config.get('services.CoAuthoring.sql.tableResult'); const cfgTableResult = configSql.get('tableResult');
const cfgMaxPacketSize = configSql.get('max_allowed_packet');
const connectionConfiguration = { const connectionConfiguration = {
user: configSql.get('dbUser'), user: configSql.get('dbUser'),
@ -48,7 +49,7 @@ const connectionConfiguration = {
}; };
let pool = null; let pool = null;
oracledb.fetchAsString = [ oracledb.NCLOB ]; oracledb.fetchAsString = [ oracledb.NCLOB, oracledb.CLOB ];
oracledb.autoCommit = true; oracledb.autoCommit = true;
function columnsToLowercase(rows) { function columnsToLowercase(rows) {
@ -67,19 +68,6 @@ function columnsToLowercase(rows) {
return formattedRows; return formattedRows;
} }
function reconfigureParametersBinding(parameters) {
if (!parameters) {
return {};
}
const objectConfiguration = {};
for (const index in parameters) {
objectConfiguration[`:${index}`] = parameters[index];
}
return objectConfiguration;
}
async function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes, opt_noLog, opt_values) { async function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes, opt_noLog, opt_values) {
// Query must not have any ';' in oracle connector. // Query must not have any ';' in oracle connector.
const correctedSql = sqlCommand.replace(/;/g, ''); const correctedSql = sqlCommand.replace(/;/g, '');
@ -97,29 +85,32 @@ async function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes, opt_
ctx.logger.error('sqlQuery error sqlCommand: %s: %s', correctedSql, error.stack); ctx.logger.error('sqlQuery error sqlCommand: %s: %s', correctedSql, error.stack);
} }
connection.close();
callbackFunction?.(error); callbackFunction?.(error);
return; return;
} }
let output = { rows: [], affectedRows: 0 }; connection.close();
if (result?.rowsAffected) {
output = { affectedRows: result.rowsAffected };
}
if (result?.rows) { let output = { rows: [], affectedRows: 0 };
output = !opt_noModifyRes ? columnsToLowercase(result.rows) : result.rows; if (!opt_noModifyRes) {
if (result?.rowsAffected) {
output = { affectedRows: result.rowsAffected };
}
if (result?.rows) {
output = columnsToLowercase(result.rows);
}
} else {
output = result;
} }
callbackFunction?.(error, output); callbackFunction?.(error, output);
}; };
const bondedValues = reconfigureParametersBinding(opt_values); const bondedValues = opt_values ?? [];
const outputFormat = { outFormat: !opt_noModifyRes ? oracledb.OUT_FORMAT_OBJECT : oracledb.OUT_FORMAT_ARRAY }; const outputFormat = { outFormat: !opt_noModifyRes ? oracledb.OUT_FORMAT_OBJECT : oracledb.OUT_FORMAT_ARRAY };
connection.execute(correctedSql, bondedValues, outputFormat, handler); connection.execute(correctedSql, bondedValues, outputFormat, handler);
connection.close();
} catch (error) { } catch (error) {
if (!opt_noLog) { if (!opt_noLog) {
ctx.logger.error('sqlQuery error while pool manipulation: %s', error.stack); ctx.logger.error('sqlQuery error while pool manipulation: %s', error.stack);
@ -162,18 +153,11 @@ function upsert(ctx, task, opt_updateUserIndex) {
} }
const dateNow = new Date(); const dateNow = new Date();
const values = []; const values = [];
const valuesPlaceholder = [ const tenant = addSqlParameter(task.tenant, values);
addSqlParameter(task.tenant, values), const id = addSqlParameter(task.key, values);
addSqlParameter(task.key, values), const lastOpenDate = addSqlParameter(dateNow, values);
addSqlParameter(task.status, values),
addSqlParameter(task.statusInfo, values),
addSqlParameter(dateNow, values),
addSqlParameter(task.userIndex, values),
addSqlParameter(task.changeId, values),
addSqlParameter(cbInsert, values),
addSqlParameter(task.baseurl, values)
];
let callback = ''; let callback = '';
if (task.callback) { if (task.callback) {
@ -192,12 +176,25 @@ function upsert(ctx, task, opt_updateUserIndex) {
userIndex = ', user_index = user_index + 1'; userIndex = ', user_index = user_index + 1';
} }
const updateQuery = `last_open_date = ${addSqlParameter(dateNow, values)}${callback}${baseUrl}${userIndex}` const updateQuery = `last_open_date = ${lastOpenDate}${callback}${baseUrl}${userIndex}`
const condition = `tenant = ${valuesPlaceholder[0]} AND id = ${valuesPlaceholder[1]}` const condition = `tenant = ${tenant} AND id = ${id}`
let mergeSqlCommand = `MERGE INTO ${cfgTableResult} USING DUAL ON (${condition})` let mergeSqlCommand = `MERGE INTO ${cfgTableResult} USING DUAL ON (${condition})`
+ ` WHEN MATCHED THEN UPDATE SET ${updateQuery}` + ` WHEN MATCHED THEN UPDATE SET ${updateQuery}`;
+ ` WHEN NOT MATCHED THEN INSERT (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl) VALUES (${valuesPlaceholder.join(', ')})`;
const valuesPlaceholder = [
addSqlParameter(task.tenant, values),
addSqlParameter(task.key, values),
addSqlParameter(task.status, values),
addSqlParameter(task.statusInfo, values),
addSqlParameter(dateNow, values),
addSqlParameter(task.userIndex, values),
addSqlParameter(task.changeId, values),
addSqlParameter(cbInsert, values),
addSqlParameter(task.baseurl, values)
];
mergeSqlCommand += ` WHEN NOT MATCHED THEN INSERT (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl) VALUES (${valuesPlaceholder.join(', ')})`;
sqlQuery(ctx, mergeSqlCommand, function(error, result) { sqlQuery(ctx, mergeSqlCommand, function(error, result) {
if (error) { if (error) {
@ -209,10 +206,66 @@ function upsert(ctx, task, opt_updateUserIndex) {
}); });
} }
async function insertChanges(ctx, tableChanges, startIndex, objChanges, docId, index, user, callback) {
if (startIndex === objChanges.length) {
return;
}
let packetCapacityReached = false;
let currentIndex = startIndex;
let lengthUtf8Current = 'INSERT ALL SELECT 1 FROM DUAL'.length;
let insertAllSqlCommand = 'INSERT ALL ';
const values = [];
const maxInsertionClauseLength = `INTO ${tableChanges} VALUES(:9991,:9992,:9993,:9994,:9995,:9996,:9997,:9998) `.length;
const indexBytes = 4;
const timeBytes = 8;
for (; currentIndex < objChanges.length; ++currentIndex, ++index) {
// 4 bytes is maximum for utf8 symbol.
const lengthUtf8Row = maxInsertionClauseLength + indexBytes + timeBytes
+ 4 * (ctx.tenant.length + docId.length + user.id.length + user.idOriginal.length + user.username.length + objChanges[currentIndex].change.length);
if (lengthUtf8Row + lengthUtf8Current >= cfgMaxPacketSize && currentIndex > startIndex) {
packetCapacityReached = true;
break;
}
const valuesPlaceholder= [
addSqlParameter(ctx.tenant, values),
addSqlParameter(docId, values),
addSqlParameter(index, values),
addSqlParameter(user.id, values),
addSqlParameter(user.idOriginal, values),
addSqlParameter(user.username, values),
addSqlParameter(objChanges[currentIndex].change, values),
addSqlParameter(objChanges[currentIndex].time, values)
];
insertAllSqlCommand += `INTO ${tableChanges} VALUES(${valuesPlaceholder.join(',')}) `;
}
insertAllSqlCommand += 'SELECT 1 FROM DUAL';
await sqlQuery(ctx, insertAllSqlCommand, function (error, result) {
if (error) {
callback(error, null, true);
return;
}
if (packetCapacityReached) {
insertChanges(ctx, tableChanges, currentIndex, objChanges, docId, index, user, callback);
} else {
callback(error, result, true);
}
}, false, false, values);
}
module.exports = { module.exports = {
sqlQuery, sqlQuery,
addSqlParameter, addSqlParameter,
concatParams, concatParams,
getTableColumns, getTableColumns,
upsert upsert,
insertChanges
} }

View File

@ -1,6 +1,6 @@
-- You must be logged in as SYS(sysdba) user. -- You must be logged in as SYS(sysdba) user.
-- Here, "onlyoffice" is a PBD(service) name. -- Here, "onlyoffice" is a PBD(service) name.
alter session set container = onlyoffice; --test name: xepdb1; alter session set container = onlyoffice;
-- In tables creation section "onlyoffice" is a user name. -- In tables creation section "onlyoffice" is a user name.
-- ---------------------------- -- ----------------------------
@ -28,12 +28,12 @@ CREATE TABLE onlyoffice.task_result (
id NVARCHAR2(255) NOT NULL, id NVARCHAR2(255) NOT NULL,
status NUMBER NOT NULL, status NUMBER NOT NULL,
status_info NUMBER NOT NULL, status_info NUMBER NOT NULL,
created_at TIMESTAMP DEFAULT SYSDATE, -- check format created_at TIMESTAMP DEFAULT SYSDATE NOT NULL,
last_open_date TIMESTAMP NOT NULL, last_open_date TIMESTAMP NOT NULL,
user_index NUMBER DEFAULT 1 NOT NULL, user_index NUMBER DEFAULT 1 NOT NULL,
change_id NUMBER DEFAULT 0 NOT NULL, change_id NUMBER DEFAULT 0 NOT NULL,
callback NCLOB NOT NULL, callback NCLOB, -- codebase uses '' as default values here, but Oracle treat '' as NULL, so NULL permitted for this value.
baseurl NCLOB NOT NULL, baseurl NCLOB, -- codebase uses '' as default values here, but Oracle treat '' as NULL, so NULL permitted for this value.
password NCLOB NULL, password NCLOB NULL,
additional NCLOB NULL, additional NCLOB NULL,
CONSTRAINT task_result_unsigned_int CHECK (user_index BETWEEN 0 AND 4294967295 AND change_id BETWEEN 0 AND 4294967295) CONSTRAINT task_result_unsigned_int CHECK (user_index BETWEEN 0 AND 4294967295 AND change_id BETWEEN 0 AND 4294967295)

View File

@ -1,6 +1,6 @@
-- You must be logged in as SYS(sysdba) user. -- You must be logged in as SYS(sysdba) user.
-- Here, "onlyoffice" is a PBD(service) name. -- Here, "onlyoffice" is a PBD(service) name.
alter session set container = xepdb1; alter session set container = onlyoffice;
DROP TABLE onlyoffice.doc_changes CASCADE CONSTRAINTS PURGE; DROP TABLE onlyoffice.doc_changes CASCADE CONSTRAINTS PURGE;
DROP TABLE onlyoffice.task_result CASCADE CONSTRAINTS PURGE; DROP TABLE onlyoffice.task_result CASCADE CONSTRAINTS PURGE;