[feature] Add binaryChanges config param

This commit is contained in:
Sergey Konovalov
2022-10-27 14:20:13 +03:00
committed by Sergey Konovalov
parent a050a5df43
commit fcb9a57a7d
8 changed files with 207 additions and 23 deletions

View File

@ -260,6 +260,7 @@
"attempts": 50,
"delay": "2s"
},
"binaryChanges": false,
"websocketMaxPayloadSize": "1.5MB",
"maxChangesSize": "0mb"
},

View File

@ -1044,4 +1044,6 @@ exports.getFunctionArguments = function(func) {
exports.isUselesSfc = function(row, cmd) {
return !(row && commonDefines.FileStatus.SaveVersion === row.status && cmd.getStatusInfoIn() === row.status_info);
};
exports.getChangesFileHeader = function() {
return `CHANGES|${commonDefines.buildVersion};`;
};

View File

@ -1482,10 +1482,10 @@ exports.install = function(server, callbackFunction) {
_checkLicense(ctx, conn);
});
io.engine.on("connection_error", (err) => {
console.log(err.req); // the request object
console.log(err.code); // the error code, for example 1
console.log(err.message); // the error message, for example "Session ID unknown"
console.log(err.context); // some additional error context
operationContext.global.logger.warn(err.req); // the request object
operationContext.global.logger.warn(err.code); // the error code, for example 1
operationContext.global.logger.warn(err.message); // the error message, for example "Session ID unknown"
operationContext.global.logger.warn(err.context); // some additional error context
});
/**
*
@ -2547,8 +2547,21 @@ exports.install = function(server, callbackFunction) {
do {
changes = yield sqlBase.getChangesPromise(ctx, docId, index, index + cfgMaxRequestChanges);
if (changes.length > 0) {
let buffers = changes.map(elem => elem.change_data);
let buffer = Buffer.concat(buffers);
let buffer;
if (cfgEditor['binaryChanges']) {
let buffers = changes.map(elem => elem.change_data);
buffers.unshift(Buffer.from(utils.getChangesFileHeader(), 'utf-8'))
buffer = Buffer.concat(buffers);
} else {
let changesJSON = indexChunk > 1 ? ',[' : '[';
changesJSON += changes[0].change_data;
for (let i = 1; i < changes.length; ++i) {
changesJSON += ',';
changesJSON += changes[i].change_data;
}
changesJSON += ']\r\n';
buffer = Buffer.from(changesJSON, 'utf8');
}
yield storage.putObject(ctx, changesPrefix + (indexChunk++).toString().padStart(3, '0'), buffer, buffer.length, cfgErrorFiles);
}
index += cfgMaxRequestChanges;
@ -2808,7 +2821,7 @@ exports.install = function(server, callbackFunction) {
// Стартовый индекс изменения при добавлении
const startIndex = puckerIndex;
const newChanges = data.changes;
const newChanges = cfgEditor['binaryChanges'] ? data.changes : JSON.parse(data.changes);
let newChangesLastDate = new Date();
newChangesLastDate.setMilliseconds(0);//remove milliseconds avoid issues with MySQL datetime rounding
let newChangesLastTime = newChangesLastDate.getTime();
@ -2819,7 +2832,8 @@ exports.install = function(server, callbackFunction) {
for (let i = 0; i < newChanges.length; ++i) {
oElement = newChanges[i];
arrNewDocumentChanges.push({docid: docId, change: oElement, time: newChangesLastDate,
let change = cfgEditor['binaryChanges'] ? oElement : JSON.stringify(oElement);
arrNewDocumentChanges.push({docid: docId, change: change, time: newChangesLastDate,
user: userId, useridoriginal: conn.user.idOriginal});
}

View File

@ -40,6 +40,8 @@ const config = require('config');
var configSql = config.get('services.CoAuthoring.sql');
const cfgTableResult = config.get('services.CoAuthoring.sql.tableResult');
var pgPoolExtraOptions = configSql.get('pgPoolExtraOptions');
const cfgEditor = config.get('services.CoAuthoring.editor');
let connectionConfig = {
host: configSql.get('dbHost'),
port: configSql.get('dbPort'),
@ -135,7 +137,7 @@ function getUpsertString(task, values) {
let p11 = addSqlParam(task.baseurl, values);
sqlCommand += `, baseurl = ${p11}`;
}
sqlCommand += ", user_index = ${cfgTableResult}.user_index + 1 RETURNING user_index as userindex;";
sqlCommand += `, user_index = ${cfgTableResult}.user_index + 1 RETURNING user_index as userindex;`;
return sqlCommand;
} else {
return `SELECT * FROM merge_db(${p0}, ${p1}, ${p2}, ${p3}, ${p4}, ${p5}, ${p6}, ${p7}, ${p8});`;
@ -183,7 +185,8 @@ exports.insertChanges = function(ctx, tableChanges, startIndex, objChanges, docI
let time = [];
//Postgres 9.4 multi-argument unnest
let sqlCommand = `INSERT INTO ${tableChanges} (tenant, id, change_id, user_id, user_id_original, user_name, change_data, change_date) `;
sqlCommand += "SELECT * FROM UNNEST ($1::text[], $2::text[], $3::int[], $4::text[], $5::text[], $6::text[], $7::bytea[], $8::timestamp[]);";
let changesType = cfgEditor['binaryChanges'] ? 'bytea' : 'text';
sqlCommand += `SELECT * FROM UNNEST ($1::text[], $2::text[], $3::int[], $4::text[], $5::text[], $6::text[], $7::${changesType}[], $8::timestamp[]);`;
let values = [tenant, id, changeId, userId, userIdOriginal, username, change, time];
let curLength = sqlCommand.length;
for (; i < objChanges.length; ++i) {

View File

@ -245,7 +245,7 @@ function addRandomKey(ctx, task, opt_prefix, opt_size) {
let p6 = addSqlParam(task.changeId, values);
let p7 = addSqlParam(task.callback, values);
let p8 = addSqlParam(task.baseurl, values);
let sqlCommand = 'INSERT INTO ${cfgTableResult} (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl)' +
let sqlCommand = `INSERT INTO ${cfgTableResult} (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl)` +
` VALUES (${p0}, ${p1}, ${p2}, ${p3}, ${p4}, ${p5}, ${p6}, ${p7}, ${p8});`;
sqlBase.baseConnector.sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {

View File

@ -35,7 +35,6 @@ var os = require('os');
var path = require('path');
var fs = require('fs');
var url = require('url');
var childProcess = require('child_process');
var co = require('co');
var config = require('config');
var spawnAsync = require('@expo/spawn-async');
@ -78,6 +77,7 @@ const cfgMaxRequestChanges = config.get('services.CoAuthoring.server.maxRequestC
const cfgForgottenFiles = config.get('services.CoAuthoring.server.forgottenfiles');
const cfgForgottenFilesName = config.get('services.CoAuthoring.server.forgottenfilesname');
const cfgNewFileTemplate = config.get('services.CoAuthoring.server.newFileTemplate');
const cfgEditor = config.get('services.CoAuthoring.editor');
//windows limit 512(2048) https://msdn.microsoft.com/en-us/library/6e3b887c.aspx
//Ubuntu 14.04 limit 4096 http://underyx.me/2015/05/18/raising-the-maximum-number-of-file-descriptors.html
@ -416,7 +416,11 @@ function* processDownloadFromStorage(ctx, dataConvert, cmd, task, tempDirs, auth
yield* concatFiles(tempDirs.source);
}
if (task.getFromChanges()) {
res = yield* processChanges(ctx, tempDirs, task, cmd, authorProps);
if(cfgEditor['binaryChanges']) {
res = yield* processChangesBin(ctx, tempDirs, task, cmd, authorProps);
} else {
res = yield* processChangesBase64(ctx, tempDirs, task, cmd, authorProps);
}
}
//todo rework
if (!fs.existsSync(dataConvert.fileFrom)) {
@ -458,8 +462,7 @@ function* concatFiles(source) {
}
}
}
function* processChanges(ctx, tempDirs, task, cmd, authorProps) {
function* processChangesBin(ctx, tempDirs, task, cmd, authorProps) {
let res = constants.NO_ERROR;
let changesDir = path.join(tempDirs.source, constants.CHANGES_NAME);
fs.mkdirSync(changesDir);
@ -488,7 +491,8 @@ function* processChanges(ctx, tempDirs, task, cmd, authorProps) {
}];
}
let streamObj = yield* streamCreate(ctx, changesDir, indexFile++, {highWaterMark: cfgStreamWriterBufferSize});
let streamObj = yield* streamCreateBin(ctx, changesDir, indexFile++, {highWaterMark: cfgStreamWriterBufferSize});
yield* streamWriteBin(streamObj, Buffer.from(utils.getChangesFileHeader(), 'utf-8'));
let curIndexStart = 0;
let curIndexEnd = Math.min(curIndexStart + cfgMaxRequestChanges, forceSaveIndex);
while (curIndexStart < curIndexEnd || extChanges) {
@ -509,6 +513,123 @@ function* processChanges(ctx, tempDirs, task, cmd, authorProps) {
changes = extChanges;
}
extChanges = undefined;
for (let i = 0; i < changes.length; ++i) {
let change = changes[i];
if (null === changesAuthor || changesAuthor !== change.user_id_original) {
if (null !== changesAuthor) {
yield* streamEndBin(streamObj);
streamObj = yield* streamCreateBin(ctx, changesDir, indexFile++);
yield* streamWriteBin(streamObj, Buffer.from(utils.getChangesFileHeader(), 'utf-8'));
}
let strDate = baseConnector.getDateTime(change.change_date);
changesHistory.changes.push({'created': strDate, 'user': {'id': change.user_id_original, 'name': change.user_name}});
}
changesAuthor = change.user_id_original;
changesAuthorUnique = change.user_id;
yield* streamWriteBin(streamObj, change.change_data);
streamObj.isNoChangesInFile = false;
}
if (changes.length > 0) {
authorProps.lastModifiedBy = changes[changes.length - 1].user_name;
authorProps.modified = changes[changes.length - 1].change_date.toISOString().slice(0, 19) + 'Z';
}
if (changes.length === curIndexEnd - curIndexStart) {
curIndexStart += cfgMaxRequestChanges;
curIndexEnd = Math.min(curIndexStart + cfgMaxRequestChanges, forceSaveIndex);
} else {
break;
}
}
yield* streamEndBin(streamObj);
if (streamObj.isNoChangesInFile) {
fs.unlinkSync(streamObj.filePath);
}
if (null !== changesAuthorUnique) {
changesIndex = utils.getIndexFromUserId(changesAuthorUnique, changesAuthor);
}
if (null == changesAuthor && null == changesIndex && forceSave && undefined !== forceSave.getAuthorUserId() &&
undefined !== forceSave.getAuthorUserIndex()) {
changesAuthor = forceSave.getAuthorUserId();
changesIndex = forceSave.getAuthorUserIndex();
}
cmd.setUserId(changesAuthor);
cmd.setUserIndex(changesIndex);
fs.writeFileSync(path.join(tempDirs.result, 'changesHistory.json'), JSON.stringify(changesHistory), 'utf8');
ctx.logger.debug('processChanges end');
return res;
}
function* streamCreateBin(ctx, changesDir, indexFile, opt_options) {
let fileName = constants.CHANGES_NAME + indexFile + '.bin';
let filePath = path.join(changesDir, fileName);
let writeStream = yield utils.promiseCreateWriteStream(filePath, opt_options);
writeStream.on('error', function(err) {
//todo integrate error handle in main thread (probable: set flag here and check it in main thread)
ctx.logger.error('WriteStreamError %s', err.stack);
});
return {writeStream: writeStream, filePath: filePath, isNoChangesInFile: true};
}
function* streamWriteBin(streamObj, buf) {
if (!streamObj.writeStream.write(buf)) {
yield utils.promiseWaitDrain(streamObj.writeStream);
}
}
function* streamEndBin(streamObj) {
streamObj.writeStream.end();
yield utils.promiseWaitClose(streamObj.writeStream);
}
function* processChangesBase64(ctx, tempDirs, task, cmd, authorProps) {
let res = constants.NO_ERROR;
let changesDir = path.join(tempDirs.source, constants.CHANGES_NAME);
fs.mkdirSync(changesDir);
let indexFile = 0;
let changesAuthor = null;
let changesAuthorUnique = null;
let changesIndex = null;
let changesHistory = {
serverVersion: commonDefines.buildVersion,
changes: []
};
let forceSave = cmd.getForceSave();
let forceSaveTime;
let forceSaveIndex = Number.MAX_VALUE;
if (forceSave && undefined !== forceSave.getTime() && undefined !== forceSave.getIndex()) {
forceSaveTime = forceSave.getTime();
forceSaveIndex = forceSave.getIndex();
}
let extChangeInfo = cmd.getExternalChangeInfo();
let extChanges;
if (extChangeInfo) {
extChanges = [{
id: cmd.getDocId(), change_id: 0, change_data: "", user_id: extChangeInfo.user_id,
user_id_original: extChangeInfo.user_id_original, user_name: extChangeInfo.user_name,
change_date: new Date(extChangeInfo.change_date)
}];
}
let streamObj = yield* streamCreate(ctx, changesDir, indexFile++, {highWaterMark: cfgStreamWriterBufferSize});
let curIndexStart = 0;
let curIndexEnd = Math.min(curIndexStart + cfgMaxRequestChanges, forceSaveIndex);
while (curIndexStart < curIndexEnd || extChanges) {
let changes = [];
if (curIndexStart < curIndexEnd) {
changes = yield baseConnector.getChangesPromise(ctx, cmd.getDocId(), curIndexStart, curIndexEnd, forceSaveTime);
if (changes.length > 0 && changes[0].change_data.startsWith('ENCRYPTED;')) {
ctx.logger.warn('processChanges encrypted changes');
//todo sql request instead?
res = constants.EDITOR_CHANGES;
}
res = yield* isUselessConvertion(ctx, task, cmd);
if (constants.NO_ERROR !== res) {
break;
}
}
if (0 === changes.length && extChanges) {
changes = extChanges;
}
extChanges = undefined;
for (let i = 0; i < changes.length; ++i) {
let change = changes[i];
if (null === changesAuthor || changesAuthor !== change.user_id_original) {
@ -518,6 +639,9 @@ function* processChanges(ctx, tempDirs, task, cmd, authorProps) {
}
let strDate = baseConnector.getDateTime(change.change_date);
changesHistory.changes.push({'created': strDate, 'user': {'id': change.user_id_original, 'name': change.user_name}});
yield* streamWrite(streamObj, '[');
} else {
yield* streamWrite(streamObj, ',');
}
changesAuthor = change.user_id_original;
changesAuthorUnique = change.user_id;
@ -535,7 +659,7 @@ function* processChanges(ctx, tempDirs, task, cmd, authorProps) {
break;
}
}
yield* streamEnd(streamObj);
yield* streamEnd(streamObj, ']');
if (streamObj.isNoChangesInFile) {
fs.unlinkSync(streamObj.filePath);
}
@ -555,7 +679,7 @@ function* processChanges(ctx, tempDirs, task, cmd, authorProps) {
}
function* streamCreate(ctx, changesDir, indexFile, opt_options) {
let fileName = constants.CHANGES_NAME + indexFile + '.bin';
let fileName = constants.CHANGES_NAME + indexFile + '.json';
let filePath = path.join(changesDir, fileName);
let writeStream = yield utils.promiseCreateWriteStream(filePath, opt_options);
writeStream.on('error', function(err) {
@ -565,14 +689,14 @@ function* streamCreate(ctx, changesDir, indexFile, opt_options) {
return {writeStream: writeStream, filePath: filePath, isNoChangesInFile: true};
}
function* streamWrite(streamObj, buf) {
if (!streamObj.writeStream.write(buf)) {
function* streamWrite(streamObj, text) {
if (!streamObj.writeStream.write(text, 'utf8')) {
yield utils.promiseWaitDrain(streamObj.writeStream);
}
}
function* streamEnd(streamObj) {
streamObj.writeStream.end();
function* streamEnd(streamObj, text) {
streamObj.writeStream.end(text, 'utf8');
yield utils.promiseWaitClose(streamObj.writeStream);
}
function* processUploadToStorage(ctx, dir, storagePath) {

View File

@ -44,6 +44,30 @@ CREATE TABLE IF NOT EXISTS `doc_changes` (
/*!40000 ALTER TABLE `doc_changes` DISABLE KEYS */;
/*!40000 ALTER TABLE `doc_changes` ENABLE KEYS */;
--
-- Definition of table `doc_changes`
--
CREATE TABLE IF NOT EXISTS `doc_changes2` (
`tenant` varchar(255) NOT NULL,
`id` varchar(255) NOT NULL,
`change_id` int(10) unsigned NOT NULL,
`user_id` varchar(255) NOT NULL,
`user_id_original` varchar(255) NOT NULL,
`user_name` varchar(255) NOT NULL,
`change_data` longblob NOT NULL,
`change_date` datetime NOT NULL,
PRIMARY KEY (`tenant`, `id`,`change_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--
-- Dumping data for table `doc_changes`
--
/*!40000 ALTER TABLE `doc_changes2` DISABLE KEYS */;
/*!40000 ALTER TABLE `doc_changes2` ENABLE KEYS */;
--
-- Definition of table `task_result`
--

View File

@ -20,6 +20,22 @@ PRIMARY KEY ("tenant", "id", "change_id")
)
WITH (OIDS=FALSE);
-- ----------------------------
-- Table structure for doc_changes2
-- ----------------------------
CREATE TABLE IF NOT EXISTS "public"."doc_changes2" (
"tenant" varchar(255) COLLATE "default" NOT NULL,
"id" varchar(255) COLLATE "default" NOT NULL,
"change_id" int4 NOT NULL,
"user_id" varchar(255) COLLATE "default" NOT NULL,
"user_id_original" varchar(255) COLLATE "default" NOT NULL,
"user_name" varchar(255) COLLATE "default" NOT NULL,
"change_data" bytea COLLATE "default" NOT NULL,
"change_date" timestamp without time zone NOT NULL,
PRIMARY KEY ("tenant", "id", "change_id")
)
WITH (OIDS=FALSE);
-- ----------------------------
-- Table structure for task_result
-- ----------------------------