node-oracledb/lib/aqQueue.js

290 lines
10 KiB
JavaScript

// Copyright (c) 2019, 2024, Oracle and/or its affiliates.
//-----------------------------------------------------------------------------
//
// This software is dual-licensed to you under the Universal Permissive License
// (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
// 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
// either license.
//
// If you elect to accept the software under the Apache License, Version 2.0,
// the following applies:
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//-----------------------------------------------------------------------------
'use strict';
const { Buffer } = require('buffer');
const errors = require('./errors.js');
const nodbUtil = require('./util.js');
const AqDeqOptions = require('./aqDeqOptions.js');
const AqEnqOptions = require('./aqEnqOptions.js');
const AqMessage = require('./aqMessage.js');
const BaseDbObject = require('./dbObject.js');
const transformer = require('./transformer.js');
const types = require('./types.js');
class AqQueue {
//---------------------------------------------------------------------------
// _isPayload()
//
// Returns a boolean indicating if the value is a valid payload.
//---------------------------------------------------------------------------
_isPayload(value) {
return (typeof value === 'string' || Buffer.isBuffer(value) ||
value instanceof BaseDbObject);
}
//---------------------------------------------------------------------------
// _makeMessage()
//
// For enqOne()/deqOne()/enqMany()/deqMany(), wrap the return value with JS
// layer object.
//---------------------------------------------------------------------------
_makeMessage(msgImpl) {
const msg = new AqMessage();
msg._impl = msgImpl;
msg._payloadTypeClass = this._payloadTypeClass;
return msg;
}
//---------------------------------------------------------------------------
// _verifyMessage()
//
// Messages that can be enqueued must be a string, Buffer or database object
// (in which case all message properties are defaulted) or an object
// containing a "payload" property along with the other properties to use
// during the enqueue. A normalized object is returned.
//---------------------------------------------------------------------------
_verifyMessage(message) {
// validate we have a payload of the correct type
let payload;
if (this._isPayload(message)) {
payload = message;
message = {};
} else {
message = {...message};
if (this._isJson || this._isPayload(message.payload)) {
payload = message.payload;
} else if (this._payloadTypeClass) {
payload = new this._payloadTypeClass(message.payload);
} else {
errors.throwErr(errors.ERR_INVALID_AQ_MESSAGE);
}
}
// validate payload
if (this._isJson) {
message.payload = transformer.transformJsonValue(payload);
} else if (typeof payload === 'string') {
message.payload = Buffer.from(payload);
} else if (Buffer.isBuffer(payload)) {
message.payload = payload;
} else {
message.payload = payload._impl;
}
// validate options, if applicable
if (message.correlation !== undefined) {
errors.assertParamPropValue(typeof message.correlation === 'string', 1,
"correlation");
}
if (message.delay !== undefined) {
errors.assertParamPropValue(Number.isInteger(message.delay), 1, "delay");
}
if (message.exceptionQueue !== undefined) {
errors.assertParamPropValue(typeof message.exceptionQueue === 'string',
1, "exceptionQueue");
}
if (message.expiration !== undefined) {
errors.assertParamPropValue(Number.isInteger(message.expiration), 1,
"expiration");
}
if (message.priority !== undefined) {
errors.assertParamPropValue(Number.isInteger(message.priority), 1,
"priority");
}
if (message.recipients !== undefined) {
errors.assertParamPropValue(nodbUtil.isArrayOfStrings(message.recipients),
1, "recipients");
}
return message;
}
//---------------------------------------------------------------------------
// create()
//
// Creates the queue and populates some internal attributes.
//---------------------------------------------------------------------------
async create(conn, name, options) {
if (options.payloadType === types.DB_TYPE_JSON) {
this._isJson = true;
this._payloadType = types.DB_TYPE_JSON;
this._payloadTypeName = "JSON";
} else if (options.payloadType === undefined ||
options.payloadType === types.DB_TYPE_RAW) {
this._payloadType = types.DB_TYPE_RAW;
this._payloadTypeName = "RAW";
} else {
if (typeof options.payloadType === 'string') {
// DB Object type
const cls = await conn._getDbObjectClassForName(options.payloadType);
this._payloadTypeClass = cls;
options.payloadType = cls;
} else {
errors.assertParamPropValue(nodbUtil.isObject(options.payloadType) &&
options.payloadType.prototype instanceof BaseDbObject, 2, "payloadType");
this._payloadTypeClass = options.payloadType;
}
this._payloadType = types.DB_TYPE_OBJECT;
this._payloadTypeName = this._payloadTypeClass.prototype.name;
}
this._name = name;
this._impl = await conn._impl.getQueue(name, this._payloadTypeClass,
this._payloadType);
}
//---------------------------------------------------------------------------
// deqMany()
//
// Returns an array of messages from the queue, up to the maximum specified,
// if any are available.
//---------------------------------------------------------------------------
async deqMany(maxMessages) {
errors.assertArgCount(arguments, 1, 1);
errors.assertParamValue(Number.isInteger(maxMessages) && maxMessages > 0,
1);
const msgImpls = await this._impl.deq(maxMessages);
return msgImpls.map(i => this._makeMessage(i));
}
//---------------------------------------------------------------------------
// deqOne()
//
// Returns a single message from the queue, if one is available.
//---------------------------------------------------------------------------
async deqOne() {
errors.assertArgCount(arguments, 0, 0);
const msgImpls = await this._impl.deq(1);
if (msgImpls.length > 0)
return this._makeMessage(msgImpls[0]);
}
//---------------------------------------------------------------------------
// deqOptions
//
// Property for the dequeue options associated with the queue.
//---------------------------------------------------------------------------
get deqOptions() {
if (!this._deqOptions) {
const deqOptions = new AqDeqOptions();
deqOptions._impl = this._impl.deqOptions;
this._deqOptions = deqOptions;
}
return this._deqOptions;
}
//---------------------------------------------------------------------------
// enqMany()
//
// Enqueues multiple messages into the queue at the same time, avoiding
// multiple round-trips.
//---------------------------------------------------------------------------
async enqMany(messages) {
errors.assertArgCount(arguments, 1, 1);
errors.assertParamValue(Array.isArray(messages) && messages.length > 0, 1);
const verifiedMessages = new Array(messages.length);
for (let i = 0; i < messages.length; i++) {
verifiedMessages[i] = this._verifyMessage(messages[i]);
}
const msgImpls = await this._impl.enq(verifiedMessages);
return msgImpls.map(i => this._makeMessage(i));
}
//---------------------------------------------------------------------------
// enqOne()
//
// Enqueues a single message into the queue.
//---------------------------------------------------------------------------
async enqOne(message) {
errors.assertArgCount(arguments, 1, 1);
message = this._verifyMessage(message);
const msgImpls = await this._impl.enq([message]);
return this._makeMessage(msgImpls[0]);
}
//---------------------------------------------------------------------------
// enqOptions
//
// Property for the enqueue options associated with the queue.
//---------------------------------------------------------------------------
get enqOptions() {
if (!this._enqOptions) {
const enqOptions = new AqEnqOptions();
enqOptions._impl = this._impl.enqOptions;
this._enqOptions = enqOptions;
}
return this._enqOptions;
}
//---------------------------------------------------------------------------
// name
//
// Property for the name of the queue.
//---------------------------------------------------------------------------
get name() {
return this._name;
}
//---------------------------------------------------------------------------
// payloadType
//
// Property for the payload type.
//---------------------------------------------------------------------------
get payloadType() {
return this._payloadType;
}
//---------------------------------------------------------------------------
// payloadTypeName
//
// Property for the payload type name.
//---------------------------------------------------------------------------
get payloadTypeName() {
return this._payloadTypeName;
}
//---------------------------------------------------------------------------
// payloadTypeClass
//
// Property for the payload type class.
//---------------------------------------------------------------------------
get payloadTypeClass() {
return this._payloadTypeClass;
}
}
nodbUtil.wrapFns(AqQueue.prototype,
"deqOne",
"deqMany",
"enqOne",
"enqMany");
module.exports = AqQueue;