const fs = require('fs').promises; const path = require('path'); const TableIndex = require('./TableIndex'); const TableHash = require('./TableHash'); const TableFlag = require('./TableFlag'); const utils = require('./utils'); const maxFileDumpSize = 2*1024*1024;//bytes class TableReducer { constructor(inMemory, tablePath, compressed, rowsInterface) { this._compressed = compressed || 0; this._inMemory = inMemory; this._tablePath = tablePath; this._rowsInterface = rowsInterface; this._flag = new Map(); this._index = new Map(); this._hash = new Map(); this._deltas = new Map(); this._fd = {};//file descriptors } _getDelta(deltaStep) { if (this._inMemory) throw new Error('TableReducer: sometinhg wrong'); if (this._deltas.has(deltaStep)) { return this._deltas.get(deltaStep); } else { const delta = { flag: [], index: [], hash: [], }; this._deltas.set(deltaStep, delta); return delta; } } _getFullPath(fileName) { return `${this._tablePath}/${fileName}`; } async _getNotExistingFileName(prefix) { let i = 0; while (1) {//eslint-disable-line no-constant-condition i++; const fileName = `${this._tablePath}/${prefix}${i}`; if (!await utils.pathExists(fileName + '.0') && !await utils.pathExists(fileName + '.1')) return path.basename(fileName); } } async _addFlag(opts, quietIfExists, deltaStep) { const flagName = opts.name; if (!this._flag.has(flagName)) { const flag = new TableFlag(opts.check); for (const id of this._rowsInterface.getAllIds()) flag.add(await this._rowsInterface.getRow(id)); if (this._inMemory) { flag.meta = opts; } else { const fileName = await this._getNotExistingFileName('flag'); await this._openFd(this._getFullPath(fileName) + '.1'); flag.meta = Object.assign({}, opts, {fileName}); const delta = this._getDelta(deltaStep); if (!delta.dumpFlag) delta.dumpFlag = new Map(); delta.dumpFlag.set(flagName, 1); delta.dumpMeta = true; } this._flag.set(flagName, flag); } else { if (!quietIfExists) throw new Error(`Flag with name '${flagName}' already exists`); } } async _delFlag(flagName, deltaStep) { if (this._flag.has(flagName)) { if (!this._inMemory) { const delta = this._getDelta(deltaStep); delta.dumpMeta = true; const fileName = this._getFullPath((this._flag.get(flagName)).meta.fileName); if (!delta.delFiles) delta.delFiles = []; delta.delFiles.push(fileName); } this._flag.delete(flagName); } else { throw new Error(`Flag with name '${flagName}' does not exist`); } } _listFlag() { const result = []; for (const flag of this._flag.values()) { result.push(flag.meta); } return result; } async _addHash(opts, quietIfExists, deltaStep) { const fieldName = opts.field; if (!this._hash.has(fieldName)) { const hash = new TableHash(opts); for (const id of this._rowsInterface.getAllIds()) { const row = await this._rowsInterface.getRow(id); hash.add(row[fieldName], id); } if (this._inMemory) { hash.meta = opts; } else { const fileName = await this._getNotExistingFileName('hash'); await this._openFd(this._getFullPath(fileName) + '.1'); hash.meta = Object.assign({}, opts, {fileName}); const delta = this._getDelta(deltaStep); if (!delta.dumpHash) delta.dumpHash = new Map(); delta.dumpHash.set(fieldName, 1); delta.dumpMeta = true; } this._hash.set(fieldName, hash); } else { if (!quietIfExists) throw new Error(`Hash for field '${fieldName}' already exists`); } } async _delHash(fieldName, deltaStep) { if (this._hash.has(fieldName)) { if (!this._inMemory) { const delta = this._getDelta(deltaStep); delta.dumpMeta = true; const fileName = this._getFullPath((this._hash.get(fieldName)).meta.fileName); if (!delta.delFiles) delta.delFiles = []; delta.delFiles.push(fileName); } this._hash.delete(fieldName); } else { throw new Error(`Hash for field '${fieldName}' does not exist`); } } _listHash() { const result = []; for (const hash of this._hash.values()) { result.push(hash.meta); } return result; } async _addIndex(opts, quietIfExists, deltaStep) { const fieldName = opts.field; if (!this._index.has(fieldName)) { const index = new TableIndex(opts); for (const id of this._rowsInterface.getAllIds()) { const row = await this._rowsInterface.getRow(id); index.add(row[fieldName], id); } if (this._inMemory) { index.meta = opts; } else { const fileName = await this._getNotExistingFileName('index'); await this._openFd(this._getFullPath(fileName) + '.1'); index.meta = Object.assign({}, opts, {fileName}); const delta = this._getDelta(deltaStep); if (!delta.dumpIndex) delta.dumpIndex = new Map(); delta.dumpIndex.set(fieldName, 1); delta.dumpMeta = true; } this._index.set(fieldName, index); } else { if (!quietIfExists) throw new Error(`Index for field '${fieldName}' already exists`); } } async _delIndex(fieldName, deltaStep) { if (this._index.has(fieldName)) { if (!this._inMemory) { const delta = this._getDelta(deltaStep); delta.dumpMeta = true; const fileName = this._getFullPath((this._index.get(fieldName)).meta.fileName); if (!delta.delFiles) delta.delFiles = []; delta.delFiles.push(fileName); } this._index.delete(fieldName); } else { throw new Error(`Index for field '${fieldName}' does not exist`); } } _listIndex() { const result = []; for (const index of this._index.values()) { result.push(index.meta); } return result; } _update(oldRows, newRows, deltaStep) { if (!deltaStep && !this._inMemory) throw new Error('Something wrong: deltaStep is empty'); //oldRows & newRows arrays have equal size if (oldRows.length != newRows.length) throw new Error('Reducer update: old and new array lengths are not equal'); //consistency const oldIds = new Map(); const newIds = new Map(); for (let i = 0; i < oldRows.length; i++) { const oldRow = oldRows[i]; const newRow = newRows[i]; if (oldRow.id !== undefined) { if (oldIds.has(oldRow.id)) { throw new Error(`Reducer update: duplicate old_id:${oldRow.id} detected`); } oldIds.set(oldRow.id, true); } if (newRow.id !== undefined) { if (newIds.has(newRow.id)) { throw new Error(`Reducer update: duplicate new_id:${newRow.id} detected`); } newIds.set(newRow.id, true); } if (oldRow.id !== undefined && newRow.id !== undefined && oldRow.id !== newRow.id) throw new Error(`Reducer update: old and new id's are not equal (${oldRow.id} !== ${newRow.id})`); } //update try { let delta = (this._inMemory ? null : this._getDelta(deltaStep)); //flags for (const [flagName, flag] of this._flag.entries()) { const flagDelta = []; for (let i = 0; i < oldRows.length; i++) { const oldRow = oldRows[i]; const newRow = newRows[i]; if (oldRow.id !== undefined) { flag.del(oldRow); flagDelta.push([oldRow.id, 0]); } if (newRow.id !== undefined) { const added = flag.add(newRow); if (added) flagDelta.push([newRow.id, 1]); } } if (delta && flagDelta.length) { delta.flag.push([flagName, flagDelta]); } } //hashes for (const [fieldName, hash] of this._hash.entries()) { const hashDelta = []; for (let i = 0; i < oldRows.length; i++) { const oldRow = oldRows[i]; const newRow = newRows[i]; if (oldRow[fieldName] !== newRow[fieldName]) { if (oldRow.id !== undefined) { const value = hash.del(oldRow[fieldName], oldRow.id); hashDelta.push([value, oldRow.id, 0]); } if (newRow.id !== undefined) { const value = hash.add(newRow[fieldName], newRow.id); hashDelta.push([value, newRow.id, 1]); } } } if (delta && hashDelta.length) { delta.hash.push([fieldName, hashDelta]); } } //indexes for (const [fieldName, index] of this._index.entries()) { const indexDelta = []; for (let i = 0; i < oldRows.length; i++) { const oldRow = oldRows[i]; const newRow = newRows[i]; if (oldRow[fieldName] !== newRow[fieldName]) { if (oldRow.id !== undefined) { const value = index.del(oldRow[fieldName], oldRow.id); indexDelta.push([value, oldRow.id, 0]); } if (newRow.id !== undefined) { const value = index.add(newRow[fieldName], newRow.id); indexDelta.push([value, newRow.id, 1]); } } } if (delta && indexDelta.length) { delta.index.push([fieldName, indexDelta]); } } } catch(e) { //rollback //flags for (const flag of this._flag.values()) { for (let i = 0; i < oldRows.length; i++) { const oldRow = oldRows[i]; const newRow = newRows[i]; if (newRow.id !== undefined) { try { flag.del(newRow); } catch(e) {} // eslint-disable-line no-empty } if (oldRow.id !== undefined) { try { flag.add(oldRow); } catch(e) {} // eslint-disable-line no-empty } } } //hashes for (const [fieldName, hash] of this._hash.entries()) { for (let i = 0; i < oldRows.length; i++) { const oldRow = oldRows[i]; const newRow = newRows[i]; if (oldRow[fieldName] !== newRow[fieldName]) { if (newRow.id !== undefined) { try { hash.del(newRow[fieldName], newRow.id); } catch(e) {} // eslint-disable-line no-empty } if (oldRow.id !== undefined) { try { hash.add(oldRow[fieldName], oldRow.id); } catch(e) {} // eslint-disable-line no-empty } } } } //indexes for (const [fieldName, index] of this._index.entries()) { for (let i = 0; i < oldRows.length; i++) { const oldRow = oldRows[i]; const newRow = newRows[i]; if (oldRow[fieldName] !== newRow[fieldName]) { if (newRow.id !== undefined) { try { index.del(newRow[fieldName], newRow.id); } catch(e) {} // eslint-disable-line no-empty } if (oldRow.id !== undefined) { try { index.add(oldRow[fieldName], oldRow.id); } catch(e) {} // eslint-disable-line no-empty } } } } throw e; } } async _closeFd(name) { if (this._fd[name]) { await this._fd[name].close(); this._fd[name] = null; } } async _openFd(name) { if (this._fd[name]) return; if (!name) { throw new Error('TableReducer: openFd name is empty'); } const exists = await utils.pathExists(name); const fd = await fs.open(name, 'a'); if (!exists) { await fd.write('0['); } this._fd[name] = fd; } async _dumpMaps(delta) { //dump flag for (const [flagName, flag] of this._flag.entries()) { const fileName = this._getFullPath(flag.meta.fileName); const fileName1 = `${fileName}.1`; let size = 0; if (this._fd[fileName1]) size = (await this._fd[fileName1].stat()).size; if (size > maxFileDumpSize || (delta.dumpFlag && delta.dumpFlag.get(flagName))) { const fileName0 = `${fileName}.0`; const fileName2 = `${fileName}.2`; await this._writeFinal(fileName2, JSON.stringify([...flag.flag])); await fs.rename(fileName2, fileName0); await this._closeFd(fileName1); await fs.unlink(fileName1); } } //dump hash for (const [fieldName, hash] of this._hash.entries()) { const fileName = this._getFullPath(hash.meta.fileName); const fileName1 = `${fileName}.1`; let size = 0; if (this._fd[fileName1]) size = (await this._fd[fileName1].stat()).size; if (size > maxFileDumpSize || (delta.dumpHash && delta.dumpHash.get(fieldName))) { const fileName0 = `${fileName}.0`; const fileName2 = `${fileName}.2`; if (hash.unique) { await this._writeFinal(fileName2, JSON.stringify(Array.from(hash.hash))); } else { const buf = []; for (const [key, keySet] of hash.hash) { buf.push([key, [...keySet]]); } await this._writeFinal(fileName2, JSON.stringify(buf)); } await fs.rename(fileName2, fileName0); await this._closeFd(fileName1); await fs.unlink(fileName1); } } //dump index for (const [fieldName, index] of this._index.entries()) { const fileName = this._getFullPath(index.meta.fileName); const fileName1 = `${fileName}.1`; let size = 0; if (this._fd[fileName1]) size = (await this._fd[fileName1].stat()).size; if (size > maxFileDumpSize || (delta.dumpIndex && delta.dumpIndex.get(fieldName))) { const fileName0 = `${fileName}.0`; const fileName2 = `${fileName}.2`; const buf = {hash: [], sorted: index.sorted, delCount: index.delCount}; if (index.unique) { buf.hash = Array.from(index.hash); } else { for (const [key, keySet] of index.hash) { buf.hash.push([key, [...keySet]]); } } await this._writeFinal(fileName2, JSON.stringify(buf)); await fs.rename(fileName2, fileName0); await this._closeFd(fileName1); await fs.unlink(fileName1); } } } async _dumpMeta() { const fileName = this._getFullPath('meta'); const fileName0 = `${fileName}.0`; const fileName2 = `${fileName}.2`; await this._writeFinal(fileName2, JSON.stringify({ flag: this._listFlag(), hash: this._listHash(), index: this._listIndex(), })); await fs.rename(fileName2, fileName0); } async _saveDelta(deltaStep) { //delta const delta = this._getDelta(deltaStep); //save flag delta for (const flagRec of delta.flag) { const [flagName, flagDelta] = flagRec; const flag = this._flag.get(flagName); const fileName = this._getFullPath(flag.meta.fileName) + '.1'; if (!this._fd[fileName]) await this._openFd(fileName); const buf = []; for (const deltaRec of flagDelta) { buf.push(JSON.stringify(deltaRec)); } if (buf.length) await this._fd[fileName].write(buf.join(',') + ','); } //save hash delta for (const hashRec of delta.hash) { const [hashName, hashDelta] = hashRec; const hash = this._hash.get(hashName); const fileName = this._getFullPath(hash.meta.fileName) + '.1'; if (!this._fd[fileName]) await this._openFd(fileName); const buf = []; for (const deltaRec of hashDelta) { buf.push(JSON.stringify(deltaRec)); } if (buf.length) await this._fd[fileName].write(buf.join(',') + ','); } //save index delta for (const indexRec of delta.index) { const [indexName, indexDelta] = indexRec; const index = this._index.get(indexName); const fileName = this._getFullPath(index.meta.fileName) + '.1'; if (!this._fd[fileName]) await this._openFd(fileName); const buf = []; for (const deltaRec of indexDelta) { buf.push(JSON.stringify(deltaRec)); } if (buf.length) await this._fd[fileName].write(buf.join(',') + ','); } //dumps await this._dumpMaps(delta); //meta if (delta.dumpMeta) await this._dumpMeta(); //del files if (delta.delFiles) { for (const fileName of delta.delFiles) { if (this._fd[fileName]) this._closeFd(fileName); if (await utils.pathExists(fileName)) await fs.unlink(fileName); } } this._deltas.delete(deltaStep); } async _cancelDelta(deltaStep) { this._deltas.delete(deltaStep); } async _loadFile(filePath) { let buf = await fs.readFile(filePath); if (!buf.length) throw new Error(`TableReducer: file ${filePath} is empty`); const flag = buf[0]; if (flag === 50) {//flag '2' ~ finalized && compressed const packed = Buffer.from(buf.buffer, buf.byteOffset + 1, buf.length - 1); const data = await utils.inflate(packed); buf = data.toString(); } else if (flag === 49) {//flag '1' ~ finalized buf[0] = 32;//' ' buf = buf.toString(); } else {//flag '0' ~ not finalized buf[0] = 32;//' ' const last = buf.length - 1; if (buf[last] === 44) {//',' buf[last] = 93;//']' buf = buf.toString(); } else {//corrupted or empty buf = buf.toString(); if (this._loadCorrupted) { const lastComma = buf.lastIndexOf(','); if (lastComma >= 0) buf = buf.substring(0, lastComma); } buf += ']'; } } let result; try { result = JSON.parse(buf); } catch(e) { throw new Error(`load ${filePath} failed: ${e.message}`); } return result; } async _writeFinal(fileName, data) { if (!this._compressed) { await fs.writeFile(fileName, '1' + data); } else { let buf = Buffer.from(data); buf = await utils.deflate(buf, this.compressed); const fd = await fs.open(fileName, 'w'); await fd.write('2'); await fd.write(buf); await fd.close(); } } async _load(corrupted = false, metaPath = '') { if (corrupted) this._loadCorrupted = true; const metaFileName = (metaPath ? metaPath : this._getFullPath('meta.0')); if (!await utils.pathExists(metaFileName)) return; const meta = await this._loadFile(metaFileName); //flag this._flag.clear(); for (const opts of meta.flag) { const flag = new TableFlag(opts.check); flag.meta = opts; if (!corrupted) { const fileName = this._getFullPath(opts.fileName); const fileName0 = `${fileName}.0`; const fileName1 = `${fileName}.1`; //load dump if (await utils.pathExists(fileName0)) { const data = await this._loadFile(fileName0); flag.flag = new Set(data); } //load delta if (await utils.pathExists(fileName1)) { const flagDelta = await this._loadFile(fileName1); for (const deltaRec of flagDelta) { const [id, isAdd] = deltaRec; if (isAdd) flag.flag.add(id); else flag.flag.delete(id); } } } this._flag.set(opts.name, flag); } //hash this._hash.clear(); for (const opts of meta.hash) { const hash = new TableHash(opts); hash.meta = opts; if (!corrupted) { const fileName = this._getFullPath(opts.fileName); const fileName0 = `${fileName}.0`; const fileName1 = `${fileName}.1`; //load dump if (await utils.pathExists(fileName0)) { const data = await this._loadFile(fileName0); if (hash.unique) { hash.hash = new Map(data); } else { for (const rec of data) { const [key, keySet] = rec; hash.hash.set(key, new Set(keySet)); } } } //load delta if (await utils.pathExists(fileName1)) { const hashDelta = await this._loadFile(fileName1); for (const deltaRec of hashDelta) { const [value, id, isAdd] = deltaRec; if (isAdd) hash.add(value, id); else hash.del(value, id); } } } this._hash.set(opts.field, hash); } //index this._index.clear(); for (const opts of meta.index) { const index = new TableIndex(opts); index.meta = opts; if (!corrupted) { const fileName = this._getFullPath(opts.fileName); const fileName0 = `${fileName}.0`; const fileName1 = `${fileName}.1`; //load dump if (await utils.pathExists(fileName0)) { const data = await this._loadFile(fileName0); index.sorted = data.sorted; index.delCount = data.delCount; if (index.unique) { index.hash = new Map(data.hash); } else { for (const rec of data.hash) { const [key, keySet] = rec; index.hash.set(key, new Set(keySet)); } } } //load delta if (await utils.pathExists(fileName1)) { const indexDelta = await this._loadFile(fileName1); for (const deltaRec of indexDelta) { const [value, id, isAdd] = deltaRec; if (isAdd) index.add(value, id); else index.del(value, id); } } } this._index.set(opts.field, index); } } async _closeAllFiles() { for (const name of Object.keys(this._fd)) { await this._closeFd(name); } } async _destroy() { await this._closeAllFiles(); //for GC this._flag.clear(); this._index.clear(); this._hash.clear(); this._deltas.clear(); this._rowsInterface = null; } //------------------------------------------------------------------------------------------ //Reducer methods async id() { const result = new Set(); for (const arg of arguments) { if (!Array.isArray(arg)) result.add(arg); else { for (const id of arg) { result.add(id); } } } return result; } async flag(flagName) { if (this._flag.has(flagName)) { return new Set(this._flag.get(flagName).flag); } else { throw new Error(`Flag with name '${flagName}' does not exist`); } } async hash(fieldName, value) { if (this._hash.has(fieldName)) { const hash = this._hash.get(fieldName); const result = new Set(); if (!Array.isArray(value)) { const ids = hash.reduce(value); for (const id of ids) { const row = await this._rowsInterface.getRow(id); if (row[fieldName] === value) result.add(id); } } else { for (const v of value) { const ids = hash.reduce(v); for (const id of ids) { const row = await this._rowsInterface.getRow(id); if (row[fieldName] === v) result.add(id); } } } return result; } else { throw new Error(`Hash for field '${fieldName}' does not exist`); } } async hashMin(fieldName) { if (this._hash.has(fieldName)) { const hash = this._hash.get(fieldName); return hash.min(); } else { throw new Error(`Hash for field '${fieldName}' does not exist`); } } async hashMax(fieldName) { if (this._hash.has(fieldName)) { const hash = this._hash.get(fieldName); return hash.max(); } else { throw new Error(`Hash for field '${fieldName}' does not exist`); } } async hashIter(fieldName, checkFunc) { if (this._hash.has(fieldName)) { const hash = this._hash.get(fieldName); return hash.iter(checkFunc); } else { throw new Error(`Hash for field '${fieldName}' does not exist`); } } async _indexReduce(fieldName, from, to, checkFuncs) { if (this._index.has(fieldName)) { const index = this._index.get(fieldName); const ids = index.reduce(from, to); const check = (index.isNumber ? checkFuncs[0] : checkFuncs[1]); const result = new Set(); for (const id of ids) { const row = await this._rowsInterface.getRow(id); if (check(row[fieldName])) result.add(id); } return result; } else { throw new Error(`Index for field '${fieldName}' does not exist`); } } async index(fieldName, from, to) { let checkFuncs = [ (value) => (value > from && value < to), (value) => (value.localeCompare(from) > 0 && value.localeCompare(to) < 0), ]; if (from === undefined) { checkFuncs = [ (value) => (value < to), (value) => (value.localeCompare(to) < 0), ]; } else if (to === undefined) { checkFuncs = [ (value) => (value > from), (value) => (value.localeCompare(from) > 0), ]; } return this._indexReduce(fieldName, from, to, checkFuncs); } async indexL(fieldName, from, to) { let checkFuncs = [ (value) => (value >= from && value < to), (value) => (value.localeCompare(from) >= 0 && value.localeCompare(to) < 0), ]; if (from === undefined) { checkFuncs = [ (value) => (value < to), (value) => (value.localeCompare(to) < 0), ]; } else if (to === undefined) { checkFuncs = [ (value) => (value >= from), (value) => (value.localeCompare(from) >= 0), ]; } return this._indexReduce(fieldName, from, to, checkFuncs); } async indexR(fieldName, from, to) { let checkFuncs = [ (value) => (value > from && value <= to), (value) => (value.localeCompare(from) > 0 && value.localeCompare(to) <= 0), ]; if (from === undefined) { checkFuncs = [ (value) => (value <= to), (value) => (value.localeCompare(to) <= 0), ]; } else if (to === undefined) { checkFuncs = [ (value) => (value > from), (value) => (value.localeCompare(from) > 0), ]; } return this._indexReduce(fieldName, from, to, checkFuncs); } async indexLR(fieldName, from, to) { let checkFuncs = [ (value) => (value >= from && value <= to), (value) => (value.localeCompare(from) >= 0 && value.localeCompare(to) <= 0), ]; if (from === undefined) { checkFuncs = [ (value) => (value <= to), (value) => (value.localeCompare(to) <= 0), ]; } else if (to === undefined) { checkFuncs = [ (value) => (value >= from), (value) => (value.localeCompare(from) >= 0), ]; } return this._indexReduce(fieldName, from, to, checkFuncs); } async indexMin(fieldName) { if (this._index.has(fieldName)) { const index = this._index.get(fieldName); return index.min(); } else { throw new Error(`Index for field '${fieldName}' does not exist`); } } async indexMax(fieldName) { if (this._index.has(fieldName)) { const index = this._index.get(fieldName); return index.max(); } else { throw new Error(`Index for field '${fieldName}' does not exist`); } } async indexIter(fieldName, checkFunc) { if (this._index.has(fieldName)) { const index = this._index.get(fieldName); return index.iter(checkFunc); } else { throw new Error(`Index for field '${fieldName}' does not exist`); } } //returns iterator, not Set async all() { return this._rowsInterface.getAllIds(); } async allSize() { return this._rowsInterface.getAllIdsSize(); } async iter(ids, checkFunc) { const result = new Set(); for (const id of ids) { const row = await this._rowsInterface.getRow(id); const checkResult = checkFunc(row); if (checkResult === undefined) break; if (checkResult) result.add(id); } return result; } async and() { const result = []; for (const arg of arguments) { if (!Array.isArray(arg)) { result.push(arg); } else { for (const s of arg) { result.push(s); } } } return utils.intersectSet(result); } async or() { const result = []; for (const arg of arguments) { if (!Array.isArray(arg)) result.push(arg); else { for (const s of arg) { result.push(s); } } } return utils.unionSet(result); } } module.exports = TableReducer;