262 lines
8.6 KiB
JavaScript
262 lines
8.6 KiB
JavaScript
const WebSocketConnection = require('../WebSocketConnection');
|
||
const JembaConnManager = require('../../db/JembaConnManager');//singleton
|
||
|
||
const ayncExit = new (require('../AsyncExit'))();
|
||
const utils = require('../utils');
|
||
const log = new (require('../AppLogger'))().log;//singleton
|
||
|
||
const minuteMs = 60*1000;
|
||
const hourMs = 60*minuteMs;
|
||
const dayMs = 24*hourMs;
|
||
|
||
let instance = null;
|
||
|
||
//singleton
|
||
class BUCClient {
|
||
constructor(config) {
|
||
if (!instance) {
|
||
this.config = config;
|
||
|
||
this.connManager = new JembaConnManager();
|
||
this.appDb = this.connManager.db['app'];
|
||
|
||
this.wsc = new WebSocketConnection(config.bucServer.url, 10, 30, {rejectUnauthorized: false});
|
||
this.accessToken = config.bucServer.accessToken;
|
||
|
||
//константы
|
||
if (this.config.branch !== 'development') {
|
||
this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
|
||
this.syncPeriod = 1*hourMs;//период синхронизации с сервером BUC
|
||
this.sendBookUrlsPeriod = 1*minuteMs;//период отправки BookUrls на сервер BUC
|
||
} else {
|
||
this.cleanQueryInterval = 300*dayMs;//интервал очистки устаревших
|
||
this.syncPeriod = 1*minuteMs;//период синхронизации с сервером BUC
|
||
this.sendBookUrlsPeriod = 1*1000;//период отправки BookUrls на сервер BUC
|
||
}
|
||
|
||
this.fromCheckTime = 1;
|
||
this.bookUrls = new Set();
|
||
|
||
this.main();//no await
|
||
|
||
instance = this;
|
||
}
|
||
|
||
return instance;
|
||
}
|
||
|
||
async wsRequest(query) {
|
||
const response = await this.wsc.message(
|
||
await this.wsc.send(Object.assign({accessToken: this.accessToken}, query), 60),
|
||
60
|
||
);
|
||
if (response.error)
|
||
throw new Error(response.error);
|
||
return response;
|
||
}
|
||
|
||
async wsGetBuc(fromCheckTime, callback) {
|
||
const requestId = await this.wsc.send({accessToken: this.accessToken, action: 'get-buc', fromCheckTime}, 60);
|
||
while (1) {//eslint-disable-line
|
||
const res = await this.wsc.message(requestId, 60);
|
||
|
||
if (res.state == 'get') {
|
||
await callback(res.rows);
|
||
} else {
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
async wsUpdateBuc(bookUrls) {
|
||
return await this.wsRequest({action: 'update-buc', bookUrls});
|
||
}
|
||
|
||
async checkBuc(bookUrls) {
|
||
const db = this.appDb;
|
||
|
||
for (const url of bookUrls)
|
||
this.bookUrls.add(url);
|
||
|
||
const rows = await db.select({
|
||
table: 'buc',
|
||
map: `(r) => ({id: r.id, size: r.size})`,
|
||
where: `@@id(${db.esc(bookUrls)})`,
|
||
});
|
||
|
||
return rows;
|
||
}
|
||
|
||
async findMaxCheckTime() {
|
||
const db = this.appDb;
|
||
|
||
let result = 1;
|
||
|
||
//одним куском, возможно будет жрать память
|
||
const rows = await db.select({
|
||
table: 'buc',
|
||
where: `
|
||
const result = new Set();
|
||
let max = 0;
|
||
let maxId = null;
|
||
|
||
@iter(@all(), (row) => {
|
||
if (row.checkTime > max) {
|
||
max = row.checkTime;
|
||
maxId = row.id;
|
||
}
|
||
});
|
||
|
||
if (maxId)
|
||
result.add(maxId);
|
||
|
||
return result;
|
||
`
|
||
});
|
||
|
||
if (rows.length)
|
||
result = rows[0].checkTime;
|
||
|
||
return result;
|
||
}
|
||
|
||
async periodicSendBookUrls() {
|
||
while (1) {//eslint-disable-line
|
||
try {
|
||
//отправим this.bookUrls
|
||
if (this.bookUrls.size) {
|
||
log(`client: remote update buc begin`);
|
||
|
||
const arr = Array.from(this.bookUrls);
|
||
this.bookUrls = new Set();
|
||
|
||
const chunkSize = 100;
|
||
let updated = 0;
|
||
for (let i = 0; i < arr.length; i += chunkSize) {
|
||
const chunk = arr.slice(i, i + chunkSize);
|
||
|
||
const res = await this.wsUpdateBuc(chunk);
|
||
if (!res.error && res.state == 'success') {
|
||
//update success
|
||
updated += chunk.length;
|
||
} else {
|
||
for (const url of chunk) {
|
||
this.bookUrls.add(url);
|
||
}
|
||
log(LM_ERR, `update-buc error: ${(res.error ? res.error : `wrong state "${res.state}"`)}`);
|
||
}
|
||
}
|
||
log(`client: remote update buc end, updated ${updated} urls`);
|
||
}
|
||
} catch (e) {
|
||
log(LM_ERR, e.stack);
|
||
}
|
||
|
||
await utils.sleep(this.sendBookUrlsPeriod);
|
||
}
|
||
}
|
||
|
||
async periodicSync() {
|
||
const db = this.appDb;
|
||
|
||
while (1) {//eslint-disable-line
|
||
try {
|
||
//почистим нашу таблицу 'buc'
|
||
log(`client: clean 'buc' table begin`);
|
||
const cleanTime = Date.now() - this.cleanQueryInterval;
|
||
while (1) {//eslint-disable-line
|
||
//выборка всех по кусочкам
|
||
const rows = await db.select({
|
||
table: 'buc',
|
||
where: `
|
||
let iter = @getItem('clean');
|
||
if (!iter) {
|
||
iter = @all();
|
||
@setItem('clean', iter);
|
||
}
|
||
|
||
const ids = new Set();
|
||
let id = iter.next();
|
||
while (!id.done) {
|
||
ids.add(id.value);
|
||
if (ids.size >= 1000)
|
||
break;
|
||
id = iter.next();
|
||
}
|
||
|
||
return ids;
|
||
`
|
||
});
|
||
|
||
if (rows.length) {
|
||
const toDelIds = [];
|
||
for (const row of rows)
|
||
if (row.queryTime <= cleanTime)
|
||
toDelIds.push(row.id);
|
||
|
||
//удаление
|
||
const res = await db.delete({
|
||
table: 'buc',
|
||
where: `@@id(${db.esc(toDelIds)})`,
|
||
});
|
||
|
||
log(`client: clean 'buc' deleted ${res.deleted}`);
|
||
} else {
|
||
break;
|
||
}
|
||
}
|
||
await db.select({
|
||
table: 'buc',
|
||
where: `
|
||
@delItem('clean');
|
||
return new Set();
|
||
`
|
||
});
|
||
|
||
log(`client: clean 'buc' table end`);
|
||
|
||
//синхронизация с сервером BUC
|
||
log(`client: sync 'buc' table begin`);
|
||
this.fromCheckTime -= 30*minuteMs;//минус полчаса на всякий случай
|
||
await this.wsGetBuc(this.fromCheckTime, async(rows) => {
|
||
for (const row of rows) {
|
||
if (row.checkTime > this.fromCheckTime)
|
||
this.fromCheckTime = row.checkTime;
|
||
}
|
||
|
||
const res = await db.insert({
|
||
table: 'buc',
|
||
replace: true,
|
||
rows
|
||
});
|
||
|
||
log(`client: sync 'buc' table, inserted ${res.inserted} rows, replaced ${res.replaced}`);
|
||
});
|
||
log(`client: sync 'buc' table end`);
|
||
} catch (e) {
|
||
log(LM_ERR, e.stack);
|
||
}
|
||
|
||
await utils.sleep(this.syncPeriod);
|
||
}
|
||
}
|
||
|
||
async main() {
|
||
try {
|
||
if (!this.config.bucEnabled)
|
||
throw new Error('BookUpdateChecker disabled');
|
||
|
||
this.fromCheckTime = await this.findMaxCheckTime();
|
||
|
||
this.periodicSendBookUrls();//no await
|
||
this.periodicSync();//no await
|
||
|
||
log(`BUC Client Worker started`);
|
||
} catch (e) {
|
||
log(LM_FATAL, e.stack);
|
||
ayncExit.exit(1);
|
||
}
|
||
}
|
||
}
|
||
|
||
module.exports = BUCClient; |