diff --git a/.eslintrc b/.eslintrc index f7aaa550..2025d7b5 100644 --- a/.eslintrc +++ b/.eslintrc @@ -12,6 +12,7 @@ "@babel" ], "env": { + "es6": true, "browser": true, "node": true }, diff --git a/build/webpack.base.config.js b/build/webpack.base.config.js index 8605c532..747c506f 100644 --- a/build/webpack.base.config.js +++ b/build/webpack.base.config.js @@ -62,34 +62,6 @@ module.exports = { filename: 'fonts/[name]-[hash:6][ext]' }, }, - /*{ - test: /\.gif$/, - loader: "url-loader", - options: { - name: "images/[name]-[hash:6].[ext]" - } - }, - { - test: /\.png$/, - loader: "url-loader", - options: { - name: "images/[name]-[hash:6].[ext]" - } - }, - { - test: /\.jpg$/, - loader: "file-loader", - options: { - name: "images/[name]-[hash:6].[ext]" - } - }, - { - test: /\.(ttf|eot|woff|woff2)$/, - loader: "file-loader", - options: { - name: "fonts/[name]-[hash:6].[ext]" - } - },*/ ] }, diff --git a/client/components/Reader/Reader.vue b/client/components/Reader/Reader.vue index 198c1455..e6ea41cc 100644 --- a/client/components/Reader/Reader.vue +++ b/client/components/Reader/Reader.vue @@ -585,7 +585,20 @@ class Reader { //сохранение в serverStorage if (value) { await utils.sleep(500); - await this.$refs.serverStorage.saveRecent(value); + + let timer = setTimeout(() => { + if (!this.offlineModeActive) + this.$root.notify.error('Таймаут соединения'); + }, 10000); + + try { + await this.$refs.serverStorage.saveRecent(value); + } catch (e) { + if (!this.offlineModeActive) + this.$root.notify.error(e.message); + } finally { + clearTimeout(timer); + } } } } diff --git a/client/components/Reader/ServerStorage/ServerStorage.vue b/client/components/Reader/ServerStorage/ServerStorage.vue index af8c29d3..c0a525cc 100644 --- a/client/components/Reader/ServerStorage/ServerStorage.vue +++ b/client/components/Reader/ServerStorage/ServerStorage.vue @@ -576,7 +576,7 @@ class ServerStorage { newRecentPatch.rev++; newRecentPatch.data[itemKey] = _.cloneDeep(bm.recent[itemKey]); - let applyMod = this.cachedRecentMod.data; + const applyMod = this.cachedRecentMod.data; if (applyMod && applyMod.key && newRecentPatch.data[applyMod.key]) newRecentPatch.data[applyMod.key] = utils.applyObjDiff(newRecentPatch.data[applyMod.key], applyMod.mod, {isAddChanged: true}); @@ -627,7 +627,7 @@ class ServerStorage { this.warning(`Последние изменения отменены. Данные синхронизированы с сервером.`); if (!recurse && itemKey) { this.savingRecent = false; - this.saveRecent(itemKey, true); + await this.saveRecent(itemKey, true); return; } } else if (result.state == 'success') { diff --git a/client/components/Reader/share/bookManager.js b/client/components/Reader/share/bookManager.js index 0c93379e..ad07c0d0 100644 --- a/client/components/Reader/share/bookManager.js +++ b/client/components/Reader/share/bookManager.js @@ -63,48 +63,6 @@ class BookManager { } await this.cleanRecentBooks(); - - //TODO: убрать после 06.2021, когда bmRecentStoreOld устареет - { - await this.convertFileToDiskPrefix(); - if (this.recentRev > 10) - await bmRecentStoreOld.clear(); - } - } else {//TODO: убрать после 06.2021, когда bmRecentStoreOld устареет - this.recentLast = await bmRecentStoreOld.getItem('recent-last'); - if (this.recentLast) { - this.recent[this.recentLast.key] = this.recentLast; - const meta = await bmMetaStore.getItem(`bmMeta-${this.recentLast.key}`); - if (_.isObject(meta)) { - this.books[meta.key] = meta; - } - } - - let key = null; - const len = await bmRecentStoreOld.length(); - for (let i = len - 1; i >= 0; i--) { - key = await bmRecentStoreOld.key(i); - if (key) { - let r = await bmRecentStoreOld.getItem(key); - if (_.isObject(r) && r.key) { - this.recent[r.key] = r; - } - } else { - await bmRecentStoreOld.removeItem(key); - } - } - - //размножение для дебага - /*if (key) { - for (let i = 0; i < 1000; i++) { - const k = this.keyFromUrl(i.toString()); - this.recent[k] = Object.assign({}, _.cloneDeep(this.recent[key]), {key: k, touchTime: Date.now() - 1000000, url: utils.randomHexString(300)}); - } - }*/ - - await bmRecentStoreNew.setItem('recent', this.recent); - this.recentRev = 1; - await bmRecentStoreNew.setItem('rev', this.recentRev); } this.recentChanged = true; @@ -374,7 +332,7 @@ class BookManager { //-- recent -------------------------------------------------------------- async recentSetItem(item = null, skipCheck = false) { const rev = await bmRecentStoreNew.getItem('rev'); - if (rev != this.recentRev && !skipCheck) { + if (rev != this.recentRev && !skipCheck) {//если изменение произошло в другой вкладке барузера const newRecent = await bmRecentStoreNew.getItem('recent'); Object.assign(this.recent, newRecent); this.recentItem = await bmRecentStoreNew.getItem('recent-item'); @@ -455,33 +413,6 @@ class BookManager { return isDel; } - async convertFileToDiskPrefix() { - let isConverted = false; - - const newRecent = {}; - for (let key of Object.keys(this.recent)) { - let newKey = key; - let newUrl = this.recent[key].url; - - if (newKey.indexOf('66696c65') == 0) { - newKey = newKey.replace(/^66696c65/, '6469736b'); - if (newUrl) - newUrl = newUrl.replace(/^file/, 'disk'); - isConverted = true; - } - - newRecent[newKey] = this.recent[key]; - newRecent[newKey].key = newKey; - if (newUrl) - newRecent[newKey].url = newUrl; - } - if (isConverted) { - this.recent = newRecent; - await this.recentSetItem(null, true); - } - return isConverted; - } - mostRecentBook() { if (this.recentLastKey) { return this.recent[this.recentLastKey]; diff --git a/client/components/Reader/versionHistory.js b/client/components/Reader/versionHistory.js index 4c819c6c..9a6ba551 100644 --- a/client/components/Reader/versionHistory.js +++ b/client/components/Reader/versionHistory.js @@ -1,4 +1,15 @@ export const versionHistory = [ +{ + showUntil: '2022-01-10', + header: '0.11.2 (2022-01-11)', + content: +` + +` +}, + { showUntil: '2021-12-02', header: '0.11.1 (2021-12-03)', diff --git a/client/quasar.js b/client/quasar.js index 075c80e9..0994aa2a 100644 --- a/client/quasar.js +++ b/client/quasar.js @@ -86,7 +86,6 @@ const plugins = { import '@quasar/extras/line-awesome/line-awesome.css'; import lineAwesome from 'quasar/icon-set/line-awesome.js' -//const q: {Quasar, QuasarOptions: { config, components, directives, plugins }}; export default { quasar: Quasar, options: { config, components, directives, plugins }, diff --git a/docs/liberama.top/liberama b/docs/liberama.top/liberama index ec288a99..7a4bc2ec 100644 --- a/docs/liberama.top/liberama +++ b/docs/liberama.top/liberama @@ -35,6 +35,7 @@ server { proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; + proxy_read_timeout 600s; } location / { diff --git a/docs/omnireader.ru/omnireader b/docs/omnireader.ru/omnireader index 1939d80f..5ba759bd 100644 --- a/docs/omnireader.ru/omnireader +++ b/docs/omnireader.ru/omnireader @@ -24,6 +24,7 @@ server { proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; + proxy_read_timeout 600s; } location / { diff --git a/package-lock.json b/package-lock.json index 530df793..7865b34a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "got": "^11.8.2", "he": "^1.2.0", "iconv-lite": "^0.6.3", + "jembadb": "^1.3.0", "localforage": "^1.10.0", "lodash": "^4.17.21", "minimist": "^1.2.5", @@ -6377,6 +6378,14 @@ "node": "*" } }, + "node_modules/jembadb": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/jembadb/-/jembadb-1.3.0.tgz", + "integrity": "sha512-zMJ1GyXmqvniWToaZTzc3JPHK+SfvcynFHYsZAx8bJWlgVdQd6cqYpIEXJFP+3OZqxPTzMYG5OBGclxTsoOqtg==", + "engines": { + "node": ">=14.4.0" + } + }, "node_modules/jest-worker": { "version": "27.3.1", "resolved": "https://registry.npmjs.org/jest-worker/-/jest-worker-27.3.1.tgz", @@ -16219,6 +16228,11 @@ "minimatch": "^3.0.4" } }, + "jembadb": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/jembadb/-/jembadb-1.3.0.tgz", + "integrity": "sha512-zMJ1GyXmqvniWToaZTzc3JPHK+SfvcynFHYsZAx8bJWlgVdQd6cqYpIEXJFP+3OZqxPTzMYG5OBGclxTsoOqtg==" + }, "jest-worker": { "version": "27.3.1", "resolved": "https://registry.npmjs.org/jest-worker/-/jest-worker-27.3.1.tgz", diff --git a/package.json b/package.json index 617ab320..dd35260f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "Liberama", - "version": "0.11.1", + "version": "0.11.2", "author": "Book Pauk ", "license": "CC0-1.0", "repository": "bookpauk/liberama", @@ -10,8 +10,8 @@ "scripts": { "dev": "nodemon --inspect --ignore server/public --ignore server/data --ignore client --exec 'node server'", "build:client": "webpack --config build/webpack.prod.config.js", - "build:linux": "npm run build:client && node build/linux && pkg -t node14-linux-x64 -o dist/linux/liberama .", - "build:win": "npm run build:client && node build/win && pkg -t node14-win-x64 -o dist/win/liberama .", + "build:linux": "npm run build:client && node build/linux && pkg -t node14-linux-x64 -C GZip -o dist/linux/liberama .", + "build:win": "npm run build:client && node build/win && pkg -t node14-win-x64 -C GZip -o dist/win/liberama .", "lint": "eslint --ext=.js,.vue client server", "build:client-dev": "webpack --config build/webpack.dev.config.js", "postinstall": "npm run build:client-dev && node build/linux" @@ -60,6 +60,7 @@ "got": "^11.8.2", "he": "^1.2.0", "iconv-lite": "^0.6.3", + "jembadb": "^1.3.0", "localforage": "^1.10.0", "lodash": "^4.17.21", "minimist": "^1.2.5", diff --git a/server/core/AsyncExit.js b/server/core/AsyncExit.js index d2b61594..10921f1b 100644 --- a/server/core/AsyncExit.js +++ b/server/core/AsyncExit.js @@ -1,7 +1,7 @@ let instance = null; const defaultTimeout = 15*1000;//15 sec -const exitSignals = ['SIGINT', 'SIGTERM', 'SIGBREAK', 'SIGHUP', 'uncaughtException']; +const exitSignals = ['SIGINT', 'SIGTERM', 'SIGBREAK', 'SIGHUP', 'uncaughtException', 'SIGUSR2']; //singleton class AsyncExit { @@ -18,13 +18,10 @@ class AsyncExit { return instance; } - init(signals = null, codeOnSignal = 2) { + init(signals = exitSignals, codeOnSignal = 2) { if (this.inited) throw new Error('AsyncExit: initialized already'); - if (!signals) - signals = exitSignals; - const runSingalCallbacks = async(signal) => { for (const signalCallback of this.onSignalCallbacks.keys()) { try { diff --git a/server/db/JembaConnManager.js b/server/db/JembaConnManager.js index d3a2dd57..d91e3d44 100644 --- a/server/db/JembaConnManager.js +++ b/server/db/JembaConnManager.js @@ -2,7 +2,7 @@ const fs = require('fs-extra'); const _ = require('lodash'); const ayncExit = new (require('../core/AsyncExit'))();//singleton -const { JembaDb, JembaDbThread } = require('./JembaDb'); +const { JembaDb, JembaDbThread } = require('jembadb'); const log = new (require('../core/AppLogger'))().log;//singleton const jembaMigrations = require('./jembaMigrations'); @@ -46,7 +46,13 @@ class JembaConnManager { } log(`Open "${dbConfig.dbName}" begin`); - await dbConn.openDb({dbPath, cacheSize: dbConfig.cacheSize, compressed: dbConfig.compressed, forceFileClosing: dbConfig.forceFileClosing}); + await dbConn.openDb({ + dbPath, + create: true, + cacheSize: dbConfig.cacheSize, + compressed: dbConfig.compressed, + forceFileClosing: dbConfig.forceFileClosing + }); if (dbConfig.openAll) { try { diff --git a/server/db/JembaDb/JembaDb.js b/server/db/JembaDb/JembaDb.js deleted file mode 100644 index 004b3236..00000000 --- a/server/db/JembaDb/JembaDb.js +++ /dev/null @@ -1,536 +0,0 @@ -const fs = require('fs').promises; - -const Table = require('./Table'); -const utils = require('./utils'); - -/* API methods: -openDb -closeDb - -create -drop - -open -openAll -close -closeAll - -tableExists -getDbInfo -getDbSize - -select -insert -update -delete - -esc -*/ - -class JembaDb { - constructor() { - this.opened = false; - } - - /* - query = { - dbPath: String, - //table open defaults - inMemory: Boolean, false - cacheSize: Number, 5 - compressed: Number, {0..9}, 0 - recreate: Boolean, false, - autoRepair: Boolean, false, - forceFileClosing: Boolean, false, - lazyOpen: Boolean, false, - } - */ - async openDb(query = {}) { - if (this.opened) - throw new Error(`Database ${this.dbPath} has already been opened`); - - if (!query.dbPath) - throw new Error(`'query.dbPath' parameter is required`); - - this.dbPath = query.dbPath; - await fs.mkdir(this.dbPath, { recursive: true }); - - this.table = new Map(); - this.tableOpenDefaults = { - inMemory: query.inMemory, - cacheSize: query.cacheSize, - compressed: query.compressed, - recreate: query.recreate, - autoRepair: query.autoRepair, - forceFileClosing: query.forceFileClosing, - lazyOpen: query.lazyOpen, - }; - - this.opened = true; - } - - async closeDb() { - if (!this.opened) - return; - - await this.closeAll(); - this.opened = false; - - //console.log('closed'); - } - - checkOpened() { - if (!this.opened) - throw new Error('Database closed'); - } - - /* - query = { - table: 'tableName', - quietIfExists: Boolean, - inMemory: Boolean, false - cacheSize: Number, 5 - compressed: Number, {0..9}, 0 - recreate: Boolean, false, - autoRepair: Boolean, false, - forceFileClosing: Boolean, false, - lazyOpen: Boolean, false, - - in: 'tableName', - flag: Object || Array, {name: 'flag1', check: '(r) => r.id > 10'} - hash: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false} - index: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false} - } - result = {} - */ - async create(query = {}) { - this.checkOpened(); - - if ((!query.table && !query.in) || (query.table && query.in)) - throw new Error(`One of 'query.table' or 'query.in' parameters is required, but not both`); - - let table; - if (query.table) { - if (await this.tableExists({table: query.table})) { - if (!query.quietIfExists) - throw new Error(`Table '${query.table}' already exists`); - - table = this.table.get(query.table); - } else { - table = new Table(); - this.table.set(query.table, table); - - await this.open(query); - } - } else { - if (await this.tableExists({table: query.in})) { - table = this.table.get(query.in); - } else { - throw new Error(`Table '${query.in}' does not exist`); - } - } - - if (query.flag || query.hash || query.index) { - await table.create({ - quietIfExists: query.quietIfExists, - flag: query.flag, - hash: query.hash, - index: query.index, - }); - } - - return {}; - } - - /* - query = { - table: 'tableName', - - in: 'tableName', - flag: Object || Array, {name: 'flag1'} - hash: Object || Array, {field: 'field1'} - index: Object || Array, {field: 'field1'} - } - result = {} - */ - async drop(query = {}) { - this.checkOpened(); - - if ((!query.table && !query.in) || (query.table && query.in)) - throw new Error(`One of 'query.table' or 'query.in' parameters is required, but not both`); - - if (query.table) { - if (await this.tableExists({table: query.table})) { - const table = this.table.get(query.table); - if (table && table.opened) { - await table.close(); - } - - const basePath = `${this.dbPath}/${query.table}`; - await fs.rmdir(basePath, { recursive: true }); - - this.table.delete(query.table); - } else { - throw new Error(`Table '${query.table}' does not exist`); - } - } else { - if (await this.tableExists({table: query.in})) { - const table = this.table.get(query.in); - - if (table) { - if (query.flag || query.hash || query.index) { - await table.drop({ - flag: query.flag, - hash: query.hash, - index: query.index, - }); - } - } else { - throw new Error(`Table '${query.in}' has not been opened yet`); - } - } else { - throw new Error(`Table '${query.in}' does not exist`); - } - } - - return {}; - } - - /* - query = { - (!) table: 'tableName', - inMemory: Boolean, false - cacheSize: Number, 5 - compressed: Number, {0..9}, 0 - recreate: Boolean, false, - autoRepair: Boolean, false, - forceFileClosing: Boolean, false, - lazyOpen: Boolean, false, - } - */ - async open(query = {}) { - this.checkOpened(); - - if (!query.table) - throw new Error(`'query.table' parameter is required`); - - if (await this.tableExists({table: query.table})) { - let table = this.table.get(query.table); - - if (!table) { - table = new Table(); - } - - if (!table.opened) { - const opts = Object.assign({}, this.tableOpenDefaults, query); - opts.tablePath = `${this.dbPath}/${query.table}`; - await table.open(opts); - } - - this.table.set(query.table, table); - } else { - throw new Error(`Table '${query.table}' does not exist`); - } - } - - - async _getTableList() { - const result = []; - const files = await fs.readdir(this.dbPath, { withFileTypes: true }); - - for (const file of files) { - if (file.isDirectory()) { - if (file.name.indexOf('___temporary_recreating') >= 0) - continue; - result.push(file.name); - } - } - - return result; - } - - /* - query = { - inMemory: Boolean, false - cacheSize: Number, 5 - compressed: Number, {0..9}, 0 - recreate: Boolean, false, - autoRepair: Boolean, false, - forceFileClosing: Boolean, false, - lazyOpen: Boolean, false, - } - */ - async openAll(query = {}) { - this.checkOpened(); - - const tables = await this._getTableList(); - - //sequentially - for (const table of tables) { - this.checkOpened(); - await this.open(Object.assign({}, query, {table})); - } - - /*const promises = []; - for (const table of tables) { - promises.push(this.open(Object.assign({}, query, {table}))); - } - await Promise.all(promises);*/ - } - - /* - query = { - (!) table: 'tableName', - } - */ - async close(query = {}) { - this.checkOpened(); - - if (!query.table) - throw new Error(`'query.table' parameter is required`); - - if (await this.tableExists({table: query.table})) { - let table = this.table.get(query.table); - - if (table) { - await table.close(); - } - - this.table.delete(query.table); - } else { - throw new Error(`Table '${query.table}' does not exist`); - } - } - - async closeAll() { - this.checkOpened(); - - const promises = []; - for (const table of this.table.keys()) { - promises.push(this.close({table})); - } - await Promise.all(promises); - } - - /* - query = { - (!) table: 'tableName' - }, - result = Boolean - */ - async tableExists(query = {}) { - this.checkOpened(); - - if (!query.table) - throw new Error(`'query.table' parameter is required`); - - if (this.table.has(query.table)) - return true; - - if (await utils.pathExists(`${this.dbPath}/${query.table}`)) - return true; - - return false; - } - - /* - query = { - table: 'tableName' - }, - result = { - dbPath: String, - tableName1: {opened: Boolean, ...}, - tableName2: {opened: Boolean, ...}, - ... - } - */ - async getDbInfo(query = {}) { - this.checkOpened(); - - const tables = await this._getTableList(); - - const result = {dbPath: this.dbPath}; - for (const table of tables) { - if (!query.table || (query.table && table == query.table)) { - const tableInstance = this.table.get(table); - if (tableInstance && tableInstance.opened) { - result[table] = await tableInstance.getMeta(); - result[table].opened = true; - } else { - result[table] = {opened: false}; - } - } - } - return result; - } - - /* - result = { - total: Number, - tables: { - tableName1: Number, - tableName2: Number, - ... - } - } - */ - async getDbSize() { - this.checkOpened(); - - const dirs = await fs.readdir(this.dbPath, { withFileTypes: true }); - - const result = {total: 0, tables: {}}; - for (const dir of dirs) { - if (dir.isDirectory()) { - const table = dir.name; - const tablePath = `${this.dbPath}/${table}`; - const files = await fs.readdir(tablePath, { withFileTypes: true }); - - if (!result.tables[table]) - result.tables[table] = 0; - - for (const file of files) { - if (file.isFile()) { - let size = 0; - try { - size = (await fs.stat(`${tablePath}/${file.name}`)).size; - } catch(e) { - // - } - result.tables[table] += size; - result.total += size; - } - } - } - } - - return result; - } - - /* - query = { - (!) table: 'tableName', - distinct: 'fieldName' || Array, - count: Boolean, - map: '(r) => ({id1: r.id, ...})', - where: `@@index('field1', 10, 20)`, - sort: '(a, b) => a.id - b.id', - limit: 10, - offset: 10, - } - result = Array - */ - async select(query = {}) { - this.checkOpened(); - - if (!query.table) - throw new Error(`'query.table' parameter is required`); - - const table = this.table.get(query.table); - if (table) { - return await table.select(query); - } else { - if (await this.tableExists({table: query.table})) { - throw new Error(`Table '${query.table}' has not been opened yet`); - } else { - throw new Error(`Table '${query.table}' does not exist`); - } - } - } - - /* - query = { - (!) table: 'tableName', - replace: Boolean, - (!) rows: Array, - } - result = { - (!) inserted: Number, - (!) replaced: Number, - } - */ - async insert(query = {}) { - this.checkOpened(); - - if (!query.table) - throw new Error(`'query.table' parameter is required`); - - const table = this.table.get(query.table); - if (table) { - return await table.insert(query); - } else { - if (await this.tableExists({table: query.table})) { - throw new Error(`Table '${query.table}' has not been opened yet`); - } else { - throw new Error(`Table '${query.table}' does not exist`); - } - } - } - - /* - query = { - (!) table: 'tableName', - (!) mod: '(r) => r.count++', - where: `@@index('field1', 10, 20)`, - sort: '(a, b) => a.id - b.id', - limit: 10, - offset: 10, - } - result = { - (!) updated: Number, - } - */ - async update(query = {}) { - this.checkOpened(); - - if (!query.table) - throw new Error(`'query.table' parameter is required`); - - const table = this.table.get(query.table); - if (table) { - return await table.update(query); - } else { - if (await this.tableExists({table: query.table})) { - throw new Error(`Table '${query.table}' has not been opened yet`); - } else { - throw new Error(`Table '${query.table}' does not exist`); - } - } - } - - /* - query = { - (!) table: 'tableName', - where: `@@index('field1', 10, 20)`, - sort: '(a, b) => a.id - b.id', - limit: 10, - offset: 10, - } - result = { - (!) deleted: Number, - } - */ - async delete(query = {}) { - this.checkOpened(); - - if (!query.table) - throw new Error(`'query.table' parameter is required`); - - const table = this.table.get(query.table); - if (table) { - return await table.delete(query); - } else { - if (await this.tableExists({table: query.table})) { - throw new Error(`Table '${query.table}' has not been opened yet`); - } else { - throw new Error(`Table '${query.table}' does not exist`); - } - } - } - - esc(obj) { - return utils.esc(obj); - } -} - -module.exports = JembaDb; \ No newline at end of file diff --git a/server/db/JembaDb/JembaDbChild.js b/server/db/JembaDb/JembaDbChild.js deleted file mode 100644 index b97ed98b..00000000 --- a/server/db/JembaDb/JembaDbChild.js +++ /dev/null @@ -1,49 +0,0 @@ -const { parentPort } = require('worker_threads'); - -const JembaDb = require('./JembaDb'); - -const db = new JembaDb(); - -if (parentPort) { - parentPort.on('message', async(mes) => { - let result = {}; - try { - if (db[mes.action]) - result.result = await db[mes.action](mes.query); - else - result = {error: 'Action not found: ' + mes.action}; - } catch (e) { - result = {error: e.message}; - } - - result.requestId = mes.requestId; - parentPort.postMessage(result); - }); -} - -//This is for proper working of pkg (by zeit) and worker_threads -//just a copy of the above code as a string -module.exports = ` -const { parentPort } = require('worker_threads'); - -const JembaDb = require('./JembaDb'); - -const db = new JembaDb(); - -if (parentPort) { - parentPort.on('message', async(mes) => { - let result = {}; - try { - if (db[mes.action]) - result.result = await db[mes.action](mes.query); - else - result = {error: 'Action not found: ' + mes.action}; - } catch (e) { - result = {error: e.message}; - } - - result.requestId = mes.requestId; - parentPort.postMessage(result); - }); -} -`.replace('./JembaDb', `${__dirname.replace(/\\/g, '/')}/JembaDb`); \ No newline at end of file diff --git a/server/db/JembaDb/JembaDbThread.js b/server/db/JembaDb/JembaDbThread.js deleted file mode 100644 index df3a9b7e..00000000 --- a/server/db/JembaDb/JembaDbThread.js +++ /dev/null @@ -1,119 +0,0 @@ -const { Worker } = require('worker_threads'); -const utils = require('./utils'); -const JembaDbChild = require('./JembaDbChild'); -/* API methods: -openDb -closeDb - -create -drop - -open -openAll -close -closeAll - -tableExists -getInfo -getDbSize - -select -insert -update -delete - -esc -*/ - -class JembaDbThread { - constructor() { - this.worker = null; - this.listeners = new Map(); - this.requestId = 0; - - const apiMethods = [ - 'create', 'drop', 'open', 'openAll', 'close', 'closeAll', - 'tableExists', 'getDbInfo', 'getDbSize', 'select', 'insert', 'update', 'delete', 'dumpTables' - ]; - - for (const action of apiMethods) { - this[action] = async(query) => this._action(action, query); - } - } - - _terminate() { - if (this.worker) { - for (const listener of this.listeners.values()) { - listener({error: 'Worker terminated'}); - } - this.worker.terminate(); - } - this.worker = null; - } - - _runWoker() { - //const worker = new Worker(`${__dirname}/JembaDbChild.js`); - const worker = new Worker(JembaDbChild, {eval: true}); - - worker.on('message', (mes) => { - const listener = this.listeners.get(mes.requestId); - if (listener) - listener(mes); - }); - - worker.on('error', (err) => { - console.error(err); - }); - - worker.on('exit', () => { - this._terminate(); - }); - - this.worker = worker; - } - - _action(action, query) { - return new Promise((resolve, reject) => { - this.requestId++; - - const requestId = this.requestId; //!!! - this.listeners.set(requestId, (mes) => { - this.listeners.delete(requestId); - - if (mes.error) - reject(new Error(mes.error)); - else - resolve(mes.result); - }); - - if (this.worker) { - this.worker.postMessage({requestId: this.requestId, action, query}); - } else { - reject(new Error('Worker does not exist (database closed?)')); - } - }); - } - - async openDb(query = {}) { - if (!this.worker) { - this._runWoker(); - } else { - throw new Error('Worker has been created already'); - } - - return this._action('openDb', query); - } - - async closeDb() { - const result = await this._action('closeDb'); - this._terminate(); - //console.log('DB closed'); - return result; - } - - esc(obj) { - return utils.esc(obj); - } -} - -module.exports = JembaDbThread; \ No newline at end of file diff --git a/server/db/JembaDb/LockQueue.js b/server/db/JembaDb/LockQueue.js deleted file mode 100644 index 9ad6ff65..00000000 --- a/server/db/JembaDb/LockQueue.js +++ /dev/null @@ -1,38 +0,0 @@ -class LockQueue { - constructor(queueSize) { - this.queueSize = queueSize; - this.freed = true; - this.waitingQueue = []; - } - - ret() { - this.freed = true; - if (this.waitingQueue.length) { - this.waitingQueue.shift().onFreed(); - } - } - - get(take = true) { - return new Promise((resolve) => { - if (this.freed) { - if (take) - this.freed = false; - resolve(); - return; - } - - if (this.waitingQueue.length >= this.queueSize) - throw new Error('Lock queue is too long'); - - this.waitingQueue.push({ - onFreed: () => { - if (take) - this.freed = false; - resolve(); - }, - }); - }); - } -} - -module.exports = LockQueue; \ No newline at end of file diff --git a/server/db/JembaDb/Table.js b/server/db/JembaDb/Table.js deleted file mode 100644 index 599670c4..00000000 --- a/server/db/JembaDb/Table.js +++ /dev/null @@ -1,852 +0,0 @@ -const fs = require('fs').promises; -const utils = require('./utils'); - -const TableReducer = require('./TableReducer'); -const TableRowsMem = require('./TableRowsMem'); -const TableRowsFile = require('./TableRowsFile'); -const LockQueue = require('./LockQueue'); - -const maxChangesLength = 10; - -class Table { - constructor() { - this.rowsInterface = new TableRowsMem(); - - this.autoIncrement = 0; - this.fileError = ''; - - this.openingLock = new LockQueue(100); - this.lock = new LockQueue(100); - - this.opened = false; - this.closed = false; - this.deltaStep = 0; - this.changes = []; - - //table options defaults - this.inMemory = false; - this.compressed = 0; - this.cacheSize = 5; - this.compressed = 0; - this.recreate = false; - this.autoRepair = false; - this.forceFileClosing = false; - } - - checkErrors() { - if (this.fileError) - throw new Error(this.fileError); - - if (this.closed) - throw new Error('Table closed'); - - if (!this.opened) - throw new Error('Table has not been opened yet'); - } - - async waitForSaveChanges() { - if (this.changes.length > maxChangesLength) { - let i = this.changes.length - maxChangesLength; - while (i > 0 && this.changes.length > maxChangesLength) { - i--; - await utils.sleep(10); - } - } - } - - async recreateTable() { - const tempTablePath = `${this.tablePath}___temporary_recreating`; - await fs.rmdir(tempTablePath, { recursive: true }); - await fs.mkdir(tempTablePath, { recursive: true }); - - const tableRowsFileSrc = new TableRowsFile(this.tablePath, this.cacheSize); - - const tableRowsFileDest = new TableRowsFile(tempTablePath, this.cacheSize, this.compressed); - const reducerDest = new TableReducer(false, tempTablePath, this.compressed, tableRowsFileDest); - - try { - await tableRowsFileSrc.loadCorrupted(); - } catch (e) { - console.error(e); - } - try { - await reducerDest._load(true, `${this.tablePath}/meta.0`); - } catch (e) { - console.error(e); - } - - const putRows = async(rows) => { - const oldRows = []; - const newRows = []; - const newRowsStr = []; - //checks - for (const row of rows) { - if (!row) { - continue; - } - - const t = typeof(row.id); - if (t !== 'number' && t !== 'string') { - continue; - } - - const oldRow = await tableRowsFileDest.getRow(row.id); - - if (oldRow) { - continue; - } - - let str = ''; - try { - str = JSON.stringify(row);//because of stringify errors - } catch(e) { - continue; - } - - newRows.push(row); - oldRows.push({}); - newRowsStr.push(str); - } - - try { - //reducer - reducerDest._update(oldRows, newRows, 1); - - //insert - for (let i = 0; i < newRows.length; i++) { - const newRow = newRows[i]; - const newRowStr = newRowsStr[i]; - - tableRowsFileDest.setRow(newRow.id, newRow, newRowStr, 1); - } - - await tableRowsFileDest.saveDelta(1); - await reducerDest._saveDelta(1); - } catch(e) { - console.error(e); - } - }; - - let rows = []; - for (const id of tableRowsFileSrc.getAllIds()) { - if (this.closed) - throw new Error('Table closed'); - - let row = null; - try { - row = await tableRowsFileSrc.getRow(id); - } catch(e) { - console.error(e); - continue; - } - - rows.push(row); - if (rows.length > 1000) { - await putRows(rows); - rows = []; - } - } - if (rows.length) - await putRows(rows); - - await tableRowsFileDest.saveDelta(0); - - const delta = reducerDest._getDelta(0); - delta.dumpMeta = true; - await reducerDest._saveDelta(0); - - await tableRowsFileSrc.destroy(); - await reducerDest._destroy(); - await tableRowsFileDest.destroy(); - - await fs.writeFile(`${tempTablePath}/state`, '1'); - - await fs.rmdir(this.tablePath, { recursive: true }); - await fs.rename(tempTablePath, this.tablePath); - } - - /* - query: { - tablePath: String, - inMemory: Boolean, - cacheSize: Number, - compressed: Number, 0..9 - recreate: Boolean, false, - autoRepair: Boolean, false, - forceFileClosing: Boolean, false, - lazyOpen: Boolean, false, - } - */ - async _open(query = {}) { - if (this.opening) - return; - this.opening = true; - await this.openingLock.get(); - //console.log(query); - try { - if (this.opened) - throw new Error('Table has already been opened'); - if (this.closed) - throw new Error('Table instance has been destroyed. Please create a new one.'); - - this.inMemory = !!query.inMemory; - - if (this.inMemory) { - this.reducer = new TableReducer(this.inMemory, '', 0, this.rowsInterface); - } else { - if (!query.tablePath) - throw new Error(`'query.tablePath' parameter is required`); - - this.tablePath = query.tablePath; - this.cacheSize = query.cacheSize || 5; - this.compressed = query.compressed || 0; - this.recreate = query.recreate || false; - this.autoRepair = query.autoRepair || false; - this.forceFileClosing = query.forceFileClosing || false; - - await fs.mkdir(this.tablePath, { recursive: true }); - - this.tableRowsFile = new TableRowsFile(query.tablePath, this.cacheSize, this.compressed); - this.rowsInterface = this.tableRowsFile; - - this.reducer = new TableReducer(this.inMemory, this.tablePath, this.compressed, this.rowsInterface); - - const statePath = `${this.tablePath}/state`; - let state = null; - if (await utils.pathExists(statePath)) { - state = await fs.readFile(statePath, 'utf8'); - } - - if (state === null) {//check if other files exists - const files = await fs.readdir(this.tablePath); - if (files.length) - state = '0'; - } - - if (this.recreate) { - await this.recreateTable(); - state = '1'; - } - - if (state !== null) { - try { - if (state === '1') { - // load tableRowsFile & reducer - this.autoIncrement = await this.tableRowsFile.load(); - await this.reducer._load(); - } else { - throw new Error('Table corrupted') - } - } catch(e) { - if (this.autoRepair) { - console.error(e.message); - await this.recreateTable(); - } else { - throw e; - } - // load tableRowsFile & reducer - this.autoIncrement = await this.tableRowsFile.load(); - await this.reducer._load(); - } - } - } - - this.opened = true; - } catch(e) { - await this.close(); - const errMes = `Open table (${query.tablePath}): ${e.message}`; - if (!query.lazyOpen) - throw new Error(errMes); - else - this.fileError = errMes; - } finally { - this.openingLock.ret(); - this.opening = false; - } - } - - async open(query = {}) { - if (query.lazyOpen) { - this._open(query); - } else { - await this._open(query); - } - } - - async close() { - if (this.closed) - return; - - this.opened = false; - this.closed = true; - - if (!this.inMemory) { - while (this.savingChanges) { - await utils.sleep(10); - } - } - - //for GC - if (this.reducer) - await this.reducer._destroy(); - this.reducer = null; - - if (this.rowsInterface) - await this.rowsInterface.destroy(); - this.rowsInterface = null; - this.tableRowsFile = null; - } - - /* - query = { - quietIfExists: Boolean, - flag: Object || Array, {name: 'flag1', check: '(r) => r.id > 10'} - hash: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false} - index: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false} - } - result = {} - */ - async create(query) { - await this.openingLock.get(false); - this.checkErrors(); - - await this.lock.get(); - try { - this.deltaStep++; - try { - if (query.flag) { - for (const flag of utils.paramToArray(query.flag)) { - await this.reducer._addFlag(flag, query.quietIfExists, this.deltaStep); - } - } - - if (query.hash) { - for (const hash of utils.paramToArray(query.hash)) { - await this.reducer._addHash(hash, query.quietIfExists, this.deltaStep); - } - } - - if (query.index) { - for (const index of utils.paramToArray(query.index)) { - await this.reducer._addIndex(index, query.quietIfExists, this.deltaStep); - } - } - - this.changes.push([this.deltaStep, 1]); - } catch(e) { - this.changes.push([this.deltaStep, 0]); - throw e; - } - - return {}; - } finally { - this.saveChanges();//no await - this.lock.ret(); - } - } - - /* - query = { - flag: Object || Array, {name: 'flag1'} - hash: Object || Array, {field: 'field1'} - index: Object || Array, {field: 'field1'} - } - result = {} - */ - async drop(query) { - await this.openingLock.get(false); - this.checkErrors(); - - await this.lock.get(); - try { - this.deltaStep++; - try { - if (query.flag) { - for (const flag of utils.paramToArray(query.flag)) { - await this.reducer._delFlag(flag.name, this.deltaStep); - } - } - - if (query.hash) { - for (const hash of utils.paramToArray(query.hash)) { - await this.reducer._delHash(hash.field, this.deltaStep); - } - } - - if (query.index) { - for (const index of utils.paramToArray(query.index)) { - await this.reducer._delIndex(index.field, this.deltaStep); - } - } - - this.changes.push([this.deltaStep, 1]); - } catch(e) { - this.changes.push([this.deltaStep, 0]); - throw e; - } - - return {}; - } finally { - this.saveChanges();//no await - this.lock.ret(); - } - } - - /* - result = { - inMemory: Boolean, - flag: Array, [{name: 'flag1', check: '(r) => r.id > 10'}, ...] - hash: Array, [{field: 'field1', type: 'string', depth: 11, allowUndef: false}, ...] - index: Array, [{field: 'field1', type: 'string', depth: 11, allowUndef: false}, ...] - } - */ - async getMeta() { - this.checkErrors(); - - return { - inMemory: this.inMemory, - flag: this.reducer._listFlag(), - hash: this.reducer._listHash(), - index: this.reducer._listIndex(), - }; - } - - prepareWhere(where) { - if (typeof(where) !== 'string') - throw new Error('query.where must be a string'); - - return `async(__tr) => {${where.replace(/@@/g, 'return await __tr.').replace(/@/g, 'await __tr.')}}`; - } - - /* - query = { - distinct: 'fieldName' || Array, - count: Boolean, - map: '(r) => ({id1: r.id, ...})', - where: `@@index('field1', 10, 20)`, - sort: '(a, b) => a.id - b.id', - limit: 10, - offset: 10, - } - result = Array - */ - async select(query = {}) { - await this.openingLock.get(false); - this.checkErrors(); - - let ids;//iterator - if (query.where) { - const where = this.prepareWhere(query.where); - const whereFunc = new Function(`return ${where}`)(); - - ids = await whereFunc(this.reducer); - } else { - ids = this.rowsInterface.getAllIds(); - } - - let found = []; - - let distinct = () => true; - if (query.distinct) { - const distFields = (Array.isArray(query.distinct) ? query.distinct : [query.distinct]); - const dist = new Map(); - distinct = (row) => { - let uniq = ''; - for (const field of distFields) { - const value = row[field]; - uniq += `${(value === undefined ? '___' : '')}${field}:${value}`; - } - - if (dist.has(uniq)) - return false; - dist.set(uniq, true); - return true; - }; - } - - if (!query.where && !query.distinct && query.count) {//some optimization - found = [{count: this.rowsInterface.getAllIdsSize()}]; - } else {//full running - for (const id of ids) { - const row = await this.rowsInterface.getRow(id); - - if (row && distinct(row)) { - found.push(row); - } - } - - if (query.count) { - found = [{count: found.length}]; - } - } - - let result = []; - if (query.map) { - const mapFunc = new Function(`return ${query.map}`)(); - - for (const row of found) { - result.push(mapFunc(row)); - } - } else { - result = found; - } - - if (query.sort) { - const sortFunc = new Function(`return ${query.sort}`)(); - result.sort(sortFunc); - } - - if (query.hasOwnProperty('limit') || query.hasOwnProperty('offset')) { - const offset = query.offset || 0; - const limit = (query.hasOwnProperty('limit') ? query.limit : result.length); - result = result.slice(offset, offset + limit); - } - - return utils.cloneDeep(result); - } - - /* - query = { - replace: Boolean, - (!) rows: Array, - } - result = { - (!) inserted: Number, - (!) replaced: Number, - } - */ - async insert(query = {}) { - await this.openingLock.get(false); - this.checkErrors(); - - await this.lock.get(); - try { - if (!Array.isArray(query.rows)) { - throw new Error('query.rows must be an array'); - } - - const newRows = utils.cloneDeep(query.rows); - const replace = query.replace; - - //autoIncrement correction - for (const newRow of newRows) { - if (typeof(newRow.id) === 'number' && newRow.id >= this.autoIncrement) - this.autoIncrement = newRow.id + 1; - } - - const oldRows = []; - const newRowsStr = []; - //checks - for (const newRow of newRows) { - if (newRow.hasOwnProperty('___meta')) - throw new Error(`Use of field with name '___meta' is forbidden`); - - if (newRow.id === undefined) { - newRow.id = this.autoIncrement; - this.autoIncrement++; - } - - const t = typeof(newRow.id); - if (t !== 'number' && t !== 'string') { - throw new Error(`Row id bad type, 'number' or 'string' expected, got ${t}`); - } - - const oldRow = await this.rowsInterface.getRow(newRow.id); - - if (!replace && oldRow) { - throw new Error(`Record id:${newRow.id} already exists`); - } - - oldRows.push((oldRow ? oldRow : {})); - newRowsStr.push(JSON.stringify(newRow));//because of stringify errors - } - - const result = {inserted: 0, replaced: 0}; - this.deltaStep++; - try { - //reducer - this.reducer._update(oldRows, newRows, this.deltaStep); - - //insert - for (let i = 0; i < newRows.length; i++) { - const newRow = newRows[i]; - const newRowStr = newRowsStr[i]; - const oldRow = oldRows[i]; - - this.rowsInterface.setRow(newRow.id, newRow, newRowStr, this.deltaStep); - - if (oldRow.id !== undefined) - result.replaced++; - else - result.inserted++; - } - - this.changes.push([this.deltaStep, 1]); - } catch(e) { - this.changes.push([this.deltaStep, 0]); - throw e; - } - - await this.waitForSaveChanges(); - return result; - } finally { - this.saveChanges();//no await - this.lock.ret(); - } - } - - /* - query = { - (!) mod: '(r) => r.count++', - where: `@@index('field1', 10, 20)`, - sort: '(a, b) => a.id - b.id', - limit: 10, - offset: 10, - } - result = { - (!) updated: Number, - } - */ - async update(query = {}) { - await this.openingLock.get(false); - this.checkErrors(); - - await this.lock.get(); - try { - if (typeof(query.mod) !== 'string') { - throw new Error('query.mod must be a string'); - } - const modFunc = new Function(`return ${query.mod}`)(); - - //where - let ids;//iterator - if (query.where) { - const where = this.prepareWhere(query.where); - const whereFunc = new Function(`return ${where}`)(); - - ids = await whereFunc(this.reducer); - } else { - ids = this.rowsInterface.getAllIds(); - } - - //oldRows - let oldRows = []; - for (const id of ids) { - const oldRow = await this.rowsInterface.getRow(id); - - if (oldRow) { - oldRows.push(oldRow); - } - } - - if (query.sort) { - const sortFunc = new Function(`return ${query.sort}`)(); - oldRows.sort(sortFunc); - } - let newRows = utils.cloneDeep(oldRows); - - if (query.hasOwnProperty('limit') || query.hasOwnProperty('offset')) { - const offset = query.offset || 0; - const limit = (query.hasOwnProperty('limit') ? query.limit : newRows.length); - newRows = newRows.slice(offset, offset + limit); - oldRows = oldRows.slice(offset, offset + limit); - } - - //mod & checks - const context = {}; - const newRowsStr = []; - for (const newRow of newRows) { - modFunc(newRow, context); - - const t = typeof(newRow.id); - if (t !== 'number' && t !== 'string') { - throw new Error(`Row id bad type, 'number' or 'string' expected, got ${t}`); - } - - //autoIncrement correction - if (t === 'number' && newRow.id >= this.autoIncrement) - this.autoIncrement = newRow.id + 1; - - if (newRow.hasOwnProperty('___meta')) - throw new Error(`Use of field with name '___meta' is forbidden`); - - newRowsStr.push(JSON.stringify(newRow));//because of stringify errors - } - - this.deltaStep++; - const result = {updated: 0}; - try { - //reducer - this.reducer._update(oldRows, newRows, this.deltaStep); - - //replace - for (let i = 0; i < newRows.length; i++) { - const newRow = newRows[i]; - const newRowStr = newRowsStr[i]; - - // oldRow.id === newRow.id always here, so - this.rowsInterface.setRow(newRow.id, newRow, newRowStr, this.deltaStep); - - result.updated++; - } - - this.changes.push([this.deltaStep, 1]); - } catch(e) { - this.changes.push([this.deltaStep, 0]); - throw e; - } - - await this.waitForSaveChanges(); - return result; - } finally { - this.saveChanges();//no await - this.lock.ret(); - } - } - - /* - query = { - where: `@@index('field1', 10, 20)`, - sort: '(a, b) => a.id - b.id', - limit: 10, - offset: 10, - } - result = { - (!) deleted: Number, - } - */ - async delete(query = {}) { - await this.openingLock.get(false); - this.checkErrors(); - - await this.lock.get(); - try { - //where - let ids;//iterator - if (query.where) { - const where = this.prepareWhere(query.where); - const whereFunc = new Function(`return ${where}`)(); - - ids = await whereFunc(this.reducer); - } else { - ids = this.rowsInterface.getAllIds(); - } - - //oldRows - let oldRows = []; - let newRows = []; - for (const id of ids) { - const oldRow = await this.rowsInterface.getRow(id); - - if (oldRow) { - oldRows.push(oldRow); - newRows.push({}); - } - } - - if (query.sort) { - const sortFunc = new Function(`return ${query.sort}`)(); - oldRows.sort(sortFunc); - } - - if (query.hasOwnProperty('limit') || query.hasOwnProperty('offset')) { - const offset = query.offset || 0; - const limit = (query.hasOwnProperty('limit') ? query.limit : newRows.length); - newRows = newRows.slice(offset, offset + limit); - oldRows = oldRows.slice(offset, offset + limit); - } - - this.deltaStep++; - const result = {deleted: 0}; - try { - //reducer - this.reducer._update(oldRows, newRows, this.deltaStep); - - //delete - for (let i = 0; i < oldRows.length; i++) { - const oldRow = oldRows[i]; - - this.rowsInterface.deleteRow(oldRow.id, this.deltaStep); - - result.deleted++; - } - - this.changes.push([this.deltaStep, 1]); - } catch(e) { - this.changes.push([this.deltaStep, 0]); - throw e; - } - - await this.waitForSaveChanges(); - return result; - } finally { - this.saveChanges();//no await - this.lock.ret(); - } - } - - async saveState(state) { - await fs.writeFile(`${this.tablePath}/state`, state); - } - - async saveChanges() { - this.needSaveChanges = true; - if (this.savingChanges) - return; - - if (this.inMemory) { - this.changes = []; - return; - } - - try { - this.checkErrors(); - } catch(e) { - return; - } - - this.savingChanges = true; - try { - await utils.sleep(0); - - while (this.needSaveChanges) { - this.needSaveChanges = false; - - await this.saveState('0'); - while (this.changes.length) { - - const len = this.changes.length; - let i = 0; - while (i < len) { - const [deltaStep, isOk] = this.changes[i]; - i++; - - if (isOk) { - await this.tableRowsFile.saveDelta(deltaStep); - await this.reducer._saveDelta(deltaStep); - } else { - await this.tableRowsFile.cancelDelta(deltaStep); - await this.reducer._cancelDelta(deltaStep); - } - } - - this.changes = this.changes.slice(i); - } - await this.saveState('1'); - - if (this.forceFileClosing) { - await this.tableRowsFile.closeAllFiles(); - await this.reducer._closeAllFiles(); - } - } - } catch(e) { - console.error(e.message); - this.fileError = e.message; - } finally { - this.savingChanges = false; - } - } - -} - -module.exports = Table; \ No newline at end of file diff --git a/server/db/JembaDb/TableFlag.js b/server/db/JembaDb/TableFlag.js deleted file mode 100644 index bc62058f..00000000 --- a/server/db/JembaDb/TableFlag.js +++ /dev/null @@ -1,22 +0,0 @@ -class TableFlag { - constructor(checkCode) { - this.checkCode = checkCode; - this.checkFunc = eval(checkCode); - - this.flag = new Set(); - } - - add(row) { - if (this.checkFunc(row)) { - this.flag.add(row.id); - return true; - } - return false; - } - - del(row) { - this.flag.delete(row.id); - } -} - -module.exports = TableFlag; \ No newline at end of file diff --git a/server/db/JembaDb/TableHash.js b/server/db/JembaDb/TableHash.js deleted file mode 100644 index e527953c..00000000 --- a/server/db/JembaDb/TableHash.js +++ /dev/null @@ -1,172 +0,0 @@ -class TableHash { - //opts.type = 'string' || 'number' || 'number_as_string' - constructor(opts = {}) { - const type = opts.type || 'string'; - this.depth = opts.depth || 11; - this.allowUndef = opts.allowUndef || false; - this.unique = opts.unique || false; - - this.hash = new Map(); - - this.isNumber = (type === 'number' || type === 'number_as_string'); - this.numberAsString = (type === 'number_as_string'); - this.valueAsString = !this.isNumber || this.numberAsString; - } - - checkType(v) { - if (typeof(v) != 'number' && this.isNumber) - throw new Error(`Hashed value must be a number, got type:${typeof(v)}, value:${v}`); - - if (typeof(v) != 'string' && !this.isNumber) - throw new Error(`Hashed value must be a string, got type:${typeof(v)}, value:${v}`); - } - - prepareValue(v) { - let result = v; - if (this.numberAsString) { - result = v.toString().padStart(this.depth, '0'); - } - if (this.valueAsString && result.length > this.depth) - result = result.substring(0, this.depth); - return result; - } - - add(value, id) { - if (value === undefined && this.allowUndef) - return; - - this.checkType(value); - - value = this.prepareValue(value); - if (this.hash.has(value)) { - if (this.unique) { - const id_ = this.hash.get(value); - if (id_ !== id) { - throw new Error(`Collision for unique hash detected: value:${value}, id1:${id_}, id2:${id}`); - } - } else { - const ids = this.hash.get(value); - ids.add(id); - } - } else { - if (this.unique) { - this.hash.set(value, id); - } else { - const ids = new Set(); - this.hash.set(value, ids); - ids.add(id); - } - } - - return value; - } - - del(value, id) { - if (value === undefined && this.allowUndef) - return; - - this.checkType(value); - - value = this.prepareValue(value); - if (this.hash.has(value)) { - if (this.unique) { - const id_ = this.hash.get(value); - if (id_ === id) - this.hash.delete(value); - } else { - const ids = this.hash.get(value); - - ids.delete(id); - - if (!ids.size) { - this.hash.delete(value); - } - } - } - - return value; - } - - reduce(value) { - this.checkType(value); - - value = this.prepareValue(value); - let result; - if (this.hash.has(value)) { - if (this.unique) { - result = new Set(); - result.add(this.hash.get(value)); - } else { - result = this.hash.get(value); - } - } else { - result = new Set(); - } - - return result; - } - - min() { - let result = new Set(); - - let min = null; - let id = null; - for (const value of this.hash.keys()) { - if (value < min || min === null) { - min = value; - id = this.hash.get(min); - } - } - - if (id !== null) { - if (this.unique) - result.add(id); - else - result = id; - } - - return result; - } - - max() { - let result = new Set(); - - let max = null; - let id = null; - for (const value of this.hash.keys()) { - if (value > max || max === null) { - max = value; - id = this.hash.get(max); - } - } - - if (id !== null) { - if (this.unique) - result.add(id); - else - result = id; - } - - return result; - } - - iter(checkFunc) { - const result = new Set(); - for (const [value, ids] of this.hash.entries()) { - const checkResult = checkFunc(value); - if (checkResult === undefined) - break; - if (checkResult) { - if (this.unique) { - result.add(ids); - } else { - for (const id of ids) - result.add(id); - } - } - } - return result; - } -} - -module.exports = TableHash; \ No newline at end of file diff --git a/server/db/JembaDb/TableIndex.js b/server/db/JembaDb/TableIndex.js deleted file mode 100644 index 7419312e..00000000 --- a/server/db/JembaDb/TableIndex.js +++ /dev/null @@ -1,311 +0,0 @@ -const utils = require('./utils'); - -class TableIndex { - //opts.type = 'string' || 'number' || 'number_as_string' - constructor(opts = {}) { - const type = opts.type || 'string'; - this.depth = opts.depth || 11; - this.allowUndef = opts.allowUndef || false; - this.unique = opts.unique || false; - - this.hash = new Map(); - this.sorted = [[]]; - this.delCount = 0; - - this.isNumber = (type === 'number' || type === 'number_as_string'); - this.numberAsString = (type === 'number_as_string'); - this.valueAsString = !this.isNumber || this.numberAsString; - - this.cmp = (a, b) => a.localeCompare(b); - if (type === 'number') { - this.cmp = (a, b) => a - b; - } else if (type === 'number_as_string') { - this.cmp = (a, b) => (a < b ? -1 : (a > b ? 1 : 0)); - } - } - - checkType(v) { - if (typeof(v) != 'number' && this.isNumber) - throw new Error(`Indexed value must be a number, got type:${typeof(v)}, value:${v}`); - - if (typeof(v) != 'string' && !this.isNumber) - throw new Error(`Indexed value must be a string, got type:${typeof(v)}, value:${v}`); - } - - prepareValue(v) { - let result = v; - if (this.numberAsString) { - result = v.toString().padStart(this.depth, '0'); - } - if (this.valueAsString && result.length > this.depth) - result = result.substring(0, this.depth); - return result; - } - - add(value, id) { - if (value === undefined && this.allowUndef) - return; - - this.checkType(value); - - value = this.prepareValue(value); - if (this.hash.has(value)) { - if (this.unique) { - const id_ = this.hash.get(value); - if (id_ !== id) { - throw new Error(`Collision for unique index detected: value:${value}, id1:${id_}, id2:${id}`); - } - } else { - const ids = this.hash.get(value); - ids.add(id); - } - } else { - if (this.unique) { - this.hash.set(value, id); - } else { - const ids = new Set(); - this.hash.set(value, ids); - ids.add(id); - } - - let s = this.sorted.length - 1; - const d = this.sorted[s]; - d.push(value); - - let i = d.length - 1; - //вставка - while (i > 0 && this.cmp(d[i], d[i - 1]) < 0) { - const v = d[i]; - d[i] = d[i - 1]; - d[i - 1] = v; - i--; - } - - if (d.length > 10) { - //слияние - while (s > 0 && this.sorted[s].length >= this.sorted[s - 1].length) { - const a = this.sorted.pop(); - const b = this.sorted.pop(); - const c = []; - let i = 0; - let j = 0; - while (i < a.length || j < b.length) { - if (i < a.length && (j === b.length || this.cmp(a[i], b[j]) <= 0)) { - c.push(a[i]); - i++; - } - if (j < b.length && (i === a.length || this.cmp(b[j], a[i]) <= 0)) { - c.push(b[j]); - j++; - } - } - this.sorted.push(c); - s--; - } - - this.sorted.push([]); - } - } - - return value; - } - - del(value, id, forceClean = false) { - if (value === undefined && this.allowUndef) - return; - - this.checkType(value); - - value = this.prepareValue(value); - if (this.hash.has(value)) { - if (this.unique) { - const id_ = this.hash.get(value); - if (id_ === id) { - this.hash.delete(value); - this.delCount++; - } - } else { - const ids = this.hash.get(value); - - ids.delete(id); - - if (!ids.size) { - this.hash.delete(value); - this.delCount++; - } - } - } - - if (this.delCount > (this.sorted[0].length >> 2) || forceClean) { - for (let s = 0; s < this.sorted.length; s++) { - const a = this.sorted[s]; - const b = []; - for (let i = 0; i < a.length; i++) { - if (this.hash.has(a[i])) - b.push(a[i]); - } - this.sorted[s] = b; - } - - this.sorted = this.sorted.filter(a => a.length); - if (!this.sorted.length) { - this.sorted = [[]] - } else { - this.sorted.sort((a, b) => b.length - a.length); - } - - this.delCount = 0; - } - - return value; - } - - reduce(from, to) { - const useFrom = (from !== undefined); - const useTo = (to !== undefined); - - if (useFrom) { - this.checkType(from); - from = this.prepareValue(from); - } - if (useTo) { - this.checkType(to); - to = this.prepareValue(to); - } - - const result = []; - for (let s = 0; s < this.sorted.length; s++) { - const a = this.sorted[s]; - if (!a.length) // на всякий случай - continue; - - let leftIndex = 0; - if (useFrom) { - //дихотомия - let left = 0; - let right = a.length - 1; - while (left < right) { - let mid = left + ((right - left) >> 1); - if (this.cmp(from, a[mid]) <= 0) - right = mid; - else - left = mid + 1; - } - - leftIndex = right; - if (this.cmp(from, a[right]) > 0) - leftIndex++; - } - - let rightIndex = a.length; - if (useTo) { - //дихотомия - let left = 0; - let right = a.length - 1; - while (left < right) { - let mid = right - ((right - left) >> 1); - if (this.cmp(to, a[mid]) >= 0) - left = mid; - else - right = mid - 1; - } - - rightIndex = left; - if (this.cmp(to, a[left]) >= 0) - rightIndex++; - } -//console.log(a, leftIndex, rightIndex); - if (this.unique) { - const ids = new Set(); - for (let i = leftIndex; i < rightIndex; i++) { - const value = a[i]; - if (this.hash.has(value)) { - ids.add(this.hash.get(value)); - } - } - result.push(ids); - } else { - for (let i = leftIndex; i < rightIndex; i++) { - const value = a[i]; - if (this.hash.has(value)) { - result.push(this.hash.get(value)); - } - } - } - } - - return utils.unionSet(result); - } - - min() { - let result = new Set(); - - let min = null; - let id = null; - for (let s = 0; s < this.sorted.length; s++) { - const a = this.sorted[s]; - if (!a.length) // на всякий случай - continue; - if (a[0] < min || min === null) { - min = a[0]; - id = this.hash.get(min); - } - } - - if (id !== null) { - if (this.unique) - result.add(id); - else - result = id; - } - - return result; - } - - max() { - let result = new Set(); - - let max = null; - let id = null; - for (let s = 0; s < this.sorted.length; s++) { - const a = this.sorted[s]; - if (!a.length) // на всякий случай - continue; - - const last = a.length - 1; - if (a[last] > max || max === null) { - max = a[last]; - id = this.hash.get(max); - } - } - - if (id !== null) { - if (this.unique) - result.add(id); - else - result = id; - } - - return result; - } - - iter(checkFunc) { - const result = new Set(); - for (const [value, ids] of this.hash.entries()) { - const checkResult = checkFunc(value); - if (checkResult === undefined) - break; - if (checkResult) { - if (this.unique) { - result.add(ids); - } else { - for (const id of ids) - result.add(id); - } - } - } - return result; - } -} - -module.exports = TableIndex; \ No newline at end of file diff --git a/server/db/JembaDb/TableReducer.js b/server/db/JembaDb/TableReducer.js deleted file mode 100644 index 41ef3444..00000000 --- a/server/db/JembaDb/TableReducer.js +++ /dev/null @@ -1,1044 +0,0 @@ -const fs = require('fs').promises; -const path = require('path'); - -const TableIndex = require('./TableIndex'); -const TableHash = require('./TableHash'); -const TableFlag = require('./TableFlag'); - -const utils = require('./utils'); - -const maxFileDumpSize = 2*1024*1024;//bytes - -class TableReducer { - constructor(inMemory, tablePath, compressed, rowsInterface) { - this._compressed = compressed || 0; - this._inMemory = inMemory; - this._tablePath = tablePath; - this._rowsInterface = rowsInterface; - - this._flag = new Map(); - this._index = new Map(); - this._hash = new Map(); - - this._deltas = new Map(); - this._fd = {};//file descriptors - } - - _getDelta(deltaStep) { - if (this._inMemory) - throw new Error('TableReducer: sometinhg wrong'); - - if (this._deltas.has(deltaStep)) { - return this._deltas.get(deltaStep); - } else { - const delta = { - flag: [], - index: [], - hash: [], - }; - this._deltas.set(deltaStep, delta); - return delta; - } - } - - _getFullPath(fileName) { - return `${this._tablePath}/${fileName}`; - } - - async _getNotExistingFileName(prefix) { - let i = 0; - while (1) {//eslint-disable-line no-constant-condition - i++; - const fileName = `${this._tablePath}/${prefix}${i}`; - if (!await utils.pathExists(fileName + '.0') && !await utils.pathExists(fileName + '.1')) - return path.basename(fileName); - } - } - - async _addFlag(opts, quietIfExists, deltaStep) { - const flagName = opts.name; - - if (!this._flag.has(flagName)) { - const flag = new TableFlag(opts.check); - for (const id of this._rowsInterface.getAllIds()) - flag.add(await this._rowsInterface.getRow(id)); - - if (this._inMemory) { - flag.meta = opts; - } else { - const fileName = await this._getNotExistingFileName('flag'); - await this._openFd(this._getFullPath(fileName) + '.1'); - flag.meta = Object.assign({}, opts, {fileName}); - - const delta = this._getDelta(deltaStep); - if (!delta.dumpFlag) - delta.dumpFlag = new Map(); - delta.dumpFlag.set(flagName, 1); - delta.dumpMeta = true; - } - - this._flag.set(flagName, flag); - } else { - if (!quietIfExists) - throw new Error(`Flag with name '${flagName}' already exists`); - } - } - - async _delFlag(flagName, deltaStep) { - if (this._flag.has(flagName)) { - if (!this._inMemory) { - const delta = this._getDelta(deltaStep); - delta.dumpMeta = true; - - const fileName = this._getFullPath((this._flag.get(flagName)).meta.fileName); - if (!delta.delFiles) - delta.delFiles = []; - delta.delFiles.push(fileName); - } - - this._flag.delete(flagName); - } else { - throw new Error(`Flag with name '${flagName}' does not exist`); - } - } - - _listFlag() { - const result = []; - for (const flag of this._flag.values()) { - result.push(flag.meta); - } - return result; - } - - async _addHash(opts, quietIfExists, deltaStep) { - const fieldName = opts.field; - - if (!this._hash.has(fieldName)) { - const hash = new TableHash(opts); - for (const id of this._rowsInterface.getAllIds()) { - const row = await this._rowsInterface.getRow(id); - hash.add(row[fieldName], id); - } - - if (this._inMemory) { - hash.meta = opts; - } else { - const fileName = await this._getNotExistingFileName('hash'); - await this._openFd(this._getFullPath(fileName) + '.1'); - hash.meta = Object.assign({}, opts, {fileName}); - - const delta = this._getDelta(deltaStep); - if (!delta.dumpHash) - delta.dumpHash = new Map(); - delta.dumpHash.set(fieldName, 1); - delta.dumpMeta = true; - } - - this._hash.set(fieldName, hash); - } else { - if (!quietIfExists) - throw new Error(`Hash for field '${fieldName}' already exists`); - } - } - - async _delHash(fieldName, deltaStep) { - if (this._hash.has(fieldName)) { - if (!this._inMemory) { - const delta = this._getDelta(deltaStep); - delta.dumpMeta = true; - - const fileName = this._getFullPath((this._hash.get(fieldName)).meta.fileName); - if (!delta.delFiles) - delta.delFiles = []; - delta.delFiles.push(fileName); - } - - this._hash.delete(fieldName); - } else { - throw new Error(`Hash for field '${fieldName}' does not exist`); - } - } - - _listHash() { - const result = []; - for (const hash of this._hash.values()) { - result.push(hash.meta); - } - return result; - } - - async _addIndex(opts, quietIfExists, deltaStep) { - const fieldName = opts.field; - - if (!this._index.has(fieldName)) { - const index = new TableIndex(opts); - for (const id of this._rowsInterface.getAllIds()) { - const row = await this._rowsInterface.getRow(id); - index.add(row[fieldName], id); - } - - if (this._inMemory) { - index.meta = opts; - } else { - const fileName = await this._getNotExistingFileName('index'); - await this._openFd(this._getFullPath(fileName) + '.1'); - index.meta = Object.assign({}, opts, {fileName}); - - const delta = this._getDelta(deltaStep); - if (!delta.dumpIndex) - delta.dumpIndex = new Map(); - delta.dumpIndex.set(fieldName, 1); - delta.dumpMeta = true; - } - - this._index.set(fieldName, index); - } else { - if (!quietIfExists) - throw new Error(`Index for field '${fieldName}' already exists`); - } - } - - async _delIndex(fieldName, deltaStep) { - if (this._index.has(fieldName)) { - if (!this._inMemory) { - const delta = this._getDelta(deltaStep); - delta.dumpMeta = true; - - const fileName = this._getFullPath((this._index.get(fieldName)).meta.fileName); - if (!delta.delFiles) - delta.delFiles = []; - delta.delFiles.push(fileName); - } - - this._index.delete(fieldName); - } else { - throw new Error(`Index for field '${fieldName}' does not exist`); - } - } - - _listIndex() { - const result = []; - for (const index of this._index.values()) { - result.push(index.meta); - } - return result; - } - - _update(oldRows, newRows, deltaStep) { - if (!deltaStep && !this._inMemory) - throw new Error('Something wrong: deltaStep is empty'); - - //oldRows & newRows arrays have equal size - if (oldRows.length != newRows.length) - throw new Error('Reducer update: old and new array lengths are not equal'); - - //consistency - const oldIds = new Map(); - const newIds = new Map(); - for (let i = 0; i < oldRows.length; i++) { - const oldRow = oldRows[i]; - const newRow = newRows[i]; - - if (oldRow.id !== undefined) { - if (oldIds.has(oldRow.id)) { - throw new Error(`Reducer update: duplicate old_id:${oldRow.id} detected`); - } - oldIds.set(oldRow.id, true); - } - - if (newRow.id !== undefined) { - if (newIds.has(newRow.id)) { - throw new Error(`Reducer update: duplicate new_id:${newRow.id} detected`); - } - newIds.set(newRow.id, true); - } - - if (oldRow.id !== undefined && newRow.id !== undefined && oldRow.id !== newRow.id) - throw new Error(`Reducer update: old and new id's are not equal (${oldRow.id} !== ${newRow.id})`); - } - - //update - try { - let delta = (this._inMemory ? null : this._getDelta(deltaStep)); - - //flags - for (const [flagName, flag] of this._flag.entries()) { - const flagDelta = []; - for (let i = 0; i < oldRows.length; i++) { - const oldRow = oldRows[i]; - const newRow = newRows[i]; - - if (oldRow.id !== undefined) { - flag.del(oldRow); - flagDelta.push([oldRow.id, 0]); - } - if (newRow.id !== undefined) { - const added = flag.add(newRow); - if (added) - flagDelta.push([newRow.id, 1]); - } - } - - if (delta && flagDelta.length) { - delta.flag.push([flagName, flagDelta]); - } - } - - //hashes - for (const [fieldName, hash] of this._hash.entries()) { - const hashDelta = []; - for (let i = 0; i < oldRows.length; i++) { - const oldRow = oldRows[i]; - const newRow = newRows[i]; - - if (oldRow[fieldName] !== newRow[fieldName]) { - if (oldRow.id !== undefined) { - const value = hash.del(oldRow[fieldName], oldRow.id); - hashDelta.push([value, oldRow.id, 0]); - } - if (newRow.id !== undefined) { - const value = hash.add(newRow[fieldName], newRow.id); - hashDelta.push([value, newRow.id, 1]); - } - } - } - - if (delta && hashDelta.length) { - delta.hash.push([fieldName, hashDelta]); - } - } - - //indexes - for (const [fieldName, index] of this._index.entries()) { - const indexDelta = []; - for (let i = 0; i < oldRows.length; i++) { - const oldRow = oldRows[i]; - const newRow = newRows[i]; - - if (oldRow[fieldName] !== newRow[fieldName]) { - if (oldRow.id !== undefined) { - const value = index.del(oldRow[fieldName], oldRow.id); - indexDelta.push([value, oldRow.id, 0]); - } - if (newRow.id !== undefined) { - const value = index.add(newRow[fieldName], newRow.id); - indexDelta.push([value, newRow.id, 1]); - } - } - } - - if (delta && indexDelta.length) { - delta.index.push([fieldName, indexDelta]); - } - } - } catch(e) { - //rollback - - //flags - for (const flag of this._flag.values()) { - for (let i = 0; i < oldRows.length; i++) { - const oldRow = oldRows[i]; - const newRow = newRows[i]; - - if (newRow.id !== undefined) { - try { flag.del(newRow); } catch(e) {} // eslint-disable-line no-empty - } - if (oldRow.id !== undefined) { - try { flag.add(oldRow); } catch(e) {} // eslint-disable-line no-empty - } - } - } - - //hashes - for (const [fieldName, hash] of this._hash.entries()) { - for (let i = 0; i < oldRows.length; i++) { - const oldRow = oldRows[i]; - const newRow = newRows[i]; - - if (oldRow[fieldName] !== newRow[fieldName]) { - if (newRow.id !== undefined) { - try { hash.del(newRow[fieldName], newRow.id); } catch(e) {} // eslint-disable-line no-empty - } - if (oldRow.id !== undefined) { - try { hash.add(oldRow[fieldName], oldRow.id); } catch(e) {} // eslint-disable-line no-empty - } - } - } - } - - //indexes - for (const [fieldName, index] of this._index.entries()) { - for (let i = 0; i < oldRows.length; i++) { - const oldRow = oldRows[i]; - const newRow = newRows[i]; - - if (oldRow[fieldName] !== newRow[fieldName]) { - if (newRow.id !== undefined) { - try { index.del(newRow[fieldName], newRow.id); } catch(e) {} // eslint-disable-line no-empty - } - if (oldRow.id !== undefined) { - try { index.add(oldRow[fieldName], oldRow.id); } catch(e) {} // eslint-disable-line no-empty - } - } - } - } - - throw e; - } - } - - async _closeFd(name) { - if (this._fd[name]) { - await this._fd[name].close(); - this._fd[name] = null; - } - } - - async _openFd(name) { - if (this._fd[name]) - return; - - if (!name) { - throw new Error('TableReducer: openFd name is empty'); - } - - const exists = await utils.pathExists(name); - - const fd = await fs.open(name, 'a'); - if (!exists) { - await fd.write('0['); - } - - this._fd[name] = fd; - } - - async _dumpMaps(delta) { - //dump flag - for (const [flagName, flag] of this._flag.entries()) { - const fileName = this._getFullPath(flag.meta.fileName); - const fileName1 = `${fileName}.1`; - - let size = 0; - if (this._fd[fileName1]) - size = (await this._fd[fileName1].stat()).size; - - if (size > maxFileDumpSize || (delta.dumpFlag && delta.dumpFlag.get(flagName))) { - const fileName0 = `${fileName}.0`; - const fileName2 = `${fileName}.2`; - - await this._writeFinal(fileName2, JSON.stringify([...flag.flag])); - - await fs.rename(fileName2, fileName0); - await this._closeFd(fileName1); - await fs.unlink(fileName1); - } - } - - //dump hash - for (const [fieldName, hash] of this._hash.entries()) { - const fileName = this._getFullPath(hash.meta.fileName); - const fileName1 = `${fileName}.1`; - - let size = 0; - if (this._fd[fileName1]) - size = (await this._fd[fileName1].stat()).size; - - if (size > maxFileDumpSize || (delta.dumpHash && delta.dumpHash.get(fieldName))) { - const fileName0 = `${fileName}.0`; - const fileName2 = `${fileName}.2`; - - if (hash.unique) { - await this._writeFinal(fileName2, JSON.stringify(Array.from(hash.hash))); - } else { - const buf = []; - for (const [key, keySet] of hash.hash) { - buf.push([key, [...keySet]]); - } - await this._writeFinal(fileName2, JSON.stringify(buf)); - } - - await fs.rename(fileName2, fileName0); - await this._closeFd(fileName1); - await fs.unlink(fileName1); - } - } - - //dump index - for (const [fieldName, index] of this._index.entries()) { - const fileName = this._getFullPath(index.meta.fileName); - const fileName1 = `${fileName}.1`; - - let size = 0; - if (this._fd[fileName1]) - size = (await this._fd[fileName1].stat()).size; - - if (size > maxFileDumpSize || (delta.dumpIndex && delta.dumpIndex.get(fieldName))) { - const fileName0 = `${fileName}.0`; - const fileName2 = `${fileName}.2`; - - const buf = {hash: [], sorted: index.sorted, delCount: index.delCount}; - if (index.unique) { - buf.hash = Array.from(index.hash); - } else { - for (const [key, keySet] of index.hash) { - buf.hash.push([key, [...keySet]]); - } - } - await this._writeFinal(fileName2, JSON.stringify(buf)); - - await fs.rename(fileName2, fileName0); - await this._closeFd(fileName1); - await fs.unlink(fileName1); - } - } - } - - async _dumpMeta() { - const fileName = this._getFullPath('meta'); - const fileName0 = `${fileName}.0`; - const fileName2 = `${fileName}.2`; - - await this._writeFinal(fileName2, JSON.stringify({ - flag: this._listFlag(), - hash: this._listHash(), - index: this._listIndex(), - })); - await fs.rename(fileName2, fileName0); - } - - async _saveDelta(deltaStep) { - //delta - const delta = this._getDelta(deltaStep); - - //save flag delta - for (const flagRec of delta.flag) { - const [flagName, flagDelta] = flagRec; - - const flag = this._flag.get(flagName); - const fileName = this._getFullPath(flag.meta.fileName) + '.1'; - - if (!this._fd[fileName]) - await this._openFd(fileName); - - const buf = []; - for (const deltaRec of flagDelta) { - buf.push(JSON.stringify(deltaRec)); - } - - if (buf.length) - await this._fd[fileName].write(buf.join(',') + ','); - } - - //save hash delta - for (const hashRec of delta.hash) { - const [hashName, hashDelta] = hashRec; - - const hash = this._hash.get(hashName); - const fileName = this._getFullPath(hash.meta.fileName) + '.1'; - - if (!this._fd[fileName]) - await this._openFd(fileName); - - const buf = []; - for (const deltaRec of hashDelta) { - buf.push(JSON.stringify(deltaRec)); - } - - if (buf.length) - await this._fd[fileName].write(buf.join(',') + ','); - } - - //save index delta - for (const indexRec of delta.index) { - const [indexName, indexDelta] = indexRec; - - const index = this._index.get(indexName); - const fileName = this._getFullPath(index.meta.fileName) + '.1'; - - if (!this._fd[fileName]) - await this._openFd(fileName); - - const buf = []; - for (const deltaRec of indexDelta) { - buf.push(JSON.stringify(deltaRec)); - } - - if (buf.length) - await this._fd[fileName].write(buf.join(',') + ','); - } - - //dumps - await this._dumpMaps(delta); - - //meta - if (delta.dumpMeta) - await this._dumpMeta(); - - //del files - if (delta.delFiles) { - for (const fileName of delta.delFiles) { - if (this._fd[fileName]) - this._closeFd(fileName); - - if (await utils.pathExists(fileName)) - await fs.unlink(fileName); - } - } - - this._deltas.delete(deltaStep); - } - - async _cancelDelta(deltaStep) { - this._deltas.delete(deltaStep); - } - - async _loadFile(filePath) { - let buf = await fs.readFile(filePath); - if (!buf.length) - throw new Error(`TableReducer: file ${filePath} is empty`); - - const flag = buf[0]; - if (flag === 50) {//flag '2' ~ finalized && compressed - const packed = Buffer.from(buf.buffer, buf.byteOffset + 1, buf.length - 1); - const data = await utils.inflate(packed); - buf = data.toString(); - } else if (flag === 49) {//flag '1' ~ finalized - buf[0] = 32;//' ' - buf = buf.toString(); - } else {//flag '0' ~ not finalized - buf[0] = 32;//' ' - const last = buf.length - 1; - if (buf[last] === 44) {//',' - buf[last] = 93;//']' - buf = buf.toString(); - } else {//corrupted or empty - buf = buf.toString(); - if (this._loadCorrupted) { - const lastComma = buf.lastIndexOf(','); - if (lastComma >= 0) - buf = buf.substring(0, lastComma); - } - buf += ']'; - } - } - - let result; - try { - result = JSON.parse(buf); - } catch(e) { - throw new Error(`load ${filePath} failed: ${e.message}`); - } - - return result; - } - - async _writeFinal(fileName, data) { - if (!this._compressed) { - await fs.writeFile(fileName, '1' + data); - } else { - let buf = Buffer.from(data); - buf = await utils.deflate(buf, this.compressed); - const fd = await fs.open(fileName, 'w'); - await fd.write('2'); - await fd.write(buf); - await fd.close(); - } - } - - async _load(corrupted = false, metaPath = '') { - if (corrupted) - this._loadCorrupted = true; - - const metaFileName = (metaPath ? metaPath : this._getFullPath('meta.0')); - if (!await utils.pathExists(metaFileName)) - return; - - const meta = await this._loadFile(metaFileName); - - //flag - this._flag.clear(); - for (const opts of meta.flag) { - const flag = new TableFlag(opts.check); - flag.meta = opts; - - if (!corrupted) { - const fileName = this._getFullPath(opts.fileName); - const fileName0 = `${fileName}.0`; - const fileName1 = `${fileName}.1`; - - //load dump - if (await utils.pathExists(fileName0)) { - const data = await this._loadFile(fileName0); - flag.flag = new Set(data); - } - - //load delta - if (await utils.pathExists(fileName1)) { - const flagDelta = await this._loadFile(fileName1); - for (const deltaRec of flagDelta) { - const [id, isAdd] = deltaRec; - if (isAdd) - flag.flag.add(id); - else - flag.flag.delete(id); - } - } - } - - this._flag.set(opts.name, flag); - } - - //hash - this._hash.clear(); - for (const opts of meta.hash) { - const hash = new TableHash(opts); - hash.meta = opts; - - if (!corrupted) { - const fileName = this._getFullPath(opts.fileName); - const fileName0 = `${fileName}.0`; - const fileName1 = `${fileName}.1`; - - //load dump - if (await utils.pathExists(fileName0)) { - const data = await this._loadFile(fileName0); - if (hash.unique) { - hash.hash = new Map(data); - } else { - for (const rec of data) { - const [key, keySet] = rec; - hash.hash.set(key, new Set(keySet)); - } - } - } - - //load delta - if (await utils.pathExists(fileName1)) { - const hashDelta = await this._loadFile(fileName1); - for (const deltaRec of hashDelta) { - const [value, id, isAdd] = deltaRec; - if (isAdd) - hash.add(value, id); - else - hash.del(value, id); - } - } - } - - this._hash.set(opts.field, hash); - } - - //index - this._index.clear(); - for (const opts of meta.index) { - const index = new TableIndex(opts); - index.meta = opts; - - if (!corrupted) { - const fileName = this._getFullPath(opts.fileName); - const fileName0 = `${fileName}.0`; - const fileName1 = `${fileName}.1`; - - //load dump - if (await utils.pathExists(fileName0)) { - const data = await this._loadFile(fileName0); - index.sorted = data.sorted; - index.delCount = data.delCount; - - if (index.unique) { - index.hash = new Map(data.hash); - } else { - for (const rec of data.hash) { - const [key, keySet] = rec; - index.hash.set(key, new Set(keySet)); - } - } - } - - //load delta - if (await utils.pathExists(fileName1)) { - const indexDelta = await this._loadFile(fileName1); - for (const deltaRec of indexDelta) { - const [value, id, isAdd] = deltaRec; - if (isAdd) - index.add(value, id); - else - index.del(value, id); - } - } - } - - this._index.set(opts.field, index); - } - } - - async _closeAllFiles() { - for (const name of Object.keys(this._fd)) { - await this._closeFd(name); - } - } - - async _destroy() { - await this._closeAllFiles(); - - //for GC - this._flag.clear(); - this._index.clear(); - this._hash.clear(); - this._deltas.clear(); - this._rowsInterface = null; - } - - //------------------------------------------------------------------------------------------ - //Reducer methods - async id() { - const result = new Set(); - for (const arg of arguments) { - if (!Array.isArray(arg)) - result.add(arg); - else { - for (const id of arg) { - result.add(id); - } - } - } - return result; - } - - async flag(flagName) { - if (this._flag.has(flagName)) { - return new Set(this._flag.get(flagName).flag); - } else { - throw new Error(`Flag with name '${flagName}' does not exist`); - } - } - - async hash(fieldName, value) { - if (this._hash.has(fieldName)) { - const hash = this._hash.get(fieldName); - - const result = new Set(); - if (!Array.isArray(value)) { - const ids = hash.reduce(value); - for (const id of ids) { - const row = await this._rowsInterface.getRow(id); - if (row[fieldName] === value) - result.add(id); - } - } else { - for (const v of value) { - const ids = hash.reduce(v); - for (const id of ids) { - const row = await this._rowsInterface.getRow(id); - if (row[fieldName] === v) - result.add(id); - } - } - } - - return result; - } else { - throw new Error(`Hash for field '${fieldName}' does not exist`); - } - } - - async hashMin(fieldName) { - if (this._hash.has(fieldName)) { - const hash = this._hash.get(fieldName); - return hash.min(); - } else { - throw new Error(`Hash for field '${fieldName}' does not exist`); - } - } - - async hashMax(fieldName) { - if (this._hash.has(fieldName)) { - const hash = this._hash.get(fieldName); - return hash.max(); - } else { - throw new Error(`Hash for field '${fieldName}' does not exist`); - } - } - - async hashIter(fieldName, checkFunc) { - if (this._hash.has(fieldName)) { - const hash = this._hash.get(fieldName); - return hash.iter(checkFunc); - } else { - throw new Error(`Hash for field '${fieldName}' does not exist`); - } - } - - async _indexReduce(fieldName, from, to, checkFuncs) { - if (this._index.has(fieldName)) { - const index = this._index.get(fieldName); - const ids = index.reduce(from, to); - - const check = (index.isNumber ? checkFuncs[0] : checkFuncs[1]); - const result = new Set(); - for (const id of ids) { - const row = await this._rowsInterface.getRow(id); - if (check(row[fieldName])) - result.add(id); - } - return result; - } else { - throw new Error(`Index for field '${fieldName}' does not exist`); - } - } - - async index(fieldName, from, to) { - let checkFuncs = [ - (value) => (value > from && value < to), - (value) => (value.localeCompare(from) > 0 && value.localeCompare(to) < 0), - ]; - if (from === undefined) { - checkFuncs = [ - (value) => (value < to), - (value) => (value.localeCompare(to) < 0), - ]; - } else if (to === undefined) { - checkFuncs = [ - (value) => (value > from), - (value) => (value.localeCompare(from) > 0), - ]; - } - return this._indexReduce(fieldName, from, to, checkFuncs); - } - - async indexL(fieldName, from, to) { - let checkFuncs = [ - (value) => (value >= from && value < to), - (value) => (value.localeCompare(from) >= 0 && value.localeCompare(to) < 0), - ]; - if (from === undefined) { - checkFuncs = [ - (value) => (value < to), - (value) => (value.localeCompare(to) < 0), - ]; - } else if (to === undefined) { - checkFuncs = [ - (value) => (value >= from), - (value) => (value.localeCompare(from) >= 0), - ]; - } - return this._indexReduce(fieldName, from, to, checkFuncs); - } - - async indexR(fieldName, from, to) { - let checkFuncs = [ - (value) => (value > from && value <= to), - (value) => (value.localeCompare(from) > 0 && value.localeCompare(to) <= 0), - ]; - if (from === undefined) { - checkFuncs = [ - (value) => (value <= to), - (value) => (value.localeCompare(to) <= 0), - ]; - } else if (to === undefined) { - checkFuncs = [ - (value) => (value > from), - (value) => (value.localeCompare(from) > 0), - ]; - } - return this._indexReduce(fieldName, from, to, checkFuncs); - } - - async indexLR(fieldName, from, to) { - let checkFuncs = [ - (value) => (value >= from && value <= to), - (value) => (value.localeCompare(from) >= 0 && value.localeCompare(to) <= 0), - ]; - if (from === undefined) { - checkFuncs = [ - (value) => (value <= to), - (value) => (value.localeCompare(to) <= 0), - ]; - } else if (to === undefined) { - checkFuncs = [ - (value) => (value >= from), - (value) => (value.localeCompare(from) >= 0), - ]; - } - return this._indexReduce(fieldName, from, to, checkFuncs); - } - - async indexMin(fieldName) { - if (this._index.has(fieldName)) { - const index = this._index.get(fieldName); - return index.min(); - } else { - throw new Error(`Index for field '${fieldName}' does not exist`); - } - } - - async indexMax(fieldName) { - if (this._index.has(fieldName)) { - const index = this._index.get(fieldName); - return index.max(); - } else { - throw new Error(`Index for field '${fieldName}' does not exist`); - } - } - - async indexIter(fieldName, checkFunc) { - if (this._index.has(fieldName)) { - const index = this._index.get(fieldName); - return index.iter(checkFunc); - } else { - throw new Error(`Index for field '${fieldName}' does not exist`); - } - } - - //returns iterator, not Set - async all() { - return this._rowsInterface.getAllIds(); - } - - async allSize() { - return this._rowsInterface.getAllIdsSize(); - } - - async iter(ids, checkFunc) { - const result = new Set(); - for (const id of ids) { - const row = await this._rowsInterface.getRow(id); - const checkResult = checkFunc(row); - if (checkResult === undefined) - break; - if (checkResult) - result.add(id); - } - return result; - } - - async and() { - const result = []; - for (const arg of arguments) { - if (!Array.isArray(arg)) { - result.push(arg); - } else { - for (const s of arg) { - result.push(s); - } - } - } - return utils.intersectSet(result); - } - - async or() { - const result = []; - for (const arg of arguments) { - if (!Array.isArray(arg)) - result.push(arg); - else { - for (const s of arg) { - result.push(s); - } - } - } - return utils.unionSet(result); - } -} - -module.exports = TableReducer; \ No newline at end of file diff --git a/server/db/JembaDb/TableRowsFile.js b/server/db/JembaDb/TableRowsFile.js deleted file mode 100644 index b3ef203d..00000000 --- a/server/db/JembaDb/TableRowsFile.js +++ /dev/null @@ -1,646 +0,0 @@ -const fs = require('fs').promises; -const path = require('path'); -const utils = require('./utils'); - -const maxBlockSize = 1024*1024;//bytes - -const minFileDumpSize = 100*1024;//bytes -const maxFileDumpSize = 50*1024*1024;//bytes -const defragAfter = 10; -const defragBlockCountAtOnce = 10;//better >= defragAfter - -class TableRowsFile { - constructor(tablePath, cacheSize, compressed) { - this.tablePath = tablePath; - this.loadedBlocksCount = cacheSize || 5; - this.loadedBlocksCount = (this.loadedBlocksCount <= 0 ? 0 : this.loadedBlocksCount); - this.compressed = compressed || 0; - - this.blockIndex = new Map(); - this.currentBlockIndex = 0; - this.lastSavedBlockIndex = 0; - this.blockList = new Map(); - this.blocksNotFinalized = new Map();//indexes of blocks - this.loadedBlocks = []; - this.deltas = new Map(); - - this.defragCounter = 0; - this.destroyed = false; - - this.blockindex0Size = 0; - this.blocklist0Size = 0; - - this.fd = { - blockIndex: null, - blockList: null, - blockRows: null, - blockRowsIndex: null,//not a file descriptor - }; - } - - //--- rows interface - async getRow(id) { - const block = this.blockList.get(this.blockIndex.get(id)); - - if (block) { - if (!block.rows) { - await this.loadBlock(block); - } - - this.unloadBlocksIfNeeded();//no await - return block.rows.get(id); - } - return; - } - - setRow(id, row, rowStr, deltaStep) { - const delta = this.getDelta(deltaStep); - - if (this.blockIndex.has(id)) { - this.deleteRow(id, deltaStep, delta); - } - - const index = this.addToCurrentBlock(id, row, rowStr, deltaStep, delta); - this.blockIndex.set(id, index); - delta.blockIndex.push([id, index]); - } - - deleteRow(id, deltaStep, delta) { - if (this.blockIndex.has(id)) { - if (!delta) - delta = this.getDelta(deltaStep); - - const block = this.blockList.get(this.blockIndex.get(id)); - if (block) { - block.delCount++; - delta.blockList.push([block.index, 1]); - } - - this.blockIndex.delete(id); - delta.blockIndex.push([id, 0]); - } - } - - getAllIds() { - return this.blockIndex.keys(); - } - - getAllIdsSize() { - return this.blockIndex.size; - } - //--- rows interface end - - getDelta(deltaStep) { - if (this.deltas.has(deltaStep)) { - return this.deltas.get(deltaStep); - } else { - const delta = { - blockIndex: [], - blockList: [], - blockRows: [], - }; - this.deltas.set(deltaStep, delta); - return delta; - } - } - - createNewBlock() { - this.currentBlockIndex++; - const block = { - index: this.currentBlockIndex, - delCount: 0, - addCount: 0, - size: 0, - rows: new Map(), - rowsLength: 0, - final: false, - }; - this.blockList.set(this.currentBlockIndex, block); - this.loadedBlocks.push(this.currentBlockIndex); - this.blocksNotFinalized.set(this.currentBlockIndex, 1); - - return block; - } - - addToCurrentBlock(id, row, rowStr, deltaStep, delta) { - if (!delta) - delta = this.getDelta(deltaStep); - - let block = this.blockList.get(this.currentBlockIndex); - if (!block) - block = this.createNewBlock(); - - if (block.size > maxBlockSize) - block = this.createNewBlock(); - - if (!block.rows) { - throw new Error('TableRowsFile: something has gone wrong'); - } - - block.rows.set(id, row); - - block.addCount++; - block.size += rowStr.length; - block.rowsLength = block.rows.size; - - delta.blockList.push([block.index, 1]); - delta.blockRows.push([block.index, id, row]); - - return block.index; - } - - async unloadBlocksIfNeeded() { - this.needUnload = true; - if (this.unloadingBlocks) - return; - - this.unloadingBlocks = true; - try { - while (this.needUnload) { - this.needUnload = false; - if (this.destroyed) - return; - - await utils.sleep(10); - - //check loaded - let missed = new Map(); - while (this.loadedBlocks.length >= this.loadedBlocksCount) { - const index = this.loadedBlocks.shift(); - if (index >= this.lastSavedBlockIndex) { - missed.set(index, 1); - continue; - } - const block = this.blockList.get(index); - - if (block) { - block.rows = null; -//console.log(`unloaded block ${block.index}`); - } - - if (this.destroyed) - return; - } - - this.loadedBlocks = this.loadedBlocks.concat(Array.from(missed.keys())); - } - } finally { - this.unloadingBlocks = false; - } - } - - async loadFile(filePath) { - let buf = await fs.readFile(filePath); - if (!buf.length) - throw new Error(`TableRowsFile: file ${filePath} is empty`); - - const flag = buf[0]; - if (flag === 50) {//flag '2' ~ finalized && compressed - const packed = Buffer.from(buf.buffer, buf.byteOffset + 1, buf.length - 1); - const data = await utils.inflate(packed); - buf = data.toString(); - } else if (flag === 49) {//flag '1' ~ finalized - buf[0] = 32;//' ' - buf = buf.toString(); - } else {//flag '0' ~ not finalized - buf[0] = 32;//' ' - const last = buf.length - 1; - if (buf[last] === 44) {//',' - buf[last] = 93;//']' - buf = buf.toString(); - } else {//corrupted or empty - buf = buf.toString(); - if (this.loadCorrupted) { - const lastComma = buf.lastIndexOf(','); - if (lastComma >= 0) - buf = buf.substring(0, lastComma); - } - buf += ']'; - } - } - - let result; - try { - result = JSON.parse(buf); - } catch(e) { - throw new Error(`load ${filePath} failed: ${e.message}`); - } - - return result; - } - - async writeFinal(fileName, data) { - if (!this.compressed) { - await fs.writeFile(fileName, '1' + data); - } else { - let buf = Buffer.from(data); - buf = await utils.deflate(buf, this.compressed); - const fd = await fs.open(fileName, 'w'); - await fd.write('2'); - await fd.write(buf); - await fd.close(); - } - } - - async loadBlock(block) { -//console.log(`start load block ${block.index}`); - if (!block.rows) { - const arr = await this.loadFile(this.blockRowsFilePath(block.index)); - - block.rows = new Map(arr); - - this.loadedBlocks.push(block.index); -//console.log(`loaded block ${block.index}`); - } - } - - async closeFd(name) { - if (this.fd[name]) { - await this.fd[name].close(); - this.fd[name] = null; - } - } - - async openFd(name, fileName = '') { - if (this.fd[name]) - return; - - if (!fileName) { - throw new Error('TableRowsFile: fileName is empty'); - } - - const exists = await utils.pathExists(fileName); - - const fd = await fs.open(fileName, 'a'); - if (!exists) { - await fd.write('0['); - } - - this.fd[name] = fd; - } - - blockRowsFilePath(index) { - if (index < 1000000) - return `${this.tablePath}/${index.toString().padStart(6, '0')}.jem`; - else - return `${this.tablePath}/${index.toString().padStart(12, '0')}.jem`; - } - - async finalizeBlocks() { -//console.log(this.blocksNotFinalized.size); - - for (const index of this.blocksNotFinalized.keys()) { - if (this.destroyed) - return; - - if (index >= this.lastSavedBlockIndex) - continue; - - const block = this.blockList.get(index); - - if (block) { - if (block.final) - throw new Error('finalizeBlocks: something wrong'); - - const blockPath = this.blockRowsFilePath(block.index); -//console.log(`start finalize block ${block.index}`); - const arr = await this.loadFile(blockPath); - const rows = new Map(arr); - - const finBlockPath = `${blockPath}.tmp`; - const rowsStr = JSON.stringify(Array.from(rows)); - await this.writeFinal(finBlockPath, rowsStr); - - await fs.rename(finBlockPath, blockPath); - - block.size = Buffer.byteLength(rowsStr, 'utf8') + 1; - block.rowsLength = rows.size;//insurance - block.final = true; - await this.fd.blockList.write(JSON.stringify(block) + ','); -//console.log(`finalized block ${block.index}`); - } - - this.blocksNotFinalized.delete(index); - } - } - - async dumpMaps() { - //dumping blockIndex - const blockindex1Size = (await this.fd.blockIndex.stat()).size; - if ((blockindex1Size > minFileDumpSize && blockindex1Size > this.blockindex0Size) || blockindex1Size > maxFileDumpSize) { - const blockindex0Path = `${this.tablePath}/blockindex.0`; - const blockindex2Path = `${this.tablePath}/blockindex.2`; - await this.writeFinal(blockindex2Path, JSON.stringify(Array.from(this.blockIndex))); - - await fs.rename(blockindex2Path, blockindex0Path); - await this.closeFd('blockIndex'); - await fs.unlink(`${this.tablePath}/blockindex.1`); - this.blockindex0Size = (await fs.stat(blockindex0Path)).size; - } - - //dumping blockList - const blocklist1Size = (await this.fd.blockList.stat()).size; - if ((blocklist1Size > minFileDumpSize && blocklist1Size > this.blocklist0Size) || blocklist1Size > maxFileDumpSize) { - const blocklist0Path = `${this.tablePath}/blocklist.0`; - const blocklist2Path = `${this.tablePath}/blocklist.2`; - await this.writeFinal(blocklist2Path, JSON.stringify(Array.from(this.blockList.values()))); - - await fs.rename(blocklist2Path, blocklist0Path); - await this.closeFd('blockList'); - await fs.unlink(`${this.tablePath}/blocklist.1`); - this.blocklist0Size = (await fs.stat(blocklist0Path)).size; - } - } - - async saveDelta(deltaStep) { - const delta = this.getDelta(deltaStep); - - //lastSavedBlockIndex - const len = delta.blockRows.length; - if (len) { - this.lastSavedBlockIndex = delta.blockRows[len - 1][0]; - } - - //check all blocks fragmentation - if (!this.defragCandidates) - this.defragCandidates = []; - - if (!this.defragCandidates.length) { - if (this.defragCounter >= defragAfter) { - for (const block of this.blockList.values()) { - if (!block.final) - continue; - - if (block.addCount - block.delCount < block.rowsLength/2 || block.size < maxBlockSize/2) { - this.defragCandidates.push(block); - } - } - - this.defragCounter = 0; - } else { - this.defragCounter++; - } - } - - let defragmented = 0; - while (this.defragCandidates.length) { - if (defragmented >= defragBlockCountAtOnce || this.destroyed) - break; - - const block = this.defragCandidates.shift(); - - if (!block.rows) { - await this.loadBlock(block); - } - - //move all active rows from fragmented block to current - for (const [id, row] of block.rows.entries()) { - if (this.blockIndex.get(id) === block.index) { - const newIndex = this.addToCurrentBlock(id, row, JSON.stringify(row), deltaStep, delta); - this.blockIndex.set(id, newIndex); - delta.blockIndex.push([id, newIndex]); - } - } - - this.blockList.delete(block.index); - delta.blockList.push([block.index, 0]); - - if (!delta.delFiles) - delta.delFiles = []; - delta.delFiles.push(this.blockRowsFilePath(block.index)); - - defragmented++; -//console.log(`defragmented block ${block.index}, size: ${block.size}, addCount: ${block.addCount}, delCount: ${block.delCount}, rowsLength: ${block.rowsLength}`); - } - - //blockIndex delta save - if (!this.fd.blockIndex) - await this.openFd('blockIndex', `${this.tablePath}/blockindex.1`); - - let buf = []; - for (const deltaRec of delta.blockIndex) { - buf.push(JSON.stringify(deltaRec)); - } - if (buf.length) - await this.fd.blockIndex.write(buf.join(',') + ','); - - //blockList delta save - if (!this.fd.blockList) - await this.openFd('blockList', `${this.tablePath}/blocklist.1`); - - let lastSaved = 0; - buf = []; - for (const deltaRec of delta.blockList) { - const index = deltaRec[0]; - const exists = deltaRec[1]; - - if (exists) { - if (lastSaved !== index) {//optimization - const block = this.blockList.get(index); - if (block)//might be defragmented already - buf.push(JSON.stringify(block)); - lastSaved = index; - } - } else { - buf.push(JSON.stringify({index, deleted: 1})); - } - } - if (buf.length) - await this.fd.blockList.write(buf.join(',') + ','); - - //blockRows delta save - buf = []; - for (const deltaRec of delta.blockRows) { - const [index, id, row] = deltaRec; - - if (this.fd.blockRowsIndex !== index) { - if (buf.length) - await this.fd.blockRows.write(buf.join(',') + ','); - buf = []; - await this.closeFd('blockRows'); - this.fd.blockRowsIndex = null; - } - - if (!this.fd.blockRows) { - const blockPath = this.blockRowsFilePath(index); - - await this.openFd('blockRows', blockPath); - this.fd.blockRowsIndex = index; - } - - buf.push(JSON.stringify([id, row])); - } - if (buf.length) - await this.fd.blockRows.write(buf.join(',') + ','); - - //blocks finalization - await this.finalizeBlocks(); - this.unloadBlocksIfNeeded();//no await - - //dumps if needed - await this.dumpMaps(); - - //delete files if needed - if (delta.delFiles) { - for (const fileName of delta.delFiles) { -//console.log(`delete ${fileName}`); - if (await utils.pathExists(fileName)) - await fs.unlink(fileName); - } - } - - this.deltas.delete(deltaStep); - } - - async cancelDelta(deltaStep) { - this.deltas.delete(deltaStep); - } - - async load() { - let autoIncrement = 0; - - const loadBlockIndex = (fileNum, data) => { - if (fileNum === 0) {//dumped data - this.blockIndex = new Map(data);//much faster - for (const id of this.blockIndex.keys()) { - if (typeof(id) === 'number' && id >= autoIncrement) - autoIncrement = id + 1; - } - } else { - for (const rec of data) { - const [id, index] = rec; - if (index > 0) { - this.blockIndex.set(id, index); - if (typeof(id) === 'number' && id >= autoIncrement) - autoIncrement = id + 1; - } else - this.blockIndex.delete(id); - } - } - } - - const loadBlockList = (data) => { - for (const rec of data) { - const block = rec; - if (block.deleted) { - this.blockList.delete(block.index); - } else { - block.rows = null; - this.blockList.set(block.index, block); - if (block.index > this.currentBlockIndex) - this.currentBlockIndex = block.index; - } - } - - } - - this.blockIndex.clear(); - for (let i = 0; i < 2; i++) { - const dataPath = `${this.tablePath}/blockindex.${i}`; - - if (await utils.pathExists(dataPath)) { - const data = await this.loadFile(dataPath); - loadBlockIndex(i, data); - } - } - const blockindex0Path = `${this.tablePath}/blockindex.0`; - if (await utils.pathExists(blockindex0Path)) - this.blockindex0Size = (await fs.stat(blockindex0Path)).size; - - this.currentBlockIndex = 0; - this.blockList.clear(); - for (let i = 0; i < 2; i++) { - const dataPath = `${this.tablePath}/blocklist.${i}`; - - if (await utils.pathExists(dataPath)) { - const data = await this.loadFile(dataPath); - loadBlockList(data); - } - } - const blocklist0Path = `${this.tablePath}/blocklist.0`; - if (await utils.pathExists(blocklist0Path)) - this.blocklist0Size = (await fs.stat(blocklist0Path)).size; - - this.lastSavedBlockIndex = this.currentBlockIndex; - const currentBlock = this.blockList.get(this.currentBlockIndex); - if (currentBlock) - await this.loadBlock(currentBlock); - - this.blocksNotFinalized = new Map(); - for (const block of this.blockList.values()) { - if (!block.final) - this.blocksNotFinalized.set(block.index, 1); - } - - return autoIncrement; - } - - async loadCorrupted() { - this.loadCorrupted = true; - - const loadBlockIndex = (fileNum, data) => { - if (fileNum === 0) {//dumped data - this.blockIndex = new Map(data);//much faster - } else { - for (const rec of data) { - const [id, index] = rec; - if (index > 0) - this.blockIndex.set(id, index); - else - this.blockIndex.delete(id); - } - } - } - - this.blockIndex.clear(); - for (let i = 0; i < 2; i++) { - const dataPath = `${this.tablePath}/blockindex.${i}`; - - if (await utils.pathExists(dataPath)) { - try { - const data = await this.loadFile(dataPath); - loadBlockIndex(i, data); - } catch(e) { - console.error(e); - } - } - } - - const files = await fs.readdir(this.tablePath, { withFileTypes: true }); - - this.blockList.clear(); - for (const file of files) { - if (file.isFile() && path.extname(file.name) == '.jem') { - const numStr = path.basename(file.name, '.jem'); - const index = parseInt(numStr, 10); - if (!isNaN(index)) { - const block = { - index, - delCount: 0, - addCount: 0, - size: 0, - rows: null, - rowsLength: 0, - final: false, - }; - this.blockList.set(block.index, block); - //console.log(index); - } - } - } - } - - async closeAllFiles() { - await this.closeFd('blockIndex'); - await this.closeFd('blockList'); - await this.closeFd('blockRows'); - } - - async destroy() { - await this.closeAllFiles(); - - this.destroyed = true; - } -} - -module.exports = TableRowsFile; \ No newline at end of file diff --git a/server/db/JembaDb/TableRowsMem.js b/server/db/JembaDb/TableRowsMem.js deleted file mode 100644 index 91b16222..00000000 --- a/server/db/JembaDb/TableRowsMem.js +++ /dev/null @@ -1,34 +0,0 @@ -class TableRowsMem { - constructor() { - this.rows = new Map(); - } - - //--- rows interface - async getRow(id) { - return this.rows.get(id); - } - - setRow(id, row) { - this.rows.set(id, row); - } - - deleteRow(id) { - this.rows.delete(id); - } - - getAllIds() { - return this.rows.keys(); - } - - getAllIdsSize() { - return this.rows.size; - } - //--- rows interface end - - async destroy() { - //for GC - this.rows = null; - } -} - -module.exports = TableRowsMem; \ No newline at end of file diff --git a/server/db/JembaDb/index.js b/server/db/JembaDb/index.js deleted file mode 100644 index c97f50a7..00000000 --- a/server/db/JembaDb/index.js +++ /dev/null @@ -1,7 +0,0 @@ -const JembaDb = require('./JembaDb'); -const JembaDbThread = require('./JembaDbThread'); - -module.exports = { - JembaDb, - JembaDbThread, -}; \ No newline at end of file diff --git a/server/db/JembaDb/utils.js b/server/db/JembaDb/utils.js deleted file mode 100644 index 7cf05b05..00000000 --- a/server/db/JembaDb/utils.js +++ /dev/null @@ -1,152 +0,0 @@ -const fsCB = require('fs'); -const fs = fsCB.promises; -const zlib = require('zlib'); - -function sleep(ms) { - return new Promise(resolve => setTimeout(resolve, ms)); -} - -function sleepWithStop(ms, cb = () => {}) { - return new Promise(resolve => { - const timer = setTimeout(resolve, ms); - cb(() => { clearTimeout(timer); resolve(); }); - }); -} - -function unionSet(arrSet) { - if (!arrSet.length) - return new Set(); - - let max = 0; - let size = arrSet[0].size; - for (let i = 1; i < arrSet.length; i++) { - if (arrSet[i].size > size) { - max = i; - size = arrSet[i].size; - } - } - - const result = new Set(arrSet[max]); - for (let i = 0; i < arrSet.length; i++) { - if (i === max) - continue; - - for (const elem of arrSet[i]) { - result.add(elem); - } - } - - return result; -} - -function intersectSet(arrSet) { - if (!arrSet.length) - return new Set(); - - let min = 0; - let size = arrSet[0].size; - for (let i = 1; i < arrSet.length; i++) { - if (arrSet[i].size < size) { - min = i; - size = arrSet[i].size; - } - } - - const result = new Set(); - for (const elem of arrSet[min]) { - let inAll = true; - for (let i = 0; i < arrSet.length; i++) { - if (i === min) - continue; - if (!arrSet[i].has(elem)) { - inAll = false; - break; - } - } - - if (inAll) - result.add(elem); - } - - - return result; -} - -async function pathExists(path) { - try { - await fs.access(path); - return true; - } catch(e) { - return false; - } -} - -async function appendFileToFile(nameFrom, nameTo) { - return new Promise((resolve, reject) => { - const readStream = fsCB.createReadStream(nameFrom); - readStream.on('error', (err) => { - reject(err); - }); - - const writeStream = fsCB.createWriteStream(nameTo, {flags: 'a'}); - - writeStream.on('error', (err) => { - reject(err); - }); - - writeStream.on('close', () => { - resolve(); - }); - - readStream.pipe(writeStream); - }); -} - -function esc(obj) { - return JSON.stringify(obj).replace(/@/g, '\\x40'); -} - -function paramToArray(param) { - return (Array.isArray(param) ? param : [param]); -} - -function cloneDeep(obj) { - return JSON.parse(JSON.stringify(obj)); -} - -//async -function deflate(buf, compressionLevel) { - return new Promise((resolve, reject) => { - zlib.deflateRaw(buf, {level: compressionLevel}, (err, b) => { - if (err) - reject(err); - resolve(b); - }); - }); -} - -//async -function inflate(buf) { - return new Promise((resolve, reject) => { - zlib.inflateRaw(buf, (err, b) => { - if (err) - reject(err); - resolve(b); - }); - }); -} - - -module.exports = { - sleep, - sleepWithStop, - unionSet, - intersectSet, - pathExists, - appendFileToFile, - esc, - paramToArray, - cloneDeep, - deflate, - inflate, -}; \ No newline at end of file