don't blow up when holding onto a ChannelPipeline of a deinitialised Channel
This commit is contained in:
parent
ebf351abda
commit
11a609396d
|
@ -121,7 +121,7 @@ extension sockaddr_storage {
|
|||
}
|
||||
}
|
||||
|
||||
class BaseSocket : Selectable {
|
||||
class BaseSocket: Selectable {
|
||||
public let descriptor: Int32
|
||||
public private(set) var open: Bool
|
||||
|
||||
|
|
|
@ -190,9 +190,10 @@ public final class ServerBootstrap {
|
|||
|
||||
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
|
||||
let accepted = self.unwrapInboundIn(data)
|
||||
let eventLoop = ctx.channel.eventLoop
|
||||
self.childChannelOptions.applyAll(channel: accepted).whenComplete { v in
|
||||
// We must return to the server channel.
|
||||
ctx.channel.eventLoop.execute {
|
||||
eventLoop.execute {
|
||||
switch v {
|
||||
case .failure(let err):
|
||||
self.closeAndFire(ctx: ctx, accepted: accepted, err: err)
|
||||
|
|
|
@ -219,4 +219,3 @@ public enum ChannelEvent: Equatable {
|
|||
/// Output portion of the `Channel` was closed.
|
||||
case outputClosed
|
||||
}
|
||||
|
||||
|
|
|
@ -141,12 +141,18 @@ public final class ChannelPipeline : ChannelInvoker {
|
|||
internal private(set) var destroyed: Bool = false
|
||||
|
||||
/// The `EventLoop` that is used by the underlying `Channel`.
|
||||
public var eventLoop: EventLoop {
|
||||
return channel.eventLoop
|
||||
}
|
||||
public let eventLoop: EventLoop
|
||||
|
||||
/// The `Channel` that this `ChannelPipeline` belongs to.
|
||||
public unowned let channel: Channel
|
||||
///
|
||||
/// - warning: This is unsafe as it's only valid if the `Channel` is still open
|
||||
private unowned let _channel: Channel
|
||||
|
||||
/// The `Channel` that this `ChannelPipeline` belongs to.
|
||||
internal var channel: Channel {
|
||||
assert(self.eventLoop.inEventLoop)
|
||||
return !self.destroyed ? self._channel : DeadChannel(pipeline: self)
|
||||
}
|
||||
|
||||
/// Add a `ChannelHandler` to the `ChannelPipeline`.
|
||||
///
|
||||
|
@ -662,7 +668,8 @@ public final class ChannelPipeline : ChannelInvoker {
|
|||
|
||||
// Only executed from Channel
|
||||
init (channel: Channel) {
|
||||
self.channel = channel
|
||||
self._channel = channel
|
||||
self.eventLoop = channel.eventLoop
|
||||
|
||||
self.head = ChannelHandlerContext(name: "head", handler: HeadChannelHandler.sharedInstance, pipeline: self)
|
||||
self.tail = ChannelHandlerContext(name: "tail", handler: TailChannelHandler.sharedInstance, pipeline: self)
|
||||
|
@ -797,7 +804,7 @@ public final class ChannelHandlerContext : ChannelInvoker {
|
|||
public let pipeline: ChannelPipeline
|
||||
|
||||
public var channel: Channel {
|
||||
return pipeline.channel
|
||||
return self.pipeline.channel
|
||||
}
|
||||
|
||||
public var handler: ChannelHandler {
|
||||
|
@ -1276,7 +1283,7 @@ public final class ChannelHandlerContext : ChannelInvoker {
|
|||
|
||||
extension ChannelPipeline: CustomDebugStringConvertible {
|
||||
public var debugDescription: String {
|
||||
var desc = "ChannelPipeline:\n"
|
||||
var desc = "ChannelPipeline (\(ObjectIdentifier(self))):\n"
|
||||
var node = self.head
|
||||
while let ctx = node {
|
||||
let inboundStr = ctx.handler is _ChannelInboundHandler ? "I" : ""
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
//===----------------------------------------------------------------------===//
|
||||
//
|
||||
// This source file is part of the SwiftNIO open source project
|
||||
//
|
||||
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
|
||||
// Licensed under Apache License v2.0
|
||||
//
|
||||
// See LICENSE.txt for license information
|
||||
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
//===----------------------------------------------------------------------===//
|
||||
|
||||
/// A `DeadChannelCore` is a `ChannelCore` for a `DeadChannel`. A `DeadChannel` is used as a replacement `Channel` when
|
||||
/// the original `Channel` is closed. Given that the original `Channel` is closed the `DeadChannelCore` should fail
|
||||
/// all operations.
|
||||
private final class DeadChannelCore: ChannelCore {
|
||||
func register0(promise: EventLoopPromise<Void>?) {
|
||||
promise?.fail(error: ChannelError.ioOnClosedChannel)
|
||||
}
|
||||
|
||||
func bind0(to: SocketAddress, promise: EventLoopPromise<Void>?) {
|
||||
promise?.fail(error: ChannelError.ioOnClosedChannel)
|
||||
}
|
||||
|
||||
func connect0(to: SocketAddress, promise: EventLoopPromise<Void>?) {
|
||||
promise?.fail(error: ChannelError.ioOnClosedChannel)
|
||||
}
|
||||
|
||||
func write0(data: NIOAny, promise: EventLoopPromise<Void>?) {
|
||||
promise?.fail(error: ChannelError.ioOnClosedChannel)
|
||||
}
|
||||
|
||||
func flush0(promise: EventLoopPromise<Void>?) {
|
||||
promise?.fail(error: ChannelError.ioOnClosedChannel)
|
||||
}
|
||||
|
||||
func read0(promise: EventLoopPromise<Void>?) {
|
||||
promise?.fail(error: ChannelError.ioOnClosedChannel)
|
||||
}
|
||||
|
||||
func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
|
||||
promise?.fail(error: ChannelError.alreadyClosed)
|
||||
}
|
||||
|
||||
func triggerUserOutboundEvent0(event: Any, promise: EventLoopPromise<Void>?) {
|
||||
promise?.fail(error: ChannelError.ioOnClosedChannel)
|
||||
}
|
||||
|
||||
func channelRead0(data: NIOAny) {
|
||||
// a `DeadChannel` should never be in any running `ChannelPipeline` and therefore the `TailChannelHandler`
|
||||
// should never invoke this.
|
||||
fatalError("\(#function) called on DeadChannelCore")
|
||||
}
|
||||
|
||||
func errorCaught0(error: Error) {
|
||||
// a `DeadChannel` should never be in any running `ChannelPipeline` and therefore the `TailChannelHandler`
|
||||
// should never invoke this.
|
||||
fatalError("\(#function) called on DeadChannelCore")
|
||||
}
|
||||
}
|
||||
|
||||
/// This represents a `Channel` which is already closed and therefore all the operations do fail.
|
||||
/// A `ChannelPipeline` that is associated with a closed `Channel` must be careful to no longer use that original
|
||||
/// channel as it only holds an unowned reference to the original `Channel`. `DeadChannel` serves as a replacement
|
||||
/// that can be used when the original `Channel` might no longer be valid.
|
||||
internal final class DeadChannel: Channel {
|
||||
let pipeline: ChannelPipeline
|
||||
|
||||
var eventLoop: EventLoop {
|
||||
return self.pipeline.eventLoop
|
||||
}
|
||||
|
||||
internal init(pipeline: ChannelPipeline) {
|
||||
self.pipeline = pipeline
|
||||
}
|
||||
|
||||
var allocator: ByteBufferAllocator {
|
||||
return ByteBufferAllocator()
|
||||
}
|
||||
|
||||
var closeFuture: EventLoopFuture<Void> {
|
||||
return self.pipeline.eventLoop.newSucceedFuture(result: ())
|
||||
}
|
||||
|
||||
let localAddress: SocketAddress? = nil
|
||||
|
||||
let remoteAddress: SocketAddress? = nil
|
||||
|
||||
let parent: Channel? = nil
|
||||
|
||||
func setOption<T>(option: T, value: T.OptionType) -> EventLoopFuture<Void> where T : ChannelOption {
|
||||
return EventLoopFuture(eventLoop: self.pipeline.eventLoop, error: ChannelError.ioOnClosedChannel, file: #file, line: #line)
|
||||
}
|
||||
|
||||
func getOption<T>(option: T) throws -> T.OptionType where T : ChannelOption {
|
||||
throw ChannelError.ioOnClosedChannel
|
||||
}
|
||||
|
||||
let isWritable = false
|
||||
let isActive = false
|
||||
let _unsafe: ChannelCore = DeadChannelCore()
|
||||
}
|
|
@ -589,6 +589,14 @@ internal final class SelectableEventLoop : EventLoop {
|
|||
}
|
||||
}
|
||||
|
||||
extension SelectableEventLoop: CustomStringConvertible {
|
||||
var description: String {
|
||||
return self.tasksLock.withLock {
|
||||
return "SelectableEventLoop { selector = \(self.selector), scheduledTasks = \(self.scheduledTasks.description) }"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Provides an endless stream of `EventLoop`s to use.
|
||||
public protocol EventLoopGroup: class {
|
||||
|
|
|
@ -43,7 +43,6 @@ final class Selector<R: Registration> {
|
|||
private typealias EventType = kevent
|
||||
#endif
|
||||
|
||||
|
||||
private let fd: Int32
|
||||
private var eventsCapacity = 64
|
||||
private var events: UnsafeMutablePointer<EventType>
|
||||
|
@ -444,6 +443,12 @@ final class Selector<R: Registration> {
|
|||
}
|
||||
}
|
||||
|
||||
extension Selector: CustomStringConvertible {
|
||||
var description: String {
|
||||
return "Selector { descriptor = \(self.fd) }"
|
||||
}
|
||||
}
|
||||
|
||||
/// An event that is triggered once the `Selector` was able to select something.
|
||||
struct SelectorEvent<R> {
|
||||
public let registration: R
|
||||
|
|
|
@ -1070,3 +1070,15 @@ final class ServerSocketChannel : BaseSocketChannel<ServerSocket> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension SocketChannel: CustomStringConvertible {
|
||||
var description: String {
|
||||
return "SocketChannel { descriptor = \(self.selectable.descriptor), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
|
||||
}
|
||||
}
|
||||
|
||||
extension ServerSocketChannel: CustomStringConvertible {
|
||||
var description: String {
|
||||
return "ServerSocketChannel { descriptor = \(self.selectable.descriptor), localAddress = \(self.localAddress.debugDescription), remoteAddress = \(self.remoteAddress.debugDescription) }"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ extension ChannelTests {
|
|||
("testCloseInput", testCloseInput),
|
||||
("testHalfClosure", testHalfClosure),
|
||||
("testRejectsInvalidData", testRejectsInvalidData),
|
||||
("testWeDontCrashIfChannelReleasesBeforePipeline", testWeDontCrashIfChannelReleasesBeforePipeline),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1275,4 +1275,68 @@ public class ChannelTests: XCTestCase {
|
|||
XCTFail("Got \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
func testWeDontCrashIfChannelReleasesBeforePipeline() throws {
|
||||
final class StuffHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = Never
|
||||
|
||||
let promise: EventLoopPromise<ChannelPipeline>
|
||||
|
||||
init(promise: EventLoopPromise<ChannelPipeline>) {
|
||||
self.promise = promise
|
||||
}
|
||||
|
||||
func channelRegistered(ctx: ChannelHandlerContext) {
|
||||
self.promise.succeed(result: ctx.channel.pipeline)
|
||||
}
|
||||
}
|
||||
weak var weakClientChannel: Channel? = nil
|
||||
weak var weakServerChannel: Channel? = nil
|
||||
weak var weakServerChildChannel: Channel? = nil
|
||||
|
||||
let group = MultiThreadedEventLoopGroup(numThreads: 1)
|
||||
defer {
|
||||
XCTAssertNoThrow(try group.syncShutdownGracefully())
|
||||
}
|
||||
|
||||
let promise: EventLoopPromise<ChannelPipeline> = group.next().newPromise()
|
||||
|
||||
try {
|
||||
let serverChildChannelPromise: EventLoopPromise<Channel> = group.next().newPromise()
|
||||
let serverChannel = try ServerBootstrap(group: group)
|
||||
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
||||
.childChannelInitializer { channel in
|
||||
serverChildChannelPromise.succeed(result: channel)
|
||||
channel.close(promise: nil)
|
||||
return channel.eventLoop.newSucceedFuture(result: ())
|
||||
}
|
||||
.bind(to: "127.0.0.1", on: 0).wait()
|
||||
|
||||
let clientChannel = try ClientBootstrap(group: group)
|
||||
.channelInitializer {
|
||||
$0.pipeline.add(handler: StuffHandler(promise: promise))
|
||||
}
|
||||
.connect(to: serverChannel.localAddress!).wait()
|
||||
weakClientChannel = clientChannel
|
||||
weakServerChannel = serverChannel
|
||||
weakServerChildChannel = try serverChildChannelPromise.futureResult.wait()
|
||||
_ = try? clientChannel.close().wait()
|
||||
XCTAssertNoThrow(try serverChannel.close().wait())
|
||||
}()
|
||||
let pipeline = try promise.futureResult.wait()
|
||||
do {
|
||||
try pipeline.eventLoop.submit { () -> Channel in
|
||||
XCTAssertTrue(pipeline.channel is DeadChannel)
|
||||
return pipeline.channel
|
||||
}.wait().write(data: NIOAny(())).wait()
|
||||
XCTFail("shouldn't have been reached")
|
||||
} catch let e as ChannelError where e == .ioOnClosedChannel {
|
||||
// OK
|
||||
} catch {
|
||||
XCTFail("wrong error \(error) received")
|
||||
}
|
||||
XCTAssertNil(weakClientChannel, "weakClientChannel not nil, looks like we leaked it!")
|
||||
XCTAssertNil(weakServerChannel, "weakServerChannel not nil, looks like we leaked it!")
|
||||
XCTAssertNil(weakServerChildChannel, "weakServerChildChannel not nil, looks like we leaked it!")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue