Решение проблемы откатов страницы чтения при нестабильной связи во время синхронизации
This commit is contained in:
@@ -49,6 +49,7 @@ class ServerStorage {
|
|||||||
this.keyInited = false;
|
this.keyInited = false;
|
||||||
this.commit = this.$store.commit;
|
this.commit = this.$store.commit;
|
||||||
this.prevServerStorageKey = null;
|
this.prevServerStorageKey = null;
|
||||||
|
this.identity = utils.randomHexString(20);
|
||||||
this.lock = new LockQueue(100);
|
this.lock = new LockQueue(100);
|
||||||
|
|
||||||
this.$root.generateNewServerStorageKey = () => {this.generateNewServerStorageKey()};
|
this.$root.generateNewServerStorageKey = () => {this.generateNewServerStorageKey()};
|
||||||
@@ -647,6 +648,8 @@ class ServerStorage {
|
|||||||
await this.setCachedRecentPatch(newRecentPatch);
|
await this.setCachedRecentPatch(newRecentPatch);
|
||||||
if (needSaveRecentMod && newRecentMod.rev)
|
if (needSaveRecentMod && newRecentMod.rev)
|
||||||
await this.setCachedRecentMod(newRecentMod);
|
await this.setCachedRecentMod(newRecentMod);
|
||||||
|
} else {
|
||||||
|
this.prevItemKey = null;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.lock.ret();
|
this.lock.ret();
|
||||||
@@ -669,7 +672,7 @@ class ServerStorage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async storageApi(action, items, force) {
|
async storageApi(action, items, force) {
|
||||||
const request = {action, items};
|
const request = {action, identity: this.identity, items};
|
||||||
if (force)
|
if (force)
|
||||||
request.force = true;
|
request.force = true;
|
||||||
const encodedRequest = await this.encodeStorageItems(request);
|
const encodedRequest = await this.encodeStorageItems(request);
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ class JembaReaderStorage {
|
|||||||
if (!instance) {
|
if (!instance) {
|
||||||
this.connManager = new JembaConnManager();
|
this.connManager = new JembaConnManager();
|
||||||
this.db = this.connManager.db['reader-storage'];
|
this.db = this.connManager.db['reader-storage'];
|
||||||
|
|
||||||
|
this.cacheMap = new Map();
|
||||||
this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа
|
this.periodicCleanCache(3*3600*1000);//1 раз в 3 часа
|
||||||
|
|
||||||
instance = this;
|
instance = this;
|
||||||
@@ -20,6 +22,21 @@ class JembaReaderStorage {
|
|||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getCache(id) {
|
||||||
|
const obj = this.cacheMap.get(id);
|
||||||
|
if (obj)
|
||||||
|
obj.time = Date.now();
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
setCache(id, newObj) {
|
||||||
|
let obj = this.cacheMap.get(id);
|
||||||
|
if (!obj)
|
||||||
|
obj = {};
|
||||||
|
Object.assign(obj, newObj, {time: Date.now()});
|
||||||
|
this.cacheMap.set(id, obj);
|
||||||
|
}
|
||||||
|
|
||||||
async doAction(act) {
|
async doAction(act) {
|
||||||
try {
|
try {
|
||||||
if (!_.isObject(act.items))
|
if (!_.isObject(act.items))
|
||||||
@@ -34,7 +51,7 @@ class JembaReaderStorage {
|
|||||||
result = await this.getItems(act.items);
|
result = await this.getItems(act.items);
|
||||||
break;
|
break;
|
||||||
case 'set':
|
case 'set':
|
||||||
result = await this.setItems(act.items, act.force);
|
result = await this.setItems(act.items, act.identity, act.force);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new Error('Unknown action');
|
throw new Error('Unknown action');
|
||||||
@@ -53,8 +70,9 @@ class JembaReaderStorage {
|
|||||||
const db = this.db;
|
const db = this.db;
|
||||||
|
|
||||||
for (const id of Object.keys(items)) {
|
for (const id of Object.keys(items)) {
|
||||||
if (this.cache[id]) {
|
const obj = this.getCache(id);
|
||||||
result.items[id] = this.cache[id];
|
if (obj && obj.items) {
|
||||||
|
result.items[id] = obj.items;
|
||||||
} else {
|
} else {
|
||||||
const rows = await db.select({//SQL`SELECT rev FROM storage WHERE id = ${id}`
|
const rows = await db.select({//SQL`SELECT rev FROM storage WHERE id = ${id}`
|
||||||
table: 'storage',
|
table: 'storage',
|
||||||
@@ -63,7 +81,8 @@ class JembaReaderStorage {
|
|||||||
});
|
});
|
||||||
const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
|
const rev = (rows.length && rows[0].rev ? rows[0].rev : 0);
|
||||||
result.items[id] = {rev};
|
result.items[id] = {rev};
|
||||||
this.cache[id] = result.items[id];
|
|
||||||
|
this.setCache(id, {items: result.items[id]});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -88,7 +107,7 @@ class JembaReaderStorage {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
async setItems(items, force) {
|
async setItems(items, identity, force) {
|
||||||
let check = await this.checkItems(items);
|
let check = await this.checkItems(items);
|
||||||
|
|
||||||
//сначала проверим совпадение ревизий
|
//сначала проверим совпадение ревизий
|
||||||
@@ -96,32 +115,54 @@ class JembaReaderStorage {
|
|||||||
if (!_.isString(items[id].data))
|
if (!_.isString(items[id].data))
|
||||||
throw new Error('items.data is not a string');
|
throw new Error('items.data is not a string');
|
||||||
|
|
||||||
if (!force && check.items[id].rev + 1 !== items[id].rev)
|
//identity необходимо для работы при нестабильной связи,
|
||||||
|
//одному и тому же клиенту разрешается перезаписывать данные при расхождении на 0 или 1 ревизию
|
||||||
|
const obj = this.getCache(id) || {};
|
||||||
|
const sameClient = (identity && obj.identity === identity);
|
||||||
|
if (identity && obj.identity !== identity) {
|
||||||
|
obj.identity = identity;
|
||||||
|
this.setCache(id, obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
const revDiff = items[id].rev - check.items[id].rev;
|
||||||
|
const allowUpdate = force || revDiff === 1 || (sameClient && (revDiff === 0 || revDiff === 1));
|
||||||
|
if (!allowUpdate)
|
||||||
return {state: 'reject', items: check.items};
|
return {state: 'reject', items: check.items};
|
||||||
}
|
}
|
||||||
|
|
||||||
const db = this.db;
|
const db = this.db;
|
||||||
const newRev = {};
|
|
||||||
for (const id of Object.keys(items)) {
|
for (const id of Object.keys(items)) {
|
||||||
await db.insert({//SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`);
|
await db.insert({//SQL`INSERT OR REPLACE INTO storage (id, rev, time, data) VALUES (${id}, ${items[id].rev}, strftime('%s','now'), ${items[id].data})`);
|
||||||
table: 'storage',
|
table: 'storage',
|
||||||
replace: true,
|
replace: true,
|
||||||
rows: [{id, rev: items[id].rev, time: utils.toUnixTime(Date.now()), data: items[id].data}],
|
rows: [{id, rev: items[id].rev, time: utils.toUnixTime(Date.now()), data: items[id].data}],
|
||||||
});
|
});
|
||||||
newRev[id] = {rev: items[id].rev};
|
this.setCache(id, {items: {rev: items[id].rev}});
|
||||||
}
|
}
|
||||||
|
|
||||||
Object.assign(this.cache, newRev);
|
|
||||||
|
|
||||||
return {state: 'success'};
|
return {state: 'success'};
|
||||||
}
|
}
|
||||||
|
|
||||||
periodicCleanCache(timeout) {
|
periodicCleanCache(timeout) {
|
||||||
this.cache = {};
|
try {
|
||||||
|
const sorted = [];
|
||||||
|
for (const [id, obj] of this.cacheMap)
|
||||||
|
sorted.push(Object.assign({id}, obj));
|
||||||
|
|
||||||
setTimeout(() => {
|
sorted.sort((a, b) => b.time - a.time);
|
||||||
this.periodicCleanCache(timeout);
|
|
||||||
}, timeout);
|
for (const obj of sorted) {
|
||||||
|
//оставляем только 1000 недавних
|
||||||
|
if (this.cacheMap.size <= 1000)
|
||||||
|
break;
|
||||||
|
|
||||||
|
this.cacheMap.delete(obj.id);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
setTimeout(() => {
|
||||||
|
this.periodicCleanCache(timeout);
|
||||||
|
}, timeout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user