diff --git a/server/controllers/WebSocketController.js b/server/controllers/WebSocketController.js index c02b9ee..4ec35ac 100644 --- a/server/controllers/WebSocketController.js +++ b/server/controllers/WebSocketController.js @@ -98,13 +98,10 @@ class WebSocketController { } async getConfig(req, ws) { - if (Array.isArray(req.params)) { - const paramsSet = new Set(req.params); + const config = _.pick(this.config, this.config.webConfigParams); + config.dbConfig = await this.webWorker.dbConfig(); - this.send(_.pick(this.config, this.config.webConfigParams.filter(x => paramsSet.has(x))), req, ws); - } else { - throw new Error('params is not an array'); - } + this.send(config, req, ws); } async getWorkerState(req, ws) { diff --git a/server/core/DbSearcher.js b/server/core/DbSearcher.js new file mode 100644 index 0000000..82b4b8e --- /dev/null +++ b/server/core/DbSearcher.js @@ -0,0 +1,28 @@ +class DbSearcher { + constructor(db) { + this.db = db; + } + + async search(query) { + const db = this.db; + + let result = []; + + if (query.author) { + // + } else { + result = await db.select({ + table: 'author', + map: `(r) => ({id: r.id, author: r.author})` + }); + } + + if (query.limit) { + result = result.slice(0, query.limit); + } + + return result; + } +} + +module.exports = DbSearcher; \ No newline at end of file diff --git a/server/core/WebSocketConnection.js b/server/core/WebSocketConnection.js new file mode 100644 index 0000000..636f2e4 --- /dev/null +++ b/server/core/WebSocketConnection.js @@ -0,0 +1,240 @@ +const isBrowser = (typeof window !== 'undefined'); + +const utils = { + sleep: (ms) => { return new Promise(resolve => setTimeout(resolve, ms)); } +}; + +const cleanPeriod = 5*1000;//5 секунд + +class WebSocketConnection { + //messageLifeTime в секундах (проверка каждый cleanPeriod интервал) + constructor(url, openTimeoutSecs = 10, messageLifeTimeSecs = 30, webSocketOptions = {}) { + this.WebSocket = (isBrowser ? WebSocket : require('ws')); + this.url = url; + this.webSocketOptions = webSocketOptions; + + this.ws = null; + + this.listeners = []; + this.messageQueue = []; + this.messageLifeTime = messageLifeTimeSecs*1000; + this.openTimeout = openTimeoutSecs*1000; + this.requestId = 0; + + this.wsErrored = false; + this.closed = false; + + this.connecting = false; + this.periodicClean();//no await + } + + //рассылаем сообщение и удаляем те обработчики, которые его получили + emit(mes, isError) { + const len = this.listeners.length; + if (len > 0) { + let newListeners = []; + for (const listener of this.listeners) { + let emitted = false; + if (isError) { + listener.onError(mes); + emitted = true; + } else { + if ( (listener.requestId && mes.requestId && listener.requestId === mes.requestId) || + (!listener.requestId && !mes.requestId) ) { + listener.onMessage(mes); + emitted = true; + } + } + + if (!emitted) + newListeners.push(listener); + } + this.listeners = newListeners; + } + + return this.listeners.length != len; + } + + get isOpen() { + return (this.ws && this.ws.readyState == this.WebSocket.OPEN); + } + + processMessageQueue() { + let newMessageQueue = []; + for (const message of this.messageQueue) { + if (!this.emit(message.mes)) { + newMessageQueue.push(message); + } + } + + this.messageQueue = newMessageQueue; + } + + _open() { + return new Promise((resolve, reject) => { (async() => { + if (this.closed) + reject(new Error('Этот экземпляр класса уничтожен. Пожалуйста, создайте новый.')); + + if (this.connecting) { + let i = this.openTimeout/100; + while (i-- > 0 && this.connecting) { + await utils.sleep(100); + } + } + + //проверим подключение, и если нет, то подключимся заново + if (this.isOpen) { + resolve(this.ws); + } else { + this.connecting = true; + this.terminate(); + + if (isBrowser) { + const protocol = (window.location.protocol == 'https:' ? 'wss:' : 'ws:'); + const url = this.url || `${protocol}//${window.location.host}/ws`; + this.ws = new this.WebSocket(url); + } else { + this.ws = new this.WebSocket(this.url, this.webSocketOptions); + } + + const onopen = () => { + this.connecting = false; + resolve(this.ws); + }; + + const onmessage = (data) => { + try { + if (isBrowser) + data = data.data; + const mes = JSON.parse(data); + this.messageQueue.push({regTime: Date.now(), mes}); + + this.processMessageQueue(); + } catch (e) { + this.emit(e.message, true); + } + }; + + const onerror = (e) => { + this.emit(e.message, true); + reject(new Error(e.message)); + }; + + const onclose = (e) => { + this.emit(e.message, true); + reject(new Error(e.message)); + }; + + if (isBrowser) { + this.ws.onopen = onopen; + this.ws.onmessage = onmessage; + this.ws.onerror = onerror; + this.ws.onclose = onclose; + } else { + this.ws.on('open', onopen); + this.ws.on('message', onmessage); + this.ws.on('error', onerror); + this.ws.on('close', onclose); + } + + await utils.sleep(this.openTimeout); + reject(new Error('Соединение не удалось')); + } + })() }); + } + + //timeout в секундах (проверка каждый cleanPeriod интервал) + message(requestId, timeoutSecs = 4) { + return new Promise((resolve, reject) => { + this.listeners.push({ + regTime: Date.now(), + requestId, + timeout: timeoutSecs*1000, + onMessage: (mes) => { + resolve(mes); + }, + onError: (mes) => { + reject(new Error(mes)); + } + }); + + this.processMessageQueue(); + }); + } + + async send(req, timeoutSecs = 4) { + await this._open(); + if (this.isOpen) { + this.requestId = (this.requestId < 1000000 ? this.requestId + 1 : 1); + const requestId = this.requestId;//реентерабельность!!! + + this.ws.send(JSON.stringify(Object.assign({requestId}, req))); + + let resp = {}; + try { + resp = await this.message(requestId, timeoutSecs); + } catch(e) { + this.terminate(); + throw new Error('WebSocket не отвечает'); + } + + if (resp._rok) { + return requestId; + } else { + throw new Error('Запрос не принят сервером'); + } + } else { + throw new Error('WebSocket коннект закрыт'); + } + } + + terminate() { + if (this.ws) { + if (isBrowser) { + this.ws.close(); + } else { + this.ws.terminate(); + } + } + this.ws = null; + } + + close() { + this.terminate(); + this.closed = true; + } + + async periodicClean() { + while (!this.closed) { + try { + const now = Date.now(); + //чистка listeners + let newListeners = []; + for (const listener of this.listeners) { + if (now - listener.regTime < listener.timeout) { + newListeners.push(listener); + } else { + if (listener.onError) + listener.onError('Время ожидания ответа истекло'); + } + } + this.listeners = newListeners; + + //чистка messageQueue + let newMessageQueue = []; + for (const message of this.messageQueue) { + if (now - message.regTime < this.messageLifeTime) { + newMessageQueue.push(message); + } + } + this.messageQueue = newMessageQueue; + } catch(e) { + // + } + + await utils.sleep(cleanPeriod); + } + } +} + +module.exports = WebSocketConnection; \ No newline at end of file diff --git a/server/core/WebWorker.js b/server/core/WebWorker.js index 447c1c4..c306b72 100644 --- a/server/core/WebWorker.js +++ b/server/core/WebWorker.js @@ -4,6 +4,7 @@ const fs = require('fs-extra'); const WorkerState = require('./WorkerState'); const { JembaDbThread } = require('jembadb'); const DbCreator = require('./DbCreator'); +const DbSearcher = require('./DbSearcher'); const ayncExit = new (require('./AsyncExit'))(); const log = new (require('./AppLogger'))().log;//singleton @@ -32,6 +33,7 @@ class WebWorker { this.wState = this.workerState.getControl('server_state'); this.myState = ''; this.db = null; + this.dbSearcher = null; ayncExit.add(this.closeDb.bind(this)); @@ -125,8 +127,8 @@ class WebWorker { this.setMyState(ssDbLoading); log('Searcher DB open'); - this.db = new JembaDbThread(); - await this.db.lock({ + const db = new JembaDbThread(); + await db.lock({ dbPath, softLock: true, @@ -136,8 +138,16 @@ class WebWorker { }); //открываем все таблицы - await this.db.openAll(); - await this.db.close({table: 'title'}); + await db.openAll(); + + //закроем title для экономии памяти, откроем при необходимости + await db.close({table: 'title'}); + this.titleOpen = false; + + this.dbSearcher = new DbSearcher(db); + + db.wwCache = {}; + this.db = db; log('Searcher DB is ready'); } catch (e) { @@ -148,6 +158,24 @@ class WebWorker { } } + async dbConfig() { + this.checkMyState(); + + const db = this.db; + if (!db.wwCache.config) { + const rows = await db.select({table: 'config'}); + const config = {}; + + for (const row of rows) { + config[row.id] = row.value; + } + + db.wwCache.config = config; + } + + return db.wwCache.config; + } + async logServerStats() { while (1) {// eslint-disable-line try {