[ds] Oracle base connector pt.4

This commit is contained in:
Georgii Petrov
2023-06-30 14:00:46 +03:00
parent f43db03f83
commit 5c8b3b4859
3 changed files with 64 additions and 36 deletions

View File

@ -48,6 +48,38 @@ const connectionConfiguration = {
};
let pool = null;
oracledb.fetchAsString = [ oracledb.NCLOB ];
oracledb.autoCommit = true;
function columnsToLowercase(rows) {
const formattedRows = [];
for (const row of rows) {
const newRow = {};
for (const column in row) {
if (row.hasOwnProperty(column)) {
newRow[column.toLowerCase()] = row[column];
}
}
formattedRows.push(newRow);
}
return formattedRows;
}
function reconfigureParametersBinding(parameters) {
if (!parameters) {
return {};
}
const objectConfiguration = {};
for (const index in parameters) {
objectConfiguration[`:${index}`] = parameters[index];
}
return objectConfiguration;
}
async function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes, opt_noLog, opt_values) {
// Query must not have any ';' in oracle connector.
const correctedSql = sqlCommand.replace(/;/g, '');
@ -57,12 +89,12 @@ async function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes, opt_
pool = await oracledb.createPool(connectionConfiguration);
}
const connection = await oracledb.getConnection();
const connection = await pool.getConnection();
const handler = (error, result) => {
if (error) {
if (!opt_noLog) {
ctx.logger.error('sqlQuery error sqlCommand: %s: %s', correctedSql.slice(0, 50), error.stack);
ctx.logger.error('sqlQuery error sqlCommand: %s: %s', correctedSql, error.stack);
}
connection.close();
@ -71,40 +103,23 @@ async function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes, opt_
return;
}
let output = result;
if (!opt_noModifyRes) {
output = { rows: [], affectedRows: 0 };
let output = { rows: [], affectedRows: 0 };
if (result?.rowsAffected) {
output = { affectedRows: result.rowsAffected };
}
if (result?.rowsAffected) {
output = { affectedRows: result.rowsAffected };
}
if (result?.rows) {
output = { rows: result.rows };
}
if (result?.rows) {
output = !opt_noModifyRes ? columnsToLowercase(result.rows) : result.rows;
}
callbackFunction?.(error, output);
};
if (opt_values) {
connection.execute(correctedSql, opt_values, handler);
} else {
connection.execute(correctedSql, handler);
}
const bondedValues = reconfigureParametersBinding(opt_values);
const outputFormat = { outFormat: !opt_noModifyRes ? oracledb.OUT_FORMAT_OBJECT : oracledb.OUT_FORMAT_ARRAY };
connection.execute(correctedSql, bondedValues, outputFormat, handler);
// Transaction must be committed otherwise connector roll it back.
connection.execute('COMMIT', (error, result) => {
if (error) {
if (!opt_noLog) {
ctx.logger.error('sqlQuery error sqlCommand: %s: %s', correctedSql.slice(0, 50), error.stack);
}
callbackFunction?.(error);
}
connection.close();
});
connection.close();
} catch (error) {
if (!opt_noLog) {
ctx.logger.error('sqlQuery error while pool manipulation: %s', error.stack);
@ -119,8 +134,20 @@ function addSqlParameter(parameter, accumulatedArray) {
return `:${currentIndex}`;
}
function concatParams(val1, val2) {
return `COALESCE(${val1}, '') || COALESCE(${val2}, '')`;
function concatParams(firstParameter, secondParameter) {
return `${firstParameter} || ${secondParameter} || ''`;
}
function getTableColumns(ctx, tableName) {
return new Promise((resolve, reject) => {
sqlQuery(ctx, `SELECT LOWER(column_name) AS column_name FROM user_tab_columns WHERE table_name = '${tableName.toUpperCase()}'`, function (error, result) {
if (error) {
reject(error);
} else {
resolve(result);
}
});
});
}
function upsert(ctx, task, opt_updateUserIndex) {
@ -151,7 +178,7 @@ function upsert(ctx, task, opt_updateUserIndex) {
let callback = '';
if (task.callback) {
const parameter = addSqlParameter(JSON.stringify(task.callback), values);
callback = `, callback = CONCAT(callback , '${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":' , (user_index + 1) , ',"callback":', ${parameter}, '}')`;
callback = `, callback = callback || '${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":' || (user_index + 1) || ',"callback":' || ${parameter} || '}'`;
}
let baseUrl = '';
@ -172,13 +199,13 @@ function upsert(ctx, task, opt_updateUserIndex) {
+ ` WHEN MATCHED THEN UPDATE SET ${updateQuery}`
+ ` WHEN NOT MATCHED THEN INSERT (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl) VALUES (${valuesPlaceholder.join(', ')})`;
exports.sqlQuery(ctx, mergeSqlCommand, function(error, result) {
sqlQuery(ctx, mergeSqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
resolve(result);
}
}, undefined, undefined, values);
}, false, false, values);
});
}
@ -186,5 +213,6 @@ module.exports = {
sqlQuery,
addSqlParameter,
concatParams,
getTableColumns,
upsert
}

View File

@ -1,6 +1,6 @@
-- You must be logged in as SYS(sysdba) user.
-- Here, "onlyoffice" is a PBD(service) name.
alter session set container = onlyoffice;
alter session set container = onlyoffice; --test name: xepdb1;
-- In tables creation section "onlyoffice" is a user name.
-- ----------------------------

View File

@ -1,6 +1,6 @@
-- You must be logged in as SYS(sysdba) user.
-- Here, "onlyoffice" is a PBD(service) name.
alter session set container = onlyoffice;
alter session set container = xepdb1;
DROP TABLE onlyoffice.doc_changes CASCADE CONSTRAINTS PURGE;
DROP TABLE onlyoffice.task_result CASCADE CONSTRAINTS PURGE;