Улучшение работы с удаленным хранилищем

This commit is contained in:
Book Pauk
2022-07-18 23:54:25 +07:00
parent c42ad66be6
commit 16b30c922a

View File

@@ -14,6 +14,8 @@ const utils = require('../utils');
const log = new (require('../AppLogger'))().log;//singleton
const cleanDirPeriod = 30*60*1000;//раз в полчаса
const remoteSendPeriod = 119*1000;//примерно раз 2 минуты
const queue = new LimitedQueue(5, 100, 2*60*1000 + 15000);//2 минуты ожидание подвижек
let instance = null;
@@ -45,20 +47,27 @@ class ReaderWorker {
);
}
this.remoteConfig = {
'/tmp': {
this.dirConfigArr = [
{
dir: this.config.tempPublicDir,
remoteDir: '/tmp',
maxSize: this.config.maxTempPublicDirSize,
moveToRemote: true,
},
'/upload': {
{
dir: this.config.uploadDir,
remoteDir: '/upload',
maxSize: this.config.maxUploadPublicDirSize,
moveToRemote: true,
}
};
];
//преобразуем в объект для большего удобства
this.dirConfig = {};
for (const configRec of this.dirConfigArr)
this.dirConfig[configRec.remoteDir] = configRec;
this.periodicCleanDir(this.remoteConfig);//no await
this.remoteFilesToSend = [];
this.periodicCleanDir();//no await
instance = this;
}
@@ -159,6 +168,8 @@ class ReaderWorker {
const finishFilename = path.basename(compFilename);
wState.finish({path: `/tmp/${finishFilename}`, size: stat.size});
this.pushRemoteSend(compFilename, '/tmp');
} catch (e) {
log(LM_ERR, e.stack);
let mes = e.message.split('|FORLOG|');
@@ -199,6 +210,7 @@ class ReaderWorker {
if (!await fs.pathExists(outFilename)) {
await fs.move(file.path, outFilename);
this.pushRemoteSend(outFilename, '/upload');
} else {
await utils.touchFile(outFilename);
await fs.remove(file.path);
@@ -213,6 +225,7 @@ class ReaderWorker {
if (!await fs.pathExists(outFilename)) {
await fs.writeFile(outFilename, buf);
this.pushRemoteSend(outFilename, '/upload');
} else {
await utils.touchFile(outFilename);
}
@@ -230,8 +243,8 @@ class ReaderWorker {
async restoreRemoteFile(filename, remoteDir) {
let targetDir = '';
if (this.remoteConfig[remoteDir])
targetDir = this.remoteConfig[remoteDir].dir;
if (this.dirConfig[remoteDir])
targetDir = this.dirConfig[remoteDir].dir;
else
throw new Error(`restoreRemoteFile: unknown remoteDir value (${remoteDir})`);
@@ -252,7 +265,56 @@ class ReaderWorker {
return targetName;
}
async cleanDir(dir, remoteDir, maxSize, moveToRemote) {
pushRemoteSend(fileName, remoteDir) {
if (this.remoteStorage
&& this.dirConfig[remoteDir]
&& this.dirConfig[remoteDir].moveToRemote) {
this.remoteFilesToSend.push({fileName, remoteDir});
}
}
async remoteSendFile(sendFileRec) {
const {fileName, remoteDir} = sendFileRec;
const sent = this.remoteSent;
if (!fileName || sent[fileName])
return;
log(`remoteStorage.putFile ${remoteDir}/${path.basename(fileName)}`);
//отправляем в remoteStorage
await this.remoteStorage.putFile(fileName, remoteDir);
sent[fileName] = true;
await this.appDb.insert({table: 'remote_sent', ignore: true, rows: [{id: fileName, remoteDir}]});
}
async remoteSendAll() {
if (!this.remoteStorage)
return;
const newSendQueue = [];
while (this.remoteFilesToSend.length) {
const sendFileRec = this.remoteFilesToSend.shift();
if (sendFileRec.remoteDir
&& this.dirConfig[sendFileRec.remoteDir]
&& this.dirConfig[sendFileRec.remoteDir].moveToRemote) {
try {
await this.remoteSendFile(sendFileRec);
} catch (e) {
newSendQueue.push(sendFileRec)
log(LM_ERR, e.stack);
}
}
}
this.remoteFilesToSend = newSendQueue;
}
async cleanDir(config) {
const {dir, remoteDir, maxSize, moveToRemote} = config;
const sent = this.remoteSent;
const list = await fs.readdir(dir);
@@ -278,26 +340,20 @@ class ReaderWorker {
for (const file of files) {
foundFiles.add(file.name);
if (sent[file.name])
continue;
//отправляем в remoteStorage
//отсылаем на всякий случай перед удалением, если вдруг remoteSend не справился
try {
log(`remoteStorage.putFile ${remoteDir}/${path.basename(file.name)}`);
await this.remoteStorage.putFile(file.name, remoteDir);
sent[file.name] = true;
await this.appDb.insert({table: 'remote_sent', ignore: true, rows: [{id: file.name, remoteDir}]});
await this.remoteSendFile({fileName: file.name, remoteDir});
} catch (e) {
log(LM_ERR, e.stack);
}
}
//почистим remoteSent и БД
//несколько неоптимально, таскает все записи из БД
//несколько неоптимально, таскает все записи из таблицы
const rows = await this.appDb.select({table: 'remote_sent'});
for (const row of rows) {
if (row.remoteDir === remoteDir && !foundFiles.has(row.id)) {
if ((row.remoteDir === remoteDir && !foundFiles.has(row.id))
|| !this.dirConfig[row.remoteDir]) {
delete sent[row.id];
await this.appDb.delete({table: 'remote_sent', where: `@@id(${this.appDb.esc(row.id)})`});
}
@@ -325,7 +381,7 @@ class ReaderWorker {
log(`removed ${j} files`);
}
async periodicCleanDir(cleanConfig) {
async periodicCleanDir() {
try {
if (!this.remoteSent)
this.remoteSent = {};
@@ -338,17 +394,34 @@ class ReaderWorker {
}
}
let lastCleanDirTime = 0;
let lastRemoteSendTime = 0;
while (1) {// eslint-disable-line no-constant-condition
for (const [remoteDir, config] of Object.entries(cleanConfig)) {
//отсылка в удаленное хранилище
if (Date.now() - lastRemoteSendTime >= remoteSendPeriod) {
try {
await this.cleanDir(config.dir, remoteDir, config.maxSize, config.moveToRemote);
await this.remoteSendAll();
} catch(e) {
log(LM_ERR, e.stack);
}
lastRemoteSendTime = Date.now();
}
await utils.sleep(cleanDirPeriod);
//чистка папок
if (Date.now() - lastCleanDirTime >= cleanDirPeriod) {
for (const config of Object.values(this.dirConfig)) {
try {
await this.cleanDir(config);
} catch(e) {
log(LM_ERR, e.stack);
}
}
lastCleanDirTime = Date.now();
}
await utils.sleep(60*1000);//интервал проверки 1 минута
}
} catch (e) {
log(LM_FATAL, e.message);