Работа над BookUpdateChecker

This commit is contained in:
Book Pauk
2022-07-26 18:43:42 +07:00
parent e6008b5ec4
commit 2c47b2bee3
6 changed files with 158 additions and 13 deletions

View File

@@ -60,6 +60,7 @@ module.exports = {
keysFile: 'server',
ip: '0.0.0.0',
port: '33443',
accessToken: '',
}*/
],
@@ -75,6 +76,7 @@ module.exports = {
/*
bucServer: {
url: 'wss://127.0.0.1:33443',
accessToken: '',
}
*/
};

View File

@@ -13,6 +13,7 @@ class BookUpdateCheckerController {
this.config = config;
this.isDevelopment = (config.branch == 'development');
this.accessToken = config.accessToken;
this.bucServer = new BUCServer(config);
this.wss = wss;
@@ -57,6 +58,9 @@ class BookUpdateCheckerController {
//pong for WebSocketConnection
this.send({_rok: 1}, req, ws);
if (req.accessToken !== this.accessToken)
throw new Error('Access denied');
switch (req.action) {
case 'test':
await this.test(req, ws); break;

View File

@@ -1,3 +1,4 @@
const WebSocketConnection = require('./WebSocketConnection');
const JembaConnManager = require('../../db/JembaConnManager');//singleton
const ayncExit = new (require('../AsyncExit'))();
@@ -6,6 +7,7 @@ const log = new (require('../AppLogger'))().log;//singleton
const minuteMs = 60*1000;
const hourMs = 60*minuteMs;
const dayMs = 24*hourMs;
let instance = null;
@@ -18,14 +20,20 @@ class BUCClient {
this.connManager = new JembaConnManager();
this.db = this.connManager.db['book-update-server'];
this.wsc = new WebSocketConnection(config.bucServer.url, 10, 30, {rejectUnauthorized: false});
this.accessToken = config.bucServer.accessToken;
//константы
if (this.config.branch !== 'development') {
this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
this.syncPeriod = 1*hourMs;//период синхронизации с сервером BUC
} else {
this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
this.syncPeriod = 1*minuteMs;//период синхронизации с сервером BUC
}
this.fromCheckTime = 1;
this.bookUrls = new Set();
this.main();//no await
@@ -35,23 +43,109 @@ class BUCClient {
return instance;
}
async wsRequest(query) {
const response = await this.wsc.message(
await this.wsc.send(Object.assign({accessToken: this.accessToken}, query), 600),
600
);
if (response.error)
throw new Error(response.error);
return response;
}
async wsUpdateBuc(bookUrls) {
return await this.wsRequest({action: 'update-buc', bookUrls});
}
async checkBuc(bookUrls) {
return [];
const db = this.db;
for (const url of bookUrls)
this.bookUrls.add(url);
const rows = await db.select({
table: 'buc',
where: `@@id(${db.esc(bookUrls)})`
});
return rows;
}
async findMaxCheckTime() {
const db = this.db;
let result = 1;
//одним куском, возможно будет жрать память
const rows = await db.select({
table: 'buc',
where: `
const result = new Set();
let max = 0;
let maxId = null;
@iter(@all(), (row) => {
if (row.checkTime > max) {
max = row.checkTime;
maxId = row.id;
}
};
if (maxId)
result.add(maxId);
return result;
`
});
if (rows.length)
result = rows[0].checkTime;
return result;
}
async main() {
if (!this.config.bucEnabled)
throw new Error('BookUpdateChecker disabled');
async periodicSync() {
while (1) {//eslint-disable-line
try {
//сначала отправим this.bookUrls
const arr = Array.from(this.bookUrls);
this.bookUrls = new Set();
const chunkSize = 100;
for (let i = 0; i < arr.length; i += chunkSize) {
const chunk = arr.slice(i, i + chunkSize);
const res = await this.wsUpdateBuc(chunk);
if (!res.error && res.state == 'success') {
//update success
} else {
for (const url of chunk) {
this.bookUrls.add(url);
}
log(LM_ERR, `update-buc error: ${(res.error ? res.error : `wrong state "${res.state}"`)}`);
}
}
//почистим нашу таблицу 'buc'
this.cleanQueryInterval
//синхронизация с сервером BUC
} catch (e) {
log(LM_ERR, e.stack);
}
await utils.sleep(this.syncPeriod);
}
}
async main() {
try {
if (!this.config.bucEnabled)
throw new Error('BookUpdateChecker disabled');
this.fromCheckTime = await this.findMaxCheckTime();
//this.periodicSync();//no await
this.periodicSync();//no await
log(`BUC Client started`);
} catch (e) {

View File

@@ -95,11 +95,58 @@ class BUCServer {
const db = this.db;
const now = Date.now();
await db.update({
const rows = await db.select({
table: 'buc',
mod: `(r) => r.queryTime = ${db.esc(now)}`,
map: `(r) => ({id: r.id})`,
where: `@@id(${db.esc(bookUrls)})`
});
const exists = new Set();
for (const row of rows) {
exists.add(row.id);
}
const toUpdateIds = [];
const toInsertRows = [];
for (let id of bookUrls) {
if (!id)
continue;
if (id.length > 1000) {
id = id.substring(0, 1000);
}
if (exists.has(id)) {
toUpdateIds.push(id);
} else {
toInsertRows.push({
id,
queryTime: now,
checkTime: 0, // 0 - never checked
modTime: '',
size: 0,
checkSum: '', //sha256
state: 0, // 0 - not processing, 1 - processing
error: '',
});
}
}
if (toUpdateIds.length) {
await db.update({
table: 'buc',
mod: `(r) => r.queryTime = ${db.esc(now)}`,
where: `@@id(${db.esc(toUpdateIds)})`
});
}
if (toInsertRows.length) {
await db.insert({
table: 'buc',
ignore: true,
rows: toInsertRows,
});
}
}
async fillCheckQueue() {
@@ -147,11 +194,10 @@ console.log(rows);
if (rows.length) {
const ids = [];
for (let i = 0; i < rows.length; i++) {
for (const row of rows) {
if (this.checkQueue.length >= this.maxCheckQueueLength)
break;
const row = rows[i];
ids.push(row.id);
this.checkQueue.push(row);
}

View File

@@ -5,15 +5,13 @@ module.exports = {
id, // book URL
queryTime: Number,
checkTime: Number, // 0 - never checked
modTime: String,
size: Number,
checkSum: String, //sha256
state: Number, // 0 - not processing, 1 - processing
error: String,
}*/
table: 'buc',
index: [
{field: 'queryTime', type: 'number'},
]
table: 'buc'
}],
],
down: [

View File

@@ -5,6 +5,7 @@ module.exports = {
id, // book URL
queryTime: Number,
checkTime: Number, // 0 - never checked
modTime: String,
size: Number,
checkSum: String, //sha256
state: Number, // 0 - not processing, 1 - processing