Server : nginx/1.18.0 System : Linux localhost 6.14.3-x86_64-linode168 #1 SMP PREEMPT_DYNAMIC Mon Apr 21 19:47:55 EDT 2025 x86_64 User : www-data ( 33) PHP Version : 8.0.16 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, Directory : /var/www/ecommerce/node_modules/websocket-extensions/lib/pipeline/ |
'use strict';
var RingBuffer = require('./ring_buffer');
var Functor = function(session, method) {
this._session = session;
this._method = method;
this._queue = new RingBuffer(Functor.QUEUE_SIZE);
this._stopped = false;
this.pending = 0;
};
Functor.QUEUE_SIZE = 8;
Functor.prototype.call = function(error, message, callback, context) {
if (this._stopped) return;
var record = { error: error, message: message, callback: callback, context: context, done: false },
called = false,
self = this;
this._queue.push(record);
if (record.error) {
record.done = true;
this._stop();
return this._flushQueue();
}
var handler = function(err, msg) {
if (!(called ^ (called = true))) return;
if (err) {
self._stop();
record.error = err;
record.message = null;
} else {
record.message = msg;
}
record.done = true;
self._flushQueue();
};
try {
this._session[this._method](message, handler);
} catch (err) {
handler(err);
}
};
Functor.prototype._stop = function() {
this.pending = this._queue.length;
this._stopped = true;
};
Functor.prototype._flushQueue = function() {
var queue = this._queue, record;
while (queue.length > 0 && queue.peek().done) {
record = queue.shift();
if (record.error) {
this.pending = 0;
queue.clear();
} else {
this.pending -= 1;
}
record.callback.call(record.context, record.error, record.message);
}
};
module.exports = Functor;