Port IdleStateHandler from Netty

This commit is contained in:
Norman Maurer 2017-10-29 09:46:48 +01:00
parent 6173bee782
commit 9c7829a065
5 changed files with 338 additions and 0 deletions

View File

@ -104,3 +104,182 @@ public class ChannelInitializer: _ChannelInboundHandler {
}
}
}
/// Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
public class IdleStateHandler : ChannelInboundHandler, ChannelOutboundHandler {
public typealias InboundIn = NIOAny
public typealias InboundOut = NIOAny
public typealias OutboundIn = NIOAny
public typealias OutboundOut = NIOAny
enum IdleStateEvent {
/// Will be triggered when no write was performed for the specified period of time
case write
/// Will be triggered when no read was performed for the specified period of time
case read
/// Will be triggered when neither read nor write was performed for the specified period of time
case all
}
public let readTimeout: TimeAmount?
public let writeTimeout: TimeAmount?
public let allTimeout: TimeAmount?
private var reading = false
private var lastReadTime: TimeAmount?
private var lastWriteCompleteTime: TimeAmount?
private var scheduledReaderTask: Scheduled<Void>?
private var scheduledWriterTask: Scheduled<Void>?
private var scheduledAllTask: Scheduled<Void>?
public init(readTimeout: TimeAmount? = nil, writeTimeout: TimeAmount? = nil, allTimeout: TimeAmount? = nil) {
self.readTimeout = readTimeout
self.writeTimeout = writeTimeout
self.allTimeout = allTimeout;
}
public func handlerAdded(ctx: ChannelHandlerContext) {
if let channel = ctx.channel, channel.isActive {
initIdleTasks(ctx)
}
}
public func handlerRemoved(ctx: ChannelHandlerContext) {
cancelIdleTasks(ctx)
}
public func channelActive(ctx: ChannelHandlerContext) {
initIdleTasks(ctx)
}
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
if readTimeout != nil || allTimeout != nil {
reading = true
}
ctx.fireChannelRead(data: data)
}
public func channelReadComplete(ctx: ChannelHandlerContext) {
if (readTimeout != nil || allTimeout != nil) && reading {
lastReadTime = TimeAmount.now()
reading = false
}
ctx.fireChannelReadComplete()
}
public func write(ctx: ChannelHandlerContext, data: NIOAny, promise: Promise<Void>?) {
guard writeTimeout != nil || allTimeout != nil else {
ctx.write(data: data, promise: promise)
return
}
let writePromise = promise ?? ctx.eventLoop.newPromise()
writePromise.futureResult.whenComplete { _ in
self.lastWriteCompleteTime = TimeAmount.now()
}
ctx.write(data: data, promise: writePromise)
}
private func shouldReschedule(_ ctx: ChannelHandlerContext) -> Bool {
if let channel = ctx.channel, channel.isActive {
return true
}
return false
}
private func newReadTimeoutTask(_ ctx: ChannelHandlerContext, _ timeout: TimeAmount) -> () -> () {
return {
guard self.shouldReschedule(ctx) else {
return
}
guard !self.reading else {
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: timeout, self.newReadTimeoutTask(ctx, timeout))
return
}
let diff = TimeAmount.now().nanoseconds - (self.lastReadTime?.nanoseconds ?? 0)
if diff >= timeout.nanoseconds {
// Reader is idle - set a new timeout and trigger an event through the pipeline
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: timeout, self.newReadTimeoutTask(ctx, timeout))
ctx.fireUserInboundEventTriggered(event: IdleStateEvent.read)
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: .nanoseconds(timeout.nanoseconds - diff), self.newReadTimeoutTask(ctx, timeout))
}
}
}
private func newWriteTimeoutTask(_ ctx: ChannelHandlerContext, _ timeout: TimeAmount) -> () -> () {
return {
guard self.shouldReschedule(ctx) else {
return
}
let lastWriteTime = self.lastWriteCompleteTime?.nanoseconds ?? 0
let diff = TimeAmount.now().nanoseconds - lastWriteTime
if diff >= timeout.nanoseconds {
// Writer is idle - set a new timeout and notify the callback.
self.scheduledWriterTask = ctx.eventLoop.scheduleTask(in: timeout, self.newWriteTimeoutTask(ctx, timeout))
ctx.fireUserInboundEventTriggered(event: IdleStateEvent.write)
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
self.scheduledWriterTask = ctx.eventLoop.scheduleTask(in: .nanoseconds(timeout.nanoseconds - diff), self.newWriteTimeoutTask(ctx, timeout))
}
}
}
private func newAllTimeoutTask(_ ctx: ChannelHandlerContext, _ timeout: TimeAmount) -> () -> () {
return {
guard self.shouldReschedule(ctx) else {
return
}
guard !self.reading else {
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: timeout, self.newAllTimeoutTask(ctx, timeout))
return
}
let lastRead = self.lastReadTime?.nanoseconds ?? 0
let lastWrite = self.lastWriteCompleteTime?.nanoseconds ?? 0
let diff = TimeAmount.now().nanoseconds - (lastRead > lastWrite ? lastRead : lastWrite)
if diff >= timeout.nanoseconds {
// Reader is idle - set a new timeout and trigger an event through the pipeline
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: timeout, self.newAllTimeoutTask(ctx, timeout))
ctx.fireUserInboundEventTriggered(event: IdleStateEvent.all)
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
self.scheduledReaderTask = ctx.eventLoop.scheduleTask(in: .nanoseconds(timeout.nanoseconds - diff), self.newAllTimeoutTask(ctx, timeout))
}
}
}
private func schedule(_ ctx: ChannelHandlerContext, _ amount: TimeAmount?, _ fn: @escaping (ChannelHandlerContext, TimeAmount) -> () -> ()) -> Scheduled<Void>? {
if let timeout = amount {
return ctx.eventLoop.scheduleTask(in: timeout, fn(ctx, timeout))
}
return nil;
}
private func initIdleTasks(_ ctx: ChannelHandlerContext) {
let now = TimeAmount.now()
lastReadTime = now
lastWriteCompleteTime = now
scheduledReaderTask = schedule(ctx, readTimeout, newReadTimeoutTask)
scheduledWriterTask = schedule(ctx, writeTimeout, newWriteTimeoutTask)
scheduledAllTask = schedule(ctx, allTimeout, newAllTimeoutTask)
}
private func cancelIdleTasks(_ ctx: ChannelHandlerContext) {
scheduledReaderTask?.cancel()
scheduledWriterTask?.cancel()
scheduledAllTask?.cancel()
scheduledReaderTask = nil
scheduledWriterTask = nil
scheduledAllTask = nil
}
}

