Files
liberama/server/core/LimitedQueue.js

119 lines
3.5 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
class LimitedQueue {
constructor(enqueueAfter = 10, size = 100, timeout = 60*60*1000) {//timeout в ms
this.size = size;
this.timeout = timeout;
this.abortCount = 0;
this.enqueueAfter = enqueueAfter;
this.freed = enqueueAfter;
this.listeners = [];
}
_addListener(listener) {
this.listeners.push(listener);
}
//отсылаем сообщение первому ожидающему и удаляем его из списка
_emitFree() {
if (this.listeners.length > 0) {
let listener = this.listeners.shift();
listener.onFree();
for (let i = 0; i < this.listeners.length; i++) {
this.listeners[i].onPlaceChange(i + 1);
}
}
}
get(onPlaceChange) {
return new Promise((resolve, reject) => {
if (this.destroyed)
reject(new Error('destroyed'));
const take = () => {
if (this.freed <= 0)
throw new Error('Ошибка получения ресурсов в очереди ожидания');
this.freed--;
this.resetTimeout();
let aCount = this.abortCount;
return {
ret: () => {
if (aCount == this.abortCount) {
this.freed++;
this._emitFree();
aCount = -1;
this.resetTimeout();
}
},
abort: () => {
return (aCount != this.abortCount);
},
resetTimeout: this.resetTimeout.bind(this)
};
};
if (this.freed > 0) {
resolve(take());
} else {
if (this.listeners.length < this.size) {
this._addListener({
onFree: () => {
resolve(take());
},
onError: (err) => {
reject(err);
},
onPlaceChange: (i) => {
if (onPlaceChange)
onPlaceChange(i);
}
});
if (onPlaceChange)
onPlaceChange(this.listeners.length);
} else {
reject(new Error('Превышен размер очереди ожидания'));
}
}
});
}
resetTimeout() {
if (this.timer)
clearTimeout(this.timer);
this.timer = setTimeout(() => { this.clean(); }, this.timeout);
}
clean() {
this.timer = null;
if (this.freed < this.enqueueAfter) {
this.abortCount++;
//чистка listeners
for (const listener of this.listeners) {
listener.onError('Время ожидания в очереди истекло');
}
this.listeners = [];
this.freed = this.enqueueAfter;
}
}
destroy() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
for (const listener of this.listeners) {
listener.onError('destroy');
}
this.listeners = [];
this.abortCount++;
this.destroyed = true;
}
}
module.exports = LimitedQueue;