From 08d0d3e7f3678e706839517736cb90282358b3bc Mon Sep 17 00:00:00 2001 From: Book Pauk Date: Tue, 26 Jul 2022 20:12:44 +0700 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B0=D0=B1=D0=BE=D1=82=D0=B0=20=D0=BD?= =?UTF-8?q?=D0=B0=D0=B4=20BookUpdateChecker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BookUpdateCheckerController.js | 4 +- server/core/BookUpdateChecker/BUCClient.js | 104 ++++++++++++++++-- server/core/BookUpdateChecker/BUCServer.js | 26 +++-- 3 files changed, 115 insertions(+), 19 deletions(-) diff --git a/server/controllers/BookUpdateCheckerController.js b/server/controllers/BookUpdateCheckerController.js index ba349641..6c5ef770 100644 --- a/server/controllers/BookUpdateCheckerController.js +++ b/server/controllers/BookUpdateCheckerController.js @@ -48,7 +48,7 @@ class BookUpdateCheckerController { let req = {}; try { if (this.isDevelopment) { - log(`WebSocket-IN: ${message.substr(0, 4000)}`); + log(`BUC-WebSocket-IN: ${message.substr(0, 4000)}`); } req = JSON.parse(message); @@ -88,7 +88,7 @@ class BookUpdateCheckerController { ws.send(message); if (this.isDevelopment) { - log(`WebSocket-OUT: ${message.substr(0, 4000)}`); + log(`BUC-WebSocket-OUT: ${message.substr(0, 4000)}`); } } diff --git a/server/core/BookUpdateChecker/BUCClient.js b/server/core/BookUpdateChecker/BUCClient.js index 074cbaf0..02c1882b 100644 --- a/server/core/BookUpdateChecker/BUCClient.js +++ b/server/core/BookUpdateChecker/BUCClient.js @@ -1,4 +1,4 @@ -const WebSocketConnection = require('./WebSocketConnection'); +const WebSocketConnection = require('../WebSocketConnection'); const JembaConnManager = require('../../db/JembaConnManager');//singleton const ayncExit = new (require('../AsyncExit'))(); @@ -18,7 +18,7 @@ class BUCClient { this.config = config; this.connManager = new JembaConnManager(); - this.db = this.connManager.db['book-update-server']; + this.appDb = this.connManager.db['app']; this.wsc = new WebSocketConnection(config.bucServer.url, 10, 30, {rejectUnauthorized: false}); this.accessToken = config.bucServer.accessToken; @@ -28,7 +28,7 @@ class BUCClient { this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших this.syncPeriod = 1*hourMs;//период синхронизации с сервером BUC } else { - this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших + this.cleanQueryInterval = 300*minuteMs;//300*dayMs;//интервал очистки устаревших this.syncPeriod = 1*minuteMs;//период синхронизации с сервером BUC } @@ -45,20 +45,33 @@ class BUCClient { async wsRequest(query) { const response = await this.wsc.message( - await this.wsc.send(Object.assign({accessToken: this.accessToken}, query), 600), - 600 + await this.wsc.send(Object.assign({accessToken: this.accessToken}, query), 60), + 60 ); if (response.error) throw new Error(response.error); return response; } + async wsGetBuc(fromCheckTime, callback) { + const requestId = await this.wsc.send({accessToken: this.accessToken, action: 'get-buc', fromCheckTime}, 60); + while (1) {//eslint-disable-line + const res = await this.wsc.message(requestId, 60); + + if (res.state == 'get') { + await callback(res.rows); + } else { + break; + } + } + } + async wsUpdateBuc(bookUrls) { return await this.wsRequest({action: 'update-buc', bookUrls}); } async checkBuc(bookUrls) { - const db = this.db; + const db = this.appDb; for (const url of bookUrls) this.bookUrls.add(url); @@ -72,7 +85,7 @@ class BUCClient { } async findMaxCheckTime() { - const db = this.db; + const db = this.appDb; let result = 1; @@ -89,7 +102,7 @@ class BUCClient { max = row.checkTime; maxId = row.id; } - }; + }); if (maxId) result.add(maxId); @@ -105,19 +118,25 @@ class BUCClient { } async periodicSync() { + const db = this.appDb; + while (1) {//eslint-disable-line try { //сначала отправим this.bookUrls + log(`client: remote update buc begin`); + const arr = Array.from(this.bookUrls); this.bookUrls = new Set(); const chunkSize = 100; + let updated = 0; for (let i = 0; i < arr.length; i += chunkSize) { const chunk = arr.slice(i, i + chunkSize); const res = await this.wsUpdateBuc(chunk); if (!res.error && res.state == 'success') { //update success + updated += chunk.length; } else { for (const url of chunk) { this.bookUrls.add(url); @@ -125,11 +144,78 @@ class BUCClient { log(LM_ERR, `update-buc error: ${(res.error ? res.error : `wrong state "${res.state}"`)}`); } } + log(`client: remote update buc end, updated ${updated} urls`); //почистим нашу таблицу 'buc' - this.cleanQueryInterval + log(`client: clean 'buc' table begin`); + const cleanTime = Date.now() - this.cleanQueryInterval; + while (1) {//eslint-disable-line + //выборка всех по кусочкам + const rows = await db.select({ + table: 'buc', + where: ` + let iter = @getItem('clean'); + if (!iter) { + iter = @all(); + @setItem('clean', iter); + } + + const ids = new Set(); + let id = iter.next(); + while (!id.done && ids.size < 1000) { + ids.add(id.value); + id = iter.next(); + } + + return ids; + ` + }); + + if (rows.length) { + const toDelIds = []; + for (const row of rows) + if (row.queryTime <= cleanTime) + toDelIds.push(row.id); + + //удаление + const res = await db.delete({ + table: 'buc', + where: `@@id(${db.esc(toDelIds)})`, + }); + + log(`client: clean 'buc' deleted ${res.deleted}`); + } else { + break; + } + } + await db.select({ + table: 'buc', + where: ` + @delItem('clean'); + return new Set(); + ` + }); + + log(`client: clean 'buc' table end`); //синхронизация с сервером BUC + log(`client: sync 'buc' table begin`); + this.fromCheckTime -= 30*minuteMs;//минус полчаса на всякий случай + await this.wsGetBuc(this.fromCheckTime, async(rows) => { + for (const row of rows) { + if (row.checkTime > this.fromCheckTime) + this.fromCheckTime = row.checkTime; + } + + const res = await db.insert({ + table: 'buc', + replace: true, + rows + }); + + log(`client: sync 'buc' table, inserted ${res.inserted} rows, replaced ${res.replaced}`); + }); + log(`client: sync 'buc' table end`); } catch (e) { log(LM_ERR, e.stack); } diff --git a/server/core/BookUpdateChecker/BUCServer.js b/server/core/BookUpdateChecker/BUCServer.js index 331c70c8..f2822962 100644 --- a/server/core/BookUpdateChecker/BUCServer.js +++ b/server/core/BookUpdateChecker/BUCServer.js @@ -35,7 +35,7 @@ class BUCServer { this.periodicCheckWait = 500;//пауза, если нечего делать this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших - this.oldQueryInterval = 5*minuteMs;//интервал устаревания запроса на обновление + this.oldQueryInterval = 30*dayMs;//интервал устаревания запроса на обновление this.checkingInterval = 30*1000;//интервал проверки обновления одного и того же файла this.sameHostCheckInterval = 1000;//интервал проверки файла на том же сайте, не менее } @@ -63,14 +63,17 @@ class BUCServer { async getBuc(fromCheckTime, callback) { const db = this.db; + const iterName = utils.randomHexString(30); + while (1) {//eslint-disable-line const rows = await db.select({ table: 'buc', where: ` - let iter = @getItem('getBuc'); + let iter = @getItem(${db.esc(iterName)}); if (!iter) { iter = @dirtyIndexLR('checkTime', ${db.esc(fromCheckTime)}); - @setItem('getBuc', iter); + iter = iter.values(); + @setItem(${db.esc(iterName)}, iter); } const ids = new Set(); @@ -89,6 +92,14 @@ class BUCServer { else break; } + + await db.select({ + table: 'buc', + where: ` + @delItem(${db.esc(iterName)}); + return new Set(); + ` + }); } async updateBuc(bookUrls) { @@ -111,7 +122,7 @@ class BUCServer { for (let id of bookUrls) { if (!id) continue; - + if (id.length > 1000) { id = id.substring(0, 1000); } @@ -174,10 +185,7 @@ class BUCServer { rows = await db.select({table: 'buc', count: true}); log(LM_WARN, `'buc' table length: ${rows[0].count}`); -/* -rows = await db.select({table: 'buc'}); -console.log(rows); -*/ + now = Date.now(); //выборка кандидатов rows = await db.select({ @@ -191,6 +199,8 @@ console.log(rows); ` }); +//console.log(rows); + if (rows.length) { const ids = [];