diff --git a/build/webpack.base.config.js b/build/webpack.base.config.js index 19933ede..8605c532 100644 --- a/build/webpack.base.config.js +++ b/build/webpack.base.config.js @@ -30,7 +30,7 @@ module.exports = { }, { resourceQuery: /^\?vue/, - use: path.resolve('build/includer.js') + use: path.resolve(__dirname, 'includer.js') }, { test: /\.js$/, diff --git a/client/components/Reader/versionHistory.js b/client/components/Reader/versionHistory.js index 4846bde4..4c819c6c 100644 --- a/client/components/Reader/versionHistory.js +++ b/client/components/Reader/versionHistory.js @@ -1,4 +1,15 @@ export const versionHistory = [ +{ + showUntil: '2021-12-02', + header: '0.11.1 (2021-12-03)', + content: +` + +` +}, + { showUntil: '2021-11-17', header: '0.11.0 (2021-11-18)', diff --git a/package-lock.json b/package-lock.json index 8a508f3a..c54affb6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "Liberama", - "version": "0.11.0", + "version": "0.11.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "Liberama", - "version": "0.11.0", + "version": "0.11.1", "hasInstallScript": true, "license": "CC0-1.0", "dependencies": { diff --git a/package.json b/package.json index c63383cb..4db64e54 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "Liberama", - "version": "0.11.0", + "version": "0.11.1", "author": "Book Pauk ", "license": "CC0-1.0", "repository": "bookpauk/liberama", diff --git a/server/config/base.js b/server/config/base.js index 8211cad4..6e400a07 100644 --- a/server/config/base.js +++ b/server/config/base.js @@ -37,6 +37,14 @@ module.exports = { } ], + jembaDb: [ + { + dbName: 'reader-storage', + thread: true, + openAll: true, + } + ], + servers: [ { serverName: '1', diff --git a/server/controllers/ReaderController.js b/server/controllers/ReaderController.js index 87f69464..0dea8925 100644 --- a/server/controllers/ReaderController.js +++ b/server/controllers/ReaderController.js @@ -1,12 +1,12 @@ const BaseController = require('./BaseController'); const ReaderWorker = require('../core/Reader/ReaderWorker');//singleton -const ReaderStorage = require('../core/Reader/ReaderStorage');//singleton +const JembaReaderStorage = require('../core/Reader/JembaReaderStorage');//singleton const WorkerState = require('../core/WorkerState');//singleton class ReaderController extends BaseController { constructor(config) { super(config); - this.readerStorage = new ReaderStorage(); + this.readerStorage = new JembaReaderStorage(); this.readerWorker = new ReaderWorker(config); this.workerState = new WorkerState(); } diff --git a/server/controllers/WebSocketController.js b/server/controllers/WebSocketController.js index cb147288..1ef46135 100644 --- a/server/controllers/WebSocketController.js +++ b/server/controllers/WebSocketController.js @@ -2,7 +2,7 @@ const WebSocket = require ('ws'); const _ = require('lodash'); const ReaderWorker = require('../core/Reader/ReaderWorker');//singleton -const ReaderStorage = require('../core/Reader/ReaderStorage');//singleton +const JembaReaderStorage = require('../core/Reader/JembaReaderStorage');//singleton const WorkerState = require('../core/WorkerState');//singleton const log = new (require('../core/AppLogger'))().log;//singleton const utils = require('../core/utils'); @@ -15,7 +15,7 @@ class WebSocketController { this.config = config; this.isDevelopment = (config.branch == 'development'); - this.readerStorage = new ReaderStorage(); + this.readerStorage = new JembaReaderStorage(); this.readerWorker = new ReaderWorker(config); this.workerState = new WorkerState(); diff --git a/server/core/AppLogger.js b/server/core/AppLogger.js index af6799ca..68c8e1fb 100644 --- a/server/core/AppLogger.js +++ b/server/core/AppLogger.js @@ -7,10 +7,14 @@ let instance = null; class AppLogger { constructor() { if (!instance) { + this.inited = false; + this.logFileName = ''; + this.errLogFileName = ''; + this.fatalLogFileName = ''; + instance = this; } - this.inited = false; return instance; } @@ -22,11 +26,16 @@ class AppLogger { if (config.loggingEnabled) { await fs.ensureDir(config.logDir); + + this.logFileName = `${config.logDir}/${config.name}.log`; + this.errLogFileName = `${config.logDir}/${config.name}.err.log`; + this.fatalLogFileName = `${config.logDir}/${config.name}.fatal.log`; + loggerParams = [ {log: 'ConsoleLog'}, - {log: 'FileLog', fileName: `${config.logDir}/${config.name}.log`}, - {log: 'FileLog', fileName: `${config.logDir}/${config.name}.err.log`, exclude: [LM_OK, LM_INFO, LM_TOTAL]}, - {log: 'FileLog', fileName: `${config.logDir}/${config.name}.fatal.log`, exclude: [LM_OK, LM_INFO, LM_WARN, LM_ERR, LM_TOTAL]},//LM_FATAL only + {log: 'FileLog', fileName: this.logFileName}, + {log: 'FileLog', fileName: this.errLogFileName, exclude: [LM_OK, LM_INFO, LM_TOTAL]}, + {log: 'FileLog', fileName: this.fatalLogFileName, exclude: [LM_OK, LM_INFO, LM_WARN, LM_ERR, LM_TOTAL]},//LM_FATAL only ]; } diff --git a/server/core/AsyncExit.js b/server/core/AsyncExit.js new file mode 100644 index 00000000..d2b61594 --- /dev/null +++ b/server/core/AsyncExit.js @@ -0,0 +1,111 @@ +let instance = null; + +const defaultTimeout = 15*1000;//15 sec +const exitSignals = ['SIGINT', 'SIGTERM', 'SIGBREAK', 'SIGHUP', 'uncaughtException']; + +//singleton +class AsyncExit { + constructor() { + if (!instance) { + this.onSignalCallbacks = new Map(); + this.callbacks = new Map(); + this.afterCallbacks = new Map(); + this.exitTimeout = defaultTimeout; + this.inited = false; + instance = this; + } + + return instance; + } + + init(signals = null, codeOnSignal = 2) { + if (this.inited) + throw new Error('AsyncExit: initialized already'); + + if (!signals) + signals = exitSignals; + + const runSingalCallbacks = async(signal) => { + for (const signalCallback of this.onSignalCallbacks.keys()) { + try { + await signalCallback(signal); + } catch(e) { + console.error(e); + } + } + }; + + for (const signal of signals) { + process.once(signal, async() => { + await runSingalCallbacks(signal); + this.exit(codeOnSignal); + }); + } + + this.inited = true; + } + + onSignal(signalCallback) { + if (!this.onSignalCallbacks.has(signalCallback)) { + this.onSignalCallbacks.set(signalCallback, true); + } + } + + add(exitCallback) { + if (!this.callbacks.has(exitCallback)) { + this.callbacks.set(exitCallback, true); + } + } + + addAfter(exitCallback) { + if (!this.afterCallbacks.has(exitCallback)) { + this.afterCallbacks.set(exitCallback, true); + } + } + + remove(exitCallback) { + if (this.callbacks.has(exitCallback)) { + this.callbacks.delete(exitCallback); + } + if (this.afterCallbacks.has(exitCallback)) { + this.afterCallbacks.delete(exitCallback); + } + } + + setExitTimeout(timeout) { + this.exitTimeout = timeout; + } + + exit(code = 0) { + if (this.exiting) + return; + + this.exiting = true; + + const timer = setTimeout(() => { process.exit(code); }, this.exitTimeout); + + (async() => { + for (const exitCallback of this.callbacks.keys()) { + try { + await exitCallback(); + } catch(e) { + console.error(e); + } + } + + for (const exitCallback of this.afterCallbacks.keys()) { + try { + await exitCallback(); + } catch(e) { + console.error(e); + } + } + + clearTimeout(timer); + //console.log('Exited gracefully'); + process.exit(code); + })(); + } +} + +module.exports = AsyncExit; diff --git a/server/core/LibSharedStorage/MegaStorage.js b/server/core/LibSharedStorage/MegaStorage.js index 718534d3..08091f2b 100644 --- a/server/core/LibSharedStorage/MegaStorage.js +++ b/server/core/LibSharedStorage/MegaStorage.js @@ -25,7 +25,6 @@ class MegaStorage { this.debouncedSaveStats = _.debounce(() => { this.saveStats().catch((e) => { log(LM_ERR, `MegaStorage::saveStats ${e.message}`); - //process.exit(1); }); }, 5000, {'maxWait':6000}); diff --git a/server/core/Logger.js b/server/core/Logger.js index db5d122b..c095bdfa 100644 --- a/server/core/Logger.js +++ b/server/core/Logger.js @@ -2,6 +2,9 @@ Журналирование с буферизацией вывода */ const fs = require('fs-extra'); +const ayncExit = new (require('./AsyncExit'))(); + +const sleep = (ms) => { return new Promise(resolve => setTimeout(resolve, ms)) }; global.LM_OK = 0; global.LM_INFO = 1; @@ -46,12 +49,13 @@ class BaseLog { this.outputBuffer = []; await this.flushImpl(this.data) - .catch(e => { console.log(e); process.exit(1); } ); + .catch(e => { console.log(e); ayncExit.exit(1); } ); this.flushing = false; } log(msgType, message) { - if (this.closed) { console.log(`Logger fatal error: log was closed (message to log: ${message}})`); process.exit(1); } + if (this.closed) + return; if (!this.exclude.has(msgType)) { this.outputBuffer.push(message); @@ -73,7 +77,7 @@ class BaseLog { } } - close() { + async close() { if (this.closed) return; @@ -81,12 +85,13 @@ class BaseLog { clearInterval(this.iid); try { - if (this.flushing) - this.flushImplSync(this.data); - this.flushImplSync(this.outputBuffer); + while (this.outputBufferLength) { + await this.flush(); + await sleep(1); + } } catch(e) { console.log(e); - process.exit(1); + ayncExit.exit(1); } this.outputBufferLength = 0; this.outputBuffer = []; @@ -103,12 +108,14 @@ class FileLog extends BaseLog { this.rcid = 0; } - close() { + async close() { if (this.closed) return; - super.close(); - if (this.fd) - fs.closeSync(this.fd); + await super.close(); + if (this.fd) { + await fs.close(this.fd); + this.fd = null; + } if (this.rcid) clearTimeout(this.rcid); } @@ -151,23 +158,15 @@ class FileLog extends BaseLog { }, LOG_ROTATE_FILE_CHECK_INTERVAL); } - await fs.write(this.fd, Buffer.from(data.join(''))); + if (this.fd) + await fs.write(this.fd, Buffer.from(data.join(''))); } - - flushImplSync(data) { - fs.writeSync(this.fd, Buffer.from(data.join(''))); - } - } class ConsoleLog extends BaseLog { async flushImpl(data) { process.stdout.write(data.join('')); } - - flushImplSync(data) { - process.stdout.write(data.join('')); - } } //------------------------------------------------------------------ @@ -178,7 +177,7 @@ const factory = { class Logger { - constructor(params = null, cleanupCallback = null) { + constructor(params = null) { this.handlers = []; if (params) { params.forEach((logParams) => { @@ -187,12 +186,22 @@ class Logger { this.handlers.push(new loggerClass(logParams)); }); } - cleanupCallback = cleanupCallback || (() => {}); - this.cleanup(cleanupCallback); + + this.closed = false; + ayncExit.onSignal((signal) => { + this.log(LM_FATAL, `Signal ${signal} received, exiting...`); + }); + ayncExit.addAfter(this.close.bind(this)); + } + + formatDate(date) { + return `${date.getFullYear()}-${(date.getMonth() + 1).toString().padStart(2, '0')}-${date.getDate().toString().padStart(2, '0')} ` + + `${date.getHours().toString().padStart(2, '0')}:${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')}.` + + `${date.getMilliseconds().toString().padStart(3, '0')}`; } prepareMessage(msgType, message) { - return (new Date().toISOString()) + ` ${msgTypeToStr[msgType]}: ${message}\n`; + return this.formatDate(new Date()) + ` ${msgTypeToStr[msgType]}: ${message}\n`; } log(msgType, message) { @@ -203,47 +212,18 @@ class Logger { const mes = this.prepareMessage(msgType, message); - for (let i = 0; i < this.handlers.length; i++) - this.handlers[i].log(msgType, mes); + if (!this.closed) { + for (let i = 0; i < this.handlers.length; i++) + this.handlers[i].log(msgType, mes); + } else { + console.log(mes); + } } - close() { + async close() { for (let i = 0; i < this.handlers.length; i++) - this.handlers[i].close(); - } - - cleanup(callback) { - // attach user callback to the process event emitter - // if no callback, it will still exit gracefully on Ctrl-C - callback = callback || (() => {}); - process.on('cleanup', callback); - - // do app specific cleaning before exiting - process.on('exit', () => { - this.close(); - process.emit('cleanup'); - }); - - // catch ctrl+c event and exit normally - process.on('SIGINT', () => { - this.log(LM_FATAL, 'Ctrl-C pressed, exiting...'); - process.exit(2); - }); - - process.on('SIGTERM', () => { - this.log(LM_FATAL, 'Kill signal, exiting...'); - process.exit(2); - }); - - //catch uncaught exceptions, trace, then exit normally - process.on('uncaughtException', e => { - try { - this.log(LM_FATAL, e.stack); - } catch (e) { - console.log(e.stack); - } - process.exit(99); - }); + await this.handlers[i].close(); + this.closed = true; } } diff --git a/server/core/Reader/JembaReaderStorage.js b/server/core/Reader/JembaReaderStorage.js new file mode 100644 index 00000000..8e7a00e8 --- /dev/null +++ b/server/core/Reader/JembaReaderStorage.js @@ -0,0 +1,122 @@ +const _ = require('lodash'); + +const utils = require('../utils'); +const JembaConnManager = require('../../db/JembaConnManager');//singleton + +let instance = null; + +//singleton +class JembaReaderStorage { + constructor() { + if (!instance) { + this.connManager = new JembaConnManager(); + this.db = this.connManager.db['reader-storage']; + this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа + + instance = this; + } + + return instance; + } + + async doAction(act) { + if (!_.isObject(act.items)) + throw new Error('items is not an object'); + + let result = {}; + switch (act.action) { + case 'check': + result = await this.checkItems(act.items); + break; + case 'get': + result = await this.getItems(act.items); + break; + case 'set': + result = await this.setItems(act.items, act.force); + break; + default: + throw new Error('Unknown action'); + } + + return result; + } + + async checkItems(items) { + let result = {state: 'success', items: {}}; + + const db = this.db; + + for (const id of Object.keys(items)) { + if (this.cache[id]) { + result.items[id] = this.cache[id]; + } else { + const rows = await db.select({//SQL`SELECT rev FROM storage WHERE id = ${id}` + table: 'storage', + map: '(r) => ({rev: r.rev})', + where: `@@id(${db.esc(id)})` + }); + const rev = (rows.length && rows[0].rev ? rows[0].rev : 0); + result.items[id] = {rev}; + this.cache[id] = result.items[id]; + } + } + + return result; + } + + async getItems(items) { + let result = {state: 'success', items: {}}; + + const db = this.db; + + for (const id of Object.keys(items)) { + const rows = await db.select({//SQL`SELECT rev, data FROM storage WHERE id = ${id}`); + table: 'storage', + where: `@@id(${db.esc(id)})` + }); + const rev = (rows.length && rows[0].rev ? rows[0].rev : 0); + const data = (rows.length && rows[0].data ? rows[0].data : ''); + result.items[id] = {rev, data}; + } + + return result; + } + + async setItems(items, force) { + let check = await this.checkItems(items); + + //сначала проверим совпадение ревизий + for (const id of Object.keys(items)) { + if (!_.isString(items[id].data)) + throw new Error('items.data is not a string'); + + if (!force && check.items[id].rev + 1 !== items[id].rev) + return {state: 'reject', items: check.items}; + } + + const db = this.db; + const newRev = {}; + for (const id of Object.keys(items)) { + await db.insert({//SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`); + table: 'storage', + replace: true, + rows: [{id, rev: items[id].rev, time: utils.toUnixTime(Date.now()), data: items[id].data}], + }); + newRev[id] = {rev: items[id].rev}; + } + + Object.assign(this.cache, newRev); + + return {state: 'success'}; + } + + periodicCleanCache(timeout) { + this.cache = {}; + + setTimeout(() => { + this.periodicCleanCache(timeout); + }, timeout); + } +} + +module.exports = JembaReaderStorage; \ No newline at end of file diff --git a/server/core/Reader/ReaderStorage.js b/server/core/Reader/ReaderStorage.js deleted file mode 100644 index d3828102..00000000 --- a/server/core/Reader/ReaderStorage.js +++ /dev/null @@ -1,126 +0,0 @@ -const SQL = require('sql-template-strings'); -const _ = require('lodash'); - -const ConnManager = require('../../db/ConnManager');//singleton - -let instance = null; - -//singleton -class ReaderStorage { - constructor() { - if (!instance) { - this.connManager = new ConnManager(); - this.storagePool = this.connManager.pool.readerStorage; - this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа - - instance = this; - } - - return instance; - } - - async doAction(act) { - if (!_.isObject(act.items)) - throw new Error('items is not an object'); - - let result = {}; - switch (act.action) { - case 'check': - result = await this.checkItems(act.items); - break; - case 'get': - result = await this.getItems(act.items); - break; - case 'set': - result = await this.setItems(act.items, act.force); - break; - default: - throw new Error('Unknown action'); - } - - return result; - } - - async checkItems(items) { - let result = {state: 'success', items: {}}; - - const dbh = await this.storagePool.get(); - try { - for (const id of Object.keys(items)) { - if (this.cache[id]) { - result.items[id] = this.cache[id]; - } else { - const rows = await dbh.all(SQL`SELECT rev FROM storage WHERE id = ${id}`); - const rev = (rows.length && rows[0].rev ? rows[0].rev : 0); - result.items[id] = {rev}; - this.cache[id] = result.items[id]; - } - } - } finally { - dbh.ret(); - } - - return result; - } - - async getItems(items) { - let result = {state: 'success', items: {}}; - - const dbh = await this.storagePool.get(); - try { - for (const id of Object.keys(items)) { - const rows = await dbh.all(SQL`SELECT rev, data FROM storage WHERE id = ${id}`); - const rev = (rows.length && rows[0].rev ? rows[0].rev : 0); - const data = (rows.length && rows[0].data ? rows[0].data : ''); - result.items[id] = {rev, data}; - } - } finally { - dbh.ret(); - } - - return result; - } - - async setItems(items, force) { - let check = await this.checkItems(items); - - //сначала проверим совпадение ревизий - for (const id of Object.keys(items)) { - if (!_.isString(items[id].data)) - throw new Error('items.data is not a string'); - - if (!force && check.items[id].rev + 1 !== items[id].rev) - return {state: 'reject', items: check.items}; - } - - const dbh = await this.storagePool.get(); - await dbh.run('BEGIN'); - try { - const newRev = {}; - for (const id of Object.keys(items)) { - await dbh.run(SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`); - newRev[id] = {rev: items[id].rev}; - } - await dbh.run('COMMIT'); - - Object.assign(this.cache, newRev); - } catch (e) { - await dbh.run('ROLLBACK'); - throw e; - } finally { - dbh.ret(); - } - - return {state: 'success'}; - } - - periodicCleanCache(timeout) { - this.cache = {}; - - setTimeout(() => { - this.periodicCleanCache(timeout); - }, timeout); - } -} - -module.exports = ReaderStorage; \ No newline at end of file diff --git a/server/core/utils.js b/server/core/utils.js index 587b72bf..18017223 100644 --- a/server/core/utils.js +++ b/server/core/utils.js @@ -38,6 +38,10 @@ function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } +function toUnixTime(time) { + return parseInt(time/1000); +} + function randomHexString(len) { return crypto.randomBytes(len).toString('hex') } @@ -126,6 +130,7 @@ module.exports = { bufferRemoveZeroes, getFileHash, sleep, + toUnixTime, randomHexString, touchFile, spawnProcess, diff --git a/server/db/ConnManager.js b/server/db/ConnManager.js index 0e8af17e..e31a601a 100644 --- a/server/db/ConnManager.js +++ b/server/db/ConnManager.js @@ -1,3 +1,4 @@ +//TODO: удалить модуль в 2023г const fs = require('fs-extra'); const SqliteConnectionPool = require('./SqliteConnectionPool'); diff --git a/server/db/Converter.js b/server/db/Converter.js new file mode 100644 index 00000000..8f68c8b9 --- /dev/null +++ b/server/db/Converter.js @@ -0,0 +1,42 @@ +//TODO: удалить модуль в 2023г +const fs = require('fs-extra'); +const log = new (require('../core/AppLogger'))().log;//singleton + +class Converter { + async run(config) { + log('Converter start'); + + try { + const connManager = new (require('./ConnManager'))();//singleton + const storagePool = connManager.pool.readerStorage; + + const jembaConnManager = new (require('./JembaConnManager'))();//singleton + const db = jembaConnManager.db['reader-storage']; + + const srcDbPath = `${config.dataDir}/reader-storage.sqlite`; + if (!await fs.pathExists(srcDbPath)) { + log(LM_WARN, ' Source DB does not exist, nothing to do'); + return; + } + + const rows = await db.select({table: 'storage', count: true}); + if (rows.length && rows[0].count != 0) { + log(LM_WARN, ` Destination table already exists (found ${rows[0].count} items), nothing to do`); + return; + } + + const dbSrc = await storagePool.get(); + try { + const rows = await dbSrc.all(`SELECT * FROM storage`); + await db.insert({table: 'storage', rows}); + log(` Inserted ${rows.length} items`); + } finally { + dbSrc.ret(); + } + } finally { + log('Converter finish'); + } + } +} + +module.exports = Converter; \ No newline at end of file diff --git a/server/db/JembaConnManager.js b/server/db/JembaConnManager.js new file mode 100644 index 00000000..d3a2dd57 --- /dev/null +++ b/server/db/JembaConnManager.js @@ -0,0 +1,181 @@ +const fs = require('fs-extra'); +const _ = require('lodash'); + +const ayncExit = new (require('../core/AsyncExit'))();//singleton +const { JembaDb, JembaDbThread } = require('./JembaDb'); +const log = new (require('../core/AppLogger'))().log;//singleton + +const jembaMigrations = require('./jembaMigrations'); + +let instance = null; + +//singleton +class JembaConnManager { + constructor() { + if (!instance) { + this.inited = false; + + instance = this; + } + + return instance; + } + + async init(config, forceAutoRepair = false, migs = jembaMigrations, undoLastMigration = false) { + if (this.inited) + throw new Error('JembaConnManager initialized already'); + + this.config = config; + this._db = {}; + + for (const dbConfig of this.config.jembaDb) { + const dbPath = `${this.config.dataDir}/db/${dbConfig.dbName}`; + + //бэкап + if (!dbConfig.noBak && await fs.pathExists(dbPath)) { + const bakFile = `${dbPath}.bak`; + await fs.remove(bakFile); + await fs.copy(dbPath, bakFile); + } + + let dbConn = null; + if (dbConfig.thread) { + dbConn = new JembaDbThread(); + } else { + dbConn = new JembaDb(); + } + + log(`Open "${dbConfig.dbName}" begin`); + await dbConn.openDb({dbPath, cacheSize: dbConfig.cacheSize, compressed: dbConfig.compressed, forceFileClosing: dbConfig.forceFileClosing}); + + if (dbConfig.openAll) { + try { + await dbConn.openAll(); + } catch(e) { + if ((forceAutoRepair || dbConfig.autoRepair) && + ( + e.message.indexOf('corrupted') >= 0 + || e.message.indexOf('Unexpected token') >= 0 + || e.message.indexOf('invalid stored block lengths') >= 0 + ) + ) { + log(LM_ERR, e); + log(`Open "${dbConfig.dbName}" with auto repair`); + await dbConn.openAll({autoRepair: true}); + } else { + throw e; + } + } + } + + log(`Open "${dbConfig.dbName}" end`); + + //миграции + const mig = migs[dbConfig.dbName]; + if (mig && mig.data) { + const applied = await this.migrate(dbConn, mig.data, mig.table, undoLastMigration); + if (applied.length) + log(`${applied.length} migrations applied to "${dbConfig.dbName}"`); + } + + this._db[dbConfig.dbName] = dbConn; + } + + ayncExit.add(this.close.bind(this)); + + this.inited = true; + } + + async close() { + if (!this.inited) + return; + + for (const dbConfig of this.config.jembaDb) { + await this._db[dbConfig.dbName].closeDb(); + } + + this._db = {}; + this.inited = false; + } + + async migrate(db, migs, table, undoLastMigration) { + const migrations = _.cloneDeep(migs).sort((a, b) => a.id - b.id); + + if (!migrations.length) { + throw new Error('No migration data'); + } + + migrations.map(migration => { + const data = migration.data; + if (!data.up || !data.down) { + throw new Error(`The ${migration.id}:${migration.name} does not contain 'up' or 'down' instructions`); + } else { + migration.up = data.up; + migration.down = data.down; + } + delete migration.data; + }); + + // Create a database table for migrations meta data if it doesn't exist + // id, name, up, down + await db.create({ + table, + quietIfExists: true, + }); + + // Get the list of already applied migrations + let dbMigrations = await db.select({ + table, + sort: '(a, b) => a.id - b.id' + }); + + const execUpDown = async(items) => { + for (const item of items) { + const action = item[0]; + await db[action](item[1]); + } + }; + + // Undo migrations that exist only in the database but not in migs, + // also undo the last migration if the undoLastMigration + const lastMigration = migrations[migrations.length - 1]; + for (const migration of dbMigrations.slice().sort((a, b) => b.id - a.id)) { + if (!migrations.some(x => x.id === migration.id) || + (undoLastMigration && migration.id === lastMigration.id)) { + await execUpDown(migration.down); + await db.delete({ + table, + where: `@@id(${db.esc(migration.id)})` + }); + dbMigrations = dbMigrations.filter(x => x.id !== migration.id); + } else { + break; + } + } + + // Apply pending migrations + let applied = []; + const lastMigrationId = dbMigrations.length ? dbMigrations[dbMigrations.length - 1].id : 0; + for (const migration of migrations) { + if (migration.id > lastMigrationId) { + await execUpDown(migration.up); + await db.insert({ + table, + rows: [migration], + }); + applied.push(migration.id); + } + } + + return applied; + } + + get db() { + if (!this.inited) + throw new Error('JembaConnManager not inited'); + + return this._db; + } +} + +module.exports = JembaConnManager; \ No newline at end of file diff --git a/server/db/JembaDb/JembaDb.js b/server/db/JembaDb/JembaDb.js new file mode 100644 index 00000000..004b3236 --- /dev/null +++ b/server/db/JembaDb/JembaDb.js @@ -0,0 +1,536 @@ +const fs = require('fs').promises; + +const Table = require('./Table'); +const utils = require('./utils'); + +/* API methods: +openDb +closeDb + +create +drop + +open +openAll +close +closeAll + +tableExists +getDbInfo +getDbSize + +select +insert +update +delete + +esc +*/ + +class JembaDb { + constructor() { + this.opened = false; + } + + /* + query = { + dbPath: String, + //table open defaults + inMemory: Boolean, false + cacheSize: Number, 5 + compressed: Number, {0..9}, 0 + recreate: Boolean, false, + autoRepair: Boolean, false, + forceFileClosing: Boolean, false, + lazyOpen: Boolean, false, + } + */ + async openDb(query = {}) { + if (this.opened) + throw new Error(`Database ${this.dbPath} has already been opened`); + + if (!query.dbPath) + throw new Error(`'query.dbPath' parameter is required`); + + this.dbPath = query.dbPath; + await fs.mkdir(this.dbPath, { recursive: true }); + + this.table = new Map(); + this.tableOpenDefaults = { + inMemory: query.inMemory, + cacheSize: query.cacheSize, + compressed: query.compressed, + recreate: query.recreate, + autoRepair: query.autoRepair, + forceFileClosing: query.forceFileClosing, + lazyOpen: query.lazyOpen, + }; + + this.opened = true; + } + + async closeDb() { + if (!this.opened) + return; + + await this.closeAll(); + this.opened = false; + + //console.log('closed'); + } + + checkOpened() { + if (!this.opened) + throw new Error('Database closed'); + } + + /* + query = { + table: 'tableName', + quietIfExists: Boolean, + inMemory: Boolean, false + cacheSize: Number, 5 + compressed: Number, {0..9}, 0 + recreate: Boolean, false, + autoRepair: Boolean, false, + forceFileClosing: Boolean, false, + lazyOpen: Boolean, false, + + in: 'tableName', + flag: Object || Array, {name: 'flag1', check: '(r) => r.id > 10'} + hash: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false} + index: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false} + } + result = {} + */ + async create(query = {}) { + this.checkOpened(); + + if ((!query.table && !query.in) || (query.table && query.in)) + throw new Error(`One of 'query.table' or 'query.in' parameters is required, but not both`); + + let table; + if (query.table) { + if (await this.tableExists({table: query.table})) { + if (!query.quietIfExists) + throw new Error(`Table '${query.table}' already exists`); + + table = this.table.get(query.table); + } else { + table = new Table(); + this.table.set(query.table, table); + + await this.open(query); + } + } else { + if (await this.tableExists({table: query.in})) { + table = this.table.get(query.in); + } else { + throw new Error(`Table '${query.in}' does not exist`); + } + } + + if (query.flag || query.hash || query.index) { + await table.create({ + quietIfExists: query.quietIfExists, + flag: query.flag, + hash: query.hash, + index: query.index, + }); + } + + return {}; + } + + /* + query = { + table: 'tableName', + + in: 'tableName', + flag: Object || Array, {name: 'flag1'} + hash: Object || Array, {field: 'field1'} + index: Object || Array, {field: 'field1'} + } + result = {} + */ + async drop(query = {}) { + this.checkOpened(); + + if ((!query.table && !query.in) || (query.table && query.in)) + throw new Error(`One of 'query.table' or 'query.in' parameters is required, but not both`); + + if (query.table) { + if (await this.tableExists({table: query.table})) { + const table = this.table.get(query.table); + if (table && table.opened) { + await table.close(); + } + + const basePath = `${this.dbPath}/${query.table}`; + await fs.rmdir(basePath, { recursive: true }); + + this.table.delete(query.table); + } else { + throw new Error(`Table '${query.table}' does not exist`); + } + } else { + if (await this.tableExists({table: query.in})) { + const table = this.table.get(query.in); + + if (table) { + if (query.flag || query.hash || query.index) { + await table.drop({ + flag: query.flag, + hash: query.hash, + index: query.index, + }); + } + } else { + throw new Error(`Table '${query.in}' has not been opened yet`); + } + } else { + throw new Error(`Table '${query.in}' does not exist`); + } + } + + return {}; + } + + /* + query = { + (!) table: 'tableName', + inMemory: Boolean, false + cacheSize: Number, 5 + compressed: Number, {0..9}, 0 + recreate: Boolean, false, + autoRepair: Boolean, false, + forceFileClosing: Boolean, false, + lazyOpen: Boolean, false, + } + */ + async open(query = {}) { + this.checkOpened(); + + if (!query.table) + throw new Error(`'query.table' parameter is required`); + + if (await this.tableExists({table: query.table})) { + let table = this.table.get(query.table); + + if (!table) { + table = new Table(); + } + + if (!table.opened) { + const opts = Object.assign({}, this.tableOpenDefaults, query); + opts.tablePath = `${this.dbPath}/${query.table}`; + await table.open(opts); + } + + this.table.set(query.table, table); + } else { + throw new Error(`Table '${query.table}' does not exist`); + } + } + + + async _getTableList() { + const result = []; + const files = await fs.readdir(this.dbPath, { withFileTypes: true }); + + for (const file of files) { + if (file.isDirectory()) { + if (file.name.indexOf('___temporary_recreating') >= 0) + continue; + result.push(file.name); + } + } + + return result; + } + + /* + query = { + inMemory: Boolean, false + cacheSize: Number, 5 + compressed: Number, {0..9}, 0 + recreate: Boolean, false, + autoRepair: Boolean, false, + forceFileClosing: Boolean, false, + lazyOpen: Boolean, false, + } + */ + async openAll(query = {}) { + this.checkOpened(); + + const tables = await this._getTableList(); + + //sequentially + for (const table of tables) { + this.checkOpened(); + await this.open(Object.assign({}, query, {table})); + } + + /*const promises = []; + for (const table of tables) { + promises.push(this.open(Object.assign({}, query, {table}))); + } + await Promise.all(promises);*/ + } + + /* + query = { + (!) table: 'tableName', + } + */ + async close(query = {}) { + this.checkOpened(); + + if (!query.table) + throw new Error(`'query.table' parameter is required`); + + if (await this.tableExists({table: query.table})) { + let table = this.table.get(query.table); + + if (table) { + await table.close(); + } + + this.table.delete(query.table); + } else { + throw new Error(`Table '${query.table}' does not exist`); + } + } + + async closeAll() { + this.checkOpened(); + + const promises = []; + for (const table of this.table.keys()) { + promises.push(this.close({table})); + } + await Promise.all(promises); + } + + /* + query = { + (!) table: 'tableName' + }, + result = Boolean + */ + async tableExists(query = {}) { + this.checkOpened(); + + if (!query.table) + throw new Error(`'query.table' parameter is required`); + + if (this.table.has(query.table)) + return true; + + if (await utils.pathExists(`${this.dbPath}/${query.table}`)) + return true; + + return false; + } + + /* + query = { + table: 'tableName' + }, + result = { + dbPath: String, + tableName1: {opened: Boolean, ...}, + tableName2: {opened: Boolean, ...}, + ... + } + */ + async getDbInfo(query = {}) { + this.checkOpened(); + + const tables = await this._getTableList(); + + const result = {dbPath: this.dbPath}; + for (const table of tables) { + if (!query.table || (query.table && table == query.table)) { + const tableInstance = this.table.get(table); + if (tableInstance && tableInstance.opened) { + result[table] = await tableInstance.getMeta(); + result[table].opened = true; + } else { + result[table] = {opened: false}; + } + } + } + return result; + } + + /* + result = { + total: Number, + tables: { + tableName1: Number, + tableName2: Number, + ... + } + } + */ + async getDbSize() { + this.checkOpened(); + + const dirs = await fs.readdir(this.dbPath, { withFileTypes: true }); + + const result = {total: 0, tables: {}}; + for (const dir of dirs) { + if (dir.isDirectory()) { + const table = dir.name; + const tablePath = `${this.dbPath}/${table}`; + const files = await fs.readdir(tablePath, { withFileTypes: true }); + + if (!result.tables[table]) + result.tables[table] = 0; + + for (const file of files) { + if (file.isFile()) { + let size = 0; + try { + size = (await fs.stat(`${tablePath}/${file.name}`)).size; + } catch(e) { + // + } + result.tables[table] += size; + result.total += size; + } + } + } + } + + return result; + } + + /* + query = { + (!) table: 'tableName', + distinct: 'fieldName' || Array, + count: Boolean, + map: '(r) => ({id1: r.id, ...})', + where: `@@index('field1', 10, 20)`, + sort: '(a, b) => a.id - b.id', + limit: 10, + offset: 10, + } + result = Array + */ + async select(query = {}) { + this.checkOpened(); + + if (!query.table) + throw new Error(`'query.table' parameter is required`); + + const table = this.table.get(query.table); + if (table) { + return await table.select(query); + } else { + if (await this.tableExists({table: query.table})) { + throw new Error(`Table '${query.table}' has not been opened yet`); + } else { + throw new Error(`Table '${query.table}' does not exist`); + } + } + } + + /* + query = { + (!) table: 'tableName', + replace: Boolean, + (!) rows: Array, + } + result = { + (!) inserted: Number, + (!) replaced: Number, + } + */ + async insert(query = {}) { + this.checkOpened(); + + if (!query.table) + throw new Error(`'query.table' parameter is required`); + + const table = this.table.get(query.table); + if (table) { + return await table.insert(query); + } else { + if (await this.tableExists({table: query.table})) { + throw new Error(`Table '${query.table}' has not been opened yet`); + } else { + throw new Error(`Table '${query.table}' does not exist`); + } + } + } + + /* + query = { + (!) table: 'tableName', + (!) mod: '(r) => r.count++', + where: `@@index('field1', 10, 20)`, + sort: '(a, b) => a.id - b.id', + limit: 10, + offset: 10, + } + result = { + (!) updated: Number, + } + */ + async update(query = {}) { + this.checkOpened(); + + if (!query.table) + throw new Error(`'query.table' parameter is required`); + + const table = this.table.get(query.table); + if (table) { + return await table.update(query); + } else { + if (await this.tableExists({table: query.table})) { + throw new Error(`Table '${query.table}' has not been opened yet`); + } else { + throw new Error(`Table '${query.table}' does not exist`); + } + } + } + + /* + query = { + (!) table: 'tableName', + where: `@@index('field1', 10, 20)`, + sort: '(a, b) => a.id - b.id', + limit: 10, + offset: 10, + } + result = { + (!) deleted: Number, + } + */ + async delete(query = {}) { + this.checkOpened(); + + if (!query.table) + throw new Error(`'query.table' parameter is required`); + + const table = this.table.get(query.table); + if (table) { + return await table.delete(query); + } else { + if (await this.tableExists({table: query.table})) { + throw new Error(`Table '${query.table}' has not been opened yet`); + } else { + throw new Error(`Table '${query.table}' does not exist`); + } + } + } + + esc(obj) { + return utils.esc(obj); + } +} + +module.exports = JembaDb; \ No newline at end of file diff --git a/server/db/JembaDb/JembaDbChild.js b/server/db/JembaDb/JembaDbChild.js new file mode 100644 index 00000000..b97ed98b --- /dev/null +++ b/server/db/JembaDb/JembaDbChild.js @@ -0,0 +1,49 @@ +const { parentPort } = require('worker_threads'); + +const JembaDb = require('./JembaDb'); + +const db = new JembaDb(); + +if (parentPort) { + parentPort.on('message', async(mes) => { + let result = {}; + try { + if (db[mes.action]) + result.result = await db[mes.action](mes.query); + else + result = {error: 'Action not found: ' + mes.action}; + } catch (e) { + result = {error: e.message}; + } + + result.requestId = mes.requestId; + parentPort.postMessage(result); + }); +} + +//This is for proper working of pkg (by zeit) and worker_threads +//just a copy of the above code as a string +module.exports = ` +const { parentPort } = require('worker_threads'); + +const JembaDb = require('./JembaDb'); + +const db = new JembaDb(); + +if (parentPort) { + parentPort.on('message', async(mes) => { + let result = {}; + try { + if (db[mes.action]) + result.result = await db[mes.action](mes.query); + else + result = {error: 'Action not found: ' + mes.action}; + } catch (e) { + result = {error: e.message}; + } + + result.requestId = mes.requestId; + parentPort.postMessage(result); + }); +} +`.replace('./JembaDb', `${__dirname.replace(/\\/g, '/')}/JembaDb`); \ No newline at end of file diff --git a/server/db/JembaDb/JembaDbThread.js b/server/db/JembaDb/JembaDbThread.js new file mode 100644 index 00000000..df3a9b7e --- /dev/null +++ b/server/db/JembaDb/JembaDbThread.js @@ -0,0 +1,119 @@ +const { Worker } = require('worker_threads'); +const utils = require('./utils'); +const JembaDbChild = require('./JembaDbChild'); +/* API methods: +openDb +closeDb + +create +drop + +open +openAll +close +closeAll + +tableExists +getInfo +getDbSize + +select +insert +update +delete + +esc +*/ + +class JembaDbThread { + constructor() { + this.worker = null; + this.listeners = new Map(); + this.requestId = 0; + + const apiMethods = [ + 'create', 'drop', 'open', 'openAll', 'close', 'closeAll', + 'tableExists', 'getDbInfo', 'getDbSize', 'select', 'insert', 'update', 'delete', 'dumpTables' + ]; + + for (const action of apiMethods) { + this[action] = async(query) => this._action(action, query); + } + } + + _terminate() { + if (this.worker) { + for (const listener of this.listeners.values()) { + listener({error: 'Worker terminated'}); + } + this.worker.terminate(); + } + this.worker = null; + } + + _runWoker() { + //const worker = new Worker(`${__dirname}/JembaDbChild.js`); + const worker = new Worker(JembaDbChild, {eval: true}); + + worker.on('message', (mes) => { + const listener = this.listeners.get(mes.requestId); + if (listener) + listener(mes); + }); + + worker.on('error', (err) => { + console.error(err); + }); + + worker.on('exit', () => { + this._terminate(); + }); + + this.worker = worker; + } + + _action(action, query) { + return new Promise((resolve, reject) => { + this.requestId++; + + const requestId = this.requestId; //!!! + this.listeners.set(requestId, (mes) => { + this.listeners.delete(requestId); + + if (mes.error) + reject(new Error(mes.error)); + else + resolve(mes.result); + }); + + if (this.worker) { + this.worker.postMessage({requestId: this.requestId, action, query}); + } else { + reject(new Error('Worker does not exist (database closed?)')); + } + }); + } + + async openDb(query = {}) { + if (!this.worker) { + this._runWoker(); + } else { + throw new Error('Worker has been created already'); + } + + return this._action('openDb', query); + } + + async closeDb() { + const result = await this._action('closeDb'); + this._terminate(); + //console.log('DB closed'); + return result; + } + + esc(obj) { + return utils.esc(obj); + } +} + +module.exports = JembaDbThread; \ No newline at end of file diff --git a/server/db/JembaDb/LockQueue.js b/server/db/JembaDb/LockQueue.js new file mode 100644 index 00000000..9ad6ff65 --- /dev/null +++ b/server/db/JembaDb/LockQueue.js @@ -0,0 +1,38 @@ +class LockQueue { + constructor(queueSize) { + this.queueSize = queueSize; + this.freed = true; + this.waitingQueue = []; + } + + ret() { + this.freed = true; + if (this.waitingQueue.length) { + this.waitingQueue.shift().onFreed(); + } + } + + get(take = true) { + return new Promise((resolve) => { + if (this.freed) { + if (take) + this.freed = false; + resolve(); + return; + } + + if (this.waitingQueue.length >= this.queueSize) + throw new Error('Lock queue is too long'); + + this.waitingQueue.push({ + onFreed: () => { + if (take) + this.freed = false; + resolve(); + }, + }); + }); + } +} + +module.exports = LockQueue; \ No newline at end of file diff --git a/server/db/JembaDb/Table.js b/server/db/JembaDb/Table.js new file mode 100644 index 00000000..599670c4 --- /dev/null +++ b/server/db/JembaDb/Table.js @@ -0,0 +1,852 @@ +const fs = require('fs').promises; +const utils = require('./utils'); + +const TableReducer = require('./TableReducer'); +const TableRowsMem = require('./TableRowsMem'); +const TableRowsFile = require('./TableRowsFile'); +const LockQueue = require('./LockQueue'); + +const maxChangesLength = 10; + +class Table { + constructor() { + this.rowsInterface = new TableRowsMem(); + + this.autoIncrement = 0; + this.fileError = ''; + + this.openingLock = new LockQueue(100); + this.lock = new LockQueue(100); + + this.opened = false; + this.closed = false; + this.deltaStep = 0; + this.changes = []; + + //table options defaults + this.inMemory = false; + this.compressed = 0; + this.cacheSize = 5; + this.compressed = 0; + this.recreate = false; + this.autoRepair = false; + this.forceFileClosing = false; + } + + checkErrors() { + if (this.fileError) + throw new Error(this.fileError); + + if (this.closed) + throw new Error('Table closed'); + + if (!this.opened) + throw new Error('Table has not been opened yet'); + } + + async waitForSaveChanges() { + if (this.changes.length > maxChangesLength) { + let i = this.changes.length - maxChangesLength; + while (i > 0 && this.changes.length > maxChangesLength) { + i--; + await utils.sleep(10); + } + } + } + + async recreateTable() { + const tempTablePath = `${this.tablePath}___temporary_recreating`; + await fs.rmdir(tempTablePath, { recursive: true }); + await fs.mkdir(tempTablePath, { recursive: true }); + + const tableRowsFileSrc = new TableRowsFile(this.tablePath, this.cacheSize); + + const tableRowsFileDest = new TableRowsFile(tempTablePath, this.cacheSize, this.compressed); + const reducerDest = new TableReducer(false, tempTablePath, this.compressed, tableRowsFileDest); + + try { + await tableRowsFileSrc.loadCorrupted(); + } catch (e) { + console.error(e); + } + try { + await reducerDest._load(true, `${this.tablePath}/meta.0`); + } catch (e) { + console.error(e); + } + + const putRows = async(rows) => { + const oldRows = []; + const newRows = []; + const newRowsStr = []; + //checks + for (const row of rows) { + if (!row) { + continue; + } + + const t = typeof(row.id); + if (t !== 'number' && t !== 'string') { + continue; + } + + const oldRow = await tableRowsFileDest.getRow(row.id); + + if (oldRow) { + continue; + } + + let str = ''; + try { + str = JSON.stringify(row);//because of stringify errors + } catch(e) { + continue; + } + + newRows.push(row); + oldRows.push({}); + newRowsStr.push(str); + } + + try { + //reducer + reducerDest._update(oldRows, newRows, 1); + + //insert + for (let i = 0; i < newRows.length; i++) { + const newRow = newRows[i]; + const newRowStr = newRowsStr[i]; + + tableRowsFileDest.setRow(newRow.id, newRow, newRowStr, 1); + } + + await tableRowsFileDest.saveDelta(1); + await reducerDest._saveDelta(1); + } catch(e) { + console.error(e); + } + }; + + let rows = []; + for (const id of tableRowsFileSrc.getAllIds()) { + if (this.closed) + throw new Error('Table closed'); + + let row = null; + try { + row = await tableRowsFileSrc.getRow(id); + } catch(e) { + console.error(e); + continue; + } + + rows.push(row); + if (rows.length > 1000) { + await putRows(rows); + rows = []; + } + } + if (rows.length) + await putRows(rows); + + await tableRowsFileDest.saveDelta(0); + + const delta = reducerDest._getDelta(0); + delta.dumpMeta = true; + await reducerDest._saveDelta(0); + + await tableRowsFileSrc.destroy(); + await reducerDest._destroy(); + await tableRowsFileDest.destroy(); + + await fs.writeFile(`${tempTablePath}/state`, '1'); + + await fs.rmdir(this.tablePath, { recursive: true }); + await fs.rename(tempTablePath, this.tablePath); + } + + /* + query: { + tablePath: String, + inMemory: Boolean, + cacheSize: Number, + compressed: Number, 0..9 + recreate: Boolean, false, + autoRepair: Boolean, false, + forceFileClosing: Boolean, false, + lazyOpen: Boolean, false, + } + */ + async _open(query = {}) { + if (this.opening) + return; + this.opening = true; + await this.openingLock.get(); + //console.log(query); + try { + if (this.opened) + throw new Error('Table has already been opened'); + if (this.closed) + throw new Error('Table instance has been destroyed. Please create a new one.'); + + this.inMemory = !!query.inMemory; + + if (this.inMemory) { + this.reducer = new TableReducer(this.inMemory, '', 0, this.rowsInterface); + } else { + if (!query.tablePath) + throw new Error(`'query.tablePath' parameter is required`); + + this.tablePath = query.tablePath; + this.cacheSize = query.cacheSize || 5; + this.compressed = query.compressed || 0; + this.recreate = query.recreate || false; + this.autoRepair = query.autoRepair || false; + this.forceFileClosing = query.forceFileClosing || false; + + await fs.mkdir(this.tablePath, { recursive: true }); + + this.tableRowsFile = new TableRowsFile(query.tablePath, this.cacheSize, this.compressed); + this.rowsInterface = this.tableRowsFile; + + this.reducer = new TableReducer(this.inMemory, this.tablePath, this.compressed, this.rowsInterface); + + const statePath = `${this.tablePath}/state`; + let state = null; + if (await utils.pathExists(statePath)) { + state = await fs.readFile(statePath, 'utf8'); + } + + if (state === null) {//check if other files exists + const files = await fs.readdir(this.tablePath); + if (files.length) + state = '0'; + } + + if (this.recreate) { + await this.recreateTable(); + state = '1'; + } + + if (state !== null) { + try { + if (state === '1') { + // load tableRowsFile & reducer + this.autoIncrement = await this.tableRowsFile.load(); + await this.reducer._load(); + } else { + throw new Error('Table corrupted') + } + } catch(e) { + if (this.autoRepair) { + console.error(e.message); + await this.recreateTable(); + } else { + throw e; + } + // load tableRowsFile & reducer + this.autoIncrement = await this.tableRowsFile.load(); + await this.reducer._load(); + } + } + } + + this.opened = true; + } catch(e) { + await this.close(); + const errMes = `Open table (${query.tablePath}): ${e.message}`; + if (!query.lazyOpen) + throw new Error(errMes); + else + this.fileError = errMes; + } finally { + this.openingLock.ret(); + this.opening = false; + } + } + + async open(query = {}) { + if (query.lazyOpen) { + this._open(query); + } else { + await this._open(query); + } + } + + async close() { + if (this.closed) + return; + + this.opened = false; + this.closed = true; + + if (!this.inMemory) { + while (this.savingChanges) { + await utils.sleep(10); + } + } + + //for GC + if (this.reducer) + await this.reducer._destroy(); + this.reducer = null; + + if (this.rowsInterface) + await this.rowsInterface.destroy(); + this.rowsInterface = null; + this.tableRowsFile = null; + } + + /* + query = { + quietIfExists: Boolean, + flag: Object || Array, {name: 'flag1', check: '(r) => r.id > 10'} + hash: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false} + index: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false} + } + result = {} + */ + async create(query) { + await this.openingLock.get(false); + this.checkErrors(); + + await this.lock.get(); + try { + this.deltaStep++; + try { + if (query.flag) { + for (const flag of utils.paramToArray(query.flag)) { + await this.reducer._addFlag(flag, query.quietIfExists, this.deltaStep); + } + } + + if (query.hash) { + for (const hash of utils.paramToArray(query.hash)) { + await this.reducer._addHash(hash, query.quietIfExists, this.deltaStep); + } + } + + if (query.index) { + for (const index of utils.paramToArray(query.index)) { + await this.reducer._addIndex(index, query.quietIfExists, this.deltaStep); + } + } + + this.changes.push([this.deltaStep, 1]); + } catch(e) { + this.changes.push([this.deltaStep, 0]); + throw e; + } + + return {}; + } finally { + this.saveChanges();//no await + this.lock.ret(); + } + } + + /* + query = { + flag: Object || Array, {name: 'flag1'} + hash: Object || Array, {field: 'field1'} + index: Object || Array, {field: 'field1'} + } + result = {} + */ + async drop(query) { + await this.openingLock.get(false); + this.checkErrors(); + + await this.lock.get(); + try { + this.deltaStep++; + try { + if (query.flag) { + for (const flag of utils.paramToArray(query.flag)) { + await this.reducer._delFlag(flag.name, this.deltaStep); + } + } + + if (query.hash) { + for (const hash of utils.paramToArray(query.hash)) { + await this.reducer._delHash(hash.field, this.deltaStep); + } + } + + if (query.index) { + for (const index of utils.paramToArray(query.index)) { + await this.reducer._delIndex(index.field, this.deltaStep); + } + } + + this.changes.push([this.deltaStep, 1]); + } catch(e) { + this.changes.push([this.deltaStep, 0]); + throw e; + } + + return {}; + } finally { + this.saveChanges();//no await + this.lock.ret(); + } + } + + /* + result = { + inMemory: Boolean, + flag: Array, [{name: 'flag1', check: '(r) => r.id > 10'}, ...] + hash: Array, [{field: 'field1', type: 'string', depth: 11, allowUndef: false}, ...] + index: Array, [{field: 'field1', type: 'string', depth: 11, allowUndef: false}, ...] + } + */ + async getMeta() { + this.checkErrors(); + + return { + inMemory: this.inMemory, + flag: this.reducer._listFlag(), + hash: this.reducer._listHash(), + index: this.reducer._listIndex(), + }; + } + + prepareWhere(where) { + if (typeof(where) !== 'string') + throw new Error('query.where must be a string'); + + return `async(__tr) => {${where.replace(/@@/g, 'return await __tr.').replace(/@/g, 'await __tr.')}}`; + } + + /* + query = { + distinct: 'fieldName' || Array, + count: Boolean, + map: '(r) => ({id1: r.id, ...})', + where: `@@index('field1', 10, 20)`, + sort: '(a, b) => a.id - b.id', + limit: 10, + offset: 10, + } + result = Array + */ + async select(query = {}) { + await this.openingLock.get(false); + this.checkErrors(); + + let ids;//iterator + if (query.where) { + const where = this.prepareWhere(query.where); + const whereFunc = new Function(`return ${where}`)(); + + ids = await whereFunc(this.reducer); + } else { + ids = this.rowsInterface.getAllIds(); + } + + let found = []; + + let distinct = () => true; + if (query.distinct) { + const distFields = (Array.isArray(query.distinct) ? query.distinct : [query.distinct]); + const dist = new Map(); + distinct = (row) => { + let uniq = ''; + for (const field of distFields) { + const value = row[field]; + uniq += `${(value === undefined ? '___' : '')}${field}:${value}`; + } + + if (dist.has(uniq)) + return false; + dist.set(uniq, true); + return true; + }; + } + + if (!query.where && !query.distinct && query.count) {//some optimization + found = [{count: this.rowsInterface.getAllIdsSize()}]; + } else {//full running + for (const id of ids) { + const row = await this.rowsInterface.getRow(id); + + if (row && distinct(row)) { + found.push(row); + } + } + + if (query.count) { + found = [{count: found.length}]; + } + } + + let result = []; + if (query.map) { + const mapFunc = new Function(`return ${query.map}`)(); + + for (const row of found) { + result.push(mapFunc(row)); + } + } else { + result = found; + } + + if (query.sort) { + const sortFunc = new Function(`return ${query.sort}`)(); + result.sort(sortFunc); + } + + if (query.hasOwnProperty('limit') || query.hasOwnProperty('offset')) { + const offset = query.offset || 0; + const limit = (query.hasOwnProperty('limit') ? query.limit : result.length); + result = result.slice(offset, offset + limit); + } + + return utils.cloneDeep(result); + } + + /* + query = { + replace: Boolean, + (!) rows: Array, + } + result = { + (!) inserted: Number, + (!) replaced: Number, + } + */ + async insert(query = {}) { + await this.openingLock.get(false); + this.checkErrors(); + + await this.lock.get(); + try { + if (!Array.isArray(query.rows)) { + throw new Error('query.rows must be an array'); + } + + const newRows = utils.cloneDeep(query.rows); + const replace = query.replace; + + //autoIncrement correction + for (const newRow of newRows) { + if (typeof(newRow.id) === 'number' && newRow.id >= this.autoIncrement) + this.autoIncrement = newRow.id + 1; + } + + const oldRows = []; + const newRowsStr = []; + //checks + for (const newRow of newRows) { + if (newRow.hasOwnProperty('___meta')) + throw new Error(`Use of field with name '___meta' is forbidden`); + + if (newRow.id === undefined) { + newRow.id = this.autoIncrement; + this.autoIncrement++; + } + + const t = typeof(newRow.id); + if (t !== 'number' && t !== 'string') { + throw new Error(`Row id bad type, 'number' or 'string' expected, got ${t}`); + } + + const oldRow = await this.rowsInterface.getRow(newRow.id); + + if (!replace && oldRow) { + throw new Error(`Record id:${newRow.id} already exists`); + } + + oldRows.push((oldRow ? oldRow : {})); + newRowsStr.push(JSON.stringify(newRow));//because of stringify errors + } + + const result = {inserted: 0, replaced: 0}; + this.deltaStep++; + try { + //reducer + this.reducer._update(oldRows, newRows, this.deltaStep); + + //insert + for (let i = 0; i < newRows.length; i++) { + const newRow = newRows[i]; + const newRowStr = newRowsStr[i]; + const oldRow = oldRows[i]; + + this.rowsInterface.setRow(newRow.id, newRow, newRowStr, this.deltaStep); + + if (oldRow.id !== undefined) + result.replaced++; + else + result.inserted++; + } + + this.changes.push([this.deltaStep, 1]); + } catch(e) { + this.changes.push([this.deltaStep, 0]); + throw e; + } + + await this.waitForSaveChanges(); + return result; + } finally { + this.saveChanges();//no await + this.lock.ret(); + } + } + + /* + query = { + (!) mod: '(r) => r.count++', + where: `@@index('field1', 10, 20)`, + sort: '(a, b) => a.id - b.id', + limit: 10, + offset: 10, + } + result = { + (!) updated: Number, + } + */ + async update(query = {}) { + await this.openingLock.get(false); + this.checkErrors(); + + await this.lock.get(); + try { + if (typeof(query.mod) !== 'string') { + throw new Error('query.mod must be a string'); + } + const modFunc = new Function(`return ${query.mod}`)(); + + //where + let ids;//iterator + if (query.where) { + const where = this.prepareWhere(query.where); + const whereFunc = new Function(`return ${where}`)(); + + ids = await whereFunc(this.reducer); + } else { + ids = this.rowsInterface.getAllIds(); + } + + //oldRows + let oldRows = []; + for (const id of ids) { + const oldRow = await this.rowsInterface.getRow(id); + + if (oldRow) { + oldRows.push(oldRow); + } + } + + if (query.sort) { + const sortFunc = new Function(`return ${query.sort}`)(); + oldRows.sort(sortFunc); + } + let newRows = utils.cloneDeep(oldRows); + + if (query.hasOwnProperty('limit') || query.hasOwnProperty('offset')) { + const offset = query.offset || 0; + const limit = (query.hasOwnProperty('limit') ? query.limit : newRows.length); + newRows = newRows.slice(offset, offset + limit); + oldRows = oldRows.slice(offset, offset + limit); + } + + //mod & checks + const context = {}; + const newRowsStr = []; + for (const newRow of newRows) { + modFunc(newRow, context); + + const t = typeof(newRow.id); + if (t !== 'number' && t !== 'string') { + throw new Error(`Row id bad type, 'number' or 'string' expected, got ${t}`); + } + + //autoIncrement correction + if (t === 'number' && newRow.id >= this.autoIncrement) + this.autoIncrement = newRow.id + 1; + + if (newRow.hasOwnProperty('___meta')) + throw new Error(`Use of field with name '___meta' is forbidden`); + + newRowsStr.push(JSON.stringify(newRow));//because of stringify errors + } + + this.deltaStep++; + const result = {updated: 0}; + try { + //reducer + this.reducer._update(oldRows, newRows, this.deltaStep); + + //replace + for (let i = 0; i < newRows.length; i++) { + const newRow = newRows[i]; + const newRowStr = newRowsStr[i]; + + // oldRow.id === newRow.id always here, so + this.rowsInterface.setRow(newRow.id, newRow, newRowStr, this.deltaStep); + + result.updated++; + } + + this.changes.push([this.deltaStep, 1]); + } catch(e) { + this.changes.push([this.deltaStep, 0]); + throw e; + } + + await this.waitForSaveChanges(); + return result; + } finally { + this.saveChanges();//no await + this.lock.ret(); + } + } + + /* + query = { + where: `@@index('field1', 10, 20)`, + sort: '(a, b) => a.id - b.id', + limit: 10, + offset: 10, + } + result = { + (!) deleted: Number, + } + */ + async delete(query = {}) { + await this.openingLock.get(false); + this.checkErrors(); + + await this.lock.get(); + try { + //where + let ids;//iterator + if (query.where) { + const where = this.prepareWhere(query.where); + const whereFunc = new Function(`return ${where}`)(); + + ids = await whereFunc(this.reducer); + } else { + ids = this.rowsInterface.getAllIds(); + } + + //oldRows + let oldRows = []; + let newRows = []; + for (const id of ids) { + const oldRow = await this.rowsInterface.getRow(id); + + if (oldRow) { + oldRows.push(oldRow); + newRows.push({}); + } + } + + if (query.sort) { + const sortFunc = new Function(`return ${query.sort}`)(); + oldRows.sort(sortFunc); + } + + if (query.hasOwnProperty('limit') || query.hasOwnProperty('offset')) { + const offset = query.offset || 0; + const limit = (query.hasOwnProperty('limit') ? query.limit : newRows.length); + newRows = newRows.slice(offset, offset + limit); + oldRows = oldRows.slice(offset, offset + limit); + } + + this.deltaStep++; + const result = {deleted: 0}; + try { + //reducer + this.reducer._update(oldRows, newRows, this.deltaStep); + + //delete + for (let i = 0; i < oldRows.length; i++) { + const oldRow = oldRows[i]; + + this.rowsInterface.deleteRow(oldRow.id, this.deltaStep); + + result.deleted++; + } + + this.changes.push([this.deltaStep, 1]); + } catch(e) { + this.changes.push([this.deltaStep, 0]); + throw e; + } + + await this.waitForSaveChanges(); + return result; + } finally { + this.saveChanges();//no await + this.lock.ret(); + } + } + + async saveState(state) { + await fs.writeFile(`${this.tablePath}/state`, state); + } + + async saveChanges() { + this.needSaveChanges = true; + if (this.savingChanges) + return; + + if (this.inMemory) { + this.changes = []; + return; + } + + try { + this.checkErrors(); + } catch(e) { + return; + } + + this.savingChanges = true; + try { + await utils.sleep(0); + + while (this.needSaveChanges) { + this.needSaveChanges = false; + + await this.saveState('0'); + while (this.changes.length) { + + const len = this.changes.length; + let i = 0; + while (i < len) { + const [deltaStep, isOk] = this.changes[i]; + i++; + + if (isOk) { + await this.tableRowsFile.saveDelta(deltaStep); + await this.reducer._saveDelta(deltaStep); + } else { + await this.tableRowsFile.cancelDelta(deltaStep); + await this.reducer._cancelDelta(deltaStep); + } + } + + this.changes = this.changes.slice(i); + } + await this.saveState('1'); + + if (this.forceFileClosing) { + await this.tableRowsFile.closeAllFiles(); + await this.reducer._closeAllFiles(); + } + } + } catch(e) { + console.error(e.message); + this.fileError = e.message; + } finally { + this.savingChanges = false; + } + } + +} + +module.exports = Table; \ No newline at end of file diff --git a/server/db/JembaDb/TableFlag.js b/server/db/JembaDb/TableFlag.js new file mode 100644 index 00000000..bc62058f --- /dev/null +++ b/server/db/JembaDb/TableFlag.js @@ -0,0 +1,22 @@ +class TableFlag { + constructor(checkCode) { + this.checkCode = checkCode; + this.checkFunc = eval(checkCode); + + this.flag = new Set(); + } + + add(row) { + if (this.checkFunc(row)) { + this.flag.add(row.id); + return true; + } + return false; + } + + del(row) { + this.flag.delete(row.id); + } +} + +module.exports = TableFlag; \ No newline at end of file diff --git a/server/db/JembaDb/TableHash.js b/server/db/JembaDb/TableHash.js new file mode 100644 index 00000000..e527953c --- /dev/null +++ b/server/db/JembaDb/TableHash.js @@ -0,0 +1,172 @@ +class TableHash { + //opts.type = 'string' || 'number' || 'number_as_string' + constructor(opts = {}) { + const type = opts.type || 'string'; + this.depth = opts.depth || 11; + this.allowUndef = opts.allowUndef || false; + this.unique = opts.unique || false; + + this.hash = new Map(); + + this.isNumber = (type === 'number' || type === 'number_as_string'); + this.numberAsString = (type === 'number_as_string'); + this.valueAsString = !this.isNumber || this.numberAsString; + } + + checkType(v) { + if (typeof(v) != 'number' && this.isNumber) + throw new Error(`Hashed value must be a number, got type:${typeof(v)}, value:${v}`); + + if (typeof(v) != 'string' && !this.isNumber) + throw new Error(`Hashed value must be a string, got type:${typeof(v)}, value:${v}`); + } + + prepareValue(v) { + let result = v; + if (this.numberAsString) { + result = v.toString().padStart(this.depth, '0'); + } + if (this.valueAsString && result.length > this.depth) + result = result.substring(0, this.depth); + return result; + } + + add(value, id) { + if (value === undefined && this.allowUndef) + return; + + this.checkType(value); + + value = this.prepareValue(value); + if (this.hash.has(value)) { + if (this.unique) { + const id_ = this.hash.get(value); + if (id_ !== id) { + throw new Error(`Collision for unique hash detected: value:${value}, id1:${id_}, id2:${id}`); + } + } else { + const ids = this.hash.get(value); + ids.add(id); + } + } else { + if (this.unique) { + this.hash.set(value, id); + } else { + const ids = new Set(); + this.hash.set(value, ids); + ids.add(id); + } + } + + return value; + } + + del(value, id) { + if (value === undefined && this.allowUndef) + return; + + this.checkType(value); + + value = this.prepareValue(value); + if (this.hash.has(value)) { + if (this.unique) { + const id_ = this.hash.get(value); + if (id_ === id) + this.hash.delete(value); + } else { + const ids = this.hash.get(value); + + ids.delete(id); + + if (!ids.size) { + this.hash.delete(value); + } + } + } + + return value; + } + + reduce(value) { + this.checkType(value); + + value = this.prepareValue(value); + let result; + if (this.hash.has(value)) { + if (this.unique) { + result = new Set(); + result.add(this.hash.get(value)); + } else { + result = this.hash.get(value); + } + } else { + result = new Set(); + } + + return result; + } + + min() { + let result = new Set(); + + let min = null; + let id = null; + for (const value of this.hash.keys()) { + if (value < min || min === null) { + min = value; + id = this.hash.get(min); + } + } + + if (id !== null) { + if (this.unique) + result.add(id); + else + result = id; + } + + return result; + } + + max() { + let result = new Set(); + + let max = null; + let id = null; + for (const value of this.hash.keys()) { + if (value > max || max === null) { + max = value; + id = this.hash.get(max); + } + } + + if (id !== null) { + if (this.unique) + result.add(id); + else + result = id; + } + + return result; + } + + iter(checkFunc) { + const result = new Set(); + for (const [value, ids] of this.hash.entries()) { + const checkResult = checkFunc(value); + if (checkResult === undefined) + break; + if (checkResult) { + if (this.unique) { + result.add(ids); + } else { + for (const id of ids) + result.add(id); + } + } + } + return result; + } +} + +module.exports = TableHash; \ No newline at end of file diff --git a/server/db/JembaDb/TableIndex.js b/server/db/JembaDb/TableIndex.js new file mode 100644 index 00000000..7419312e --- /dev/null +++ b/server/db/JembaDb/TableIndex.js @@ -0,0 +1,311 @@ +const utils = require('./utils'); + +class TableIndex { + //opts.type = 'string' || 'number' || 'number_as_string' + constructor(opts = {}) { + const type = opts.type || 'string'; + this.depth = opts.depth || 11; + this.allowUndef = opts.allowUndef || false; + this.unique = opts.unique || false; + + this.hash = new Map(); + this.sorted = [[]]; + this.delCount = 0; + + this.isNumber = (type === 'number' || type === 'number_as_string'); + this.numberAsString = (type === 'number_as_string'); + this.valueAsString = !this.isNumber || this.numberAsString; + + this.cmp = (a, b) => a.localeCompare(b); + if (type === 'number') { + this.cmp = (a, b) => a - b; + } else if (type === 'number_as_string') { + this.cmp = (a, b) => (a < b ? -1 : (a > b ? 1 : 0)); + } + } + + checkType(v) { + if (typeof(v) != 'number' && this.isNumber) + throw new Error(`Indexed value must be a number, got type:${typeof(v)}, value:${v}`); + + if (typeof(v) != 'string' && !this.isNumber) + throw new Error(`Indexed value must be a string, got type:${typeof(v)}, value:${v}`); + } + + prepareValue(v) { + let result = v; + if (this.numberAsString) { + result = v.toString().padStart(this.depth, '0'); + } + if (this.valueAsString && result.length > this.depth) + result = result.substring(0, this.depth); + return result; + } + + add(value, id) { + if (value === undefined && this.allowUndef) + return; + + this.checkType(value); + + value = this.prepareValue(value); + if (this.hash.has(value)) { + if (this.unique) { + const id_ = this.hash.get(value); + if (id_ !== id) { + throw new Error(`Collision for unique index detected: value:${value}, id1:${id_}, id2:${id}`); + } + } else { + const ids = this.hash.get(value); + ids.add(id); + } + } else { + if (this.unique) { + this.hash.set(value, id); + } else { + const ids = new Set(); + this.hash.set(value, ids); + ids.add(id); + } + + let s = this.sorted.length - 1; + const d = this.sorted[s]; + d.push(value); + + let i = d.length - 1; + //вставка + while (i > 0 && this.cmp(d[i], d[i - 1]) < 0) { + const v = d[i]; + d[i] = d[i - 1]; + d[i - 1] = v; + i--; + } + + if (d.length > 10) { + //слияние + while (s > 0 && this.sorted[s].length >= this.sorted[s - 1].length) { + const a = this.sorted.pop(); + const b = this.sorted.pop(); + const c = []; + let i = 0; + let j = 0; + while (i < a.length || j < b.length) { + if (i < a.length && (j === b.length || this.cmp(a[i], b[j]) <= 0)) { + c.push(a[i]); + i++; + } + if (j < b.length && (i === a.length || this.cmp(b[j], a[i]) <= 0)) { + c.push(b[j]); + j++; + } + } + this.sorted.push(c); + s--; + } + + this.sorted.push([]); + } + } + + return value; + } + + del(value, id, forceClean = false) { + if (value === undefined && this.allowUndef) + return; + + this.checkType(value); + + value = this.prepareValue(value); + if (this.hash.has(value)) { + if (this.unique) { + const id_ = this.hash.get(value); + if (id_ === id) { + this.hash.delete(value); + this.delCount++; + } + } else { + const ids = this.hash.get(value); + + ids.delete(id); + + if (!ids.size) { + this.hash.delete(value); + this.delCount++; + } + } + } + + if (this.delCount > (this.sorted[0].length >> 2) || forceClean) { + for (let s = 0; s < this.sorted.length; s++) { + const a = this.sorted[s]; + const b = []; + for (let i = 0; i < a.length; i++) { + if (this.hash.has(a[i])) + b.push(a[i]); + } + this.sorted[s] = b; + } + + this.sorted = this.sorted.filter(a => a.length); + if (!this.sorted.length) { + this.sorted = [[]] + } else { + this.sorted.sort((a, b) => b.length - a.length); + } + + this.delCount = 0; + } + + return value; + } + + reduce(from, to) { + const useFrom = (from !== undefined); + const useTo = (to !== undefined); + + if (useFrom) { + this.checkType(from); + from = this.prepareValue(from); + } + if (useTo) { + this.checkType(to); + to = this.prepareValue(to); + } + + const result = []; + for (let s = 0; s < this.sorted.length; s++) { + const a = this.sorted[s]; + if (!a.length) // на всякий случай + continue; + + let leftIndex = 0; + if (useFrom) { + //дихотомия + let left = 0; + let right = a.length - 1; + while (left < right) { + let mid = left + ((right - left) >> 1); + if (this.cmp(from, a[mid]) <= 0) + right = mid; + else + left = mid + 1; + } + + leftIndex = right; + if (this.cmp(from, a[right]) > 0) + leftIndex++; + } + + let rightIndex = a.length; + if (useTo) { + //дихотомия + let left = 0; + let right = a.length - 1; + while (left < right) { + let mid = right - ((right - left) >> 1); + if (this.cmp(to, a[mid]) >= 0) + left = mid; + else + right = mid - 1; + } + + rightIndex = left; + if (this.cmp(to, a[left]) >= 0) + rightIndex++; + } +//console.log(a, leftIndex, rightIndex); + if (this.unique) { + const ids = new Set(); + for (let i = leftIndex; i < rightIndex; i++) { + const value = a[i]; + if (this.hash.has(value)) { + ids.add(this.hash.get(value)); + } + } + result.push(ids); + } else { + for (let i = leftIndex; i < rightIndex; i++) { + const value = a[i]; + if (this.hash.has(value)) { + result.push(this.hash.get(value)); + } + } + } + } + + return utils.unionSet(result); + } + + min() { + let result = new Set(); + + let min = null; + let id = null; + for (let s = 0; s < this.sorted.length; s++) { + const a = this.sorted[s]; + if (!a.length) // на всякий случай + continue; + if (a[0] < min || min === null) { + min = a[0]; + id = this.hash.get(min); + } + } + + if (id !== null) { + if (this.unique) + result.add(id); + else + result = id; + } + + return result; + } + + max() { + let result = new Set(); + + let max = null; + let id = null; + for (let s = 0; s < this.sorted.length; s++) { + const a = this.sorted[s]; + if (!a.length) // на всякий случай + continue; + + const last = a.length - 1; + if (a[last] > max || max === null) { + max = a[last]; + id = this.hash.get(max); + } + } + + if (id !== null) { + if (this.unique) + result.add(id); + else + result = id; + } + + return result; + } + + iter(checkFunc) { + const result = new Set(); + for (const [value, ids] of this.hash.entries()) { + const checkResult = checkFunc(value); + if (checkResult === undefined) + break; + if (checkResult) { + if (this.unique) { + result.add(ids); + } else { + for (const id of ids) + result.add(id); + } + } + } + return result; + } +} + +module.exports = TableIndex; \ No newline at end of file diff --git a/server/db/JembaDb/TableReducer.js b/server/db/JembaDb/TableReducer.js new file mode 100644 index 00000000..41ef3444 --- /dev/null +++ b/server/db/JembaDb/TableReducer.js @@ -0,0 +1,1044 @@ +const fs = require('fs').promises; +const path = require('path'); + +const TableIndex = require('./TableIndex'); +const TableHash = require('./TableHash'); +const TableFlag = require('./TableFlag'); + +const utils = require('./utils'); + +const maxFileDumpSize = 2*1024*1024;//bytes + +class TableReducer { + constructor(inMemory, tablePath, compressed, rowsInterface) { + this._compressed = compressed || 0; + this._inMemory = inMemory; + this._tablePath = tablePath; + this._rowsInterface = rowsInterface; + + this._flag = new Map(); + this._index = new Map(); + this._hash = new Map(); + + this._deltas = new Map(); + this._fd = {};//file descriptors + } + + _getDelta(deltaStep) { + if (this._inMemory) + throw new Error('TableReducer: sometinhg wrong'); + + if (this._deltas.has(deltaStep)) { + return this._deltas.get(deltaStep); + } else { + const delta = { + flag: [], + index: [], + hash: [], + }; + this._deltas.set(deltaStep, delta); + return delta; + } + } + + _getFullPath(fileName) { + return `${this._tablePath}/${fileName}`; + } + + async _getNotExistingFileName(prefix) { + let i = 0; + while (1) {//eslint-disable-line no-constant-condition + i++; + const fileName = `${this._tablePath}/${prefix}${i}`; + if (!await utils.pathExists(fileName + '.0') && !await utils.pathExists(fileName + '.1')) + return path.basename(fileName); + } + } + + async _addFlag(opts, quietIfExists, deltaStep) { + const flagName = opts.name; + + if (!this._flag.has(flagName)) { + const flag = new TableFlag(opts.check); + for (const id of this._rowsInterface.getAllIds()) + flag.add(await this._rowsInterface.getRow(id)); + + if (this._inMemory) { + flag.meta = opts; + } else { + const fileName = await this._getNotExistingFileName('flag'); + await this._openFd(this._getFullPath(fileName) + '.1'); + flag.meta = Object.assign({}, opts, {fileName}); + + const delta = this._getDelta(deltaStep); + if (!delta.dumpFlag) + delta.dumpFlag = new Map(); + delta.dumpFlag.set(flagName, 1); + delta.dumpMeta = true; + } + + this._flag.set(flagName, flag); + } else { + if (!quietIfExists) + throw new Error(`Flag with name '${flagName}' already exists`); + } + } + + async _delFlag(flagName, deltaStep) { + if (this._flag.has(flagName)) { + if (!this._inMemory) { + const delta = this._getDelta(deltaStep); + delta.dumpMeta = true; + + const fileName = this._getFullPath((this._flag.get(flagName)).meta.fileName); + if (!delta.delFiles) + delta.delFiles = []; + delta.delFiles.push(fileName); + } + + this._flag.delete(flagName); + } else { + throw new Error(`Flag with name '${flagName}' does not exist`); + } + } + + _listFlag() { + const result = []; + for (const flag of this._flag.values()) { + result.push(flag.meta); + } + return result; + } + + async _addHash(opts, quietIfExists, deltaStep) { + const fieldName = opts.field; + + if (!this._hash.has(fieldName)) { + const hash = new TableHash(opts); + for (const id of this._rowsInterface.getAllIds()) { + const row = await this._rowsInterface.getRow(id); + hash.add(row[fieldName], id); + } + + if (this._inMemory) { + hash.meta = opts; + } else { + const fileName = await this._getNotExistingFileName('hash'); + await this._openFd(this._getFullPath(fileName) + '.1'); + hash.meta = Object.assign({}, opts, {fileName}); + + const delta = this._getDelta(deltaStep); + if (!delta.dumpHash) + delta.dumpHash = new Map(); + delta.dumpHash.set(fieldName, 1); + delta.dumpMeta = true; + } + + this._hash.set(fieldName, hash); + } else { + if (!quietIfExists) + throw new Error(`Hash for field '${fieldName}' already exists`); + } + } + + async _delHash(fieldName, deltaStep) { + if (this._hash.has(fieldName)) { + if (!this._inMemory) { + const delta = this._getDelta(deltaStep); + delta.dumpMeta = true; + + const fileName = this._getFullPath((this._hash.get(fieldName)).meta.fileName); + if (!delta.delFiles) + delta.delFiles = []; + delta.delFiles.push(fileName); + } + + this._hash.delete(fieldName); + } else { + throw new Error(`Hash for field '${fieldName}' does not exist`); + } + } + + _listHash() { + const result = []; + for (const hash of this._hash.values()) { + result.push(hash.meta); + } + return result; + } + + async _addIndex(opts, quietIfExists, deltaStep) { + const fieldName = opts.field; + + if (!this._index.has(fieldName)) { + const index = new TableIndex(opts); + for (const id of this._rowsInterface.getAllIds()) { + const row = await this._rowsInterface.getRow(id); + index.add(row[fieldName], id); + } + + if (this._inMemory) { + index.meta = opts; + } else { + const fileName = await this._getNotExistingFileName('index'); + await this._openFd(this._getFullPath(fileName) + '.1'); + index.meta = Object.assign({}, opts, {fileName}); + + const delta = this._getDelta(deltaStep); + if (!delta.dumpIndex) + delta.dumpIndex = new Map(); + delta.dumpIndex.set(fieldName, 1); + delta.dumpMeta = true; + } + + this._index.set(fieldName, index); + } else { + if (!quietIfExists) + throw new Error(`Index for field '${fieldName}' already exists`); + } + } + + async _delIndex(fieldName, deltaStep) { + if (this._index.has(fieldName)) { + if (!this._inMemory) { + const delta = this._getDelta(deltaStep); + delta.dumpMeta = true; + + const fileName = this._getFullPath((this._index.get(fieldName)).meta.fileName); + if (!delta.delFiles) + delta.delFiles = []; + delta.delFiles.push(fileName); + } + + this._index.delete(fieldName); + } else { + throw new Error(`Index for field '${fieldName}' does not exist`); + } + } + + _listIndex() { + const result = []; + for (const index of this._index.values()) { + result.push(index.meta); + } + return result; + } + + _update(oldRows, newRows, deltaStep) { + if (!deltaStep && !this._inMemory) + throw new Error('Something wrong: deltaStep is empty'); + + //oldRows & newRows arrays have equal size + if (oldRows.length != newRows.length) + throw new Error('Reducer update: old and new array lengths are not equal'); + + //consistency + const oldIds = new Map(); + const newIds = new Map(); + for (let i = 0; i < oldRows.length; i++) { + const oldRow = oldRows[i]; + const newRow = newRows[i]; + + if (oldRow.id !== undefined) { + if (oldIds.has(oldRow.id)) { + throw new Error(`Reducer update: duplicate old_id:${oldRow.id} detected`); + } + oldIds.set(oldRow.id, true); + } + + if (newRow.id !== undefined) { + if (newIds.has(newRow.id)) { + throw new Error(`Reducer update: duplicate new_id:${newRow.id} detected`); + } + newIds.set(newRow.id, true); + } + + if (oldRow.id !== undefined && newRow.id !== undefined && oldRow.id !== newRow.id) + throw new Error(`Reducer update: old and new id's are not equal (${oldRow.id} !== ${newRow.id})`); + } + + //update + try { + let delta = (this._inMemory ? null : this._getDelta(deltaStep)); + + //flags + for (const [flagName, flag] of this._flag.entries()) { + const flagDelta = []; + for (let i = 0; i < oldRows.length; i++) { + const oldRow = oldRows[i]; + const newRow = newRows[i]; + + if (oldRow.id !== undefined) { + flag.del(oldRow); + flagDelta.push([oldRow.id, 0]); + } + if (newRow.id !== undefined) { + const added = flag.add(newRow); + if (added) + flagDelta.push([newRow.id, 1]); + } + } + + if (delta && flagDelta.length) { + delta.flag.push([flagName, flagDelta]); + } + } + + //hashes + for (const [fieldName, hash] of this._hash.entries()) { + const hashDelta = []; + for (let i = 0; i < oldRows.length; i++) { + const oldRow = oldRows[i]; + const newRow = newRows[i]; + + if (oldRow[fieldName] !== newRow[fieldName]) { + if (oldRow.id !== undefined) { + const value = hash.del(oldRow[fieldName], oldRow.id); + hashDelta.push([value, oldRow.id, 0]); + } + if (newRow.id !== undefined) { + const value = hash.add(newRow[fieldName], newRow.id); + hashDelta.push([value, newRow.id, 1]); + } + } + } + + if (delta && hashDelta.length) { + delta.hash.push([fieldName, hashDelta]); + } + } + + //indexes + for (const [fieldName, index] of this._index.entries()) { + const indexDelta = []; + for (let i = 0; i < oldRows.length; i++) { + const oldRow = oldRows[i]; + const newRow = newRows[i]; + + if (oldRow[fieldName] !== newRow[fieldName]) { + if (oldRow.id !== undefined) { + const value = index.del(oldRow[fieldName], oldRow.id); + indexDelta.push([value, oldRow.id, 0]); + } + if (newRow.id !== undefined) { + const value = index.add(newRow[fieldName], newRow.id); + indexDelta.push([value, newRow.id, 1]); + } + } + } + + if (delta && indexDelta.length) { + delta.index.push([fieldName, indexDelta]); + } + } + } catch(e) { + //rollback + + //flags + for (const flag of this._flag.values()) { + for (let i = 0; i < oldRows.length; i++) { + const oldRow = oldRows[i]; + const newRow = newRows[i]; + + if (newRow.id !== undefined) { + try { flag.del(newRow); } catch(e) {} // eslint-disable-line no-empty + } + if (oldRow.id !== undefined) { + try { flag.add(oldRow); } catch(e) {} // eslint-disable-line no-empty + } + } + } + + //hashes + for (const [fieldName, hash] of this._hash.entries()) { + for (let i = 0; i < oldRows.length; i++) { + const oldRow = oldRows[i]; + const newRow = newRows[i]; + + if (oldRow[fieldName] !== newRow[fieldName]) { + if (newRow.id !== undefined) { + try { hash.del(newRow[fieldName], newRow.id); } catch(e) {} // eslint-disable-line no-empty + } + if (oldRow.id !== undefined) { + try { hash.add(oldRow[fieldName], oldRow.id); } catch(e) {} // eslint-disable-line no-empty + } + } + } + } + + //indexes + for (const [fieldName, index] of this._index.entries()) { + for (let i = 0; i < oldRows.length; i++) { + const oldRow = oldRows[i]; + const newRow = newRows[i]; + + if (oldRow[fieldName] !== newRow[fieldName]) { + if (newRow.id !== undefined) { + try { index.del(newRow[fieldName], newRow.id); } catch(e) {} // eslint-disable-line no-empty + } + if (oldRow.id !== undefined) { + try { index.add(oldRow[fieldName], oldRow.id); } catch(e) {} // eslint-disable-line no-empty + } + } + } + } + + throw e; + } + } + + async _closeFd(name) { + if (this._fd[name]) { + await this._fd[name].close(); + this._fd[name] = null; + } + } + + async _openFd(name) { + if (this._fd[name]) + return; + + if (!name) { + throw new Error('TableReducer: openFd name is empty'); + } + + const exists = await utils.pathExists(name); + + const fd = await fs.open(name, 'a'); + if (!exists) { + await fd.write('0['); + } + + this._fd[name] = fd; + } + + async _dumpMaps(delta) { + //dump flag + for (const [flagName, flag] of this._flag.entries()) { + const fileName = this._getFullPath(flag.meta.fileName); + const fileName1 = `${fileName}.1`; + + let size = 0; + if (this._fd[fileName1]) + size = (await this._fd[fileName1].stat()).size; + + if (size > maxFileDumpSize || (delta.dumpFlag && delta.dumpFlag.get(flagName))) { + const fileName0 = `${fileName}.0`; + const fileName2 = `${fileName}.2`; + + await this._writeFinal(fileName2, JSON.stringify([...flag.flag])); + + await fs.rename(fileName2, fileName0); + await this._closeFd(fileName1); + await fs.unlink(fileName1); + } + } + + //dump hash + for (const [fieldName, hash] of this._hash.entries()) { + const fileName = this._getFullPath(hash.meta.fileName); + const fileName1 = `${fileName}.1`; + + let size = 0; + if (this._fd[fileName1]) + size = (await this._fd[fileName1].stat()).size; + + if (size > maxFileDumpSize || (delta.dumpHash && delta.dumpHash.get(fieldName))) { + const fileName0 = `${fileName}.0`; + const fileName2 = `${fileName}.2`; + + if (hash.unique) { + await this._writeFinal(fileName2, JSON.stringify(Array.from(hash.hash))); + } else { + const buf = []; + for (const [key, keySet] of hash.hash) { + buf.push([key, [...keySet]]); + } + await this._writeFinal(fileName2, JSON.stringify(buf)); + } + + await fs.rename(fileName2, fileName0); + await this._closeFd(fileName1); + await fs.unlink(fileName1); + } + } + + //dump index + for (const [fieldName, index] of this._index.entries()) { + const fileName = this._getFullPath(index.meta.fileName); + const fileName1 = `${fileName}.1`; + + let size = 0; + if (this._fd[fileName1]) + size = (await this._fd[fileName1].stat()).size; + + if (size > maxFileDumpSize || (delta.dumpIndex && delta.dumpIndex.get(fieldName))) { + const fileName0 = `${fileName}.0`; + const fileName2 = `${fileName}.2`; + + const buf = {hash: [], sorted: index.sorted, delCount: index.delCount}; + if (index.unique) { + buf.hash = Array.from(index.hash); + } else { + for (const [key, keySet] of index.hash) { + buf.hash.push([key, [...keySet]]); + } + } + await this._writeFinal(fileName2, JSON.stringify(buf)); + + await fs.rename(fileName2, fileName0); + await this._closeFd(fileName1); + await fs.unlink(fileName1); + } + } + } + + async _dumpMeta() { + const fileName = this._getFullPath('meta'); + const fileName0 = `${fileName}.0`; + const fileName2 = `${fileName}.2`; + + await this._writeFinal(fileName2, JSON.stringify({ + flag: this._listFlag(), + hash: this._listHash(), + index: this._listIndex(), + })); + await fs.rename(fileName2, fileName0); + } + + async _saveDelta(deltaStep) { + //delta + const delta = this._getDelta(deltaStep); + + //save flag delta + for (const flagRec of delta.flag) { + const [flagName, flagDelta] = flagRec; + + const flag = this._flag.get(flagName); + const fileName = this._getFullPath(flag.meta.fileName) + '.1'; + + if (!this._fd[fileName]) + await this._openFd(fileName); + + const buf = []; + for (const deltaRec of flagDelta) { + buf.push(JSON.stringify(deltaRec)); + } + + if (buf.length) + await this._fd[fileName].write(buf.join(',') + ','); + } + + //save hash delta + for (const hashRec of delta.hash) { + const [hashName, hashDelta] = hashRec; + + const hash = this._hash.get(hashName); + const fileName = this._getFullPath(hash.meta.fileName) + '.1'; + + if (!this._fd[fileName]) + await this._openFd(fileName); + + const buf = []; + for (const deltaRec of hashDelta) { + buf.push(JSON.stringify(deltaRec)); + } + + if (buf.length) + await this._fd[fileName].write(buf.join(',') + ','); + } + + //save index delta + for (const indexRec of delta.index) { + const [indexName, indexDelta] = indexRec; + + const index = this._index.get(indexName); + const fileName = this._getFullPath(index.meta.fileName) + '.1'; + + if (!this._fd[fileName]) + await this._openFd(fileName); + + const buf = []; + for (const deltaRec of indexDelta) { + buf.push(JSON.stringify(deltaRec)); + } + + if (buf.length) + await this._fd[fileName].write(buf.join(',') + ','); + } + + //dumps + await this._dumpMaps(delta); + + //meta + if (delta.dumpMeta) + await this._dumpMeta(); + + //del files + if (delta.delFiles) { + for (const fileName of delta.delFiles) { + if (this._fd[fileName]) + this._closeFd(fileName); + + if (await utils.pathExists(fileName)) + await fs.unlink(fileName); + } + } + + this._deltas.delete(deltaStep); + } + + async _cancelDelta(deltaStep) { + this._deltas.delete(deltaStep); + } + + async _loadFile(filePath) { + let buf = await fs.readFile(filePath); + if (!buf.length) + throw new Error(`TableReducer: file ${filePath} is empty`); + + const flag = buf[0]; + if (flag === 50) {//flag '2' ~ finalized && compressed + const packed = Buffer.from(buf.buffer, buf.byteOffset + 1, buf.length - 1); + const data = await utils.inflate(packed); + buf = data.toString(); + } else if (flag === 49) {//flag '1' ~ finalized + buf[0] = 32;//' ' + buf = buf.toString(); + } else {//flag '0' ~ not finalized + buf[0] = 32;//' ' + const last = buf.length - 1; + if (buf[last] === 44) {//',' + buf[last] = 93;//']' + buf = buf.toString(); + } else {//corrupted or empty + buf = buf.toString(); + if (this._loadCorrupted) { + const lastComma = buf.lastIndexOf(','); + if (lastComma >= 0) + buf = buf.substring(0, lastComma); + } + buf += ']'; + } + } + + let result; + try { + result = JSON.parse(buf); + } catch(e) { + throw new Error(`load ${filePath} failed: ${e.message}`); + } + + return result; + } + + async _writeFinal(fileName, data) { + if (!this._compressed) { + await fs.writeFile(fileName, '1' + data); + } else { + let buf = Buffer.from(data); + buf = await utils.deflate(buf, this.compressed); + const fd = await fs.open(fileName, 'w'); + await fd.write('2'); + await fd.write(buf); + await fd.close(); + } + } + + async _load(corrupted = false, metaPath = '') { + if (corrupted) + this._loadCorrupted = true; + + const metaFileName = (metaPath ? metaPath : this._getFullPath('meta.0')); + if (!await utils.pathExists(metaFileName)) + return; + + const meta = await this._loadFile(metaFileName); + + //flag + this._flag.clear(); + for (const opts of meta.flag) { + const flag = new TableFlag(opts.check); + flag.meta = opts; + + if (!corrupted) { + const fileName = this._getFullPath(opts.fileName); + const fileName0 = `${fileName}.0`; + const fileName1 = `${fileName}.1`; + + //load dump + if (await utils.pathExists(fileName0)) { + const data = await this._loadFile(fileName0); + flag.flag = new Set(data); + } + + //load delta + if (await utils.pathExists(fileName1)) { + const flagDelta = await this._loadFile(fileName1); + for (const deltaRec of flagDelta) { + const [id, isAdd] = deltaRec; + if (isAdd) + flag.flag.add(id); + else + flag.flag.delete(id); + } + } + } + + this._flag.set(opts.name, flag); + } + + //hash + this._hash.clear(); + for (const opts of meta.hash) { + const hash = new TableHash(opts); + hash.meta = opts; + + if (!corrupted) { + const fileName = this._getFullPath(opts.fileName); + const fileName0 = `${fileName}.0`; + const fileName1 = `${fileName}.1`; + + //load dump + if (await utils.pathExists(fileName0)) { + const data = await this._loadFile(fileName0); + if (hash.unique) { + hash.hash = new Map(data); + } else { + for (const rec of data) { + const [key, keySet] = rec; + hash.hash.set(key, new Set(keySet)); + } + } + } + + //load delta + if (await utils.pathExists(fileName1)) { + const hashDelta = await this._loadFile(fileName1); + for (const deltaRec of hashDelta) { + const [value, id, isAdd] = deltaRec; + if (isAdd) + hash.add(value, id); + else + hash.del(value, id); + } + } + } + + this._hash.set(opts.field, hash); + } + + //index + this._index.clear(); + for (const opts of meta.index) { + const index = new TableIndex(opts); + index.meta = opts; + + if (!corrupted) { + const fileName = this._getFullPath(opts.fileName); + const fileName0 = `${fileName}.0`; + const fileName1 = `${fileName}.1`; + + //load dump + if (await utils.pathExists(fileName0)) { + const data = await this._loadFile(fileName0); + index.sorted = data.sorted; + index.delCount = data.delCount; + + if (index.unique) { + index.hash = new Map(data.hash); + } else { + for (const rec of data.hash) { + const [key, keySet] = rec; + index.hash.set(key, new Set(keySet)); + } + } + } + + //load delta + if (await utils.pathExists(fileName1)) { + const indexDelta = await this._loadFile(fileName1); + for (const deltaRec of indexDelta) { + const [value, id, isAdd] = deltaRec; + if (isAdd) + index.add(value, id); + else + index.del(value, id); + } + } + } + + this._index.set(opts.field, index); + } + } + + async _closeAllFiles() { + for (const name of Object.keys(this._fd)) { + await this._closeFd(name); + } + } + + async _destroy() { + await this._closeAllFiles(); + + //for GC + this._flag.clear(); + this._index.clear(); + this._hash.clear(); + this._deltas.clear(); + this._rowsInterface = null; + } + + //------------------------------------------------------------------------------------------ + //Reducer methods + async id() { + const result = new Set(); + for (const arg of arguments) { + if (!Array.isArray(arg)) + result.add(arg); + else { + for (const id of arg) { + result.add(id); + } + } + } + return result; + } + + async flag(flagName) { + if (this._flag.has(flagName)) { + return new Set(this._flag.get(flagName).flag); + } else { + throw new Error(`Flag with name '${flagName}' does not exist`); + } + } + + async hash(fieldName, value) { + if (this._hash.has(fieldName)) { + const hash = this._hash.get(fieldName); + + const result = new Set(); + if (!Array.isArray(value)) { + const ids = hash.reduce(value); + for (const id of ids) { + const row = await this._rowsInterface.getRow(id); + if (row[fieldName] === value) + result.add(id); + } + } else { + for (const v of value) { + const ids = hash.reduce(v); + for (const id of ids) { + const row = await this._rowsInterface.getRow(id); + if (row[fieldName] === v) + result.add(id); + } + } + } + + return result; + } else { + throw new Error(`Hash for field '${fieldName}' does not exist`); + } + } + + async hashMin(fieldName) { + if (this._hash.has(fieldName)) { + const hash = this._hash.get(fieldName); + return hash.min(); + } else { + throw new Error(`Hash for field '${fieldName}' does not exist`); + } + } + + async hashMax(fieldName) { + if (this._hash.has(fieldName)) { + const hash = this._hash.get(fieldName); + return hash.max(); + } else { + throw new Error(`Hash for field '${fieldName}' does not exist`); + } + } + + async hashIter(fieldName, checkFunc) { + if (this._hash.has(fieldName)) { + const hash = this._hash.get(fieldName); + return hash.iter(checkFunc); + } else { + throw new Error(`Hash for field '${fieldName}' does not exist`); + } + } + + async _indexReduce(fieldName, from, to, checkFuncs) { + if (this._index.has(fieldName)) { + const index = this._index.get(fieldName); + const ids = index.reduce(from, to); + + const check = (index.isNumber ? checkFuncs[0] : checkFuncs[1]); + const result = new Set(); + for (const id of ids) { + const row = await this._rowsInterface.getRow(id); + if (check(row[fieldName])) + result.add(id); + } + return result; + } else { + throw new Error(`Index for field '${fieldName}' does not exist`); + } + } + + async index(fieldName, from, to) { + let checkFuncs = [ + (value) => (value > from && value < to), + (value) => (value.localeCompare(from) > 0 && value.localeCompare(to) < 0), + ]; + if (from === undefined) { + checkFuncs = [ + (value) => (value < to), + (value) => (value.localeCompare(to) < 0), + ]; + } else if (to === undefined) { + checkFuncs = [ + (value) => (value > from), + (value) => (value.localeCompare(from) > 0), + ]; + } + return this._indexReduce(fieldName, from, to, checkFuncs); + } + + async indexL(fieldName, from, to) { + let checkFuncs = [ + (value) => (value >= from && value < to), + (value) => (value.localeCompare(from) >= 0 && value.localeCompare(to) < 0), + ]; + if (from === undefined) { + checkFuncs = [ + (value) => (value < to), + (value) => (value.localeCompare(to) < 0), + ]; + } else if (to === undefined) { + checkFuncs = [ + (value) => (value >= from), + (value) => (value.localeCompare(from) >= 0), + ]; + } + return this._indexReduce(fieldName, from, to, checkFuncs); + } + + async indexR(fieldName, from, to) { + let checkFuncs = [ + (value) => (value > from && value <= to), + (value) => (value.localeCompare(from) > 0 && value.localeCompare(to) <= 0), + ]; + if (from === undefined) { + checkFuncs = [ + (value) => (value <= to), + (value) => (value.localeCompare(to) <= 0), + ]; + } else if (to === undefined) { + checkFuncs = [ + (value) => (value > from), + (value) => (value.localeCompare(from) > 0), + ]; + } + return this._indexReduce(fieldName, from, to, checkFuncs); + } + + async indexLR(fieldName, from, to) { + let checkFuncs = [ + (value) => (value >= from && value <= to), + (value) => (value.localeCompare(from) >= 0 && value.localeCompare(to) <= 0), + ]; + if (from === undefined) { + checkFuncs = [ + (value) => (value <= to), + (value) => (value.localeCompare(to) <= 0), + ]; + } else if (to === undefined) { + checkFuncs = [ + (value) => (value >= from), + (value) => (value.localeCompare(from) >= 0), + ]; + } + return this._indexReduce(fieldName, from, to, checkFuncs); + } + + async indexMin(fieldName) { + if (this._index.has(fieldName)) { + const index = this._index.get(fieldName); + return index.min(); + } else { + throw new Error(`Index for field '${fieldName}' does not exist`); + } + } + + async indexMax(fieldName) { + if (this._index.has(fieldName)) { + const index = this._index.get(fieldName); + return index.max(); + } else { + throw new Error(`Index for field '${fieldName}' does not exist`); + } + } + + async indexIter(fieldName, checkFunc) { + if (this._index.has(fieldName)) { + const index = this._index.get(fieldName); + return index.iter(checkFunc); + } else { + throw new Error(`Index for field '${fieldName}' does not exist`); + } + } + + //returns iterator, not Set + async all() { + return this._rowsInterface.getAllIds(); + } + + async allSize() { + return this._rowsInterface.getAllIdsSize(); + } + + async iter(ids, checkFunc) { + const result = new Set(); + for (const id of ids) { + const row = await this._rowsInterface.getRow(id); + const checkResult = checkFunc(row); + if (checkResult === undefined) + break; + if (checkResult) + result.add(id); + } + return result; + } + + async and() { + const result = []; + for (const arg of arguments) { + if (!Array.isArray(arg)) { + result.push(arg); + } else { + for (const s of arg) { + result.push(s); + } + } + } + return utils.intersectSet(result); + } + + async or() { + const result = []; + for (const arg of arguments) { + if (!Array.isArray(arg)) + result.push(arg); + else { + for (const s of arg) { + result.push(s); + } + } + } + return utils.unionSet(result); + } +} + +module.exports = TableReducer; \ No newline at end of file diff --git a/server/db/JembaDb/TableRowsFile.js b/server/db/JembaDb/TableRowsFile.js new file mode 100644 index 00000000..b3ef203d --- /dev/null +++ b/server/db/JembaDb/TableRowsFile.js @@ -0,0 +1,646 @@ +const fs = require('fs').promises; +const path = require('path'); +const utils = require('./utils'); + +const maxBlockSize = 1024*1024;//bytes + +const minFileDumpSize = 100*1024;//bytes +const maxFileDumpSize = 50*1024*1024;//bytes +const defragAfter = 10; +const defragBlockCountAtOnce = 10;//better >= defragAfter + +class TableRowsFile { + constructor(tablePath, cacheSize, compressed) { + this.tablePath = tablePath; + this.loadedBlocksCount = cacheSize || 5; + this.loadedBlocksCount = (this.loadedBlocksCount <= 0 ? 0 : this.loadedBlocksCount); + this.compressed = compressed || 0; + + this.blockIndex = new Map(); + this.currentBlockIndex = 0; + this.lastSavedBlockIndex = 0; + this.blockList = new Map(); + this.blocksNotFinalized = new Map();//indexes of blocks + this.loadedBlocks = []; + this.deltas = new Map(); + + this.defragCounter = 0; + this.destroyed = false; + + this.blockindex0Size = 0; + this.blocklist0Size = 0; + + this.fd = { + blockIndex: null, + blockList: null, + blockRows: null, + blockRowsIndex: null,//not a file descriptor + }; + } + + //--- rows interface + async getRow(id) { + const block = this.blockList.get(this.blockIndex.get(id)); + + if (block) { + if (!block.rows) { + await this.loadBlock(block); + } + + this.unloadBlocksIfNeeded();//no await + return block.rows.get(id); + } + return; + } + + setRow(id, row, rowStr, deltaStep) { + const delta = this.getDelta(deltaStep); + + if (this.blockIndex.has(id)) { + this.deleteRow(id, deltaStep, delta); + } + + const index = this.addToCurrentBlock(id, row, rowStr, deltaStep, delta); + this.blockIndex.set(id, index); + delta.blockIndex.push([id, index]); + } + + deleteRow(id, deltaStep, delta) { + if (this.blockIndex.has(id)) { + if (!delta) + delta = this.getDelta(deltaStep); + + const block = this.blockList.get(this.blockIndex.get(id)); + if (block) { + block.delCount++; + delta.blockList.push([block.index, 1]); + } + + this.blockIndex.delete(id); + delta.blockIndex.push([id, 0]); + } + } + + getAllIds() { + return this.blockIndex.keys(); + } + + getAllIdsSize() { + return this.blockIndex.size; + } + //--- rows interface end + + getDelta(deltaStep) { + if (this.deltas.has(deltaStep)) { + return this.deltas.get(deltaStep); + } else { + const delta = { + blockIndex: [], + blockList: [], + blockRows: [], + }; + this.deltas.set(deltaStep, delta); + return delta; + } + } + + createNewBlock() { + this.currentBlockIndex++; + const block = { + index: this.currentBlockIndex, + delCount: 0, + addCount: 0, + size: 0, + rows: new Map(), + rowsLength: 0, + final: false, + }; + this.blockList.set(this.currentBlockIndex, block); + this.loadedBlocks.push(this.currentBlockIndex); + this.blocksNotFinalized.set(this.currentBlockIndex, 1); + + return block; + } + + addToCurrentBlock(id, row, rowStr, deltaStep, delta) { + if (!delta) + delta = this.getDelta(deltaStep); + + let block = this.blockList.get(this.currentBlockIndex); + if (!block) + block = this.createNewBlock(); + + if (block.size > maxBlockSize) + block = this.createNewBlock(); + + if (!block.rows) { + throw new Error('TableRowsFile: something has gone wrong'); + } + + block.rows.set(id, row); + + block.addCount++; + block.size += rowStr.length; + block.rowsLength = block.rows.size; + + delta.blockList.push([block.index, 1]); + delta.blockRows.push([block.index, id, row]); + + return block.index; + } + + async unloadBlocksIfNeeded() { + this.needUnload = true; + if (this.unloadingBlocks) + return; + + this.unloadingBlocks = true; + try { + while (this.needUnload) { + this.needUnload = false; + if (this.destroyed) + return; + + await utils.sleep(10); + + //check loaded + let missed = new Map(); + while (this.loadedBlocks.length >= this.loadedBlocksCount) { + const index = this.loadedBlocks.shift(); + if (index >= this.lastSavedBlockIndex) { + missed.set(index, 1); + continue; + } + const block = this.blockList.get(index); + + if (block) { + block.rows = null; +//console.log(`unloaded block ${block.index}`); + } + + if (this.destroyed) + return; + } + + this.loadedBlocks = this.loadedBlocks.concat(Array.from(missed.keys())); + } + } finally { + this.unloadingBlocks = false; + } + } + + async loadFile(filePath) { + let buf = await fs.readFile(filePath); + if (!buf.length) + throw new Error(`TableRowsFile: file ${filePath} is empty`); + + const flag = buf[0]; + if (flag === 50) {//flag '2' ~ finalized && compressed + const packed = Buffer.from(buf.buffer, buf.byteOffset + 1, buf.length - 1); + const data = await utils.inflate(packed); + buf = data.toString(); + } else if (flag === 49) {//flag '1' ~ finalized + buf[0] = 32;//' ' + buf = buf.toString(); + } else {//flag '0' ~ not finalized + buf[0] = 32;//' ' + const last = buf.length - 1; + if (buf[last] === 44) {//',' + buf[last] = 93;//']' + buf = buf.toString(); + } else {//corrupted or empty + buf = buf.toString(); + if (this.loadCorrupted) { + const lastComma = buf.lastIndexOf(','); + if (lastComma >= 0) + buf = buf.substring(0, lastComma); + } + buf += ']'; + } + } + + let result; + try { + result = JSON.parse(buf); + } catch(e) { + throw new Error(`load ${filePath} failed: ${e.message}`); + } + + return result; + } + + async writeFinal(fileName, data) { + if (!this.compressed) { + await fs.writeFile(fileName, '1' + data); + } else { + let buf = Buffer.from(data); + buf = await utils.deflate(buf, this.compressed); + const fd = await fs.open(fileName, 'w'); + await fd.write('2'); + await fd.write(buf); + await fd.close(); + } + } + + async loadBlock(block) { +//console.log(`start load block ${block.index}`); + if (!block.rows) { + const arr = await this.loadFile(this.blockRowsFilePath(block.index)); + + block.rows = new Map(arr); + + this.loadedBlocks.push(block.index); +//console.log(`loaded block ${block.index}`); + } + } + + async closeFd(name) { + if (this.fd[name]) { + await this.fd[name].close(); + this.fd[name] = null; + } + } + + async openFd(name, fileName = '') { + if (this.fd[name]) + return; + + if (!fileName) { + throw new Error('TableRowsFile: fileName is empty'); + } + + const exists = await utils.pathExists(fileName); + + const fd = await fs.open(fileName, 'a'); + if (!exists) { + await fd.write('0['); + } + + this.fd[name] = fd; + } + + blockRowsFilePath(index) { + if (index < 1000000) + return `${this.tablePath}/${index.toString().padStart(6, '0')}.jem`; + else + return `${this.tablePath}/${index.toString().padStart(12, '0')}.jem`; + } + + async finalizeBlocks() { +//console.log(this.blocksNotFinalized.size); + + for (const index of this.blocksNotFinalized.keys()) { + if (this.destroyed) + return; + + if (index >= this.lastSavedBlockIndex) + continue; + + const block = this.blockList.get(index); + + if (block) { + if (block.final) + throw new Error('finalizeBlocks: something wrong'); + + const blockPath = this.blockRowsFilePath(block.index); +//console.log(`start finalize block ${block.index}`); + const arr = await this.loadFile(blockPath); + const rows = new Map(arr); + + const finBlockPath = `${blockPath}.tmp`; + const rowsStr = JSON.stringify(Array.from(rows)); + await this.writeFinal(finBlockPath, rowsStr); + + await fs.rename(finBlockPath, blockPath); + + block.size = Buffer.byteLength(rowsStr, 'utf8') + 1; + block.rowsLength = rows.size;//insurance + block.final = true; + await this.fd.blockList.write(JSON.stringify(block) + ','); +//console.log(`finalized block ${block.index}`); + } + + this.blocksNotFinalized.delete(index); + } + } + + async dumpMaps() { + //dumping blockIndex + const blockindex1Size = (await this.fd.blockIndex.stat()).size; + if ((blockindex1Size > minFileDumpSize && blockindex1Size > this.blockindex0Size) || blockindex1Size > maxFileDumpSize) { + const blockindex0Path = `${this.tablePath}/blockindex.0`; + const blockindex2Path = `${this.tablePath}/blockindex.2`; + await this.writeFinal(blockindex2Path, JSON.stringify(Array.from(this.blockIndex))); + + await fs.rename(blockindex2Path, blockindex0Path); + await this.closeFd('blockIndex'); + await fs.unlink(`${this.tablePath}/blockindex.1`); + this.blockindex0Size = (await fs.stat(blockindex0Path)).size; + } + + //dumping blockList + const blocklist1Size = (await this.fd.blockList.stat()).size; + if ((blocklist1Size > minFileDumpSize && blocklist1Size > this.blocklist0Size) || blocklist1Size > maxFileDumpSize) { + const blocklist0Path = `${this.tablePath}/blocklist.0`; + const blocklist2Path = `${this.tablePath}/blocklist.2`; + await this.writeFinal(blocklist2Path, JSON.stringify(Array.from(this.blockList.values()))); + + await fs.rename(blocklist2Path, blocklist0Path); + await this.closeFd('blockList'); + await fs.unlink(`${this.tablePath}/blocklist.1`); + this.blocklist0Size = (await fs.stat(blocklist0Path)).size; + } + } + + async saveDelta(deltaStep) { + const delta = this.getDelta(deltaStep); + + //lastSavedBlockIndex + const len = delta.blockRows.length; + if (len) { + this.lastSavedBlockIndex = delta.blockRows[len - 1][0]; + } + + //check all blocks fragmentation + if (!this.defragCandidates) + this.defragCandidates = []; + + if (!this.defragCandidates.length) { + if (this.defragCounter >= defragAfter) { + for (const block of this.blockList.values()) { + if (!block.final) + continue; + + if (block.addCount - block.delCount < block.rowsLength/2 || block.size < maxBlockSize/2) { + this.defragCandidates.push(block); + } + } + + this.defragCounter = 0; + } else { + this.defragCounter++; + } + } + + let defragmented = 0; + while (this.defragCandidates.length) { + if (defragmented >= defragBlockCountAtOnce || this.destroyed) + break; + + const block = this.defragCandidates.shift(); + + if (!block.rows) { + await this.loadBlock(block); + } + + //move all active rows from fragmented block to current + for (const [id, row] of block.rows.entries()) { + if (this.blockIndex.get(id) === block.index) { + const newIndex = this.addToCurrentBlock(id, row, JSON.stringify(row), deltaStep, delta); + this.blockIndex.set(id, newIndex); + delta.blockIndex.push([id, newIndex]); + } + } + + this.blockList.delete(block.index); + delta.blockList.push([block.index, 0]); + + if (!delta.delFiles) + delta.delFiles = []; + delta.delFiles.push(this.blockRowsFilePath(block.index)); + + defragmented++; +//console.log(`defragmented block ${block.index}, size: ${block.size}, addCount: ${block.addCount}, delCount: ${block.delCount}, rowsLength: ${block.rowsLength}`); + } + + //blockIndex delta save + if (!this.fd.blockIndex) + await this.openFd('blockIndex', `${this.tablePath}/blockindex.1`); + + let buf = []; + for (const deltaRec of delta.blockIndex) { + buf.push(JSON.stringify(deltaRec)); + } + if (buf.length) + await this.fd.blockIndex.write(buf.join(',') + ','); + + //blockList delta save + if (!this.fd.blockList) + await this.openFd('blockList', `${this.tablePath}/blocklist.1`); + + let lastSaved = 0; + buf = []; + for (const deltaRec of delta.blockList) { + const index = deltaRec[0]; + const exists = deltaRec[1]; + + if (exists) { + if (lastSaved !== index) {//optimization + const block = this.blockList.get(index); + if (block)//might be defragmented already + buf.push(JSON.stringify(block)); + lastSaved = index; + } + } else { + buf.push(JSON.stringify({index, deleted: 1})); + } + } + if (buf.length) + await this.fd.blockList.write(buf.join(',') + ','); + + //blockRows delta save + buf = []; + for (const deltaRec of delta.blockRows) { + const [index, id, row] = deltaRec; + + if (this.fd.blockRowsIndex !== index) { + if (buf.length) + await this.fd.blockRows.write(buf.join(',') + ','); + buf = []; + await this.closeFd('blockRows'); + this.fd.blockRowsIndex = null; + } + + if (!this.fd.blockRows) { + const blockPath = this.blockRowsFilePath(index); + + await this.openFd('blockRows', blockPath); + this.fd.blockRowsIndex = index; + } + + buf.push(JSON.stringify([id, row])); + } + if (buf.length) + await this.fd.blockRows.write(buf.join(',') + ','); + + //blocks finalization + await this.finalizeBlocks(); + this.unloadBlocksIfNeeded();//no await + + //dumps if needed + await this.dumpMaps(); + + //delete files if needed + if (delta.delFiles) { + for (const fileName of delta.delFiles) { +//console.log(`delete ${fileName}`); + if (await utils.pathExists(fileName)) + await fs.unlink(fileName); + } + } + + this.deltas.delete(deltaStep); + } + + async cancelDelta(deltaStep) { + this.deltas.delete(deltaStep); + } + + async load() { + let autoIncrement = 0; + + const loadBlockIndex = (fileNum, data) => { + if (fileNum === 0) {//dumped data + this.blockIndex = new Map(data);//much faster + for (const id of this.blockIndex.keys()) { + if (typeof(id) === 'number' && id >= autoIncrement) + autoIncrement = id + 1; + } + } else { + for (const rec of data) { + const [id, index] = rec; + if (index > 0) { + this.blockIndex.set(id, index); + if (typeof(id) === 'number' && id >= autoIncrement) + autoIncrement = id + 1; + } else + this.blockIndex.delete(id); + } + } + } + + const loadBlockList = (data) => { + for (const rec of data) { + const block = rec; + if (block.deleted) { + this.blockList.delete(block.index); + } else { + block.rows = null; + this.blockList.set(block.index, block); + if (block.index > this.currentBlockIndex) + this.currentBlockIndex = block.index; + } + } + + } + + this.blockIndex.clear(); + for (let i = 0; i < 2; i++) { + const dataPath = `${this.tablePath}/blockindex.${i}`; + + if (await utils.pathExists(dataPath)) { + const data = await this.loadFile(dataPath); + loadBlockIndex(i, data); + } + } + const blockindex0Path = `${this.tablePath}/blockindex.0`; + if (await utils.pathExists(blockindex0Path)) + this.blockindex0Size = (await fs.stat(blockindex0Path)).size; + + this.currentBlockIndex = 0; + this.blockList.clear(); + for (let i = 0; i < 2; i++) { + const dataPath = `${this.tablePath}/blocklist.${i}`; + + if (await utils.pathExists(dataPath)) { + const data = await this.loadFile(dataPath); + loadBlockList(data); + } + } + const blocklist0Path = `${this.tablePath}/blocklist.0`; + if (await utils.pathExists(blocklist0Path)) + this.blocklist0Size = (await fs.stat(blocklist0Path)).size; + + this.lastSavedBlockIndex = this.currentBlockIndex; + const currentBlock = this.blockList.get(this.currentBlockIndex); + if (currentBlock) + await this.loadBlock(currentBlock); + + this.blocksNotFinalized = new Map(); + for (const block of this.blockList.values()) { + if (!block.final) + this.blocksNotFinalized.set(block.index, 1); + } + + return autoIncrement; + } + + async loadCorrupted() { + this.loadCorrupted = true; + + const loadBlockIndex = (fileNum, data) => { + if (fileNum === 0) {//dumped data + this.blockIndex = new Map(data);//much faster + } else { + for (const rec of data) { + const [id, index] = rec; + if (index > 0) + this.blockIndex.set(id, index); + else + this.blockIndex.delete(id); + } + } + } + + this.blockIndex.clear(); + for (let i = 0; i < 2; i++) { + const dataPath = `${this.tablePath}/blockindex.${i}`; + + if (await utils.pathExists(dataPath)) { + try { + const data = await this.loadFile(dataPath); + loadBlockIndex(i, data); + } catch(e) { + console.error(e); + } + } + } + + const files = await fs.readdir(this.tablePath, { withFileTypes: true }); + + this.blockList.clear(); + for (const file of files) { + if (file.isFile() && path.extname(file.name) == '.jem') { + const numStr = path.basename(file.name, '.jem'); + const index = parseInt(numStr, 10); + if (!isNaN(index)) { + const block = { + index, + delCount: 0, + addCount: 0, + size: 0, + rows: null, + rowsLength: 0, + final: false, + }; + this.blockList.set(block.index, block); + //console.log(index); + } + } + } + } + + async closeAllFiles() { + await this.closeFd('blockIndex'); + await this.closeFd('blockList'); + await this.closeFd('blockRows'); + } + + async destroy() { + await this.closeAllFiles(); + + this.destroyed = true; + } +} + +module.exports = TableRowsFile; \ No newline at end of file diff --git a/server/db/JembaDb/TableRowsMem.js b/server/db/JembaDb/TableRowsMem.js new file mode 100644 index 00000000..91b16222 --- /dev/null +++ b/server/db/JembaDb/TableRowsMem.js @@ -0,0 +1,34 @@ +class TableRowsMem { + constructor() { + this.rows = new Map(); + } + + //--- rows interface + async getRow(id) { + return this.rows.get(id); + } + + setRow(id, row) { + this.rows.set(id, row); + } + + deleteRow(id) { + this.rows.delete(id); + } + + getAllIds() { + return this.rows.keys(); + } + + getAllIdsSize() { + return this.rows.size; + } + //--- rows interface end + + async destroy() { + //for GC + this.rows = null; + } +} + +module.exports = TableRowsMem; \ No newline at end of file diff --git a/server/db/JembaDb/index.js b/server/db/JembaDb/index.js new file mode 100644 index 00000000..c97f50a7 --- /dev/null +++ b/server/db/JembaDb/index.js @@ -0,0 +1,7 @@ +const JembaDb = require('./JembaDb'); +const JembaDbThread = require('./JembaDbThread'); + +module.exports = { + JembaDb, + JembaDbThread, +}; \ No newline at end of file diff --git a/server/db/JembaDb/utils.js b/server/db/JembaDb/utils.js new file mode 100644 index 00000000..7cf05b05 --- /dev/null +++ b/server/db/JembaDb/utils.js @@ -0,0 +1,152 @@ +const fsCB = require('fs'); +const fs = fsCB.promises; +const zlib = require('zlib'); + +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +function sleepWithStop(ms, cb = () => {}) { + return new Promise(resolve => { + const timer = setTimeout(resolve, ms); + cb(() => { clearTimeout(timer); resolve(); }); + }); +} + +function unionSet(arrSet) { + if (!arrSet.length) + return new Set(); + + let max = 0; + let size = arrSet[0].size; + for (let i = 1; i < arrSet.length; i++) { + if (arrSet[i].size > size) { + max = i; + size = arrSet[i].size; + } + } + + const result = new Set(arrSet[max]); + for (let i = 0; i < arrSet.length; i++) { + if (i === max) + continue; + + for (const elem of arrSet[i]) { + result.add(elem); + } + } + + return result; +} + +function intersectSet(arrSet) { + if (!arrSet.length) + return new Set(); + + let min = 0; + let size = arrSet[0].size; + for (let i = 1; i < arrSet.length; i++) { + if (arrSet[i].size < size) { + min = i; + size = arrSet[i].size; + } + } + + const result = new Set(); + for (const elem of arrSet[min]) { + let inAll = true; + for (let i = 0; i < arrSet.length; i++) { + if (i === min) + continue; + if (!arrSet[i].has(elem)) { + inAll = false; + break; + } + } + + if (inAll) + result.add(elem); + } + + + return result; +} + +async function pathExists(path) { + try { + await fs.access(path); + return true; + } catch(e) { + return false; + } +} + +async function appendFileToFile(nameFrom, nameTo) { + return new Promise((resolve, reject) => { + const readStream = fsCB.createReadStream(nameFrom); + readStream.on('error', (err) => { + reject(err); + }); + + const writeStream = fsCB.createWriteStream(nameTo, {flags: 'a'}); + + writeStream.on('error', (err) => { + reject(err); + }); + + writeStream.on('close', () => { + resolve(); + }); + + readStream.pipe(writeStream); + }); +} + +function esc(obj) { + return JSON.stringify(obj).replace(/@/g, '\\x40'); +} + +function paramToArray(param) { + return (Array.isArray(param) ? param : [param]); +} + +function cloneDeep(obj) { + return JSON.parse(JSON.stringify(obj)); +} + +//async +function deflate(buf, compressionLevel) { + return new Promise((resolve, reject) => { + zlib.deflateRaw(buf, {level: compressionLevel}, (err, b) => { + if (err) + reject(err); + resolve(b); + }); + }); +} + +//async +function inflate(buf) { + return new Promise((resolve, reject) => { + zlib.inflateRaw(buf, (err, b) => { + if (err) + reject(err); + resolve(b); + }); + }); +} + + +module.exports = { + sleep, + sleepWithStop, + unionSet, + intersectSet, + pathExists, + appendFileToFile, + esc, + paramToArray, + cloneDeep, + deflate, + inflate, +}; \ No newline at end of file diff --git a/server/db/SqliteConnectionPool.js b/server/db/SqliteConnectionPool.js index 39d54151..fa297644 100644 --- a/server/db/SqliteConnectionPool.js +++ b/server/db/SqliteConnectionPool.js @@ -1,3 +1,4 @@ +//TODO: удалить модуль в 2023г const sqlite3 = require('sqlite3'); const sqlite = require('sqlite'); diff --git a/server/db/jembaMigrations/index.js b/server/db/jembaMigrations/index.js new file mode 100644 index 00000000..4769618c --- /dev/null +++ b/server/db/jembaMigrations/index.js @@ -0,0 +1,4 @@ +module.exports = { + //'app': require('./jembaMigrations/app'), + 'reader-storage': require('./reader-storage'), +}; diff --git a/server/db/jembaMigrations/reader-storage/001-create.js b/server/db/jembaMigrations/reader-storage/001-create.js new file mode 100644 index 00000000..bb14583b --- /dev/null +++ b/server/db/jembaMigrations/reader-storage/001-create.js @@ -0,0 +1,13 @@ +module.exports = { + up: [ + //CREATE TABLE storage (id TEXT PRIMARY KEY, rev INTEGER, time INTEGER, data TEXT); + ['create', { + table: 'storage' + }], + ], + down: [ + ['drop', { + table: 'storage' + }], + ] +}; diff --git a/server/db/jembaMigrations/reader-storage/index.js b/server/db/jembaMigrations/reader-storage/index.js new file mode 100644 index 00000000..85941af0 --- /dev/null +++ b/server/db/jembaMigrations/reader-storage/index.js @@ -0,0 +1,6 @@ +module.exports = { + table: 'migration1', + data: [ + {id: 1, name: 'create', data: require('./001-create')} + ] +} \ No newline at end of file diff --git a/server/index.js b/server/index.js index 4990a86c..884c147f 100644 --- a/server/index.js +++ b/server/index.js @@ -7,6 +7,11 @@ const compression = require('compression'); const http = require('http'); const WebSocket = require ('ws'); +const ayncExit = new (require('./core/AsyncExit'))(); +ayncExit.init(); + +let log = null; + async function init() { //config const configManager = new (require('./config'))();//singleton @@ -18,7 +23,7 @@ async function init() { //logger const appLogger = new (require('./core/AppLogger'))();//singleton await appLogger.init(config); - const log = appLogger.log; + log = appLogger.log; //dirs log(`${config.name} v${config.version}, Node.js ${process.version}`); @@ -41,6 +46,13 @@ async function init() { //connections const connManager = new (require('./db/ConnManager'))();//singleton await connManager.init(config); + + const jembaConnManager = new (require('./db/JembaConnManager'))();//singleton + await jembaConnManager.init(config, argv['auto-repair']); + + //converter SQLITE => JembaDb + const converter = new (require('./db/Converter'))(); + await converter.run(config); } async function main() { @@ -96,13 +108,15 @@ async function main() { } } - (async() => { try { await init(); await main(); } catch (e) { - console.error(e); - process.exit(1); + if (log) + log(LM_FATAL, e.stack); + else + console.error(e.stack); + ayncExit.exit(1); } -})(); \ No newline at end of file +})();