diff --git a/server/config/base.js b/server/config/base.js index e72f0dc4..d11c1a5b 100644 --- a/server/config/base.js +++ b/server/config/base.js @@ -60,6 +60,7 @@ module.exports = { keysFile: 'server', ip: '0.0.0.0', port: '33443', + accessToken: '', }*/ ], @@ -75,6 +76,7 @@ module.exports = { /* bucServer: { url: 'wss://127.0.0.1:33443', + accessToken: '', } */ }; diff --git a/server/controllers/BookUpdateCheckerController.js b/server/controllers/BookUpdateCheckerController.js index 8b54616e..ba349641 100644 --- a/server/controllers/BookUpdateCheckerController.js +++ b/server/controllers/BookUpdateCheckerController.js @@ -13,6 +13,7 @@ class BookUpdateCheckerController { this.config = config; this.isDevelopment = (config.branch == 'development'); + this.accessToken = config.accessToken; this.bucServer = new BUCServer(config); this.wss = wss; @@ -57,6 +58,9 @@ class BookUpdateCheckerController { //pong for WebSocketConnection this.send({_rok: 1}, req, ws); + if (req.accessToken !== this.accessToken) + throw new Error('Access denied'); + switch (req.action) { case 'test': await this.test(req, ws); break; diff --git a/server/core/BookUpdateChecker/BUCClient.js b/server/core/BookUpdateChecker/BUCClient.js index 3379c00c..074cbaf0 100644 --- a/server/core/BookUpdateChecker/BUCClient.js +++ b/server/core/BookUpdateChecker/BUCClient.js @@ -1,3 +1,4 @@ +const WebSocketConnection = require('./WebSocketConnection'); const JembaConnManager = require('../../db/JembaConnManager');//singleton const ayncExit = new (require('../AsyncExit'))(); @@ -6,6 +7,7 @@ const log = new (require('../AppLogger'))().log;//singleton const minuteMs = 60*1000; const hourMs = 60*minuteMs; +const dayMs = 24*hourMs; let instance = null; @@ -18,14 +20,20 @@ class BUCClient { this.connManager = new JembaConnManager(); this.db = this.connManager.db['book-update-server']; + this.wsc = new WebSocketConnection(config.bucServer.url, 10, 30, {rejectUnauthorized: false}); + this.accessToken = config.bucServer.accessToken; + //константы if (this.config.branch !== 'development') { + this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших this.syncPeriod = 1*hourMs;//период синхронизации с сервером BUC } else { + this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших this.syncPeriod = 1*minuteMs;//период синхронизации с сервером BUC } this.fromCheckTime = 1; + this.bookUrls = new Set(); this.main();//no await @@ -35,23 +43,109 @@ class BUCClient { return instance; } + async wsRequest(query) { + const response = await this.wsc.message( + await this.wsc.send(Object.assign({accessToken: this.accessToken}, query), 600), + 600 + ); + if (response.error) + throw new Error(response.error); + return response; + } + + async wsUpdateBuc(bookUrls) { + return await this.wsRequest({action: 'update-buc', bookUrls}); + } + async checkBuc(bookUrls) { - return []; + const db = this.db; + + for (const url of bookUrls) + this.bookUrls.add(url); + + const rows = await db.select({ + table: 'buc', + where: `@@id(${db.esc(bookUrls)})` + }); + + return rows; } async findMaxCheckTime() { + const db = this.db; + let result = 1; + //одним куском, возможно будет жрать память + const rows = await db.select({ + table: 'buc', + where: ` + const result = new Set(); + let max = 0; + let maxId = null; + + @iter(@all(), (row) => { + if (row.checkTime > max) { + max = row.checkTime; + maxId = row.id; + } + }; + + if (maxId) + result.add(maxId); + + return result; + ` + }); + + if (rows.length) + result = rows[0].checkTime; + return result; } - async main() { - if (!this.config.bucEnabled) - throw new Error('BookUpdateChecker disabled'); + async periodicSync() { + while (1) {//eslint-disable-line + try { + //сначала отправим this.bookUrls + const arr = Array.from(this.bookUrls); + this.bookUrls = new Set(); + const chunkSize = 100; + for (let i = 0; i < arr.length; i += chunkSize) { + const chunk = arr.slice(i, i + chunkSize); + + const res = await this.wsUpdateBuc(chunk); + if (!res.error && res.state == 'success') { + //update success + } else { + for (const url of chunk) { + this.bookUrls.add(url); + } + log(LM_ERR, `update-buc error: ${(res.error ? res.error : `wrong state "${res.state}"`)}`); + } + } + + //почистим нашу таблицу 'buc' + this.cleanQueryInterval + + //синхронизация с сервером BUC + } catch (e) { + log(LM_ERR, e.stack); + } + + await utils.sleep(this.syncPeriod); + } + } + + async main() { try { + if (!this.config.bucEnabled) + throw new Error('BookUpdateChecker disabled'); + this.fromCheckTime = await this.findMaxCheckTime(); - //this.periodicSync();//no await + + this.periodicSync();//no await log(`BUC Client started`); } catch (e) { diff --git a/server/core/BookUpdateChecker/BUCServer.js b/server/core/BookUpdateChecker/BUCServer.js index e83bea14..331c70c8 100644 --- a/server/core/BookUpdateChecker/BUCServer.js +++ b/server/core/BookUpdateChecker/BUCServer.js @@ -95,11 +95,58 @@ class BUCServer { const db = this.db; const now = Date.now(); - await db.update({ + const rows = await db.select({ table: 'buc', - mod: `(r) => r.queryTime = ${db.esc(now)}`, + map: `(r) => ({id: r.id})`, where: `@@id(${db.esc(bookUrls)})` }); + + const exists = new Set(); + for (const row of rows) { + exists.add(row.id); + } + + const toUpdateIds = []; + const toInsertRows = []; + for (let id of bookUrls) { + if (!id) + continue; + + if (id.length > 1000) { + id = id.substring(0, 1000); + } + + if (exists.has(id)) { + toUpdateIds.push(id); + } else { + toInsertRows.push({ + id, + queryTime: now, + checkTime: 0, // 0 - never checked + modTime: '', + size: 0, + checkSum: '', //sha256 + state: 0, // 0 - not processing, 1 - processing + error: '', + }); + } + } + + if (toUpdateIds.length) { + await db.update({ + table: 'buc', + mod: `(r) => r.queryTime = ${db.esc(now)}`, + where: `@@id(${db.esc(toUpdateIds)})` + }); + } + + if (toInsertRows.length) { + await db.insert({ + table: 'buc', + ignore: true, + rows: toInsertRows, + }); + } } async fillCheckQueue() { @@ -147,11 +194,10 @@ console.log(rows); if (rows.length) { const ids = []; - for (let i = 0; i < rows.length; i++) { + for (const row of rows) { if (this.checkQueue.length >= this.maxCheckQueueLength) break; - const row = rows[i]; ids.push(row.id); this.checkQueue.push(row); } diff --git a/server/db/jembaMigrations/app/002-create.js b/server/db/jembaMigrations/app/002-create.js index a7260e8b..cc0dc217 100644 --- a/server/db/jembaMigrations/app/002-create.js +++ b/server/db/jembaMigrations/app/002-create.js @@ -5,15 +5,13 @@ module.exports = { id, // book URL queryTime: Number, checkTime: Number, // 0 - never checked + modTime: String, size: Number, checkSum: String, //sha256 state: Number, // 0 - not processing, 1 - processing error: String, }*/ - table: 'buc', - index: [ - {field: 'queryTime', type: 'number'}, - ] + table: 'buc' }], ], down: [ diff --git a/server/db/jembaMigrations/book-update-server/001-create.js b/server/db/jembaMigrations/book-update-server/001-create.js index d091315a..089a61e5 100644 --- a/server/db/jembaMigrations/book-update-server/001-create.js +++ b/server/db/jembaMigrations/book-update-server/001-create.js @@ -5,6 +5,7 @@ module.exports = { id, // book URL queryTime: Number, checkTime: Number, // 0 - never checked + modTime: String, size: Number, checkSum: String, //sha256 state: Number, // 0 - not processing, 1 - processing