Pipelines that write before they read must remain readable.
* Test that we can write and then read * Correctly unregister writability
This commit is contained in:
parent
6590bb45a6
commit
4c37eee3f8
|
@ -1045,6 +1045,17 @@ class BaseSocketChannel<T : BaseSocket> : SelectableChannel, ChannelCore {
|
|||
}
|
||||
}
|
||||
|
||||
private func unregisterForWritable() {
|
||||
switch interestedEvent {
|
||||
case .all:
|
||||
safeReregister(interested: .read)
|
||||
case .write:
|
||||
safeReregister(interested: .none)
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
public final func flush0(promise: Promise<Void>?) {
|
||||
assert(eventLoop.inEventLoop)
|
||||
|
||||
|
@ -1201,13 +1212,7 @@ class BaseSocketChannel<T : BaseSocket> : SelectableChannel, ChannelCore {
|
|||
return
|
||||
}
|
||||
|
||||
if readPending {
|
||||
// Start reading again
|
||||
safeReregister(interested: .read)
|
||||
} else {
|
||||
// No read pending so just set the interested event to .none
|
||||
safeReregister(interested: .none)
|
||||
}
|
||||
unregisterForWritable()
|
||||
}
|
||||
|
||||
public final func readable() {
|
||||
|
|
|
@ -82,6 +82,41 @@ class EchoServerClientTest : XCTestCase {
|
|||
handler.assertChannelActiveFired()
|
||||
}
|
||||
|
||||
func testWriteThenRead() throws {
|
||||
let group = try MultiThreadedEventLoopGroup(numThreads: 1)
|
||||
defer {
|
||||
_ = try? group.close()
|
||||
}
|
||||
|
||||
let serverChannel = try ServerBootstrap(group: group)
|
||||
.option(option: ChannelOptions.Socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
||||
.handler(childHandler: ChannelInitializer(initChannel: { channel in
|
||||
return channel.pipeline.add(handler: EchoServer())
|
||||
})).bind(to: "127.0.0.1", on: 0).wait()
|
||||
|
||||
defer {
|
||||
_ = serverChannel.close()
|
||||
}
|
||||
|
||||
let numBytes = 16 * 1024
|
||||
let countingHandler = ByteCountingHandler(numBytes: numBytes, promise: group.next().newPromise())
|
||||
let clientChannel = try ClientBootstrap(group: group)
|
||||
.handler(handler: countingHandler)
|
||||
.connect(to: serverChannel.localAddress!).wait()
|
||||
|
||||
defer {
|
||||
_ = clientChannel.close()
|
||||
}
|
||||
|
||||
var buffer = clientChannel.allocator.buffer(capacity: numBytes)
|
||||
for i in 0..<numBytes {
|
||||
buffer.write(integer: UInt8(i % 256))
|
||||
}
|
||||
try clientChannel.writeAndFlush(data: IOData(buffer)).wait()
|
||||
|
||||
try countingHandler.assertReceived(buffer: buffer)
|
||||
}
|
||||
|
||||
private final class ByteCountingHandler : ChannelInboundHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
|
||||
|
@ -132,4 +167,17 @@ class EchoServerClientTest : XCTestCase {
|
|||
XCTAssert(promise.futureResult.fulfilled)
|
||||
}
|
||||
}
|
||||
|
||||
private final class EchoServer: ChannelInboundHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias OutboundOut = ByteBuffer
|
||||
|
||||
func channelRead(ctx: ChannelHandlerContext, data: IOData) {
|
||||
ctx.write(data: data, promise: nil)
|
||||
}
|
||||
|
||||
func channelReadComplete(ctx: ChannelHandlerContext) {
|
||||
ctx.flush(promise: nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue