Files
liberama/server/db/JembaDb/Table.js
2021-11-24 14:15:09 +07:00

852 lines
26 KiB
JavaScript

const fs = require('fs').promises;
const utils = require('./utils');
const TableReducer = require('./TableReducer');
const TableRowsMem = require('./TableRowsMem');
const TableRowsFile = require('./TableRowsFile');
const LockQueue = require('./LockQueue');
const maxChangesLength = 10;
class Table {
constructor() {
this.rowsInterface = new TableRowsMem();
this.autoIncrement = 0;
this.fileError = '';
this.openingLock = new LockQueue(100);
this.lock = new LockQueue(100);
this.opened = false;
this.closed = false;
this.deltaStep = 0;
this.changes = [];
//table options defaults
this.inMemory = false;
this.compressed = 0;
this.cacheSize = 5;
this.compressed = 0;
this.recreate = false;
this.autoRepair = false;
this.forceFileClosing = false;
}
checkErrors() {
if (this.fileError)
throw new Error(this.fileError);
if (this.closed)
throw new Error('Table closed');
if (!this.opened)
throw new Error('Table has not been opened yet');
}
async waitForSaveChanges() {
if (this.changes.length > maxChangesLength) {
let i = this.changes.length - maxChangesLength;
while (i > 0 && this.changes.length > maxChangesLength) {
i--;
await utils.sleep(10);
}
}
}
async recreateTable() {
const tempTablePath = `${this.tablePath}___temporary_recreating`;
await fs.rmdir(tempTablePath, { recursive: true });
await fs.mkdir(tempTablePath, { recursive: true });
const tableRowsFileSrc = new TableRowsFile(this.tablePath, this.cacheSize);
const tableRowsFileDest = new TableRowsFile(tempTablePath, this.cacheSize, this.compressed);
const reducerDest = new TableReducer(false, tempTablePath, this.compressed, tableRowsFileDest);
try {
await tableRowsFileSrc.loadCorrupted();
} catch (e) {
console.error(e);
}
try {
await reducerDest._load(true, `${this.tablePath}/meta.0`);
} catch (e) {
console.error(e);
}
const putRows = async(rows) => {
const oldRows = [];
const newRows = [];
const newRowsStr = [];
//checks
for (const row of rows) {
if (!row) {
continue;
}
const t = typeof(row.id);
if (t !== 'number' && t !== 'string') {
continue;
}
const oldRow = await tableRowsFileDest.getRow(row.id);
if (oldRow) {
continue;
}
let str = '';
try {
str = JSON.stringify(row);//because of stringify errors
} catch(e) {
continue;
}
newRows.push(row);
oldRows.push({});
newRowsStr.push(str);
}
try {
//reducer
reducerDest._update(oldRows, newRows, 1);
//insert
for (let i = 0; i < newRows.length; i++) {
const newRow = newRows[i];
const newRowStr = newRowsStr[i];
tableRowsFileDest.setRow(newRow.id, newRow, newRowStr, 1);
}
await tableRowsFileDest.saveDelta(1);
await reducerDest._saveDelta(1);
} catch(e) {
console.error(e);
}
};
let rows = [];
for (const id of tableRowsFileSrc.getAllIds()) {
if (this.closed)
throw new Error('Table closed');
let row = null;
try {
row = await tableRowsFileSrc.getRow(id);
} catch(e) {
console.error(e);
continue;
}
rows.push(row);
if (rows.length > 1000) {
await putRows(rows);
rows = [];
}
}
if (rows.length)
await putRows(rows);
await tableRowsFileDest.saveDelta(0);
const delta = reducerDest._getDelta(0);
delta.dumpMeta = true;
await reducerDest._saveDelta(0);
await tableRowsFileSrc.destroy();
await reducerDest._destroy();
await tableRowsFileDest.destroy();
await fs.writeFile(`${tempTablePath}/state`, '1');
await fs.rmdir(this.tablePath, { recursive: true });
await fs.rename(tempTablePath, this.tablePath);
}
/*
query: {
tablePath: String,
inMemory: Boolean,
cacheSize: Number,
compressed: Number, 0..9
recreate: Boolean, false,
autoRepair: Boolean, false,
forceFileClosing: Boolean, false,
lazyOpen: Boolean, false,
}
*/
async _open(query = {}) {
if (this.opening)
return;
this.opening = true;
await this.openingLock.get();
//console.log(query);
try {
if (this.opened)
throw new Error('Table has already been opened');
if (this.closed)
throw new Error('Table instance has been destroyed. Please create a new one.');
this.inMemory = !!query.inMemory;
if (this.inMemory) {
this.reducer = new TableReducer(this.inMemory, '', 0, this.rowsInterface);
} else {
if (!query.tablePath)
throw new Error(`'query.tablePath' parameter is required`);
this.tablePath = query.tablePath;
this.cacheSize = query.cacheSize || 5;
this.compressed = query.compressed || 0;
this.recreate = query.recreate || false;
this.autoRepair = query.autoRepair || false;
this.forceFileClosing = query.forceFileClosing || false;
await fs.mkdir(this.tablePath, { recursive: true });
this.tableRowsFile = new TableRowsFile(query.tablePath, this.cacheSize, this.compressed);
this.rowsInterface = this.tableRowsFile;
this.reducer = new TableReducer(this.inMemory, this.tablePath, this.compressed, this.rowsInterface);
const statePath = `${this.tablePath}/state`;
let state = null;
if (await utils.pathExists(statePath)) {
state = await fs.readFile(statePath, 'utf8');
}
if (state === null) {//check if other files exists
const files = await fs.readdir(this.tablePath);
if (files.length)
state = '0';
}
if (this.recreate) {
await this.recreateTable();
state = '1';
}
if (state !== null) {
try {
if (state === '1') {
// load tableRowsFile & reducer
this.autoIncrement = await this.tableRowsFile.load();
await this.reducer._load();
} else {
throw new Error('Table corrupted')
}
} catch(e) {
if (this.autoRepair) {
console.error(e.message);
await this.recreateTable();
} else {
throw e;
}
// load tableRowsFile & reducer
this.autoIncrement = await this.tableRowsFile.load();
await this.reducer._load();
}
}
}
this.opened = true;
} catch(e) {
await this.close();
const errMes = `Open table (${query.tablePath}): ${e.message}`;
if (!query.lazyOpen)
throw new Error(errMes);
else
this.fileError = errMes;
} finally {
this.openingLock.ret();
this.opening = false;
}
}
async open(query = {}) {
if (query.lazyOpen) {
this._open(query);
} else {
await this._open(query);
}
}
async close() {
if (this.closed)
return;
this.opened = false;
this.closed = true;
if (!this.inMemory) {
while (this.savingChanges) {
await utils.sleep(10);
}
}
//for GC
if (this.reducer)
await this.reducer._destroy();
this.reducer = null;
if (this.rowsInterface)
await this.rowsInterface.destroy();
this.rowsInterface = null;
this.tableRowsFile = null;
}
/*
query = {
quietIfExists: Boolean,
flag: Object || Array, {name: 'flag1', check: '(r) => r.id > 10'}
hash: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false}
index: Object || Array, {field: 'field1', type: 'string', depth: 11, allowUndef: false}
}
result = {}
*/
async create(query) {
await this.openingLock.get(false);
this.checkErrors();
await this.lock.get();
try {
this.deltaStep++;
try {
if (query.flag) {
for (const flag of utils.paramToArray(query.flag)) {
await this.reducer._addFlag(flag, query.quietIfExists, this.deltaStep);
}
}
if (query.hash) {
for (const hash of utils.paramToArray(query.hash)) {
await this.reducer._addHash(hash, query.quietIfExists, this.deltaStep);
}
}
if (query.index) {
for (const index of utils.paramToArray(query.index)) {
await this.reducer._addIndex(index, query.quietIfExists, this.deltaStep);
}
}
this.changes.push([this.deltaStep, 1]);
} catch(e) {
this.changes.push([this.deltaStep, 0]);
throw e;
}
return {};
} finally {
this.saveChanges();//no await
this.lock.ret();
}
}
/*
query = {
flag: Object || Array, {name: 'flag1'}
hash: Object || Array, {field: 'field1'}
index: Object || Array, {field: 'field1'}
}
result = {}
*/
async drop(query) {
await this.openingLock.get(false);
this.checkErrors();
await this.lock.get();
try {
this.deltaStep++;
try {
if (query.flag) {
for (const flag of utils.paramToArray(query.flag)) {
await this.reducer._delFlag(flag.name, this.deltaStep);
}
}
if (query.hash) {
for (const hash of utils.paramToArray(query.hash)) {
await this.reducer._delHash(hash.field, this.deltaStep);
}
}
if (query.index) {
for (const index of utils.paramToArray(query.index)) {
await this.reducer._delIndex(index.field, this.deltaStep);
}
}
this.changes.push([this.deltaStep, 1]);
} catch(e) {
this.changes.push([this.deltaStep, 0]);
throw e;
}
return {};
} finally {
this.saveChanges();//no await
this.lock.ret();
}
}
/*
result = {
inMemory: Boolean,
flag: Array, [{name: 'flag1', check: '(r) => r.id > 10'}, ...]
hash: Array, [{field: 'field1', type: 'string', depth: 11, allowUndef: false}, ...]
index: Array, [{field: 'field1', type: 'string', depth: 11, allowUndef: false}, ...]
}
*/
async getMeta() {
this.checkErrors();
return {
inMemory: this.inMemory,
flag: this.reducer._listFlag(),
hash: this.reducer._listHash(),
index: this.reducer._listIndex(),
};
}
prepareWhere(where) {
if (typeof(where) !== 'string')
throw new Error('query.where must be a string');
return `async(__tr) => {${where.replace(/@@/g, 'return await __tr.').replace(/@/g, 'await __tr.')}}`;
}
/*
query = {
distinct: 'fieldName' || Array,
count: Boolean,
map: '(r) => ({id1: r.id, ...})',
where: `@@index('field1', 10, 20)`,
sort: '(a, b) => a.id - b.id',
limit: 10,
offset: 10,
}
result = Array
*/
async select(query = {}) {
await this.openingLock.get(false);
this.checkErrors();
let ids;//iterator
if (query.where) {
const where = this.prepareWhere(query.where);
const whereFunc = new Function(`return ${where}`)();
ids = await whereFunc(this.reducer);
} else {
ids = this.rowsInterface.getAllIds();
}
let found = [];
let distinct = () => true;
if (query.distinct) {
const distFields = (Array.isArray(query.distinct) ? query.distinct : [query.distinct]);
const dist = new Map();
distinct = (row) => {
let uniq = '';
for (const field of distFields) {
const value = row[field];
uniq += `${(value === undefined ? '___' : '')}${field}:${value}`;
}
if (dist.has(uniq))
return false;
dist.set(uniq, true);
return true;
};
}
if (!query.where && !query.distinct && query.count) {//some optimization
found = [{count: this.rowsInterface.getAllIdsSize()}];
} else {//full running
for (const id of ids) {
const row = await this.rowsInterface.getRow(id);
if (row && distinct(row)) {
found.push(row);
}
}
if (query.count) {
found = [{count: found.length}];
}
}
let result = [];
if (query.map) {
const mapFunc = new Function(`return ${query.map}`)();
for (const row of found) {
result.push(mapFunc(row));
}
} else {
result = found;
}
if (query.sort) {
const sortFunc = new Function(`return ${query.sort}`)();
result.sort(sortFunc);
}
if (query.hasOwnProperty('limit') || query.hasOwnProperty('offset')) {
const offset = query.offset || 0;
const limit = (query.hasOwnProperty('limit') ? query.limit : result.length);
result = result.slice(offset, offset + limit);
}
return utils.cloneDeep(result);
}
/*
query = {
replace: Boolean,
(!) rows: Array,
}
result = {
(!) inserted: Number,
(!) replaced: Number,
}
*/
async insert(query = {}) {
await this.openingLock.get(false);
this.checkErrors();
await this.lock.get();
try {
if (!Array.isArray(query.rows)) {
throw new Error('query.rows must be an array');
}
const newRows = utils.cloneDeep(query.rows);
const replace = query.replace;
//autoIncrement correction
for (const newRow of newRows) {
if (typeof(newRow.id) === 'number' && newRow.id >= this.autoIncrement)
this.autoIncrement = newRow.id + 1;
}
const oldRows = [];
const newRowsStr = [];
//checks
for (const newRow of newRows) {
if (newRow.hasOwnProperty('___meta'))
throw new Error(`Use of field with name '___meta' is forbidden`);
if (newRow.id === undefined) {
newRow.id = this.autoIncrement;
this.autoIncrement++;
}
const t = typeof(newRow.id);
if (t !== 'number' && t !== 'string') {
throw new Error(`Row id bad type, 'number' or 'string' expected, got ${t}`);
}
const oldRow = await this.rowsInterface.getRow(newRow.id);
if (!replace && oldRow) {
throw new Error(`Record id:${newRow.id} already exists`);
}
oldRows.push((oldRow ? oldRow : {}));
newRowsStr.push(JSON.stringify(newRow));//because of stringify errors
}
const result = {inserted: 0, replaced: 0};
this.deltaStep++;
try {
//reducer
this.reducer._update(oldRows, newRows, this.deltaStep);
//insert
for (let i = 0; i < newRows.length; i++) {
const newRow = newRows[i];
const newRowStr = newRowsStr[i];
const oldRow = oldRows[i];
this.rowsInterface.setRow(newRow.id, newRow, newRowStr, this.deltaStep);
if (oldRow.id !== undefined)
result.replaced++;
else
result.inserted++;
}
this.changes.push([this.deltaStep, 1]);
} catch(e) {
this.changes.push([this.deltaStep, 0]);
throw e;
}
await this.waitForSaveChanges();
return result;
} finally {
this.saveChanges();//no await
this.lock.ret();
}
}
/*
query = {
(!) mod: '(r) => r.count++',
where: `@@index('field1', 10, 20)`,
sort: '(a, b) => a.id - b.id',
limit: 10,
offset: 10,
}
result = {
(!) updated: Number,
}
*/
async update(query = {}) {
await this.openingLock.get(false);
this.checkErrors();
await this.lock.get();
try {
if (typeof(query.mod) !== 'string') {
throw new Error('query.mod must be a string');
}
const modFunc = new Function(`return ${query.mod}`)();
//where
let ids;//iterator
if (query.where) {
const where = this.prepareWhere(query.where);
const whereFunc = new Function(`return ${where}`)();
ids = await whereFunc(this.reducer);
} else {
ids = this.rowsInterface.getAllIds();
}
//oldRows
let oldRows = [];
for (const id of ids) {
const oldRow = await this.rowsInterface.getRow(id);
if (oldRow) {
oldRows.push(oldRow);
}
}
if (query.sort) {
const sortFunc = new Function(`return ${query.sort}`)();
oldRows.sort(sortFunc);
}
let newRows = utils.cloneDeep(oldRows);
if (query.hasOwnProperty('limit') || query.hasOwnProperty('offset')) {
const offset = query.offset || 0;
const limit = (query.hasOwnProperty('limit') ? query.limit : newRows.length);
newRows = newRows.slice(offset, offset + limit);
oldRows = oldRows.slice(offset, offset + limit);
}
//mod & checks
const context = {};
const newRowsStr = [];
for (const newRow of newRows) {
modFunc(newRow, context);
const t = typeof(newRow.id);
if (t !== 'number' && t !== 'string') {
throw new Error(`Row id bad type, 'number' or 'string' expected, got ${t}`);
}
//autoIncrement correction
if (t === 'number' && newRow.id >= this.autoIncrement)
this.autoIncrement = newRow.id + 1;
if (newRow.hasOwnProperty('___meta'))
throw new Error(`Use of field with name '___meta' is forbidden`);
newRowsStr.push(JSON.stringify(newRow));//because of stringify errors
}
this.deltaStep++;
const result = {updated: 0};
try {
//reducer
this.reducer._update(oldRows, newRows, this.deltaStep);
//replace
for (let i = 0; i < newRows.length; i++) {
const newRow = newRows[i];
const newRowStr = newRowsStr[i];
// oldRow.id === newRow.id always here, so
this.rowsInterface.setRow(newRow.id, newRow, newRowStr, this.deltaStep);
result.updated++;
}
this.changes.push([this.deltaStep, 1]);
} catch(e) {
this.changes.push([this.deltaStep, 0]);
throw e;
}
await this.waitForSaveChanges();
return result;
} finally {
this.saveChanges();//no await
this.lock.ret();
}
}
/*
query = {
where: `@@index('field1', 10, 20)`,
sort: '(a, b) => a.id - b.id',
limit: 10,
offset: 10,
}
result = {
(!) deleted: Number,
}
*/
async delete(query = {}) {
await this.openingLock.get(false);
this.checkErrors();
await this.lock.get();
try {
//where
let ids;//iterator
if (query.where) {
const where = this.prepareWhere(query.where);
const whereFunc = new Function(`return ${where}`)();
ids = await whereFunc(this.reducer);
} else {
ids = this.rowsInterface.getAllIds();
}
//oldRows
let oldRows = [];
let newRows = [];
for (const id of ids) {
const oldRow = await this.rowsInterface.getRow(id);
if (oldRow) {
oldRows.push(oldRow);
newRows.push({});
}
}
if (query.sort) {
const sortFunc = new Function(`return ${query.sort}`)();
oldRows.sort(sortFunc);
}
if (query.hasOwnProperty('limit') || query.hasOwnProperty('offset')) {
const offset = query.offset || 0;
const limit = (query.hasOwnProperty('limit') ? query.limit : newRows.length);
newRows = newRows.slice(offset, offset + limit);
oldRows = oldRows.slice(offset, offset + limit);
}
this.deltaStep++;
const result = {deleted: 0};
try {
//reducer
this.reducer._update(oldRows, newRows, this.deltaStep);
//delete
for (let i = 0; i < oldRows.length; i++) {
const oldRow = oldRows[i];
this.rowsInterface.deleteRow(oldRow.id, this.deltaStep);
result.deleted++;
}
this.changes.push([this.deltaStep, 1]);
} catch(e) {
this.changes.push([this.deltaStep, 0]);
throw e;
}
await this.waitForSaveChanges();
return result;
} finally {
this.saveChanges();//no await
this.lock.ret();
}
}
async saveState(state) {
await fs.writeFile(`${this.tablePath}/state`, state);
}
async saveChanges() {
this.needSaveChanges = true;
if (this.savingChanges)
return;
if (this.inMemory) {
this.changes = [];
return;
}
try {
this.checkErrors();
} catch(e) {
return;
}
this.savingChanges = true;
try {
await utils.sleep(0);
while (this.needSaveChanges) {
this.needSaveChanges = false;
await this.saveState('0');
while (this.changes.length) {
const len = this.changes.length;
let i = 0;
while (i < len) {
const [deltaStep, isOk] = this.changes[i];
i++;
if (isOk) {
await this.tableRowsFile.saveDelta(deltaStep);
await this.reducer._saveDelta(deltaStep);
} else {
await this.tableRowsFile.cancelDelta(deltaStep);
await this.reducer._cancelDelta(deltaStep);
}
}
this.changes = this.changes.slice(i);
}
await this.saveState('1');
if (this.forceFileClosing) {
await this.tableRowsFile.closeAllFiles();
await this.reducer._closeAllFiles();
}
}
} catch(e) {
console.error(e.message);
this.fileError = e.message;
} finally {
this.savingChanges = false;
}
}
}
module.exports = Table;