Add query streaming (PR 321)

This commit is contained in:
Christopher Jones 2016-03-24 21:18:16 +11:00
parent b5365ec3e4
commit 58df9f11b6
9 changed files with 896 additions and 24 deletions

View File

@ -13,11 +13,10 @@ The add-on is stable, well documented, and has a comprehensive test suite.
The node-oracledb project is open source and maintained by Oracle Corp. The home page is on the
[Oracle Technology Network](http://www.oracle.com/technetwork/database/database-technologies/scripting-languages/node_js/).
### Node-oracledb supports:
- [SQL and PL/SQL execution](https://github.com/oracle/node-oracledb/blob/master/doc/api.md#sqlexecution)
- [Fetching of large result sets](https://github.com/oracle/node-oracledb/blob/master/doc/api.md#resultsethandling)
- Fetching of query results by [callbacks](https://github.com/oracle/node-oracledb/blob/master/doc/api.md#resultsethandling) or [streams](https://github.com/oracle/node-oracledb/blob/master/doc/api.md#streamingresults)
- [REF CURSORs](https://github.com/oracle/node-oracledb/blob/master/doc/api.md#refcursors)
- [Large Objects: CLOBs and BLOBs](https://github.com/oracle/node-oracledb/blob/master/doc/api.md#lobhandling)
- [Query results as JavaScript objects or array ](https://github.com/oracle/node-oracledb/blob/master/doc/api.md#queryoutputformats)

View File

@ -52,7 +52,8 @@ limitations under the License.
- 3.2.14 [queueRequests](#propdbqueuerequests)
- 3.2.15 [queueTimeout](#propdbqueuetimeout)
- 3.2.16 [stmtCacheSize](#propdbstmtcachesize)
- 3.2.17 [version](#propdbversion)
- 3.2.17 [streamNumRows](#propdbstreamnumrows)
- 3.2.18 [version](#propdbversion)
- 3.3 [Oracledb Methods](#oracledbmethods)
- 3.3.1 [createPool()](#createpool)
- 3.3.2 [getConnection()](#getconnectiondb)
@ -71,8 +72,9 @@ limitations under the License.
- 4.2.3.2 [execute(): Bind Parameters](#executebindParams)
- 4.2.3.3 [execute(): Options](#executeoptions)
- 4.2.3.4 [execute(): Callback Function](#executecallback)
- 4.2.4 [release()](#release)
- 4.2.5 [rollback()](#rollback)
- 4.2.4 [queryStream()](#querystream)
- 4.2.5 [release()](#release)
- 4.2.6 [rollback()](#rollback)
5. [Lob Class](#lobclass)
- 5.1 [Lob Properties](#lobproperties)
- 5.1.1 [chunkSize](#proplobchunksize)
@ -113,10 +115,11 @@ limitations under the License.
- 9.1 [SELECT Statements](#select)
- 9.1.1 [Fetching Rows](#fetchingrows)
- 9.1.2 [Result Set Handling](#resultsethandling)
- 9.1.3 [Query Output Formats](#queryoutputformats)
- 9.1.4 [Query Column Metadata](#querymeta)
- 9.1.5 [Result Type Mapping](#typemap)
- 9.1.6 [Row Prefetching](#rowprefetching)
- 9.1.3 [Streaming Query Results](#streamingresults)
- 9.1.4 [Query Output Formats](#queryoutputformats)
- 9.1.5 [Query Column Metadata](#querymeta)
- 9.1.6 [Result Type Mapping](#typemap)
- 9.1.7 [Row Prefetching](#rowprefetching)
10. [PL/SQL Execution](#plsqlexecution)
- 10.1 [PL/SQL Stored Procedures](#plsqlproc)
- 10.2 [PL/SQL Stored Functions](#plsqlfunc)
@ -744,7 +747,26 @@ var oracledb = require('oracledb');
oracledb.stmtCacheSize = 30;
```
#### <a name="propdbversion"></a> 3.2.17 version
#### <a name="propdbstreamnumrows"></a> 3.2.17 streamNumRows
A value used when streaming rows with [`queryStream()`](#querystream).
It does not limit the total number of rows returned by the stream.
The value is passed to internal [getRows()](#getrows) calls and is
used only for tuning because `getRows()` may be internally called one
or more times when streaming results.
The default value is 100.
This property may be overridden in a [`queryStream()`](#querystream) call.
##### Example
```javascript
var oracledb = require('oracledb');
oracledb.streamNumRows = 100;
```
#### <a name="propdbversion"></a> 3.2.18 version
```
readonly Number version
```
@ -1402,7 +1424,35 @@ rows affected, for example the number of rows inserted. For non-DML
statements such as queries, or if no rows are affected, then
`rowsAffected` will be zero.
#### <a name="release"></a> 4.2.4 release()
#### <a name="querystream"></a> 4.2.4 queryStream()
##### Prototype
```
stream.Readable queryStream(String sql, [Object bindParams, [Object options]]);
```
##### Return Value
This function will return a readable stream for queries.
##### Description
This function provides query streaming support. The input of this
function is same as `execute()` however a callback is not used.
Instead this function returns a stream used to fetch data. See
[Streaming Results](#streamingresults) for more information.
The connection must remain open until the stream is completely read.
##### Parameters
See [connection.execute()](#execute).
An additional options attribute `streamNumRows` can be set. This
overrides *Oracledb* [`streamNumRows`](#propdbstreamnumrows).
#### <a name="release"></a> 4.2.5 release()
##### Prototype
@ -1445,7 +1495,7 @@ Callback function parameter | Description
----------------------------|-------------
*Error error* | If `release()` succeeds, `error` is NULL. If an error occurs, then `error` contains the [error message](#errorobj).
#### <a name="rollback"></a> 4.2.5 rollback()
#### <a name="rollback"></a> 4.2.6 rollback()
##### Prototype
@ -2218,6 +2268,9 @@ A SQL or PL/SQL statement may be executed using the *Connection*
After all database calls on the connection complete, the application
should use the [`release()`](#release) call to release the connection.
Queries may optionally be streamed using the *Connection*
[`queryStream()`](#querystream) method.
### <a name="select"></a> 9.1 SELECT Statements
#### <a name="fetchingrows"></a> 9.1.1 Fetching Rows
@ -2244,8 +2297,10 @@ restricted to [`maxRows`](#propdbmaxrows):
#### <a name="resultsethandling"></a> 9.1.2 Result Set Handling
When the number of query rows is relatively big, or can't be
predicted, it is recommended to use a [`ResultSet`](#resultsetclass).
This prevents query results being unexpectedly truncated by the
predicted, it is recommended to use a [`ResultSet`](#resultsetclass)
with callbacks, as described in this section, or via the ResultSet
stream wrapper, as described [later](#streamingresults). This
prevents query results being unexpectedly truncated by the
[`maxRows`](#propdbmaxrows) limit and removes the need to oversize
`maxRows` to avoid such truncation. Otherwise, for queries that
return a known small number of rows, non-result set queries may have
@ -2343,7 +2398,65 @@ function fetchRowsFromRS(connection, resultSet, numRows)
}
```
#### <a name="queryoutputformats"></a> 9.1.3 Query Output Formats
#### <a name="streamingresults"></a> 9.1.3 Streaming Query Results
Streaming query results allows data to be piped to other streams, for
example when dealing with HTTP responses.
Use [`connection.queryStream()`](#querystream) to create a stream and
listen for events. Each row is returned as a `data` event. Query
metadata is available via a `metadata` event. The `end` event
indicates the end of the query results.
The connection must remain open until the stream is completely read.
Query results must be fetched to completion to avoid resource leaks.
The query stream implementation is a wrapper over the
[ResultSet Class](#resultsetclass). In particular, calls to
[getRows()](#getrows) are made internally to fetch each successive
subset of data, each row of which will generate a `data` event. The
number of rows fetched from the database by each `getRows()` call is
specified by the [`oracledb.streamNumRows`](#propdbstreamnumrows)
value or the `queryStream()` option attribute `streamNumRows`. This
value does not alter the number of rows returned by the stream since
`getRows()` will be called each time more rows are needed. However
the value can be used to tune performance.
There is no explicit ResultSet `close()` call for streaming query
results. This call will be executed internally when all data has been
fetched. If you need to be able to stop a query before retrieving all
data, use a [ResultSet with callbacks](#resultsethandling).
An example of streaming query results is:
```javascript
var stream = connection.queryStream('SELECT employees_name FROM employees',
[], // no bind variables
{ streamNumRows: 100 } // Used for tuning. Does not affect how many rows are returned.
// Default is 100
);
stream.on('error', function (error) {
// handle any error...
});
stream.on('data', function (data) {
// handle data row...
});
stream.on('end', function () {
// release connection...
});
stream.on('metadata', function (metadata) {
// access metadata of query
});
// listen to any other standard stream events...
```
#### <a name="queryoutputformats"></a> 9.1.4 Query Output Formats
Query rows may be returned as an array of column values, or as
Javascript objects, depending on the values of
@ -2413,7 +2526,7 @@ names follow Oracle's standard name-casing rules. They will commonly
be uppercase, since most applications create tables using unquoted,
case-insensitive names.
#### <a name="querymeta"></a> 9.1.4 Query Column Metadata
#### <a name="querymeta"></a> 9.1.5 Query Column Metadata
The column names of a query are returned in the
[`execute()`](#execute) callback's `result.metaData` parameter
@ -2446,7 +2559,7 @@ The names are in uppercase. This is the default casing behavior for
Oracle client programs when a database table is created with unquoted,
case-insensitive column names.
#### <a name="typemap"></a> 9.1.5 Result Type Mapping
#### <a name="typemap"></a> 9.1.6 Result Type Mapping
Oracle character, number and date columns can be selected. Data types
that are currently unsupported give a "datatype is not supported"
@ -2617,7 +2730,7 @@ you may want to bind using `type: oracledb.STRING`. Output would be:
{ x: '-71.48923', y: '42.72347' }
```
#### <a name="rowprefetching"></a> 9.1.6 Row Prefetching
#### <a name="rowprefetching"></a> 9.1.7 Row Prefetching
[Prefetching](http://docs.oracle.com/database/121/LNOCI/oci04sql.htm#LNOCI16355) is a query tuning feature allowing resource usage to be
optimized. It allows multiple rows to be returned in each network
@ -2850,7 +2963,7 @@ connection.execute(
```
The query rows can be handled using a
[ResultSet](http://localhost:8899/doc/api.md#resultsethandling).
[ResultSet](#resultsethandling).
Remember to first enable output using `DBMS_OUTPUT.ENABLE(NULL)`.

78
examples/selectstream.js Normal file
View File

@ -0,0 +1,78 @@
/* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. */
/******************************************************************************
*
* You may not use the identified files except in compliance with the Apache
* License, Version 2.0 (the "License.")
*
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and
* limitations under the License.
*
* NAME
* selectstream.js
*
* DESCRIPTION
* Executes a basic query using a Readable Stream.
* Uses Oracle's sample HR schema.
*
* Scripts to create the HR schema can be found at:
* https://github.com/oracle/db-sample-schemas
*
*****************************************************************************/
var oracledb = require('oracledb');
var dbConfig = require('./dbconfig.js');
oracledb.getConnection(
{
user : dbConfig.user,
password : dbConfig.password,
connectString : dbConfig.connectString
},
function(err, connection)
{
if (err) {
console.error(err.message);
return;
}
var stream = connection.queryStream(
'SELECT first_name, last_name FROM employees ORDER BY employee_id',
[], // no bind variables
{ streamNumRows: 100 } // Used for tuning. Does not affect how many rows are returned.
// Default is 100
);
stream.on('error', function (error) {
// console.log("stream 'error' event");
console.error(error);
return;
});
stream.on('metadata', function (metadata) {
// console.log("stream 'metadata' event");
console.log(metadata);
});
stream.on('data', function (data) {
// console.log("stream 'data' event");
console.log(data);
});
stream.on('end', function () {
// console.log("stream 'end' event");
connection.release(
function(err) {
if (err) {
console.error(err.message);
}
});
});
});

View File

@ -18,6 +18,18 @@
*****************************************************************************/
var resultset = require('./resultset.js');
var Stream = require('./resultset-read-stream');
// The queryStream function is similar to execute except that it immediately
// returns a readable stream.
function queryStream(sql, binding, options) {
var self = this;
var stream;
stream = new Stream(self, sql, binding, options);
return stream;
}
// This execute function is used to override the execute method of the Connection
// class, which is defined in the C layer. The override allows us to do things
@ -110,18 +122,26 @@ module.break = function() {
// The extend method is used to extend the Connection instance from the C layer with
// custom properties and method overrides. References to the original methods are
// maintained so they can be invoked by the overriding method at the right time.
function extend(conn, pool) {
function extend(conn, oracledb, pool) {
// Using Object.defineProperties to add properties to the Connection instance with
// special properties, such as enumerable but not writable.
Object.defineProperties(
conn,
{
_oracledb: { // storing a reference to the base instance to avoid circular references with require
value: oracledb
},
_pool: {
value: pool
},
_execute: {
value: conn.execute
},
queryStream: {
value: queryStream,
enumerable: true,
writable: true
},
execute: {
value: execute,
enumerable: true,

View File

@ -48,7 +48,7 @@ function createPool(poolAttrs, createPoolCb) {
createPoolCb(err);
return;
}
pool.extend(poolInst, poolAttrs, self);
createPoolCb(undefined, poolInst);
@ -67,7 +67,7 @@ function getConnection(connAttrs, createConnectionCb) {
return;
}
connection.extend(connInst);
connection.extend(connInst, self);
createConnectionCb(undefined, connInst);
});
@ -155,6 +155,11 @@ function extend(oracledb) {
value: oracledbCLib.ResultSet,
enumerable: true
},
streamNumRows: {
value: 100,
enumerable: true,
writable: true
},
queueRequests: {
value: true,
enumerable: true,

View File

@ -48,7 +48,7 @@ function completeConnectionRequest(getConnectionCb) {
return;
}
connection.extend(connInst, self);
connection.extend(connInst, self._oracledb, self);
getConnectionCb(undefined, connInst);
});
@ -286,6 +286,9 @@ function extend(pool, poolAttrs, oracledb) {
Object.defineProperties(
pool,
{
_oracledb: { // storing a reference to the base instance to avoid circular references with require
value: oracledb
},
queueRequests: {
value: queueRequests, // true will queue requests when conn pool is maxed out
enumerable: true

View File

@ -0,0 +1,122 @@
/* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. */
/******************************************************************************
*
* You may not use the identified files except in compliance with the Apache
* License, Version 2.0 (the "License.")
*
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and
* limitations under the License.
*
*****************************************************************************/
'use strict';
var util = require('util');
var stream = require('stream');
var Readable = stream.Readable;
var resultset = require('./resultset.js');
// A node.js read stream for resultsets (based on https://github.com/sagiegurari/simple-oracledb/blob/master/lib/resultset-read-stream.js).
function ResultSetReadStream(conn, sql, binding, options) {
var self = this;
binding = binding || [];
options = options || {};
options.resultSet = true;
self.streamNumRows = options.streamNumRows || conn._oracledb.streamNumRows;
Readable.call(self, {
objectMode: true
});
Object.defineProperty(self, 'nextRow', {
// Sets the nextRow value.
set: function (nextRow) {
self.next = nextRow;
if (self.inRead) {
self._read();
}
}
});
conn._execute(sql, binding, options, function(err, result) {
self._onExecuteDone(err, result);
});
}
util.inherits(ResultSetReadStream, Readable);
// The stream _read implementation which fetches the next row from the resultset.
ResultSetReadStream.prototype._read = function () {
var self = this;
self.inRead = false;
if (self.next) {
self.next(function onNextRowRead(error, data) {
if (error) {
self.emit('error', error);
} else if (data) {
self.push(data);
} else {
self.push(null);
}
});
} else {
self.inRead = true;
}
};
ResultSetReadStream.prototype._onExecuteDone = function(err, result) {
var self = this;
if (err) {
self.nextRow = function emitError(streamCallback) {
streamCallback(err);
};
return;
}
resultset.extend(result.resultSet);
self.emit('metadata', result.resultSet.metaData);
var close = function (streamCallback, causeError) {
result.resultSet.close(function onClose(closeError) {
streamCallback(causeError || closeError);
});
};
var readRows;
self.nextRow = function fetchNextRow(streamCallback) {
if (readRows && readRows.length) {
streamCallback(null, readRows.shift());
} else {
result.resultSet.getRows(self.streamNumRows, function onRow(rowError, rows) {
if (rowError) {
close(streamCallback, rowError);
} else if ((!rows) || (!rows.length)) {
close(streamCallback);
} else {
readRows = rows;
streamCallback(null, readRows.shift());
}
});
}
};
};
module.exports = ResultSetReadStream;

View File

@ -226,6 +226,21 @@
12.7.1 maxRows option is ignored when resultSet option is true
12.7.2 maxRows option is ignored with REF Cursor
13. stream.js
13.1 Testing ResultSet stream
13.1.1 stream results for oracle connection
13.1.2 stream results for oracle connection (outFormat: oracledb.OBJECT)
13.1.3 errors in query
13.1.4 no result
13.1.5 single row
13.1.6 multiple row
13.1.7 invalid SQL
13.1.8 Read CLOBs
13.1.9 Read CLOBs after stream close
13.1.10 meta data
13.1.11 stream results with bulk size set
13.1.12 stream stress test
21. datatypeAssist.js
22. dataTypeChar.js
@ -603,4 +618,3 @@
66.3 allows overwriting of public methods on connection instances
66.4 allows overwriting of public methods on resultset instances
66.5 allows overwriting of public methods on lob instances

518
test/stream.js Normal file
View File

@ -0,0 +1,518 @@
/* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. */
/******************************************************************************
*
* You may not use the identified files except in compliance with the Apache
* License, Version 2.0 (the "License.")
*
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and
* limitations under the License.
*
* The node-oracledb test suite uses 'mocha', 'should' and 'async'.
* See LICENSE.md for relevant licenses.
*
* NAME
* 13. stream.js
*
* DESCRIPTION
* Testing driver query results via stream feature.
*
* NUMBERING RULE
* Test numbers follow this numbering rule:
* 1 - 20 are reserved for basic functional tests
* 21 - 50 are reserved for data type supporting tests
* 51 onwards are for other tests
*
*****************************************************************************/
var oracledb = require('oracledb');
var should = require('should');
var async = require('async');
var dbConfig = require('./dbconfig.js');
describe('13. stream.js', function () {
var connection = false;
if (dbConfig.externalAuth) {
var credential = {externalAuth: true, connectString: dbConfig.connectString};
} else {
var credential = dbConfig;
}
var createTable =
"BEGIN \
DECLARE \
e_table_exists EXCEPTION; \
PRAGMA EXCEPTION_INIT(e_table_exists, -00942); \
BEGIN \
EXECUTE IMMEDIATE ('DROP TABLE nodb_employees'); \
EXCEPTION \
WHEN e_table_exists \
THEN NULL; \
END; \
EXECUTE IMMEDIATE (' \
CREATE TABLE nodb_employees ( \
employees_id NUMBER, \
employees_name VARCHAR2(20), \
employees_history CLOB \
) \
'); \
END; ";
var insertRows =
"DECLARE \
x NUMBER := 0; \
n VARCHAR2(20); \
clobData CLOB;\
BEGIN \
FOR i IN 1..217 LOOP \
x := x + 1; \
n := 'staff ' || x; \
INSERT INTO nodb_employees VALUES (x, n, EMPTY_CLOB()) RETURNING employees_history INTO clobData; \
\
DBMS_LOB.WRITE(clobData, 20, 1, '12345678901234567890');\
END LOOP; \
END; ";
var rowsAmount = 217;
before(function (done) {
oracledb.getConnection(credential, function (err, conn) {
if (err) {
console.error(err);
return;
}
connection = conn;
connection.execute(createTable, function (err) {
if (err) {
console.error(err);
return;
}
connection.execute(insertRows, function (err) {
if (err) {
console.error(err);
return;
}
done();
});
});
});
});
after(function (done) {
connection.execute(
'DROP TABLE nodb_employees',
function (err) {
if (err) {
console.error(err.message);
return;
}
connection.release(function (err) {
if (err) {
console.error(err.message);
return;
}
done();
});
}
);
});
describe('13.1 Testing ResultSet stream', function () {
it('13.1.1 stream results for oracle connection', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('SELECT employees_name FROM nodb_employees');
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered');
});
var counter = 0;
stream.on('data', function (data) {
should.exist(data);
counter++;
});
stream.on('end', function () {
should.equal(counter, rowsAmount);
setTimeout(done, 500);
});
});
it('13.1.2 stream results for oracle connection (outFormat: oracledb.OBJECT)', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('SELECT employees_name FROM nodb_employees', {}, {
outFormat: oracledb.OBJECT
});
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered');
});
var counter = 0;
stream.on('data', function (data) {
should.exist(data);
counter++;
});
stream.on('end', function () {
should.equal(counter, rowsAmount);
setTimeout(done, 500);
});
});
it('13.1.3 errors in query', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('SELECT no_such_column FROM nodb_employees');
stream.on('error', function (error) {
should.exist(error);
setTimeout(done, 500);
});
stream.on('data', function (data) {
should.fail(data, null, 'Data event should not be triggered');
});
});
it('13.1.4 no result', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('SELECT * FROM nodb_employees WHERE employees_name = :name', {
name: 'TEST_NO_RESULT'
});
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered: ' + error);
});
var counter = 0;
stream.on('data', function (data) {
should.fail(data, null, 'Data event should not be triggered');
});
stream.on('end', function () {
should.equal(counter, 0);
setTimeout(done, 500);
});
});
it('13.1.5 single row', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('SELECT employees_name FROM nodb_employees WHERE employees_name = :name', {
name: 'staff 10'
});
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered: ' + error);
});
var counter = 0;
stream.on('data', function (data) {
should.exist(data);
should.deepEqual(data, ['staff 10']);
counter++;
});
stream.on('end', function () {
should.equal(counter, 1);
setTimeout(done, 500);
});
});
it('13.1.6 multiple row', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('SELECT employees_name FROM nodb_employees WHERE employees_id <= :maxId ORDER BY employees_id', {
maxId: 10
}, {
outFormat: oracledb.OBJECT
});
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered: ' + error);
});
var counter = 0;
stream.on('data', function (data) {
should.exist(data);
should.deepEqual(data, {
EMPLOYEES_NAME: 'staff ' + (counter + 1)
});
counter++;
});
stream.on('end', function () {
should.equal(counter, 10);
setTimeout(done, 500);
});
});
it('13.1.7 invalid SQL', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('UPDATE nodb_employees SET employees_name = :name WHERE employees_id :id', {
id: 10,
name: 'test_update'
}, {
outFormat: oracledb.OBJECT
});
stream.on('error', function (error) {
should.exist(error);
setTimeout(done, 500);
});
stream.on('data', function (data) {
should.fail(data, null, 'Data event should not be triggered');
});
});
it('13.1.8 Read CLOBs', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('SELECT employees_name, employees_history FROM nodb_employees where employees_id <= :maxId ORDER BY employees_id', {
maxId: 10
}, {
outFormat: oracledb.OBJECT
});
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered: ' + error);
});
var counter = 0;
var clobs = [];
var clobsRead = 0;
stream.on('data', function (data) {
var rowIndex = counter;
should.exist(data);
should.equal(data.EMPLOYEES_NAME, 'staff ' + (rowIndex + 1));
should.exist(data.EMPLOYEES_HISTORY);
should.equal(data.EMPLOYEES_HISTORY.constructor.name, 'Lob');
var clob = [];
data.EMPLOYEES_HISTORY.setEncoding('utf8');
data.EMPLOYEES_HISTORY.on('data', function (data) {
clob.push(data);
});
data.EMPLOYEES_HISTORY.on('end', function () {
clobs[rowIndex] = clob.join('');
should.equal(clobs[rowIndex], '12345678901234567890');
clobsRead++;
if (clobsRead === 10) {
should.equal(counter, 10);
setTimeout(done, 500);
}
});
counter++;
});
stream.on('end', function () {
should.equal(counter, 10);
});
});
it('13.1.9 Read CLOBs after stream close', function (done) {
connection.should.be.ok;
this.timeout(10000);
var stream = connection.queryStream('SELECT employees_name, employees_history FROM nodb_employees where employees_id <= :maxId ORDER BY employees_id', {
maxId: 10
}, {
outFormat: oracledb.OBJECT
});
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered: ' + error);
});
var counter = 0;
var clobs = [];
var clobsRead = 0;
stream.on('data', function (data) {
var rowIndex = counter;
should.exist(data);
should.equal(data.EMPLOYEES_NAME, 'staff ' + (rowIndex + 1));
should.exist(data.EMPLOYEES_HISTORY);
should.equal(data.EMPLOYEES_HISTORY.constructor.name, 'Lob');
var clob = [];
data.EMPLOYEES_HISTORY.setEncoding('utf8');
setTimeout(function () {
data.EMPLOYEES_HISTORY.on('data', function (data) {
clob.push(data);
});
data.EMPLOYEES_HISTORY.on('end', function () {
clobs[rowIndex] = clob.join('');
should.equal(clobs[rowIndex], '12345678901234567890');
clobsRead++;
if (clobsRead === 10) {
should.equal(counter, 10);
setTimeout(done, 500);
}
});
}, 5000);
counter++;
});
stream.on('end', function () {
should.equal(counter, 10);
});
});
it('13.1.10 meta data', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('SELECT employees_name FROM nodb_employees WHERE employees_name = :name', {
name: 'staff 10'
});
var metaDataRead = false;
stream.on('metadata', function (metaData) {
should.deepEqual(metaData, [
{
name: 'EMPLOYEES_NAME'
}
]);
metaDataRead = true;
});
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered: ' + error);
});
stream.on('data', function () {
should.equal(metaDataRead, true);
});
stream.on('end', function () {
should.equal(metaDataRead, true);
setTimeout(done, 500);
});
});
it('13.1.11 stream results with bulk size set', function (done) {
connection.should.be.ok;
var stream = connection.queryStream('SELECT employees_name FROM nodb_employees', [], {
streamNumRows: 1
});
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered');
});
var counter = 0;
stream.on('data', function (data) {
should.exist(data);
counter++;
});
stream.on('end', function () {
should.equal(counter, rowsAmount);
setTimeout(done, 500);
});
});
it('13.1.12 stream stress test', function (done) {
this.timeout(30000);
connection.should.be.ok;
var stream = connection.queryStream('SELECT employees_name FROM nodb_employees', [], {
streamNumRows: 1
});
stream.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered');
});
var counter = 0;
var allData = [];
stream.on('data', function (data) {
should.exist(data);
allData.push(data);
counter++;
});
stream.on('end', function () {
should.equal(counter, rowsAmount);
var testDone = 0;
var subTest = function (callback) {
var query = connection.queryStream('SELECT employees_name FROM nodb_employees', [], {
streamNumRows: Math.floor((Math.random() * (500 - 1)) + 1)
});
query.on('error', function (error) {
should.fail(error, null, 'Error event should not be triggered');
});
var testCounter = 0;
var testData = [];
query.on('data', function (data) {
should.exist(data);
testData.push(data);
testCounter++;
});
query.on('end', function () {
should.equal(testCounter, rowsAmount);
should.deepEqual(testData, allData);
testDone++;
callback();
});
};
var tests = [];
var i;
for (i = 0; i < 50; i++) { // larger values can cause 'ORA-01000: maximum open cursors exceeded'
tests.push(subTest);
}
async.parallel(tests, function () {
should.equal(testDone, tests.length);
done();
})
});
});
});
});