Use the pre-succeeded void future when possible for async pipeline operations (#1766)

Motivation:

We added synchronous pipeline operations to allow the caller to save
allocations when they know they are already on the correct event loop.
However, we missed a trick! In some cases the caller cannot guarantee
they are on the correct event loop and must use an asynchronous method
instead. If that method returns a void future and is called on the event
loop, then we can perform the operation synchronously and return a
cached void future.

Modifications:

- Add API to `EventLoop` for creating a 'completed' future with a
  `Result` (similar to `EventLoopPromise.completeWith`)
- Add an equivalent for making completed void futures
- Use these when asynchronously adding handlers and the caller is
  already on the right event loop.

Result:

- Fewer allocations on the happiest of happy paths when adding handlers
  asynchronously to a pipeline.
This commit is contained in:
George Barnett 2021-02-25 13:08:00 +00:00 committed by GitHub
parent 9b230f89a4
commit 178636d999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 96 additions and 50 deletions

View File

@ -170,16 +170,17 @@ public final class ChannelPipeline: ChannelInvoker {
public func addHandler(_ handler: ChannelHandler,
name: String? = nil,
position: ChannelPipeline.Position = .last) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
let future: EventLoopFuture<Void>
if self.eventLoop.inEventLoop {
promise.completeWith(self.addHandlerSync(handler, name: name, position: position))
future = self.eventLoop.makeCompletedFuture(self.addHandlerSync(handler, name: name, position: position))
} else {
self.eventLoop.execute {
promise.completeWith(self.addHandlerSync(handler, name: name, position: position))
future = self.eventLoop.submit {
try self.addHandlerSync(handler, name: name, position: position).get()
}
}
return promise.futureResult
return future
}
/// Synchronously add a `ChannelHandler` to the `ChannelPipeline`.
@ -934,17 +935,17 @@ extension ChannelPipeline {
/// - returns: A future that will be completed when all of the supplied `ChannelHandler`s were added.
public func addHandlers(_ handlers: [ChannelHandler],
position: ChannelPipeline.Position = .last) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
let future: EventLoopFuture<Void>
if self.eventLoop.inEventLoop {
promise.completeWith(self.addHandlersSync(handlers, position: position))
future = self.eventLoop.makeCompletedFuture(self.addHandlersSync(handlers, position: position))
} else {
self.eventLoop.execute {
promise.completeWith(self.addHandlersSync(handlers, position: position))
future = self.eventLoop.submit {
try self.addHandlersSync(handlers, position: position).get()
}
}
return promise.futureResult
return future
}
/// Adds the provided channel handlers to the pipeline in the order given, taking account
@ -1717,11 +1718,11 @@ extension ChannelPipeline: CustomDebugStringConvertible {
let maxOutgoingTypeNameCount = debugInfos.filter { $0.isOutgoing }
.map { $0.typeName.count }
.max() ?? 0
func whitespace(count: Int) -> String {
return String(repeating: " ", count: count)
}
if debugInfos.isEmpty {
desc.append(" <no handlers>")
} else {
@ -1747,10 +1748,10 @@ extension ChannelPipeline: CustomDebugStringConvertible {
desc.append(line.joined())
}
}
return desc.joined(separator: "\n")
}
/// Returns the first `ChannelHandler` of the given type.
///
/// - parameters:
@ -1761,7 +1762,7 @@ extension ChannelPipeline: CustomDebugStringConvertible {
guard let typedContext = context.handler as? Handler else {
preconditionFailure("Expected channel handler of type \(Handler.self), got \(type(of: context.handler)) instead.")
}
return typedContext
}
}
@ -1780,7 +1781,7 @@ extension ChannelPipeline: CustomDebugStringConvertible {
return typedContext
}
}
private struct ChannelHandlerDebugInfo {
let handler: ChannelHandler
let name: String
@ -1794,7 +1795,7 @@ extension ChannelPipeline: CustomDebugStringConvertible {
return "\(type(of: self.handler))"
}
}
private func collectHandlerDebugInfos() -> [ChannelHandlerDebugInfo] {
var handlers = [ChannelHandlerDebugInfo]()
var node = self.head?.next

View File

@ -262,7 +262,7 @@ public protocol EventLoop: EventLoopGroup {
/// Asserts that the current thread is the one tied to this `EventLoop`.
/// Otherwise, the process will be abnormally terminated as per the semantics of `preconditionFailure(_:file:line:)`.
func preconditionInEventLoop(file: StaticString, line: UInt)
/// Asserts that the current thread is _not_ the one tied to this `EventLoop`.
/// Otherwise, the process will be abnormally terminated as per the semantics of `preconditionFailure(_:file:line:)`.
func preconditionNotInEventLoop(file: StaticString, line: UInt)
@ -372,7 +372,7 @@ extension TimeAmount: AdditiveArithmetic {
public static func + (lhs: TimeAmount, rhs: TimeAmount) -> TimeAmount {
return TimeAmount(lhs.nanoseconds + rhs.nanoseconds)
}
public static func +=(lhs: inout TimeAmount, rhs: TimeAmount) {
lhs = lhs + rhs
}
@ -380,7 +380,7 @@ extension TimeAmount: AdditiveArithmetic {
public static func - (lhs: TimeAmount, rhs: TimeAmount) -> TimeAmount {
return TimeAmount(lhs.nanoseconds - rhs.nanoseconds)
}
public static func -=(lhs: inout TimeAmount, rhs: TimeAmount) {
lhs = lhs - rhs
}
@ -551,7 +551,7 @@ extension EventLoop {
_ task: @escaping () throws -> EventLoopFuture<T>) -> Scheduled<T> {
let promise: EventLoopPromise<T> = self.makePromise(file:#file, line: line)
let scheduled = self.scheduleTask(deadline: deadline, task)
scheduled.futureResult.flatMap { $0 }.cascade(to: promise)
return .init(promise: promise, cancellationTask: { scheduled.cancel() })
}
@ -572,7 +572,7 @@ extension EventLoop {
_ task: @escaping () throws -> EventLoopFuture<T>) -> Scheduled<T> {
let promise: EventLoopPromise<T> = self.makePromise(file: file, line: line)
let scheduled = self.scheduleTask(in: delay, task)
scheduled.futureResult.flatMap { $0 }.cascade(to: promise)
return .init(promise: promise, cancellationTask: { scheduled.cancel() })
}
@ -608,6 +608,21 @@ extension EventLoop {
}
}
/// Creates and returns a new `EventLoopFuture` that is marked as succeeded or failed with the value held by `result`.
///
/// - Parameters:
/// - result: The value that is used by the `EventLoopFuture`
/// - Returns: A completed `EventLoopFuture`.
@inlinable
public func makeCompletedFuture<Success>(_ result: Result<Success, Error>) -> EventLoopFuture<Success> {
switch result {
case .success(let value):
return self.makeSucceededFuture(value)
case .failure(let error):
return self.makeFailedFuture(error)
}
}
/// An `EventLoop` forms a singular `EventLoopGroup`, returning itself as the 'next' `EventLoop`.
///
/// - returns: Itself, because an `EventLoop` forms a singular `EventLoopGroup`.
@ -922,7 +937,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers, selectorFactory: selectorFactory)
}
/// Creates a `MultiThreadedEventLoopGroup` instance which uses the given `ThreadInitializer`s. One `NIOThread` per `ThreadInitializer` is created and used.
///
/// - arguments:

View File

@ -82,6 +82,8 @@ extension EventLoopTest {
("testEventLoopsWithPreSucceededFuturesCacheThem", testEventLoopsWithPreSucceededFuturesCacheThem),
("testEventLoopsWithoutPreSucceededFuturesDoNotCacheThem", testEventLoopsWithoutPreSucceededFuturesDoNotCacheThem),
("testSelectableEventLoopHasPreSucceededFuturesOnlyOnTheEventLoop", testSelectableEventLoopHasPreSucceededFuturesOnlyOnTheEventLoop),
("testMakeCompletedFuture", testMakeCompletedFuture),
("testMakeCompletedVoidFuture", testMakeCompletedVoidFuture),
]
}
}

View File

@ -25,14 +25,14 @@ public final class EventLoopTest : XCTestCase {
var result: Bool?
scheduled.futureResult.whenSuccess { result = $0 }
eventLoop.run() // run without time advancing should do nothing
XCTAssertFalse(scheduled.futureResult.isFulfilled)
XCTAssertNil(result)
eventLoop.advanceTime(by: .seconds(1)) // should fire now
XCTAssertTrue(scheduled.futureResult.isFulfilled)
XCTAssertNotNil(result)
XCTAssertTrue(result == true)
}
@ -46,14 +46,14 @@ public final class EventLoopTest : XCTestCase {
var result: Bool?
scheduled.futureResult.whenSuccess { result = $0 }
eventLoop.run() // run without time advancing should do nothing
XCTAssertFalse(scheduled.futureResult.isFulfilled)
XCTAssertNil(result)
eventLoop.advanceTime(by: .seconds(1)) // should fire now
XCTAssertTrue(scheduled.futureResult.isFulfilled)
XCTAssertNotNil(result)
XCTAssertTrue(result == true)
}
@ -102,7 +102,7 @@ public final class EventLoopTest : XCTestCase {
var error: Error?
scheduled.futureResult.whenSuccess { result = $0 }
scheduled.futureResult.whenFailure { error = $0 }
eventLoop.advanceTime(by: .milliseconds(500)) // advance halfway to firing time
scheduled.cancel()
eventLoop.advanceTime(by: .milliseconds(500)) // advance the rest of the way
@ -123,7 +123,7 @@ public final class EventLoopTest : XCTestCase {
var error: Error?
scheduled.futureResult.whenSuccess { result = $0 }
scheduled.futureResult.whenFailure { error = $0 }
eventLoop.advanceTime(by: .milliseconds(500)) // advance halfway to firing time
scheduled.cancel()
eventLoop.advanceTime(by: .milliseconds(500)) // advance the rest of the way
@ -173,7 +173,7 @@ public final class EventLoopTest : XCTestCase {
var error: Error?
scheduled.futureResult.whenSuccess { result = $0 }
scheduled.futureResult.whenFailure { error = $0 }
scheduled.cancel()
eventLoop.advanceTime(by: .seconds(1))
@ -192,7 +192,7 @@ public final class EventLoopTest : XCTestCase {
var error: Error?
scheduled.futureResult.whenSuccess { result = $0 }
scheduled.futureResult.whenFailure { error = $0 }
scheduled.cancel()
eventLoop.advanceTime(by: .seconds(1))
@ -706,7 +706,7 @@ public final class EventLoopTest : XCTestCase {
XCTAssertTrue(result.isEmpty)
}
public func testRepeatedTaskThatIsImmediatelyCancelledNotifies() throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
@ -1309,7 +1309,7 @@ public final class EventLoopTest : XCTestCase {
var error: Error?
scheduled.futureResult.whenSuccess { result = $0 }
scheduled.futureResult.whenFailure { error = $0 }
scheduled.cancel()
XCTAssertTrue(scheduled.futureResult.isFulfilled)
@ -1366,6 +1366,34 @@ public final class EventLoopTest : XCTestCase {
XCTAssert(futureInside1 === futureInside2)
}.wait())
}
func testMakeCompletedFuture() {
let eventLoop = EmbeddedEventLoop()
defer {
XCTAssertNoThrow(try eventLoop.syncShutdownGracefully())
}
XCTAssertEqual(try eventLoop.makeCompletedFuture(.success("foo")).wait(), "foo")
struct DummyError: Error {}
let future = eventLoop.makeCompletedFuture(Result<String, Error>.failure(DummyError()))
XCTAssertThrowsError(try future.wait()) { error in
XCTAssertTrue(error is DummyError)
}
}
func testMakeCompletedVoidFuture() {
let eventLoop = EventLoopWithPreSucceededFuture()
defer {
XCTAssertNoThrow(try eventLoop.syncShutdownGracefully())
}
let future1 = eventLoop.makeCompletedFuture(.success(()))
let future2 = eventLoop.makeSucceededVoidFuture()
let future3 = eventLoop.makeSucceededFuture(())
XCTAssert(future1 === future2)
XCTAssert(future2 === future3)
}
}
fileprivate class EventLoopWithPreSucceededFuture: EventLoop {

View File

@ -26,11 +26,11 @@ services:
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=50
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30540
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=4100
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=180010
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=178010
- MAX_ALLOCS_ALLOWED_1000_udp_reqs=16050
- MAX_ALLOCS_ALLOWED_1000_udpbootstraps=2000
- MAX_ALLOCS_ALLOWED_1000_udpconnections=102050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=476050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=471050
- MAX_ALLOCS_ALLOWED_bytebuffer_lots_of_rw=2100
- MAX_ALLOCS_ALLOWED_creating_10000_headers=100 # 5.2 improvement 10000
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=2000
@ -46,7 +46,7 @@ services:
- MAX_ALLOCS_ALLOWED_schedule_10000_tasks=90050
- MAX_ALLOCS_ALLOWED_scheduling_10000_executions=20150
- MAX_ALLOCS_ALLOWED_udp_1000_reqs_1_conn=16250
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=202050
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=200050
- SANITIZER_ARG=--sanitize=thread
- INTEGRATION_TESTS_ARG=-f tests_0[013-9]

View File

@ -26,11 +26,11 @@ services:
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=50
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30540
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=4100
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=179010
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=177010
- MAX_ALLOCS_ALLOWED_1000_udp_reqs=16050
- MAX_ALLOCS_ALLOWED_1000_udpbootstraps=2000
- MAX_ALLOCS_ALLOWED_1000_udpconnections=101050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=471050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=466050
- MAX_ALLOCS_ALLOWED_bytebuffer_lots_of_rw=2100
- MAX_ALLOCS_ALLOWED_creating_10000_headers=100
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=2000
@ -46,7 +46,7 @@ services:
- MAX_ALLOCS_ALLOWED_schedule_10000_tasks=90050
- MAX_ALLOCS_ALLOWED_scheduling_10000_executions=20150
- MAX_ALLOCS_ALLOWED_udp_1000_reqs_1_conn=16250
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=202050
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=200050
- SANITIZER_ARG=--sanitize=thread
performance-test:

View File

@ -26,11 +26,11 @@ services:
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=50
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=31990
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=3100
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=188050
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=186050
- MAX_ALLOCS_ALLOWED_1000_udp_reqs=18050
- MAX_ALLOCS_ALLOWED_1000_udpbootstraps=2000
- MAX_ALLOCS_ALLOWED_1000_udpconnections=107050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=948050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=943050
- MAX_ALLOCS_ALLOWED_bytebuffer_lots_of_rw=2100
- MAX_ALLOCS_ALLOWED_creating_10000_headers=10100
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=2000
@ -46,7 +46,7 @@ services:
- MAX_ALLOCS_ALLOWED_schedule_10000_tasks=90050
- MAX_ALLOCS_ALLOWED_scheduling_10000_executions=20150
- MAX_ALLOCS_ALLOWED_udp_1000_reqs_1_conn=18250
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=213050
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=211050
performance-test:
image: swift-nio:18.04-5.0

View File

@ -26,11 +26,11 @@ services:
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=50
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30540
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=3100
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=180050
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=178050
- MAX_ALLOCS_ALLOWED_1000_udp_reqs=16050
- MAX_ALLOCS_ALLOWED_1000_udpbootstraps=2000
- MAX_ALLOCS_ALLOWED_1000_udpconnections=102050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=473050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=468050
- MAX_ALLOCS_ALLOWED_bytebuffer_lots_of_rw=2100
- MAX_ALLOCS_ALLOWED_creating_10000_headers=10100
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=2000
@ -46,7 +46,7 @@ services:
- MAX_ALLOCS_ALLOWED_schedule_10000_tasks=90050
- MAX_ALLOCS_ALLOWED_scheduling_10000_executions=20150
- MAX_ALLOCS_ALLOWED_udp_1000_reqs_1_conn=16250
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=202050
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=200050
performance-test:
image: swift-nio:18.04-5.1

View File

@ -26,11 +26,11 @@ services:
- MAX_ALLOCS_ALLOWED_1000_getHandlers_sync=50
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30540
- MAX_ALLOCS_ALLOWED_1000_tcpbootstraps=4100
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=181050 # regression from 5.3 which was 179010
- MAX_ALLOCS_ALLOWED_1000_tcpconnections=179050 # regression from 5.3 which was 177010
- MAX_ALLOCS_ALLOWED_1000_udpbootstraps=2000
- MAX_ALLOCS_ALLOWED_1000_udpconnections=102050 # regression from 5.3 which was 101050
- MAX_ALLOCS_ALLOWED_1000_udp_reqs=16050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=473050 # regression from 5.3 which was 471050
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=468050 # regression from 5.3 which was 466050
- MAX_ALLOCS_ALLOWED_bytebuffer_lots_of_rw=2100
- MAX_ALLOCS_ALLOWED_creating_10000_headers=100
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=2000
@ -46,7 +46,7 @@ services:
- MAX_ALLOCS_ALLOWED_schedule_10000_tasks=90050
- MAX_ALLOCS_ALLOWED_scheduling_10000_executions=20150
- MAX_ALLOCS_ALLOWED_udp_1000_reqs_1_conn=16250
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=204050 # regression from 5.3 which was 202050
- MAX_ALLOCS_ALLOWED_udp_1_reqs_1000_conn=202050 # regression from 5.3 which was 200050
# - SANITIZER_ARG=--sanitize=thread # TSan broken still
performance-test: