From 86aa3511bfdf148be8896fd19682a6d5fac31da3 Mon Sep 17 00:00:00 2001 From: Book Pauk Date: Sun, 23 Oct 2022 16:32:55 +0700 Subject: [PATCH] =?UTF-8?q?=D0=9D=D0=BE=D0=B2=D1=8B=D0=B9=20=D0=BC=D0=BE?= =?UTF-8?q?=D0=B4=D1=83=D0=BB=D1=8C=20HeavyCalc=20=D0=B4=D0=BB=D1=8F=20?= =?UTF-8?q?=D1=82=D1=8F=D0=B6=D0=B5=D0=BB=D1=8B=D1=85=20=D0=B2=D1=8B=D1=87?= =?UTF-8?q?=D0=B8=D1=81=D0=BB=D0=B5=D0=BD=D0=B8=D0=B9=20=D0=B2=20=D0=BE?= =?UTF-8?q?=D1=82=D0=B4=D0=B5=D0=BB=D1=8C=D0=BD=D0=BE=D0=BC=20=D0=BF=D0=BE?= =?UTF-8?q?=D1=82=D0=BE=D0=BA=D0=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/core/HeavyCalc.js | 133 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 server/core/HeavyCalc.js diff --git a/server/core/HeavyCalc.js b/server/core/HeavyCalc.js new file mode 100644 index 0000000..a12f31e --- /dev/null +++ b/server/core/HeavyCalc.js @@ -0,0 +1,133 @@ +const { Worker } = require('worker_threads'); + +class CalcThread { + constructor() { + this.worker = null; + this.listeners = new Map(); + this.requestId = 0; + + this.runWorker(); + } + + terminate() { + if (this.worker) { + this.worker.terminate(); + + for (const listener of this.listeners.values()) { + listener({error: 'Worker terminated'}); + } + } + this.worker = null; + } + + runWorker() { + const workerProc = () => { + const { parentPort } = require('worker_threads'); + + const sleep = (ms) => { + return new Promise(resolve => setTimeout(resolve, ms)); + }; + + if (parentPort) { + parentPort.on('message', async(mes) => { + let result = {}; + try { + const fn = new Function(`'use strict'; return ${mes.fn}`)(); + result.result = await fn(mes.args, sleep); + } catch (e) { + result = {error: e.message}; + } + + result.requestId = mes.requestId; + parentPort.postMessage(result); + }); + } + }; + + const worker = new Worker(`const wp = ${workerProc.toString()}; wp();`, {eval: true}); + + worker.on('message', (mes) => { + const listener = this.listeners.get(mes.requestId); + if (listener) { + this.listeners.delete(mes.requestId); + listener(mes); + } + }); + + worker.on('error', (err) => { + console.error(err); + }); + + worker.on('exit', () => { + this.terminate(); + }); + + this.worker = worker; + } + + //async + run(params) {//args, fn + return new Promise((resolve, reject) => { + this.requestId++; + + this.listeners.set(this.requestId, (mes) => { + if (mes.error) + reject(new Error(mes.error)); + else + resolve(mes.result); + }); + + if (this.worker) { + this.worker.postMessage({requestId: this.requestId, args: params.args, fn: params.fn.toString()}); + } else { + reject(new Error('Worker does not exist')); + } + }); + } +} + +class HeavyCalc { + constructor(opts = {}) { + this.threads = opts.threads || 1; + this.singleton = opts.singleton || false; + this.terminated = false; + + this.workers = []; + this.load = []; + for (let i = 0; i < this.threads; i++) { + const worker = new CalcThread(); + this.workers.push(worker); + this.load.push(0); + } + } + + async run(params) { + if (this.terminated || !this.workers.length) + throw new Error('All workers terminated'); + + //находим поток с минимальной нагрузкой + let found = 0; + for (let i = 1; i < this.load.length; i++) { + if (this.load[i] < this.load[found]) + found = i; + } + + try { + this.load[found]++; + return await this.workers[found].run(params); + } finally { + this.load[found]--; + } + } + + terminate() { + for (let i = 0; i < this.workers.length; i++) { + this.workers[i].terminate(); + } + this.workers = []; + this.load = []; + this.terminated = true; + } +} + +module.exports = HeavyCalc; \ No newline at end of file