[ds] MS SQL connector fixes + insert()

This commit is contained in:
Georgii Petrov
2023-07-28 18:21:58 +03:00
committed by Georgii Petrov
parent 1fe19eea91
commit a7de4643e6

View File

@ -118,7 +118,7 @@ function convertPlaceholdersValues(values) {
function registerPlaceholderValues(values, statement) {
for (const key of Object.keys(values)) {
statement.input(`${placeholderPrefix}${key}`, dataType(values[key]));
statement.input(key, dataType(values[key]));
}
}
@ -169,7 +169,7 @@ async function executeSql(ctx, sqlCommand, values = {}, noModifyRes = false, noL
}
function addSqlParameterObjectBased(parameter, name, accumulatedObject) {
accumulatedObject[name] = parameter;
accumulatedObject[`${placeholderPrefix}${name}`] = parameter;
return `@${placeholderPrefix}${name}`;
}
@ -230,7 +230,6 @@ function upsert(ctx, task, opt_updateUserIndex) {
'target.callback', `'${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":'`, '(target.user_index + 1)', `',"callback":'`, parameter, `'}'`
);
executeSql(ctx, `select ${concatParams('NULL', `',"smth":'`, 'NULL', '@ph_callback')} as result;`, { callback: '" HaHAhAHAh "' }).then(result => ctx.logger.debug('!!!!!!!!!!!!!!!!!!!!!!!!!', result))
updateColumns += `, target.callback = ${concatenatedColumns}`;
}
@ -248,7 +247,7 @@ function upsert(ctx, task, opt_updateUserIndex) {
+ `WHEN MATCHED THEN UPDATE SET ${updateColumns} `
+ `WHEN NOT MATCHED THEN INSERT(${sourceColumns}) VALUES(${sourceValues}) `
+ `OUTPUT $ACTION as action, INSERTED.user_index as insertId;`;
return executeSql(ctx, sqlMerge, values, true).then(
result => {
const insertId = result.recordset[0].insertId;
@ -264,11 +263,50 @@ function insertChanges(ctx, tableChanges, startIndex, objChanges, docId, index,
return;
}
let capacityReached = false;
let currentIndex = startIndex;
let lengthUtf8Current = 'INSERT INTO SELECT 1 FROM DUAL'.length
let sqlInsert = `INSERT INTO ${tableChanges} VALUES`;
let sqlInsert = `INSERT INTO ${tableChanges} VALUES`
const values = {};
// MS SQL Server can handle only 1000 parameters in one query.
const msSqlParametersCapacity = 1000;
const parametersInQuery = 8;
const rowsLimit = Math.trunc(msSqlParametersCapacity / parametersInQuery);
let rowCounts = 1;
let currentIndex = startIndex;
for (; currentIndex < objChanges.length && rowCounts <= rowsLimit; ++currentIndex, ++index) {
if (rowCounts !== 1) {
sqlInsert += ',';
}
rowCounts++;
const valuesPlaceholder = [
addSqlParameterObjectBased(ctx.tenant, `tenant${currentIndex}`, values),
addSqlParameterObjectBased(docId, `docId${currentIndex}`, values),
addSqlParameterObjectBased(index, `index${currentIndex}`, values),
addSqlParameterObjectBased(user.id, `id${currentIndex}`, values),
addSqlParameterObjectBased(user.idOriginal, `idOriginal${currentIndex}`, values),
addSqlParameterObjectBased(user.username, `username${currentIndex}`, values),
addSqlParameterObjectBased(objChanges[currentIndex].change, `change${currentIndex}`, values),
addSqlParameterObjectBased(objChanges[currentIndex].time, `time${currentIndex}`, values)
];
sqlInsert += `(${valuesPlaceholder.join(',')})`;
}
sqlInsert += ';';
executeSql(ctx,sqlInsert, values).then(
result => {
if (currentIndex < objChanges.length) {
insertChanges(ctx, tableChanges, currentIndex, objChanges, docId, index, user, callback);
} else {
// TODO: Not actual result, only last inserts.
callback(null, result, true);
}
},
error => callback(error, null, true)
);
}
module.exports = {
@ -277,5 +315,5 @@ module.exports = {
concatParams,
getTableColumns,
upsert,
// insertChanges
insertChanges
};