View File

@ -88,6 +88,10 @@ public struct TimeAmount {
public static func hours(_ amount: Int) -> TimeAmount {
return TimeAmount(UInt64(amount) * 1000 * 1000 * 1000 * 60 * 60)
}
public static func now() -> TimeAmount {
return nanoseconds(DispatchTime.now().uptimeNanoseconds)
}
}
extension TimeAmount: Comparable {

View File

@ -45,6 +45,7 @@ import XCTest
testCase(HTTPServerClientTest.allTests),
testCase(HTTPTest.allTests),
testCase(HTTPUpgradeTestCase.allTests),
testCase(IdleStateHandlerTest.allTests),
testCase(MarkedCircularBufferTests.allTests),
testCase(MessageToByteEncoderTest.allTests),
testCase(OpenSSLIntegrationTest.allTests),

View File

@ -0,0 +1,36 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
///
/// IdleStateHandlerTest+XCTest.swift
///
import XCTest
///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///
extension IdleStateHandlerTest {
static var allTests : [(String, (IdleStateHandlerTest) -> () throws -> Void)] {
return [
("testIdleRead", testIdleRead),
("testIdleWrite", testIdleWrite),
("testIdleAllWrite", testIdleAllWrite),
("testIdleAllRead", testIdleAllRead),
]
}
}

View File

@ -0,0 +1,118 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import XCTest
@testable import NIO
class IdleStateHandlerTest : XCTestCase {
func testIdleRead() throws {
try testIdle(IdleStateHandler(readTimeout: .seconds(1)), false, { v in
if case IdleStateHandler.IdleStateEvent.read = v {
return true
} else {
return false
}
})
}
func testIdleWrite() throws {
try testIdle(IdleStateHandler(writeTimeout: .seconds(1)), true, { v in
if case IdleStateHandler.IdleStateEvent.write = v {
return true
} else {
return false
}
})
}
func testIdleAllWrite() throws {
try testIdle(IdleStateHandler(allTimeout: .seconds(1)), true, { v in
if case IdleStateHandler.IdleStateEvent.all = v {
return true
} else {
return false
}
})
}
func testIdleAllRead() throws {
try testIdle(IdleStateHandler(allTimeout: .seconds(1)), false, { v in
if case IdleStateHandler.IdleStateEvent.all = v {
return true
} else {
return false
}
})
}
private func testIdle(_ handler: IdleStateHandler, _ writeToChannel: Bool, _ assertEventFn: @escaping (Any) -> Bool) throws {
let group = try MultiThreadedEventLoopGroup(numThreads: 1)
defer {
try! group.syncShutdownGracefully()
}
class TestWriteHandler: _ChannelInboundHandler {
private var read = false
private let writeToChannel: Bool
private let assertEventFn: (Any) -> Bool
init(_ writeToChannel: Bool, _ assertEventFn: @escaping (Any) -> Bool) {
self.writeToChannel = writeToChannel
self.assertEventFn = assertEventFn
}
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
self.read = true
}
public func userInboundEventTriggered(ctx: ChannelHandlerContext, event: Any) {
if !self.writeToChannel {
XCTAssertTrue(self.read)
}
XCTAssertTrue(assertEventFn(event))
ctx.close(promise: nil)
}
public func channelActive(ctx: ChannelHandlerContext) {
if writeToChannel {
var buffer = ctx.channel!.allocator.buffer(capacity: 4)
buffer.write(staticString: "test")
ctx.writeAndFlush(data: NIOAny(buffer), promise: nil)
}
}
}
let serverChannel = try ServerBootstrap(group: group)
.option(option: ChannelOptions.Socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.handler(childHandler: ChannelInitializer(initChannel: { channel in
channel.pipeline.add(handler: handler).then(callback: { f in
channel.pipeline.add(handler: TestWriteHandler(writeToChannel, assertEventFn))
})
})).bind(to: "127.0.0.1", on: 0).wait()
defer {
_ = serverChannel.close()
}
let clientChannel = try ClientBootstrap(group: group).connect(to: serverChannel.localAddress!).wait()
if !writeToChannel {
var buffer = clientChannel.allocator.buffer(capacity: 4)
buffer.write(staticString: "test")
try clientChannel.writeAndFlush(data: NIOAny(buffer)).wait()
}
try clientChannel.closeFuture.wait()
}
}