488 lines
15 KiB
JavaScript
488 lines
15 KiB
JavaScript
// Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved
|
|
|
|
//-----------------------------------------------------------------------------
|
|
//
|
|
// You may not use the identified files except in compliance with the Apache
|
|
// License, Version 2.0 (the "License.")
|
|
//
|
|
// You may obtain a copy of the License at
|
|
// http://www.apache.org/licenses/LICENSE-2.0.
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
//
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
//-----------------------------------------------------------------------------
|
|
|
|
'use strict';
|
|
|
|
const EventEmitter = require('events');
|
|
const nodbUtil = require('./util.js');
|
|
|
|
// completeConnectionRequest does the actual work of getting a connection from
|
|
// a pool. It's abstracted out so it can be called from getConnection and
|
|
// checkRequestQueue consistently.
|
|
function completeConnectionRequest(config, getConnectionCb) {
|
|
const self = this;
|
|
|
|
// Incrementing _connectionsOut prior to making the async call to get a connection
|
|
// to prevent other connection requests from exceeding the poolMax.
|
|
self._connectionsOut += 1;
|
|
|
|
self._getConnection(config, function(err, connInst, newSession) {
|
|
if (err) {
|
|
// Decrementing _connectionsOut if we didn't actually get a connection
|
|
// and then rechecking the queue.
|
|
self._connectionsOut -= 1;
|
|
|
|
if (self._enableStats) {
|
|
self._totalFailedRequests += 1;
|
|
}
|
|
|
|
process.nextTick(function() {
|
|
checkRequestQueue.call(self);
|
|
});
|
|
|
|
getConnectionCb(err);
|
|
|
|
return;
|
|
}
|
|
|
|
connInst.on('_after_close', function() {
|
|
self._connectionsOut -= 1;
|
|
if (self._connectionsOut == 0 && self._closeCb)
|
|
self._closeCb();
|
|
|
|
checkRequestQueue.call(self);
|
|
});
|
|
|
|
// Invoke tag fixup callback method if one has been specified and
|
|
// the actual tag on the connection doesn't match the one
|
|
// requested, or the connection is freshly created.
|
|
let requestedTag = config.tag || "";
|
|
if (typeof self.sessionCallback === 'function' &&
|
|
(newSession || connInst.tag != requestedTag)) {
|
|
self.sessionCallback(connInst, requestedTag,
|
|
function(err) {
|
|
if (err) {
|
|
connInst.close({drop: true}, function() {
|
|
getConnectionCb(err);
|
|
});
|
|
return;
|
|
}
|
|
getConnectionCb(null, connInst);
|
|
}
|
|
);
|
|
|
|
// otherwise, simply invoke the user's callback immediately
|
|
} else {
|
|
getConnectionCb(null, connInst);
|
|
}
|
|
});
|
|
}
|
|
|
|
// Requests for connections from pools are queued. checkRequestQueue determines
|
|
// when requests for connections should be completed and cancels any timeout
|
|
// that may have been associated with the request.
|
|
function checkRequestQueue() {
|
|
const self = this;
|
|
let payload;
|
|
let waitTime;
|
|
|
|
if (self._connRequestQueue.length === 0 || self._connectionsOut === self.poolMax) {
|
|
return; // no need to do any work
|
|
}
|
|
|
|
payload = self._connRequestQueue.shift();
|
|
|
|
if (self._enableStats) {
|
|
self._totalRequestsDequeued += 1;
|
|
|
|
waitTime = Date.now() - payload.enqueuedTime;
|
|
|
|
self._totalTimeInQueue += waitTime;
|
|
self._minTimeInQueue = Math.min(self._minTimeInQueue, waitTime);
|
|
self._maxTimeInQueue = Math.max(self._maxTimeInQueue, waitTime);
|
|
}
|
|
|
|
if (self._usingQueueTimeout) {
|
|
clearTimeout(payload.timeoutHandle);
|
|
|
|
delete self._connRequestTimersMap[payload.timerIdx];
|
|
payload.timeoutHandle = null;
|
|
payload.timerIdx = null;
|
|
}
|
|
|
|
completeConnectionRequest.call(self, payload.config,
|
|
payload.getConnectionCb);
|
|
}
|
|
|
|
// onRequestTimeout is used to prevent requests for connections from sitting in the
|
|
// queue for too long. The number of milliseconds can be set via queueTimeout
|
|
// property of the poolAttrs used when creating a pool.
|
|
function onRequestTimeout(timerIdx) {
|
|
const self = this;
|
|
let payloadToDequeue = self._connRequestTimersMap[timerIdx];
|
|
let requestIndex;
|
|
|
|
if (payloadToDequeue) {
|
|
if (self._enableStats) {
|
|
self._totalRequestTimeouts += 1;
|
|
self._totalTimeInQueue += Date.now() - payloadToDequeue.enqueuedTime;
|
|
}
|
|
|
|
requestIndex = self._connRequestQueue.indexOf(payloadToDequeue);
|
|
|
|
self._connRequestQueue.splice(requestIndex, 1);
|
|
delete self._connRequestTimersMap[timerIdx];
|
|
|
|
payloadToDequeue.getConnectionCb(new Error(nodbUtil.getErrorMessage('NJS-040', self.queueTimeout)));
|
|
}
|
|
}
|
|
|
|
// This function gets a connection from the pool. If there are fewer
|
|
// connections out than the poolMax setting, then the request will return
|
|
// immediately; otherwise, the request will be queued for up to queueTimeout
|
|
// milliseconds.
|
|
function getConnection(a1, a2) {
|
|
const self = this;
|
|
let payload;
|
|
let timeoutHandle;
|
|
let timerIdx;
|
|
let poolMax;
|
|
let getConnectionCb;
|
|
let options = {};
|
|
|
|
nodbUtil.assert(arguments.length >= 1 && arguments.length <= 2, 'NJS-009');
|
|
switch (arguments.length) {
|
|
case 1:
|
|
getConnectionCb = a1;
|
|
break;
|
|
case 2:
|
|
nodbUtil.assert(nodbUtil.isObject(a1), 'NJS-005', 1);
|
|
options = a1;
|
|
getConnectionCb = a2;
|
|
break;
|
|
}
|
|
nodbUtil.assert(typeof getConnectionCb === 'function', 'NJS-005', arguments.length);
|
|
|
|
if (self.status === self._oracledb.POOL_STATUS_DRAINING) { // closing soon
|
|
getConnectionCb(new Error(nodbUtil.getErrorMessage('NJS-064')));
|
|
return;
|
|
} else if (self.status === self._oracledb.POOL_STATUS_CLOSED) {
|
|
getConnectionCb(new Error(nodbUtil.getErrorMessage('NJS-065')));
|
|
return;
|
|
}
|
|
|
|
if (self._enableStats) {
|
|
self._totalConnectionRequests += 1;
|
|
}
|
|
|
|
// getting the poolMax setting on the pool may fail if the pool is no longer
|
|
// valid
|
|
try {
|
|
poolMax = self.poolMax;
|
|
} catch (err) {
|
|
getConnectionCb(err);
|
|
if (self._enableStats) {
|
|
self._totalFailedRequests += 1;
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (self._connectionsOut < poolMax) {
|
|
completeConnectionRequest.call(self, options, getConnectionCb);
|
|
} else { // need to queue the request
|
|
if (self._usingQueueTimeout) {
|
|
self._connRequestTimersIdx += 1;
|
|
timerIdx = self._connRequestTimersIdx;
|
|
|
|
timeoutHandle = setTimeout(
|
|
function() {
|
|
onRequestTimeout.call(self, timerIdx);
|
|
},
|
|
self.queueTimeout
|
|
);
|
|
}
|
|
|
|
payload = {
|
|
timerIdx: timerIdx,
|
|
timeoutHandle: timeoutHandle,
|
|
getConnectionCb: getConnectionCb,
|
|
config : options
|
|
};
|
|
|
|
if (self._usingQueueTimeout) {
|
|
self._connRequestTimersMap[timerIdx] = payload;
|
|
}
|
|
|
|
self._connRequestQueue.push(payload);
|
|
|
|
if (self._enableStats) {
|
|
payload.enqueuedTime = Date.now();
|
|
self._totalRequestsEnqueued += 1;
|
|
self._maxQueueLength = Math.max(self._maxQueueLength, self._connRequestQueue.length);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
function close(a1, a2) {
|
|
const upperLimitTimeout = 2147483647;
|
|
const self = this;
|
|
let forceClose;
|
|
let drainTime;
|
|
let timeoutCb;
|
|
let closeCb;
|
|
|
|
nodbUtil.assert(arguments.length === 1 || arguments.length === 2, 'NJS-009');
|
|
|
|
switch (arguments.length) {
|
|
case 1:
|
|
nodbUtil.assert(typeof a1 === 'function', 'NJS-005', 1);
|
|
closeCb = a1;
|
|
drainTime = 0;
|
|
forceClose = false;
|
|
break;
|
|
case 2:
|
|
nodbUtil.assert(typeof a1 === 'number', 'NJS-005', 1);
|
|
nodbUtil.assert(typeof a2 === 'function', 'NJS-005', 2);
|
|
closeCb = a2;
|
|
forceClose = true;
|
|
|
|
// If connectionsOut is === 0, closePool is called right away, as there is no need to wait at all.
|
|
drainTime = self._connectionsOut === 0 ? 0 : a1 * 1000; // a1 represents seconds of time
|
|
|
|
// SetTimeout does not accept numbers greater than a 32-bit signed integer.
|
|
if (drainTime < 0 || isNaN(drainTime) || drainTime > upperLimitTimeout) {
|
|
closeCb(new Error(nodbUtil.getErrorMessage('NJS-005', 1)));
|
|
return;
|
|
}
|
|
break;
|
|
}
|
|
|
|
if (self.status === self._oracledb.POOL_STATUS_DRAINING) { // already closing soon
|
|
closeCb(new Error(nodbUtil.getErrorMessage('NJS-064')));
|
|
return;
|
|
} else if (self.status === self._oracledb.POOL_STATUS_CLOSED) {
|
|
closeCb(new Error(nodbUtil.getErrorMessage('NJS-065')));
|
|
return;
|
|
}
|
|
|
|
// create function which will be called when the drain time has expired or
|
|
// when all of the connections have been released back to the pool
|
|
self._closeCb = function() {
|
|
self._close({forceClose : forceClose}, function(err) {
|
|
if (!err) {
|
|
self._status = self._oracledb.POOL_STATUS_CLOSED;
|
|
self.emit('_after_close', self);
|
|
}
|
|
self._closeCb = undefined;
|
|
if (timeoutCb)
|
|
clearTimeout(timeoutCb);
|
|
closeCb(err);
|
|
});
|
|
};
|
|
|
|
if (forceClose) {
|
|
self._status = self._oracledb.POOL_STATUS_DRAINING;
|
|
}
|
|
if (drainTime === 0) {
|
|
self._closeCb();
|
|
} else {
|
|
timeoutCb = setTimeout(self._closeCb, drainTime);
|
|
}
|
|
|
|
}
|
|
|
|
|
|
// logStats is used to add a hidden method (_logStats) to each pool instance.
|
|
// This provides an easy way to log out the statistics related information
|
|
// that's collected when _enableStats is set to true when creating a pool. This
|
|
// functionality may be altered or enhanced in the future.
|
|
function logStats() {
|
|
const self = this;
|
|
let averageTimeInQueue;
|
|
|
|
if (self.status === self._oracledb.POOL_STATUS_CLOSED) {
|
|
throw new Error(nodbUtil.getErrorMessage('NJS-065'));
|
|
}
|
|
|
|
if (self._enableStats !== true) {
|
|
console.log('Pool statistics not enabled');
|
|
return;
|
|
}
|
|
|
|
averageTimeInQueue = 0;
|
|
|
|
if (self._totalRequestsEnqueued !== 0) {
|
|
averageTimeInQueue = Math.round(self._totalTimeInQueue/self._totalRequestsEnqueued);
|
|
}
|
|
|
|
console.log('\nPool statistics:');
|
|
console.log('...total up time (milliseconds):', Date.now() - self._createdDate);
|
|
console.log('...total connection requests:', self._totalConnectionRequests);
|
|
console.log('...total requests enqueued:', self._totalRequestsEnqueued);
|
|
console.log('...total requests dequeued:', self._totalRequestsDequeued);
|
|
console.log('...total requests failed:', self._totalFailedRequests);
|
|
console.log('...total request timeouts:', self._totalRequestTimeouts);
|
|
console.log('...max queue length:', self._maxQueueLength);
|
|
console.log('...sum of time in queue (milliseconds):', self._totalTimeInQueue);
|
|
console.log('...min time in queue (milliseconds):', self._minTimeInQueue);
|
|
console.log('...max time in queue (milliseconds):', self._maxTimeInQueue);
|
|
console.log('...avg time in queue (milliseconds):', averageTimeInQueue);
|
|
console.log('...pool connections in use:', self.connectionsInUse);
|
|
console.log('...pool connections open:', self.connectionsOpen);
|
|
console.log('Related pool attributes:');
|
|
console.log('...poolAlias:', self.poolAlias);
|
|
console.log('...queueTimeout (milliseconds):', self.queueTimeout);
|
|
console.log('...poolMin:', self.poolMin);
|
|
console.log('...poolMax:', self.poolMax);
|
|
console.log('...poolIncrement:', self.poolIncrement);
|
|
console.log('...poolTimeout (seconds):', self.poolTimeout);
|
|
console.log('...poolPingInterval:', self.poolPingInterval);
|
|
console.log('...sessionCallback:',
|
|
typeof self.sessionCallback === 'function' ? self.sessionCallback.name :
|
|
(typeof self.sessionCallback === 'string' ? '"' + self.sessionCallback + '"' : self.sessionCallback));
|
|
console.log('...stmtCacheSize:', self.stmtCacheSize);
|
|
console.log('Pool status:');
|
|
console.log('...status:', self.status);
|
|
console.log('Related environment variables:');
|
|
console.log('...process.env.UV_THREADPOOL_SIZE:', process.env.UV_THREADPOOL_SIZE);
|
|
}
|
|
|
|
// sets up the instance with additional attributes used for logging statistics
|
|
// and managing the connection queue
|
|
function setup(poolAttrs, poolAlias, oracledb) {
|
|
const self = this;
|
|
let queueTimeout;
|
|
|
|
if (typeof poolAttrs.queueTimeout !== 'undefined') {
|
|
queueTimeout = poolAttrs.queueTimeout;
|
|
} else {
|
|
queueTimeout = oracledb.queueTimeout;
|
|
}
|
|
|
|
// Using Object.defineProperties to add properties to the Pool instance with
|
|
// special properties, such as enumerable but not writable.
|
|
Object.defineProperties(
|
|
self,
|
|
{
|
|
queueTimeout: { // milliseconds a connection request can spend in queue before being failed
|
|
enumerable: true,
|
|
get: function() {
|
|
return queueTimeout;
|
|
}
|
|
},
|
|
_closeCb: { // performs close when drainTime expires or connections closed
|
|
writable: true
|
|
},
|
|
_enableStats: { // true means pool stats will be recorded
|
|
value: poolAttrs._enableStats === true
|
|
},
|
|
_logStats: { // output pool stats
|
|
value: logStats
|
|
},
|
|
_createdDate: {
|
|
value: Date.now()
|
|
},
|
|
_totalConnectionRequests: { // total number of pool.getConnection requests
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_totalRequestsEnqueued: { // number of pool.getConnection requests added to queue
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_totalRequestsDequeued: { // number of pool.getConnection requests removed from queue because a pool connection became available
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_totalFailedRequests: { // number of pool.getConnection requests that failed at the C layer
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_totalRequestTimeouts: { // number of queued pool.getConnection requests that timed out without being satisfied
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_totalTimeInQueue: { // sum of time in milliseconds that all pool.getConnection requests spent in the queue
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_maxQueueLength: { // maximum length of pool queue
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_minTimeInQueue: { // shortest amount of time (milliseconds) that any pool.getConnection request spent in queue
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_maxTimeInQueue: { // longest amount of time (milliseconds) that any pool.getConnection request spent in queue
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_usingQueueTimeout: {
|
|
value: queueTimeout !== 0
|
|
},
|
|
_connectionsOut: { // number of connections checked out from the pool. Must be inc/dec in the main thread in JS
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_connRequestQueue: {
|
|
value: [],
|
|
writable: true
|
|
},
|
|
_connRequestTimersIdx: {
|
|
value: 0,
|
|
writable: true
|
|
},
|
|
_connRequestTimersMap: {
|
|
value: {},
|
|
writable: true
|
|
},
|
|
_status: { // open/closing/closed
|
|
value: oracledb.POOL_STATUS_OPEN,
|
|
writable: true
|
|
},
|
|
poolAlias: {
|
|
enumerable: true,
|
|
get: function() {
|
|
return poolAlias;
|
|
}
|
|
},
|
|
status: { // open/closing/closed
|
|
enumerable: true,
|
|
get: function() {
|
|
return this._status;
|
|
}
|
|
},
|
|
sessionCallback: { // session callback
|
|
enumerable: true,
|
|
get: function() {
|
|
return poolAttrs.sessionCallback;
|
|
}
|
|
}
|
|
}
|
|
);
|
|
}
|
|
|
|
|
|
class Pool extends EventEmitter {
|
|
|
|
_extend(oracledb) {
|
|
this._oracledb = oracledb;
|
|
this._setup = setup;
|
|
this.close = nodbUtil.promisify(oracledb, close);
|
|
this.getConnection = nodbUtil.promisify(oracledb, getConnection);
|
|
this.terminate = this.close;
|
|
}
|
|
|
|
}
|
|
|
|
|
|
module.exports = Pool;
|