From 16b30c922af5a9cdf9f302800a528325107c24fd Mon Sep 17 00:00:00 2001 From: Book Pauk Date: Mon, 18 Jul 2022 23:54:25 +0700 Subject: [PATCH] =?UTF-8?q?=D0=A3=D0=BB=D1=83=D1=87=D1=88=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D0=B5=20=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D1=8B=20=D1=81=20?= =?UTF-8?q?=D1=83=D0=B4=D0=B0=D0=BB=D0=B5=D0=BD=D0=BD=D1=8B=D0=BC=20=D1=85?= =?UTF-8?q?=D1=80=D0=B0=D0=BD=D0=B8=D0=BB=D0=B8=D1=89=D0=B5=D0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/core/Reader/ReaderWorker.js | 121 +++++++++++++++++++++++------ 1 file changed, 97 insertions(+), 24 deletions(-) diff --git a/server/core/Reader/ReaderWorker.js b/server/core/Reader/ReaderWorker.js index a51565ef..bfc4e372 100644 --- a/server/core/Reader/ReaderWorker.js +++ b/server/core/Reader/ReaderWorker.js @@ -14,6 +14,8 @@ const utils = require('../utils'); const log = new (require('../AppLogger'))().log;//singleton const cleanDirPeriod = 30*60*1000;//раз в полчаса +const remoteSendPeriod = 119*1000;//примерно раз 2 минуты + const queue = new LimitedQueue(5, 100, 2*60*1000 + 15000);//2 минуты ожидание подвижек let instance = null; @@ -45,20 +47,27 @@ class ReaderWorker { ); } - this.remoteConfig = { - '/tmp': { + this.dirConfigArr = [ + { dir: this.config.tempPublicDir, + remoteDir: '/tmp', maxSize: this.config.maxTempPublicDirSize, moveToRemote: true, }, - '/upload': { + { dir: this.config.uploadDir, + remoteDir: '/upload', maxSize: this.config.maxUploadPublicDirSize, moveToRemote: true, } - }; + ]; + //преобразуем в объект для большего удобства + this.dirConfig = {}; + for (const configRec of this.dirConfigArr) + this.dirConfig[configRec.remoteDir] = configRec; - this.periodicCleanDir(this.remoteConfig);//no await + this.remoteFilesToSend = []; + this.periodicCleanDir();//no await instance = this; } @@ -159,6 +168,8 @@ class ReaderWorker { const finishFilename = path.basename(compFilename); wState.finish({path: `/tmp/${finishFilename}`, size: stat.size}); + this.pushRemoteSend(compFilename, '/tmp'); + } catch (e) { log(LM_ERR, e.stack); let mes = e.message.split('|FORLOG|'); @@ -199,6 +210,7 @@ class ReaderWorker { if (!await fs.pathExists(outFilename)) { await fs.move(file.path, outFilename); + this.pushRemoteSend(outFilename, '/upload'); } else { await utils.touchFile(outFilename); await fs.remove(file.path); @@ -213,6 +225,7 @@ class ReaderWorker { if (!await fs.pathExists(outFilename)) { await fs.writeFile(outFilename, buf); + this.pushRemoteSend(outFilename, '/upload'); } else { await utils.touchFile(outFilename); } @@ -230,8 +243,8 @@ class ReaderWorker { async restoreRemoteFile(filename, remoteDir) { let targetDir = ''; - if (this.remoteConfig[remoteDir]) - targetDir = this.remoteConfig[remoteDir].dir; + if (this.dirConfig[remoteDir]) + targetDir = this.dirConfig[remoteDir].dir; else throw new Error(`restoreRemoteFile: unknown remoteDir value (${remoteDir})`); @@ -252,7 +265,56 @@ class ReaderWorker { return targetName; } - async cleanDir(dir, remoteDir, maxSize, moveToRemote) { + pushRemoteSend(fileName, remoteDir) { + if (this.remoteStorage + && this.dirConfig[remoteDir] + && this.dirConfig[remoteDir].moveToRemote) { + this.remoteFilesToSend.push({fileName, remoteDir}); + } + } + + async remoteSendFile(sendFileRec) { + const {fileName, remoteDir} = sendFileRec; + const sent = this.remoteSent; + + if (!fileName || sent[fileName]) + return; + + log(`remoteStorage.putFile ${remoteDir}/${path.basename(fileName)}`); + + //отправляем в remoteStorage + await this.remoteStorage.putFile(fileName, remoteDir); + + sent[fileName] = true; + await this.appDb.insert({table: 'remote_sent', ignore: true, rows: [{id: fileName, remoteDir}]}); + } + + async remoteSendAll() { + if (!this.remoteStorage) + return; + + const newSendQueue = []; + while (this.remoteFilesToSend.length) { + const sendFileRec = this.remoteFilesToSend.shift(); + + if (sendFileRec.remoteDir + && this.dirConfig[sendFileRec.remoteDir] + && this.dirConfig[sendFileRec.remoteDir].moveToRemote) { + + try { + await this.remoteSendFile(sendFileRec); + } catch (e) { + newSendQueue.push(sendFileRec) + log(LM_ERR, e.stack); + } + } + } + + this.remoteFilesToSend = newSendQueue; + } + + async cleanDir(config) { + const {dir, remoteDir, maxSize, moveToRemote} = config; const sent = this.remoteSent; const list = await fs.readdir(dir); @@ -278,26 +340,20 @@ class ReaderWorker { for (const file of files) { foundFiles.add(file.name); - if (sent[file.name]) - continue; - - //отправляем в remoteStorage + //отсылаем на всякий случай перед удалением, если вдруг remoteSend не справился try { - log(`remoteStorage.putFile ${remoteDir}/${path.basename(file.name)}`); - await this.remoteStorage.putFile(file.name, remoteDir); - - sent[file.name] = true; - await this.appDb.insert({table: 'remote_sent', ignore: true, rows: [{id: file.name, remoteDir}]}); + await this.remoteSendFile({fileName: file.name, remoteDir}); } catch (e) { log(LM_ERR, e.stack); } } //почистим remoteSent и БД - //несколько неоптимально, таскает все записи из БД + //несколько неоптимально, таскает все записи из таблицы const rows = await this.appDb.select({table: 'remote_sent'}); for (const row of rows) { - if (row.remoteDir === remoteDir && !foundFiles.has(row.id)) { + if ((row.remoteDir === remoteDir && !foundFiles.has(row.id)) + || !this.dirConfig[row.remoteDir]) { delete sent[row.id]; await this.appDb.delete({table: 'remote_sent', where: `@@id(${this.appDb.esc(row.id)})`}); } @@ -325,7 +381,7 @@ class ReaderWorker { log(`removed ${j} files`); } - async periodicCleanDir(cleanConfig) { + async periodicCleanDir() { try { if (!this.remoteSent) this.remoteSent = {}; @@ -338,17 +394,34 @@ class ReaderWorker { } } + let lastCleanDirTime = 0; + let lastRemoteSendTime = 0; while (1) {// eslint-disable-line no-constant-condition - - for (const [remoteDir, config] of Object.entries(cleanConfig)) { + //отсылка в удаленное хранилище + if (Date.now() - lastRemoteSendTime >= remoteSendPeriod) { try { - await this.cleanDir(config.dir, remoteDir, config.maxSize, config.moveToRemote); + await this.remoteSendAll(); } catch(e) { log(LM_ERR, e.stack); } + + lastRemoteSendTime = Date.now(); } - await utils.sleep(cleanDirPeriod); + //чистка папок + if (Date.now() - lastCleanDirTime >= cleanDirPeriod) { + for (const config of Object.values(this.dirConfig)) { + try { + await this.cleanDir(config); + } catch(e) { + log(LM_ERR, e.stack); + } + } + + lastCleanDirTime = Date.now(); + } + + await utils.sleep(60*1000);//интервал проверки 1 минута } } catch (e) { log(LM_FATAL, e.message);