node-oracledb/test/aq6.js

337 lines
9.8 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/* Copyright (c) 2024, Oracle and/or its affiliates. */
/******************************************************************************
*
* This software is dual-licensed to you under the Universal Permissive License
* (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
* 2.0 as shown at https://www.apache.org/licenses/LICENSE-2.0. You may choose
* either license.
*
* If you elect to accept the software under the Apache License, Version 2.0,
* the following applies:
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* NAME
* 282. aq6.js
*
* DESCRIPTION
* Test Oracle Advanced Queueing (AQ).
* Test cases for msgId from enqOne/enqMany/deqOne/deqMany.
*
*****************************************************************************/
'use strict';
const oracledb = require('oracledb');
const assert = require('assert');
const dbConfig = require('./dbconfig.js');
const testsUtil = require('./testsUtil.js');
describe('282. aq6.js', function() {
let isRunnable = true;
let conn;
const AQ_USER = 'NODB_SCHEMA_AQTEST1';
const AQ_USER_PWD = testsUtil.generateRandomPassword();
const RAW_TABLE = 'NODB_RAW_QUEUE_TAB';
before(async function() {
const prerequisites = await testsUtil.checkPrerequisites(2100000000, 2100000000);
if (!dbConfig.test.DBA_PRIVILEGE || oracledb.thin || !prerequisites) {
isRunnable = false;
}
if (!isRunnable) this.skip();
await testsUtil.createAQtestUser(AQ_USER, AQ_USER_PWD);
const credential = {
user: AQ_USER,
password: AQ_USER_PWD,
connectString: dbConfig.connectString
};
conn = await oracledb.getConnection(credential);
}); // before()
after(async function() {
if (!isRunnable) return;
await conn.close();
await testsUtil.dropAQtestUser(AQ_USER);
}); // after()
describe('282.1 msgId in QUEUE_PAYLOAD_TYPE as RAW', function() {
const rawQueueName = "NODB_RAW_QUEUE";
before(async function() {
const plsql = `
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
QUEUE_TABLE => '${AQ_USER}.${RAW_TABLE}',
QUEUE_PAYLOAD_TYPE => 'RAW'
);
DBMS_AQADM.CREATE_QUEUE(
QUEUE_NAME => '${AQ_USER}.${rawQueueName}',
QUEUE_TABLE => '${AQ_USER}.${RAW_TABLE}'
);
DBMS_AQADM.START_QUEUE(
QUEUE_NAME => '${AQ_USER}.${rawQueueName}'
);
END;
`;
await conn.execute(plsql);
});
it('282.1.1 msgId in enqOne/deqOne', async () => {
let msg;
// Enqueue
const queue1 = await conn.getQueue(rawQueueName);
const messageString = 'This is my message';
msg = await queue1.enqOne(messageString);
assert(msg);
assert(msg.msgId.length > 0);
assert(msg.msgId instanceof Buffer);
await conn.commit();
// Dequeue
const queue2 = await conn.getQueue(rawQueueName);
msg = await queue2.deqOne();
assert(msg);
assert(msg.msgId.length > 0);
assert(msg.msgId instanceof Buffer);
await conn.commit();
}); // 282.1.1
it('282.1.2 msgId in enqMany/deqMany', async () => {
const queue1 = await conn.getQueue(rawQueueName);
queue1.enqOptions.visibility = oracledb.AQ_VISIBILITY_IMMEDIATE;
const messages1 = [
"Message 1",
"Message 2",
{
expiration: 10,
payload: "Message 3"
},
"Message 4"
];
await queue1.enqMany(messages1);
/*Dequeue*/
const queue2 = await conn.getQueue(rawQueueName);
queue2.enqOptions.visibility = oracledb.AQ_VISIBILITY_IMMEDIATE;
const msgs = await queue2.deqMany(5); // get at most 5 messages
if (msgs) {
for (let i = 0; i < msgs.length; i++) {
assert(msgs[i].msgId.length > 0);
assert(msgs[i].msgId instanceof Buffer);
}
}
}); // 282.1.2
});
describe('282.2 msgId in QUEUE_PAYLOAD_TYPE as JSON', function() {
const objQueueName = "NODB_ADDR_QUEUE7";
const objType = "JSON";
const objTable = "NODB_TAB_JSON";
before(async function() {
const plsql = `
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
QUEUE_TABLE => '${AQ_USER}.${objTable}',
multiple_consumers => FALSE,
QUEUE_PAYLOAD_TYPE => '${objType}'
);
DBMS_AQADM.CREATE_QUEUE(
QUEUE_NAME => '${AQ_USER}.${objQueueName}',
QUEUE_TABLE => '${AQ_USER}.${objTable}'
);
DBMS_AQADM.START_QUEUE(
QUEUE_NAME => '${AQ_USER}.${objQueueName}'
);
END;
`;
await conn.execute(plsql);
});
it('282.2.1 enqOne and deqOne Json val as array type', async function() {
let msg;
const queue = await conn.getQueue(objQueueName,
{ payloadType: oracledb.DB_TYPE_JSON }
);
msg = await queue.enqOne ({
payload: { "employees": [ "Employee1", "Employee2", "Employee3" ] },
});
await conn.commit();
/*Dequeue*/
const options = { payloadType: oracledb.DB_TYPE_JSON };
const queue2 = await conn.getQueue(objQueueName, options);
msg = await queue2.deqOne();
assert(msg);
assert(msg.msgId.length > 0);
assert(msg.msgId instanceof Buffer);
await conn.commit();
}); //282.2.1
it('282.2.2 JSON type in enqMany/deqMany', async () => {
const queue3 = await conn.getQueue (objQueueName,
{ payloadType: oracledb.DB_TYPE_JSON });
const empList = [
{payload: { empName: "Employee #1", empId: 101 }},
{payload: { empName: "Employee #2", empId: 102 }},
{payload: { empName: "Employee #3", empId: 103 }}
];
await queue3.enqMany(empList);
await conn.commit();
const options = { payloadType: oracledb.DB_TYPE_JSON };
const queue4 = await conn.getQueue(objQueueName, options);
Object.assign(queue4.deqOptions,
{
navigation: oracledb.AQ_DEQ_NAV_FIRST_MSG,
wait: oracledb.AQ_DEQ_NO_WAIT
}
);
const msgs = await queue4.deqMany(5); // get at most 5 messages
if (msgs) {
for (let i = 0; i < msgs.length; i++) {
assert(msgs[i].msgId.length > 0);
assert(msgs[i].msgId instanceof Buffer);
}
}
}); //282.2.2
});
describe('282.3 msgId as Oracle Database Object AQ Messages', function() {
const objQueueName = "NODB_ADDR_QUEUE";
const objType = "NODB_ADDR_TYP";
const objTable = "NODB_TAB_ADDR";
before(async function() {
// Create the Type
let plsql = `
CREATE OR REPLACE TYPE ${objType} AS OBJECT (
NAME VARCHAR2(10),
ADDRESS VARCHAR2(50)
);
`;
await conn.execute(plsql);
// Create and start a queue
plsql = `
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
QUEUE_TABLE => '${AQ_USER}.${objTable}',
QUEUE_PAYLOAD_TYPE => '${objType}'
);
DBMS_AQADM.CREATE_QUEUE(
QUEUE_NAME => '${AQ_USER}.${objQueueName}',
QUEUE_TABLE => '${AQ_USER}.${objTable}'
);
DBMS_AQADM.START_QUEUE(
QUEUE_NAME => '${AQ_USER}.${objQueueName}'
);
END;
`;
await conn.execute(plsql);
});
it('282.3.1 msgId in enqOne/deqOne', async () => {
let msg;
const addrData = {
NAME: "scott",
ADDRESS: "The Kennel"
};
// Enqueue
const queue1 = await conn.getQueue(
objQueueName,
{ payloadType: objType }
);
const message = new queue1.payloadTypeClass(addrData);
msg = await queue1.enqOne(message);
assert(msg);
assert(msg.msgId.length > 0);
assert(msg.msgId instanceof Buffer);
await conn.commit();
// Dequeue
const queue2 = await conn.getQueue(
objQueueName,
{ payloadType: objType }
);
msg = await queue2.deqOne();
assert(msg);
assert(msg.msgId.length > 0);
assert(msg.msgId instanceof Buffer);
await conn.commit();
}); // 282.3.1
const addrArray = [
{
NAME: "John",
ADDRESS: "100 Oracle Parkway Redwood City, CA US 94065"
},
{
NAME: "Jenny",
ADDRESS: "200 Oracle Parkway Redwood City, CA US 94065"
},
{
NAME: "Laura",
ADDRESS: "300 Oracle Parkway Redwood City, CA US 94065"
},
{
NAME: "Lawrence",
ADDRESS: "400 Oracle Parkway Redwood City, CA US 94065"
}
];
it('282.3.2 msgId in deqMany() with DB object array', async () => {
// Enqueue
const queue1 = await conn.getQueue(
objQueueName,
{ payloadType: objType }
);
const msgArray = [];
for (let i = 0; i < addrArray.length; i++) {
msgArray[i] = new queue1.payloadTypeClass(addrArray[i]);
}
await queue1.enqMany(msgArray);
// Dequeue
const queue2 = await conn.getQueue(
objQueueName,
{ payloadType: objType }
);
const msgs = await queue2.deqMany(5); // get at most 5 messages
if (msgs) {
for (let i = 0; i < msgs.length; i++) {
assert(msgs[i].msgId.length > 0);
assert(msgs[i].msgId instanceof Buffer);
}
}
}); // 282.3.2
});
});