Переход на JembaDb
This commit is contained in:
@@ -37,6 +37,14 @@ module.exports = {
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
|
||||||
|
jembaDb: [
|
||||||
|
{
|
||||||
|
dbName: 'reader-storage',
|
||||||
|
thread: true,
|
||||||
|
openAll: true,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
|
||||||
servers: [
|
servers: [
|
||||||
{
|
{
|
||||||
serverName: '1',
|
serverName: '1',
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
const BaseController = require('./BaseController');
|
const BaseController = require('./BaseController');
|
||||||
const ReaderWorker = require('../core/Reader/ReaderWorker');//singleton
|
const ReaderWorker = require('../core/Reader/ReaderWorker');//singleton
|
||||||
const ReaderStorage = require('../core/Reader/ReaderStorage');//singleton
|
const JembaReaderStorage = require('../core/Reader/JembaReaderStorage');//singleton
|
||||||
const WorkerState = require('../core/WorkerState');//singleton
|
const WorkerState = require('../core/WorkerState');//singleton
|
||||||
|
|
||||||
class ReaderController extends BaseController {
|
class ReaderController extends BaseController {
|
||||||
constructor(config) {
|
constructor(config) {
|
||||||
super(config);
|
super(config);
|
||||||
this.readerStorage = new ReaderStorage();
|
this.readerStorage = new JembaReaderStorage();
|
||||||
this.readerWorker = new ReaderWorker(config);
|
this.readerWorker = new ReaderWorker(config);
|
||||||
this.workerState = new WorkerState();
|
this.workerState = new WorkerState();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ const WebSocket = require ('ws');
|
|||||||
const _ = require('lodash');
|
const _ = require('lodash');
|
||||||
|
|
||||||
const ReaderWorker = require('../core/Reader/ReaderWorker');//singleton
|
const ReaderWorker = require('../core/Reader/ReaderWorker');//singleton
|
||||||
const ReaderStorage = require('../core/Reader/ReaderStorage');//singleton
|
const JembaReaderStorage = require('../core/Reader/JembaReaderStorage');//singleton
|
||||||
const WorkerState = require('../core/WorkerState');//singleton
|
const WorkerState = require('../core/WorkerState');//singleton
|
||||||
const log = new (require('../core/AppLogger'))().log;//singleton
|
const log = new (require('../core/AppLogger'))().log;//singleton
|
||||||
const utils = require('../core/utils');
|
const utils = require('../core/utils');
|
||||||
@@ -15,7 +15,7 @@ class WebSocketController {
|
|||||||
this.config = config;
|
this.config = config;
|
||||||
this.isDevelopment = (config.branch == 'development');
|
this.isDevelopment = (config.branch == 'development');
|
||||||
|
|
||||||
this.readerStorage = new ReaderStorage();
|
this.readerStorage = new JembaReaderStorage();
|
||||||
this.readerWorker = new ReaderWorker(config);
|
this.readerWorker = new ReaderWorker(config);
|
||||||
this.workerState = new WorkerState();
|
this.workerState = new WorkerState();
|
||||||
|
|
||||||
|
|||||||
122
server/core/Reader/JembaReaderStorage.js
Normal file
122
server/core/Reader/JembaReaderStorage.js
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
const _ = require('lodash');
|
||||||
|
|
||||||
|
const utils = require('../utils');
|
||||||
|
const JembaConnManager = require('../../db/JembaConnManager');//singleton
|
||||||
|
|
||||||
|
let instance = null;
|
||||||
|
|
||||||
|
//singleton
|
||||||
|
class JembaReaderStorage {
|
||||||
|
constructor() {
|
||||||
|
if (!instance) {
|
||||||
|
this.connManager = new JembaConnManager();
|
||||||
|
this.db = this.connManager.db['reader-storage'];
|
||||||
|
this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа
|
||||||
|
|
||||||
|
instance = this;
|
||||||
|
}
|
||||||
|
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
async doAction(act) {
|
||||||
|
if (!_.isObject(act.items))
|
||||||
|
throw new Error('items is not an object');
|
||||||
|
|
||||||
|
let result = {};
|
||||||
|
switch (act.action) {
|
||||||
|
case 'check':
|
||||||
|
result = await this.checkItems(act.items);
|
||||||
|
break;
|
||||||
|
case 'get':
|
||||||
|
result = await this.getItems(act.items);
|
||||||
|
break;
|
||||||
|
case 'set':
|
||||||
|
result = await this.setItems(act.items, act.force);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new Error('Unknown action');
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
async checkItems(items) {
|
||||||
|
let result = {state: 'success', items: {}};
|
||||||
|
|
||||||
|
const db = this.db;
|
||||||
|
|
||||||
|
for (const id of Object.keys(items)) {
|
||||||
|
if (this.cache[id]) {
|
||||||
|
result.items[id] = this.cache[id];
|
||||||
|
} else {
|
||||||
|
const rows = await db.select({//SQL`SELECT rev FROM storage WHERE id = ${id}`
|
||||||
|
table: 'storage',
|
||||||
|
map: '(r) => ({rev: r.rev})',
|
||||||
|
where: `@@id(${db.esc(id)})`
|
||||||
|
});
|
||||||
|
const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
|
||||||
|
result.items[id] = {rev};
|
||||||
|
this.cache[id] = result.items[id];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getItems(items) {
|
||||||
|
let result = {state: 'success', items: {}};
|
||||||
|
|
||||||
|
const db = this.db;
|
||||||
|
|
||||||
|
for (const id of Object.keys(items)) {
|
||||||
|
const rows = await db.select({//SQL`SELECT rev, data FROM storage WHERE id = ${id}`);
|
||||||
|
table: 'storage',
|
||||||
|
where: `@@id(${db.esc(id)})`
|
||||||
|
});
|
||||||
|
const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
|
||||||
|
const data = (rows.length && rows[0].data ? rows[0].data : '');
|
||||||
|
result.items[id] = {rev, data};
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
async setItems(items, force) {
|
||||||
|
let check = await this.checkItems(items);
|
||||||
|
|
||||||
|
//сначала проверим совпадение ревизий
|
||||||
|
for (const id of Object.keys(items)) {
|
||||||
|
if (!_.isString(items[id].data))
|
||||||
|
throw new Error('items.data is not a string');
|
||||||
|
|
||||||
|
if (!force && check.items[id].rev + 1 !== items[id].rev)
|
||||||
|
return {state: 'reject', items: check.items};
|
||||||
|
}
|
||||||
|
|
||||||
|
const db = this.db;
|
||||||
|
const newRev = {};
|
||||||
|
for (const id of Object.keys(items)) {
|
||||||
|
await db.insert({//SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`);
|
||||||
|
table: 'storage',
|
||||||
|
replace: true,
|
||||||
|
rows: [{id, rev: items[id].rev, time: utils.toUnixTime(Date.now()), data: items[id].data}],
|
||||||
|
});
|
||||||
|
newRev[id] = {rev: items[id].rev};
|
||||||
|
}
|
||||||
|
|
||||||
|
Object.assign(this.cache, newRev);
|
||||||
|
|
||||||
|
return {state: 'success'};
|
||||||
|
}
|
||||||
|
|
||||||
|
periodicCleanCache(timeout) {
|
||||||
|
this.cache = {};
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
this.periodicCleanCache(timeout);
|
||||||
|
}, timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = JembaReaderStorage;
|
||||||
@@ -38,6 +38,10 @@ function sleep(ms) {
|
|||||||
return new Promise(resolve => setTimeout(resolve, ms));
|
return new Promise(resolve => setTimeout(resolve, ms));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function toUnixTime(time) {
|
||||||
|
return parseInt(time/1000);
|
||||||
|
}
|
||||||
|
|
||||||
function randomHexString(len) {
|
function randomHexString(len) {
|
||||||
return crypto.randomBytes(len).toString('hex')
|
return crypto.randomBytes(len).toString('hex')
|
||||||
}
|
}
|
||||||
@@ -126,6 +130,7 @@ module.exports = {
|
|||||||
bufferRemoveZeroes,
|
bufferRemoveZeroes,
|
||||||
getFileHash,
|
getFileHash,
|
||||||
sleep,
|
sleep,
|
||||||
|
toUnixTime,
|
||||||
randomHexString,
|
randomHexString,
|
||||||
touchFile,
|
touchFile,
|
||||||
spawnProcess,
|
spawnProcess,
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ class JembaConnManager {
|
|||||||
this.config = config;
|
this.config = config;
|
||||||
this._db = {};
|
this._db = {};
|
||||||
|
|
||||||
for (const dbConfig of this.config.db) {
|
for (const dbConfig of this.config.jembaDb) {
|
||||||
const dbPath = `${this.config.dataDir}/db/${dbConfig.dbName}`;
|
const dbPath = `${this.config.dataDir}/db/${dbConfig.dbName}`;
|
||||||
|
|
||||||
//бэкап
|
//бэкап
|
||||||
@@ -90,7 +90,7 @@ class JembaConnManager {
|
|||||||
if (!this.inited)
|
if (!this.inited)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
for (const dbConfig of this.config.db) {
|
for (const dbConfig of this.config.jembaDb) {
|
||||||
await this._db[dbConfig.dbName].closeDb();
|
await this._db[dbConfig.dbName].closeDb();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,8 +2,7 @@ module.exports = {
|
|||||||
up: [
|
up: [
|
||||||
//CREATE TABLE storage (id TEXT PRIMARY KEY, rev INTEGER, time INTEGER, data TEXT);
|
//CREATE TABLE storage (id TEXT PRIMARY KEY, rev INTEGER, time INTEGER, data TEXT);
|
||||||
['create', {
|
['create', {
|
||||||
table: 'storage',
|
table: 'storage'
|
||||||
hash: {field: 'id', type: 'string', depth: 100}
|
|
||||||
}],
|
}],
|
||||||
],
|
],
|
||||||
down: [
|
down: [
|
||||||
|
|||||||
@@ -46,6 +46,9 @@ async function init() {
|
|||||||
//connections
|
//connections
|
||||||
const connManager = new (require('./db/ConnManager'))();//singleton
|
const connManager = new (require('./db/ConnManager'))();//singleton
|
||||||
await connManager.init(config);
|
await connManager.init(config);
|
||||||
|
|
||||||
|
const jembaConnManager = new (require('./db/JembaConnManager'))();//singleton
|
||||||
|
await jembaConnManager.init(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function main() {
|
async function main() {
|
||||||
|
|||||||
Reference in New Issue
Block a user