Fix potential cursor issues while using DRCP

This commit is contained in:
Sharad Chandran R 2024-02-05 15:27:12 +05:30
parent 586e71ce5c
commit 8dadfbda65
5 changed files with 181 additions and 87 deletions

View File

@ -33,6 +33,8 @@ Common Changes
Thin Mode Changes
++++++++++++++++++
#) Fixed potential cursor issues when using DRCP.
#) Fixed bug in reading PLS_INTEGER type when used in PL/SQL records.
#) Error ``NJS-141: errors in array DML exceed 65535`` is now raised

View File

@ -41,6 +41,7 @@ const process = require('process');
const types = require('../types.js');
const errors = require("../errors.js");
const messages = require('./protocol/messages');
const StatementCache = require('./statementCache.js');
const finalizationRegistry = new global.FinalizationRegistry((heldValue) => {
heldValue.disconnect();
@ -626,11 +627,8 @@ class ThinConnectionImpl extends ConnectionImpl {
this.statementCache = null;
this.currentSchema = "";
this.invokeSessionCallback = true;
this.statementCache = new Map();
this.statementCacheSize = params.stmtCacheSize;
this._numCursorsToClose = 0;
this._currentSchemaModified = false;
this._cursorsToClose = new Set();
this._tempLobsToClose = [];
this._tempLobsTotalSize = 0;
this._drcpEstablishSession = false;
@ -734,6 +732,7 @@ class ThinConnectionImpl extends ConnectionImpl {
throw err;
}
this.statementCache = new StatementCache(this.statementCacheSize);
// maintain a list of partially populated database object types
this._partialDbObjectTypes = [];
@ -746,51 +745,10 @@ class ThinConnectionImpl extends ConnectionImpl {
}
//---------------------------------------------------------------------------
// Sets that a statement is no longer in use
// Return the statement to the statement cache, if applicable
//---------------------------------------------------------------------------
_returnStatement(statement) {
if (statement.bindInfoList) {
statement.bindInfoList.forEach(bindInfo => {
bindInfo.bindVar = null;
});
}
if (statement.queryVars) {
statement.queryVars.forEach(queryVar => {
queryVar.values.fill(null);
});
}
if (statement.returnToCache) {
statement.inUse = false;
} else if (statement.cursorId !== 0) {
this._addCursorToClose(statement.cursorId);
}
}
//---------------------------------------------------------------------------
// Adds the cursors that needs to be closed to the _cursorsToClose set
//---------------------------------------------------------------------------
_addCursorToClose(cursorId) {
if (this._cursorsToClose.has(cursorId)) {
const reason = `attempt to close cursor ${cursorId} twice`;
errors.throwErr(errors.ERR_INTERNAL, reason);
}
this._cursorsToClose.add(cursorId);
}
//---------------------------------------------------------------------------
// Adjusts the statement cache to remove least recently used statements
//---------------------------------------------------------------------------
_adjustStatementCache() {
while (this.statementCache.size > this.statementCacheSize) {
const sql = this.statementCache.keys().next().value;
const stmt = this.statementCache.get(sql);
this.statementCache.delete(sql);
if (stmt.inUse) {
stmt.returnToCache = false;
} else if (stmt.cursorId !== 0) {
this._addCursorToClose(stmt.cursorId);
}
}
this.statementCache.returnStatement(statement);
}
//---------------------------------------------------------------------------
@ -900,14 +858,6 @@ class ThinConnectionImpl extends ConnectionImpl {
return resultSet;
}
//---------------------------------------------------------------------------
// Clears the statement cache for the connection
//---------------------------------------------------------------------------
resetStatementCache() {
this.statementCache.clear();
this._cursorsToClose.clear();
}
//---------------------------------------------------------------------------
// getDbObjectClass()
//
@ -972,31 +922,10 @@ class ThinConnectionImpl extends ConnectionImpl {
// new session is going to be used.
//---------------------------------------------------------------------------
_getStatement(sql, cacheStatement = false) {
let statement = this.statementCache.get(sql);
if (!statement) {
statement = new Statement();
statement._prepare(sql);
if (cacheStatement && !this._drcpEstablishSession && !statement.isDdl &&
this.statementCacheSize > 0) {
statement.returnToCache = true;
this.statementCache.set(sql, statement);
this._adjustStatementCache();
}
} else if (statement.inUse || !cacheStatement ||
this._drcpEstablishSession) {
if (!cacheStatement) {
this.statementCache.delete(sql);
statement.returnToCache = false;
}
if (statement.inUse || this._drcpEstablishSession) {
statement = statement._copy();
}
} else {
this.statementCache.delete(sql);
this.statementCache.set(sql, statement);
if (this._drcpEstablishSession) {
cacheStatement = false;
}
statement.inUse = true;
return statement;
return this.statementCache.getStatement(sql, cacheStatement);
}
//---------------------------------------------------------------------------
@ -1091,7 +1020,7 @@ class ThinConnectionImpl extends ConnectionImpl {
// the connection object
//---------------------------------------------------------------------------
getStmtCacheSize() {
return this.statementCacheSize;
return this.statementCache._maxSize;
}
setCallTimeout(timeout) {

View File

@ -282,7 +282,7 @@ class Message {
const flags = buf.readUB4(); // session flags
if (flags & constants.TNS_SESSGET_SESSION_CHANGED) {
if (this.connection._drcpEstablishSession) {
this.connection.resetStatementCache();
this.connection.statementCache.clearOpenCursors();
}
}
this.connection._drcpEstablishSession = false;
@ -297,7 +297,7 @@ class Message {
if (this.connection._currentSchemaModified) {
this._writeCurrentSchemaPiggyback(buf);
}
if (this.connection._cursorsToClose.size > 0 && !this.connection._drcpEstablishSession) {
if (this.connection.statementCache._cursorsToClose.size > 0 && !this.connection._drcpEstablishSession) {
this.writeCloseCursorsPiggyBack(buf);
}
if (
@ -326,11 +326,7 @@ class Message {
writeCloseCursorsPiggyBack(buf) {
this.writePiggybackHeader(buf, constants.TNS_FUNC_CLOSE_CURSORS);
buf.writeUInt8(1);
buf.writeUB4(this.connection._cursorsToClose.size);
for (const cursorNum of this.connection._cursorsToClose.keys()) {
buf.writeUB4(cursorNum);
}
this.connection._cursorsToClose.clear();
this.connection.statementCache.writeCursorsToClose(buf);
}
writeCloseTempLobsPiggyback(buf) {

View File

@ -117,7 +117,7 @@ class MessageWithData extends Message {
this.errorOccurred = false;
this.statement.moreRowsToFetch = false;
} else if (this.errorInfo.num !== 0 && this.errorInfo.cursorId !== 0) {
this.connection.statementCache.delete(this.statement.sql);
this.connection.statementCache._cachedStatements.delete(this.statement.sql);
this.statement.returnToCache = false;
}
if (this.errorInfo.batchErrors) {

167
lib/thin/statementCache.js Normal file
View File

@ -0,0 +1,167 @@
// Copyright (c) 2024, Oracle and/or its affiliates.
//-----------------------------------------------------------------------------
//
// This software is dual-licensed to you under the Universal Permissive License
// (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
// 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
// either license.
//
// If you elect to accept the software under the Apache License, Version 2.0,
// the following applies:
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Node file defining the StatementCache class used to manage cached statements
//-----------------------------------------------------------------------------
const errors = require("../errors.js");
const { Statement } = require("./statement");
class StatementCache {
constructor(maxSize) {
this._cachedStatements = new Map();
this._maxSize = maxSize;
this._cursorsToClose = new Set();
this._openCursors = new Set();
}
//---------------------------------------------------------------------------
// _addCursorToClose()
//
// Add the statement's cursor to the list of cursors that need to be closed.
//---------------------------------------------------------------------------
_addCursorToClose(statement) {
if (this._cursorsToClose.has(statement.cursorId)) {
const reason = `attempt to close cursor ${statement.cursorId} twice`;
errors.throwErr(errors.ERR_INTERNAL, reason);
}
if (statement.cursorId != 0) {
this._cursorsToClose.add(statement.cursorId);
}
this._openCursors.delete(statement);
}
//---------------------------------------------------------------------------
// _adjustCache()
// Adjust the cache so that no more than the maximum number of statements
// are cached by removing least recently used statements
//---------------------------------------------------------------------------
_adjustCache() {
while (this._cachedStatements.size > this._maxSize) {
const sql = this._cachedStatements.keys().next().value;
const stmt = this._cachedStatements.get(sql);
this._cachedStatements.delete(sql);
if (stmt.inUse) {
stmt.returnToCache = false;
} else if (stmt.cursorId !== 0) {
this._addCursorToClose(stmt);
}
}
}
//---------------------------------------------------------------------------
//clearOpenCursors() {
// Clears the list of open cursors and removes the list of cursors that
// need to be closed. This is required when a DRCP session change has
// taken place as the cursor ID values are invalidated.
//---------------------------------------------------------------------------
clearOpenCursors() {
const newOpenCursors = new Set();
for (const stmt of this._openCursors) {
stmt.cursorId = 0;
if (stmt.inUse) {
newOpenCursors.add(stmt);
}
}
this._openCursors = newOpenCursors;
}
//---------------------------------------------------------------------------
// get_statement()
// Get a statement from the statement cache, or prepare a new statement
// for use. If a statement is already in use or the statement is not
// supposed to be cached, a copy will be made (and not returned to the
// cache).
//---------------------------------------------------------------------------
getStatement(sql, cacheStatement = false) {
let stmt = null;
if (sql) {
stmt = this._cachedStatements.get(sql);
if (!cacheStatement) {
this._openCursors.add(stmt);
this._cachedStatements.delete(sql);
}
}
if (!stmt) {
stmt = new Statement();
if (sql) {
stmt._prepare(sql);
}
if (cacheStatement && !stmt.isDdl && this._maxSize > 0) {
stmt.returnToCache = true;
this._cachedStatements.set(sql, stmt);
this._adjustCache();
}
this._openCursors.add(stmt);
} else if (stmt.inUse || !cacheStatement) {
stmt = stmt._copy();
this._openCursors.add(stmt);
} else {
this._cachedStatements.delete(sql);
this._cachedStatements.set(sql, stmt);
}
stmt.inUse = true;
return stmt;
}
//---------------------------------------------------------------------------
// returnStatement()
// Return the statement to the statement cache, if applicable. If the
// statement must not be returned to the statement cache, add the cursor
// id to the list of cursor ids to close on the next round trip to the
// database. Clear all bind variables and fetch variables in order to
// ensure that unnecessary references are not retained.
//---------------------------------------------------------------------------
returnStatement(statement) {
if (statement.bindInfoList) {
statement.bindInfoList.forEach(bindInfo => {
bindInfo.bindVar = null;
});
}
if (statement.queryVars) {
statement.queryVars.forEach(queryVar => {
queryVar.values.fill(null);
});
}
if (statement.returnToCache) {
statement.inUse = false;
} else if (statement.cursorId !== 0) {
this._addCursorToClose(statement);
}
}
//---------------------------------------------------------------------------
// writeCursorsToClose()
// Write the list of cursors to close to the buffer.
//---------------------------------------------------------------------------
writeCursorsToClose(buf) {
buf.writeUB4(this._cursorsToClose.size);
for (const cursorNum of this._cursorsToClose.keys()) {
buf.writeUB4(cursorNum);
}
this._cursorsToClose.clear();
}
}
module.exports = StatementCache;