
290 lines
10 KiB

// Copyright (c) 2019, 2024, Oracle and/or its affiliates.
// This software is dual-licensed to you under the Universal Permissive License
// (UPL) 1.0 as shown at and Apache License
// 2.0 as shown at You may choose
// either license.
// If you elect to accept the software under the Apache License, Version 2.0,
// the following applies:
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
'use strict';
const { Buffer } = require('buffer');
const errors = require('./errors.js');
const nodbUtil = require('./util.js');
const AqDeqOptions = require('./aqDeqOptions.js');
const AqEnqOptions = require('./aqEnqOptions.js');
const AqMessage = require('./aqMessage.js');
const BaseDbObject = require('./dbObject.js');
const transformer = require('./transformer.js');
const types = require('./types.js');
class AqQueue {
// _isPayload()
// Returns a boolean indicating if the value is a valid payload.
_isPayload(value) {
return (typeof value === 'string' || Buffer.isBuffer(value) ||
value instanceof BaseDbObject);
// _makeMessage()
// For enqOne()/deqOne()/enqMany()/deqMany(), wrap the return value with JS
// layer object.
_makeMessage(msgImpl) {
const msg = new AqMessage();
msg._impl = msgImpl;
msg._payloadTypeClass = this._payloadTypeClass;
return msg;
// _verifyMessage()
// Messages that can be enqueued must be a string, Buffer or database object
// (in which case all message properties are defaulted) or an object
// containing a "payload" property along with the other properties to use
// during the enqueue. A normalized object is returned.
_verifyMessage(message) {
// validate we have a payload of the correct type
let payload;
if (this._isPayload(message)) {
payload = message;
message = {};
} else {
message = {...message};
if (this._isJson || this._isPayload(message.payload)) {
payload = message.payload;
} else if (this._payloadTypeClass) {
payload = new this._payloadTypeClass(message.payload);
} else {
// validate payload
if (this._isJson) {
message.payload = transformer.transformJsonValue(payload);
} else if (typeof payload === 'string') {
message.payload = Buffer.from(payload);
} else if (Buffer.isBuffer(payload)) {
message.payload = payload;
} else {
message.payload = payload._impl;
// validate options, if applicable
if (message.correlation !== undefined) {
errors.assertParamPropValue(typeof message.correlation === 'string', 1,
if (message.delay !== undefined) {
errors.assertParamPropValue(Number.isInteger(message.delay), 1, "delay");
if (message.exceptionQueue !== undefined) {
errors.assertParamPropValue(typeof message.exceptionQueue === 'string',
1, "exceptionQueue");
if (message.expiration !== undefined) {
errors.assertParamPropValue(Number.isInteger(message.expiration), 1,
if (message.priority !== undefined) {
errors.assertParamPropValue(Number.isInteger(message.priority), 1,
if (message.recipients !== undefined) {
1, "recipients");
return message;
// create()
// Creates the queue and populates some internal attributes.
async create(conn, name, options) {
if (options.payloadType === types.DB_TYPE_JSON) {
this._isJson = true;
this._payloadType = types.DB_TYPE_JSON;
this._payloadTypeName = "JSON";
} else if (options.payloadType === undefined ||
options.payloadType === types.DB_TYPE_RAW) {
this._payloadType = types.DB_TYPE_RAW;
this._payloadTypeName = "RAW";
} else {
if (typeof options.payloadType === 'string') {
// DB Object type
const cls = await conn._getDbObjectClassForName(options.payloadType);
this._payloadTypeClass = cls;
options.payloadType = cls;
} else {
errors.assertParamPropValue(nodbUtil.isObject(options.payloadType) &&
options.payloadType.prototype instanceof BaseDbObject, 2, "payloadType");
this._payloadTypeClass = options.payloadType;
this._payloadType = types.DB_TYPE_OBJECT;
this._payloadTypeName =;
this._name = name;
this._impl = await conn._impl.getQueue(name, this._payloadTypeClass,
// deqMany()
// Returns an array of messages from the queue, up to the maximum specified,
// if any are available.
async deqMany(maxMessages) {
errors.assertArgCount(arguments, 1, 1);
errors.assertParamValue(Number.isInteger(maxMessages) && maxMessages > 0,
const msgImpls = await this._impl.deq(maxMessages);
return => this._makeMessage(i));
// deqOne()
// Returns a single message from the queue, if one is available.
async deqOne() {
errors.assertArgCount(arguments, 0, 0);
const msgImpls = await this._impl.deq(1);
if (msgImpls.length > 0)
return this._makeMessage(msgImpls[0]);
// deqOptions
// Property for the dequeue options associated with the queue.
get deqOptions() {
if (!this._deqOptions) {
const deqOptions = new AqDeqOptions();
deqOptions._impl = this._impl.deqOptions;
this._deqOptions = deqOptions;
return this._deqOptions;
// enqMany()
// Enqueues multiple messages into the queue at the same time, avoiding
// multiple round-trips.
async enqMany(messages) {
errors.assertArgCount(arguments, 1, 1);
errors.assertParamValue(Array.isArray(messages) && messages.length > 0, 1);
const verifiedMessages = new Array(messages.length);
for (let i = 0; i < messages.length; i++) {
verifiedMessages[i] = this._verifyMessage(messages[i]);
const msgImpls = await this._impl.enq(verifiedMessages);
return => this._makeMessage(i));
// enqOne()
// Enqueues a single message into the queue.
async enqOne(message) {
errors.assertArgCount(arguments, 1, 1);
message = this._verifyMessage(message);
const msgImpls = await this._impl.enq([message]);
return this._makeMessage(msgImpls[0]);
// enqOptions
// Property for the enqueue options associated with the queue.
get enqOptions() {
if (!this._enqOptions) {
const enqOptions = new AqEnqOptions();
enqOptions._impl = this._impl.enqOptions;
this._enqOptions = enqOptions;
return this._enqOptions;
// name
// Property for the name of the queue.
get name() {
return this._name;
// payloadType
// Property for the payload type.
get payloadType() {
return this._payloadType;
// payloadTypeName
// Property for the payload type name.
get payloadTypeName() {
return this._payloadTypeName;
// payloadTypeClass
// Property for the payload type class.
get payloadTypeClass() {
return this._payloadTypeClass;
module.exports = AqQueue;