Improve the EmbeddedChannel to be able to easily capture inbound and outbound messages
This commit is contained in:
parent
c2c5c70610
commit
614f689e05
|
@ -83,7 +83,8 @@ class EmbeddedChannelCore : ChannelCore {
|
|||
deinit { closePromise.succeed(result: ()) }
|
||||
|
||||
var outboundBuffer: [Any] = []
|
||||
|
||||
var inboundBuffer: [Any] = []
|
||||
|
||||
func close0(error: Error, promise: Promise<Void>?) {
|
||||
promise?.succeed(result: ())
|
||||
|
||||
|
@ -111,7 +112,7 @@ class EmbeddedChannelCore : ChannelCore {
|
|||
}
|
||||
|
||||
func write0(data: IOData, promise: Promise<Void>?) {
|
||||
outboundBuffer.append(data.forceAsByteBuffer())
|
||||
addToBuffer(buffer: &outboundBuffer, data: data)
|
||||
promise?.succeed(result: ())
|
||||
}
|
||||
|
||||
|
@ -128,6 +129,16 @@ class EmbeddedChannelCore : ChannelCore {
|
|||
}
|
||||
|
||||
func channelRead0(data: IOData) {
|
||||
addToBuffer(buffer: &inboundBuffer, data: data)
|
||||
}
|
||||
|
||||
private func addToBuffer(buffer: inout [Any], data: IOData) {
|
||||
switch data {
|
||||
case .byteBuffer(let buf):
|
||||
buffer.append(buf)
|
||||
case .other(let other):
|
||||
buffer.append(other)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,14 +159,33 @@ public class EmbeddedChannel : Channel {
|
|||
return true
|
||||
}
|
||||
|
||||
public func finish() throws -> Bool {
|
||||
try close().wait()
|
||||
return !channelcore.outboundBuffer.isEmpty || !channelcore.outboundBuffer.isEmpty
|
||||
}
|
||||
|
||||
private var _pipeline: ChannelPipeline!
|
||||
public let allocator: ByteBufferAllocator = ByteBufferAllocator()
|
||||
public var eventLoop: EventLoop = EmbeddedEventLoop()
|
||||
|
||||
public var localAddress: SocketAddress? = nil
|
||||
public var remoteAddress: SocketAddress? = nil
|
||||
var outboundBuffer: [Any] { return (_unsafe as! EmbeddedChannelCore).outboundBuffer }
|
||||
|
||||
|
||||
public func readOutbound<T>() -> T? {
|
||||
return readFromBuffer(buffer: &(_unsafe as! EmbeddedChannelCore).outboundBuffer)
|
||||
}
|
||||
|
||||
public func readInbound<T>() -> T? {
|
||||
return readFromBuffer(buffer: &(_unsafe as! EmbeddedChannelCore).inboundBuffer)
|
||||
}
|
||||
|
||||
private func readFromBuffer<T>(buffer: inout [Any]) -> T? {
|
||||
guard !buffer.isEmpty else {
|
||||
return nil
|
||||
}
|
||||
return (buffer.removeFirst() as! T)
|
||||
}
|
||||
|
||||
init() {
|
||||
_pipeline = ChannelPipeline(channel: self)
|
||||
|
||||
|
|
|
@ -71,7 +71,9 @@ class ChannelPipelineTest: XCTestCase {
|
|||
|
||||
_ = channel.write(data: .other("msg"))
|
||||
_ = try channel.flush().wait()
|
||||
XCTAssertEqual(buf, channel.outboundBuffer[0] as! ByteBuffer)
|
||||
XCTAssertEqual(buf, channel.readOutbound())
|
||||
XCTAssertNil(channel.readOutbound())
|
||||
|
||||
}
|
||||
|
||||
private final class TestChannelOutboundHandler: ChannelOutboundHandler {
|
||||
|
|
|
@ -30,27 +30,10 @@ public class ByteToMessageDecoderTest: XCTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private final class InboundDataCollector: ChannelInboundHandler {
|
||||
|
||||
private let fn: (IOData) -> Void
|
||||
|
||||
init(_ fn: @escaping (IOData) -> Void) {
|
||||
self.fn = fn
|
||||
}
|
||||
|
||||
public func channelRead(ctx: ChannelHandlerContext, data: IOData) {
|
||||
self.fn(data)
|
||||
}
|
||||
}
|
||||
|
||||
func testDecoder() throws {
|
||||
let channel = EmbeddedChannel()
|
||||
|
||||
var dataArray = [Int32]()
|
||||
_ = try channel.pipeline.add(handler: ByteToInt32Decoder()).wait()
|
||||
_ = try channel.pipeline.add(handler: InboundDataCollector { data in
|
||||
dataArray.append(data.forceAsOther())
|
||||
}).wait()
|
||||
|
||||
var buffer = channel.allocator.buffer(capacity: 32)
|
||||
buffer.write(integer: Int32(1))
|
||||
|
@ -58,23 +41,21 @@ public class ByteToMessageDecoderTest: XCTestCase {
|
|||
buffer.moveWriterIndex(to: writerIndex - 1)
|
||||
|
||||
channel.pipeline.fireChannelRead(data: .byteBuffer(buffer))
|
||||
XCTAssertTrue(dataArray.isEmpty)
|
||||
XCTAssertNil(channel.readInbound())
|
||||
|
||||
channel.pipeline.fireChannelRead(data: .byteBuffer(buffer.slice(at: writerIndex - 1, length: 1)!))
|
||||
XCTAssertEqual(1, dataArray.count)
|
||||
|
||||
var buffer2 = channel.allocator.buffer(capacity: 32)
|
||||
buffer2.write(integer: Int32(2))
|
||||
buffer2.write(integer: Int32(3))
|
||||
channel.pipeline.fireChannelRead(data: .byteBuffer(buffer2))
|
||||
XCTAssertEqual(3, dataArray.count)
|
||||
|
||||
|
||||
try channel.close().wait()
|
||||
XCTAssertEqual(3, dataArray.count)
|
||||
|
||||
XCTAssertEqual(Int32(1), dataArray[0])
|
||||
XCTAssertEqual(Int32(2), dataArray[1])
|
||||
XCTAssertEqual(Int32(3), dataArray[2])
|
||||
|
||||
XCTAssertEqual(Int32(1), channel.readInbound())
|
||||
XCTAssertEqual(Int32(2), channel.readInbound())
|
||||
XCTAssertEqual(Int32(3), channel.readInbound())
|
||||
XCTAssertNil(channel.readInbound())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ class EmbeddedChannelTest: XCTestCase {
|
|||
|
||||
var ranBlock = false
|
||||
f.whenSuccess { () -> Void in
|
||||
XCTAssertEqual(buf, channel.outboundBuffer[0] as! ByteBuffer)
|
||||
XCTAssertEqual(buf, channel.readOutbound())
|
||||
ranBlock = true
|
||||
}
|
||||
try f.wait()
|
||||
|
|
Loading…
Reference in New Issue