454 lines
18 KiB
JavaScript
454 lines
18 KiB
JavaScript
/* Copyright (c) 2023, Oracle and/or its affiliates. */
|
|
|
|
/******************************************************************************
|
|
*
|
|
* This software is dual-licensed to you under the Universal Permissive License
|
|
* (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
|
|
* 2.0 as shown at https://www.apache.org/licenses/LICENSE-2.0. You may choose
|
|
* either license.
|
|
*
|
|
* If you elect to accept the software under the Apache License, Version 2.0,
|
|
* the following applies:
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the `License`);
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* https://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an `AS IS` BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
* NAME
|
|
* 281. optimisticLock.js
|
|
*
|
|
* DESCRIPTION
|
|
* Testing optimistic locking to handle concurrent updates to a database
|
|
*
|
|
*****************************************************************************/
|
|
'use strict';
|
|
|
|
const oracledb = require('oracledb');
|
|
const assert = require('assert');
|
|
const dbConfig = require('./dbconfig.js');
|
|
const testsUtil = require('./testsUtil.js');
|
|
|
|
describe('281. optimisticLock.js', function() {
|
|
|
|
let isRunnable = false;
|
|
const TABLE = 'my_table';
|
|
|
|
before(async function() {
|
|
isRunnable = await testsUtil.checkPrerequisites(2100000000, 2300000000);
|
|
if (!isRunnable) {
|
|
this.skip();
|
|
}
|
|
});
|
|
|
|
describe('281.1 locking in tables', function() {
|
|
let connection = null;
|
|
// Function to simulate a 1-second delay
|
|
function sleep(ms) {
|
|
return new Promise(resolve => setTimeout(resolve, ms));
|
|
}
|
|
|
|
before(async function() {
|
|
connection = await oracledb.getConnection(dbConfig);
|
|
const sql = `CREATE TABLE ${TABLE} (
|
|
id NUMBER(10) PRIMARY KEY,
|
|
value VARCHAR2(100),
|
|
version NUMBER(10)
|
|
)`;
|
|
// Set up database table with a version number column
|
|
await testsUtil.createTable(connection, TABLE, sql);
|
|
});
|
|
|
|
after(async function() {
|
|
await testsUtil.dropTable(connection, TABLE);
|
|
await connection.close();
|
|
});
|
|
|
|
afterEach(async function() {
|
|
await connection.execute(`TRUNCATE TABLE ${TABLE}`);
|
|
});
|
|
|
|
it('281.1.1 multiple workers that update the same row in the database at the same time', async function() {
|
|
|
|
// Insert a row into the table
|
|
await connection.execute(
|
|
`INSERT INTO ${TABLE} (id, value, version) VALUES (:id, :value, :version)`,
|
|
{ id: 1, value: 'foo', version: 1 }
|
|
);
|
|
|
|
// Define a function to simulate a worker updating the row
|
|
async function updateRow(workerId) {
|
|
|
|
//incrementing the version column to indicate that the row has been modified
|
|
const updateQuery = `
|
|
UPDATE ${TABLE}
|
|
SET value = :value, version = version + 1
|
|
WHERE id = :id AND version = :version
|
|
`;
|
|
|
|
const updateParams = { id: 1, value: `worker ${workerId}`, version: workerId };
|
|
const result = await connection.execute(updateQuery, updateParams);
|
|
assert.strictEqual(result.rowsAffected, 1);
|
|
}
|
|
|
|
// Start multiple workers to update the row concurrently
|
|
const workerCount = 10;
|
|
const workers = [];
|
|
for (let i = 1; i <= workerCount; i++) {
|
|
workers.push(updateRow(i));
|
|
}
|
|
|
|
// Wait for all workers to finish
|
|
await Promise.all(workers);
|
|
|
|
// Verify that the final value of the row is the expected value
|
|
const selectQuery = `SELECT value FROM ${TABLE} WHERE id = :id`;
|
|
const selectParams = { id: 1 };
|
|
const selectResult = await connection.execute(selectQuery, selectParams);
|
|
const finalValue = selectResult.rows[0][0];
|
|
assert.strictEqual(finalValue, `worker ${workerCount}`);
|
|
});
|
|
|
|
it('281.1.2 two workers update and insert data into the same table at the same time', async function() {
|
|
/*
|
|
One of the workers will insert a new row and not commit the transaction,
|
|
while the other worker will update an existing row.
|
|
This should result in the second worker being blocked by the first worker,
|
|
until the first worker either commits or rolls back its transaction.*/
|
|
|
|
// Insert a row into the table
|
|
await connection.execute(
|
|
`INSERT INTO ${TABLE} (id, value, version) VALUES (:id, :value, :version)`,
|
|
{ id: 1, value: 'foo', version: 1 }
|
|
);
|
|
|
|
// Define a function to simulate a worker inserting a row and not committing
|
|
async function insertRow(workerId) {
|
|
const insertQuery = `
|
|
INSERT INTO ${TABLE} (id, value, version)
|
|
VALUES (:id, :value, :version)
|
|
`;
|
|
const insertParams = { id: 2, value: `worker ${workerId}`, version: 1 };
|
|
await connection.execute(insertQuery, insertParams, { autoCommit: false });
|
|
}
|
|
|
|
// Define a function to simulate a worker updating a row
|
|
async function updateRow(workerId) {
|
|
const updateQuery = `
|
|
UPDATE ${TABLE}
|
|
SET value = :value, version = version + 1
|
|
WHERE id = :id AND version = :version
|
|
`;
|
|
const updateParams = { id: 1, value: `worker ${workerId}`, version: 1 };
|
|
await connection.execute(updateQuery, updateParams);
|
|
await connection.commit();
|
|
}
|
|
|
|
// Start the insert and update workers concurrently
|
|
const workers = [];
|
|
workers.push(insertRow(1));
|
|
workers.push(updateRow(2));
|
|
|
|
// Wait for all workers to finish
|
|
await Promise.all(workers);
|
|
|
|
// Verify that the final values of the rows are the expected values
|
|
const selectQuery = `SELECT value FROM ${TABLE} WHERE id IN (:id1, :id2) ORDER BY id`;
|
|
const selectParams = { id1: 1, id2: 2 };
|
|
const selectResult = await connection.execute(selectQuery, selectParams);
|
|
const finalValues = selectResult.rows.map(row => row[0]);
|
|
assert.strictEqual(finalValues[0], "worker 2");
|
|
assert.strictEqual(finalValues[1], "worker 1");
|
|
|
|
// Roll back the uncommitted transaction
|
|
await connection.rollback();
|
|
});
|
|
|
|
it('281.1.3 two workers suspends the current session to update and insert', async function() {
|
|
/*
|
|
Each worker will acquire a lock on the row and prevent other workers from updating it.
|
|
The worker then simulates a delay before updating the row where multiple workers might
|
|
be contending to access and update the same row simultaneously.
|
|
|
|
After updating the row, the worker checks if the update affected exactly one row and
|
|
commits the transaction if successful.
|
|
|
|
The test case waits for both threads to finish and collects their results. It then
|
|
verifies that exactly one thread succeeded and one thread failed, indicating that
|
|
the optimistic locking mechanism prevented concurrent updates and handled errors correctly.
|
|
*/
|
|
|
|
await connection.execute(`
|
|
INSERT INTO ${TABLE} (id, value, version)
|
|
VALUES (1, 'initial', 1)
|
|
`);
|
|
const numworkers = 2;
|
|
const results = [];
|
|
|
|
const workerTasks = [];
|
|
for (let i = 0; i < numworkers; i++) {
|
|
workerTasks.push(runworkerTask(i));
|
|
}
|
|
|
|
async function runworkerTask(i) {
|
|
let workerResult;
|
|
|
|
try {
|
|
/*
|
|
This is done to simulate multiple users accessing a shared resource
|
|
at the same time, and to observe how the nodejs driver behaves when
|
|
two or more users are attempting to access the same tables simultaneously
|
|
*/
|
|
// Delay execution for 1 second
|
|
await sleep(1000);
|
|
|
|
// Select the row for update
|
|
const result = await connection.execute(`
|
|
SELECT id, value, version
|
|
FROM ${TABLE}
|
|
WHERE id = 1
|
|
FOR UPDATE
|
|
`, [], { isAutoCommit: false });
|
|
|
|
const row = result.rows[0];
|
|
|
|
// Simulate a delay before updating the row
|
|
// Delay execution for 1 second
|
|
await sleep(1000);
|
|
|
|
// Increment the version number and update the row
|
|
const updateResult = await connection.execute(`
|
|
UPDATE ${TABLE}
|
|
SET value = :value, version = :newVersion
|
|
WHERE id = :id AND version = :oldVersion
|
|
`, {
|
|
value: `worker ${i}`,
|
|
newVersion: row[2] + 1,
|
|
id: row[0],
|
|
oldVersion: row[2]
|
|
}, { isAutoCommit: false });
|
|
|
|
// Check that the update affected exactly one row
|
|
if (updateResult.rowsAffected !== 1) {
|
|
throw new Error('Failed to update row');
|
|
}
|
|
|
|
// Commit the transaction
|
|
await connection.commit();
|
|
|
|
workerResult = 'success';
|
|
} catch (err) {
|
|
workerResult = err.message;
|
|
}
|
|
|
|
return workerResult;
|
|
}
|
|
|
|
const workerResults = await Promise.all(workerTasks);
|
|
|
|
|
|
// Wait for all workers to finish and collect the results
|
|
for (let i = 0; i < numworkers; i++) {
|
|
results.push(await workerResults[i]);
|
|
}
|
|
|
|
// Verify that one worker succeeded and one worker failed
|
|
assert.strictEqual(results.includes('success'), true);
|
|
assert.strictEqual(results.includes('Failed to update row'), true);
|
|
});
|
|
});
|
|
|
|
describe('281.2 optimistic Locking in JSON Relational Duality View', function() {
|
|
let connection = null;
|
|
const createTableEmp = `CREATE TABLE employees (employee_id NUMBER(6)
|
|
primary key, first_name varchar2(4000),
|
|
last_name varchar2(4000), version NUMBER(10), department_id NUMBER(4))`;
|
|
|
|
const createTableDept = `CREATE TABLE departments (department_id NUMBER(4)
|
|
primary key, department_name VARCHAR2(30),
|
|
manager_id NUMBER(6))`;
|
|
|
|
|
|
const alterTableEmp = `ALTER TABLE employees ADD
|
|
(CONSTRAINT emp_dept_fk FOREIGN KEY (department_id)
|
|
REFERENCES departments)`;
|
|
|
|
const createEmpView = `CREATE or replace JSON relational duality VIEW EMP_OV
|
|
AS
|
|
select JSON {
|
|
'EMPLOYEE_ID' is emp.EMPLOYEE_ID,
|
|
'FIRST_NAME' is emp.FIRST_NAME,
|
|
'LAST_NAME' is emp.last_name,
|
|
'VERSION' is emp.version,
|
|
'department_info' is
|
|
(
|
|
select JSON
|
|
{
|
|
'DEPARTMENT_ID' is dept.department_id,
|
|
'departmentname' is dept.department_name WITH(UPDATE)
|
|
}
|
|
from departments dept WITH(UPDATE,CHECK ETAG)
|
|
where dept.department_id = emp.department_id
|
|
)
|
|
returning JSON}
|
|
from employees emp WITH(INSERT,UPDATE,DELETE)`;
|
|
|
|
const createDeptView = `CREATE OR REPLACE JSON relational duality VIEW dept_ov
|
|
AS
|
|
select JSON{
|
|
'department_id' is dept.DEPARTMENT_ID,
|
|
'department_name' is dept.DEPARTMENT_NAME,
|
|
'EMP_INFO' is
|
|
( select
|
|
json_arrayagg
|
|
(
|
|
JSON
|
|
{
|
|
'employee_id' is emp.employee_id,
|
|
'FIRST_NAME' is emp.FIRST_NAME
|
|
}
|
|
)
|
|
from employees emp WITH (INSERT,UPDATE,DELETE)
|
|
where emp.department_id = dept.department_id
|
|
)
|
|
returning json
|
|
}
|
|
from departments dept WITH (INSERT,UPDATE,DELETE,CHECK ETAG)`;
|
|
|
|
before(async function() {
|
|
connection = await oracledb.getConnection(dbConfig);
|
|
await testsUtil.createTable(connection, 'employees', createTableEmp);
|
|
await testsUtil.createTable(connection, 'departments', createTableDept);
|
|
await connection.execute(alterTableEmp);
|
|
await connection.execute(createEmpView);
|
|
await connection.execute(createDeptView);
|
|
});
|
|
|
|
after(async function() {
|
|
await testsUtil.dropTable(connection, 'employees');
|
|
await testsUtil.dropTable(connection, 'departments');
|
|
await connection.execute(`DROP VIEW IF EXISTS emp_ov`);
|
|
await connection.execute(`DROP VIEW IF EXISTS dept_ov`);
|
|
await connection.close();
|
|
});
|
|
|
|
afterEach(async function() {
|
|
await connection.execute(`DELETE from employees`);
|
|
await connection.execute(`DELETE from departments`);
|
|
});
|
|
|
|
it('281.2.1 multiple workers that update the view at the same time', async function() {
|
|
|
|
// Insert a row into the table
|
|
await connection.execute(`INSERT INTO departments VALUES
|
|
( 10
|
|
,'Administration'
|
|
, 100
|
|
)`);
|
|
|
|
|
|
await connection.execute(`INSERT INTO employees VALUES
|
|
( 100
|
|
, 'Steven'
|
|
, 'King'
|
|
, 1
|
|
, 10
|
|
)`);
|
|
|
|
// Define a function to simulate a worker updating the row
|
|
async function updateRow(workerId) {
|
|
const queryUpdate = `update emp_ov set data = '{"EMPLOYEE_ID":100,"FIRST_NAME":"Lex","LAST_NAME":"De Haan",
|
|
"VERSION": ` + workerId + `,"department_info":{"DEPARTMENT_ID":10,"departmentname":"newdept"}}'
|
|
where json_value(data,'$.EMPLOYEE_ID') = 100`;
|
|
const result = await connection.execute(queryUpdate);
|
|
assert.strictEqual(result.rowsAffected, 1);
|
|
}
|
|
|
|
// Start multiple workers to update the row concurrently
|
|
const workerCount = 10;
|
|
const workers = [];
|
|
for (let i = 1; i <= workerCount; i++) {
|
|
workers.push(updateRow(i));
|
|
}
|
|
|
|
// Wait for all workers to finish
|
|
await Promise.all(workers);
|
|
|
|
// Verify that the final value of the row is the expected value
|
|
const query = `select * from emp_ov order by 1`;
|
|
const result = await connection.execute(query);
|
|
assert.strictEqual(result.rows[0][0].VERSION, workerCount);
|
|
});
|
|
|
|
it('281.2.2 two workers update and insert data into the same table at the same time', async function() {
|
|
/*
|
|
One of the workers will insert a new row and not commit the transaction,
|
|
while the other worker will update an existing row.
|
|
This should result in the second worker being blocked by the first worker,
|
|
until the first worker either commits or rolls back its transaction.*/
|
|
|
|
// Insert a row into the departments table
|
|
await connection.execute(`INSERT INTO departments VALUES
|
|
( 10
|
|
,'Administration'
|
|
, 100
|
|
)`);
|
|
|
|
|
|
await connection.execute(`INSERT INTO employees VALUES
|
|
( 100
|
|
, 'Steven'
|
|
, 'King'
|
|
, 0
|
|
, 10
|
|
)`);
|
|
|
|
|
|
// Define a function to simulate a worker inserting a row and not committing
|
|
async function insertRow(workerId) {
|
|
const queryInsert = `insert into emp_ov values ('{"EMPLOYEE_ID":` + 100 + workerId + `,"FIRST_NAME":"Lex",
|
|
"LAST_NAME":"De Haan","VERSION": ` + workerId + `,"department_info":{"DEPARTMENT_ID":10,"departmentname":"Administration"}}')`;
|
|
|
|
const result = await connection.execute(queryInsert, {}, { autoCommit: false });
|
|
assert.strictEqual(result.rowsAffected, 1);
|
|
}
|
|
|
|
// Define a function to simulate a worker updating a row
|
|
async function updateRow(workerId) {
|
|
const queryUpdate = `update emp_ov set data = '{"EMPLOYEE_ID":100,"FIRST_NAME":"Harry","LAST_NAME":"Potter",
|
|
"VERSION": ` + workerId + `,"department_info":{"DEPARTMENT_ID":10,"departmentname":"Wizardry"}}'
|
|
where json_value(data,'$.EMPLOYEE_ID') = 100`;
|
|
const result = await connection.execute(queryUpdate);
|
|
assert.strictEqual(result.rowsAffected, 1);
|
|
await connection.commit();
|
|
}
|
|
|
|
// Start the insert and update workers concurrently
|
|
const workers = [];
|
|
workers.push(insertRow(1));
|
|
workers.push(updateRow(2));
|
|
|
|
// Wait for all workers to finish
|
|
await Promise.all(workers);
|
|
|
|
// Verify that the final value of the row is the expected value
|
|
const query = `select * from emp_ov order by 1`;
|
|
const result = await connection.execute(query);
|
|
|
|
assert.strictEqual(result.rows.length, 2);
|
|
assert.strictEqual(result.rows[0][0].VERSION, 2);
|
|
assert.strictEqual(result.rows[1][0].VERSION, 1);
|
|
|
|
// Roll back the uncommitted transaction
|
|
await connection.rollback();
|
|
});
|
|
});
|
|
});
|