diff --git a/server/controllers/WebSocketController.js b/server/controllers/WebSocketController.js new file mode 100644 index 00000000..e31aacb7 --- /dev/null +++ b/server/controllers/WebSocketController.js @@ -0,0 +1,111 @@ +const WebSocket = require ('ws'); +const WorkerState = require('../core/WorkerState');//singleton +const utils = require('../core/utils'); + +const cleanPeriod = 1*60*1000;//1 минута +const closeSocketOnIdle = 5*60*1000;//5 минут + +class WebSocketController { + constructor(wss, config) { + this.config = config; + this.workerState = new WorkerState(); + + this.wss = wss; + + wss.on('connection', (ws) => { + ws.on('message', (message) => { + this.onMessage(ws, message); + }); + }); + + setTimeout(() => { this.periodicClean(); }, cleanPeriod); + } + + periodicClean() { + try { + const now = Date.now(); + this.wss.clients.forEach((ws) => { + if (!ws.lastActivity || now - ws.lastActivity > closeSocketOnIdle - 50) { + ws.terminate(); + } + }); + } finally { + setTimeout(() => { this.periodicClean(); }, cleanPeriod); + } + } + + async onMessage(ws, message) { + let req = {}; + try { + ws.lastActivity = Date.now(); + req = JSON.parse(message); + switch (req.action) { + case 'test': + this.test(req, ws); break; + case 'worker-get-state': + this.workerGetState(req, ws); break; + case 'worker-get-state-finish': + this.workerGetStateFinish(req, ws); break; + + default: + throw new Error(`Action not found: ${req.action}`); + } + } catch (e) { + this.send({error: e.message}, req, ws); + } + } + + send(res, req, ws) { + if (ws.readyState == WebSocket.OPEN) { + ws.lastActivity = Date.now(); + let r = Object.assign({}, res); + if (req.requestId) + r.requestId = req.requestId; + ws.send(JSON.stringify(r)); + } + } + + //Actions + async test(req, ws) { + this.send({message: 'Liberama project is awesome'}, req, ws); + } + + async workerGetState(req, ws) { + if (!req.workerId) + throw new Error(`key 'workerId' is wrong`); + + const state = this.workerState.getState(req.workerId); + this.send((state ? state : {}), req, ws); + } + + async workerGetStateFinish(req, ws) { + if (!req.workerId) + throw new Error(`key 'workerId' is wrong`); + + const refreshPause = 200; + let i = 0; + let state = {}; + while (1) {// eslint-disable-line no-constant-condition + const prevProgress = state.progress || -1; + const prevState = state.state || ''; + state = this.workerState.getState(req.workerId); + + this.send((state ? state : {}), req, ws); + if (!state) break; + + if (state.state != 'finish' && state.state != 'error') + await utils.sleep(refreshPause); + else + break; + + i++; + if (i > 2*60*1000/refreshPause) {//2 мин ждем телодвижений воркера + this.send({state: 'error', error: 'Время ожидания процесса истекло'}, req, ws); + } + i = (prevProgress != state.progress || prevState != state.state ? 1 : i); + } + } + +} + +module.exports = WebSocketController; diff --git a/server/controllers/WorkerController.js b/server/controllers/WorkerController.js index 00742c0e..e0ef77a9 100644 --- a/server/controllers/WorkerController.js +++ b/server/controllers/WorkerController.js @@ -26,6 +26,7 @@ class WorkerController extends BaseController { return false; } + //TODO: удалить бесполезную getStateFinish async getStateFinish(req, res) { const request = req.body; let error = ''; diff --git a/server/controllers/index.js b/server/controllers/index.js index fbbad7b7..9c2f4f07 100644 --- a/server/controllers/index.js +++ b/server/controllers/index.js @@ -2,4 +2,5 @@ module.exports = { MiscController: require('./MiscController'), ReaderController: require('./ReaderController'), WorkerController: require('./WorkerController'), + WebSocketController: require('./WebSocketController'), } \ No newline at end of file diff --git a/server/index.js b/server/index.js index 94bf47d9..9a15a6b2 100644 --- a/server/index.js +++ b/server/index.js @@ -4,6 +4,8 @@ const path = require('path'); const argv = require('minimist')(process.argv.slice(2)); const express = require('express'); const compression = require('compression'); +const http = require('http'); +const WebSocket = require ('ws'); async function init() { //config @@ -46,10 +48,13 @@ async function main() { const config = new (require('./config'))().config;//singleton //servers - for (let server of config.servers) { - if (server.mode !== 'none') { + for (let serverCfg of config.servers) { + if (serverCfg.mode !== 'none') { const app = express(); - const serverConfig = Object.assign({}, config, server); + const server = http.createServer(app); + const wss = new WebSocket.Server({ server, maxPayload: 10*1024*1024 }); + + const serverConfig = Object.assign({}, config, serverCfg); let devModule = undefined; if (serverConfig.branch == 'development') { @@ -73,7 +78,7 @@ async function main() { } })); - require('./routes').initRoutes(app, serverConfig); + require('./routes').initRoutes(app, wss, serverConfig); if (devModule) { devModule.logErrors(app); @@ -84,7 +89,7 @@ async function main() { }); } - app.listen(serverConfig.port, serverConfig.ip, function() { + server.listen(serverConfig.port, serverConfig.ip, function() { log(`Server-${serverConfig.serverName} is ready on ${serverConfig.ip}:${serverConfig.port}, mode: ${serverConfig.mode}`); }); } diff --git a/server/routes.js b/server/routes.js index 09b96239..93eb129a 100644 --- a/server/routes.js +++ b/server/routes.js @@ -2,10 +2,11 @@ const c = require('./controllers'); const utils = require('./core/utils'); const multer = require('multer'); -function initRoutes(app, config) { +function initRoutes(app, wss, config) { const misc = new c.MiscController(config); const reader = new c.ReaderController(config); const worker = new c.WorkerController(config); + new c.WebSocketController(wss, config); //access const [aAll, aNormal, aSite, aReader, aOmnireader] = // eslint-disable-line no-unused-vars