Provide a convenience API for dispatching blocking work (#1563) (#1662)

Motivation:

SwiftNIO lacks a convenience API for performing blocking IO / tasks. As
this is a fairly common task it then requires the clients to make ad hoc
implementations that address this requirement.

Modifications:

Extension to DispatchQueue with the following method to schedule a work
item to the `DispatchQueue` and return and `EventLoopFuture` for the
result returned:

- `asyncWithFuture<NewValue>(eventLoop: EventLoop, _ callbackMayBlock: @escaping () throws -> NewValue) -> EventLoopFuture<NewValue>`

Added new unit tests for this function both when the promise succeeds
and fails.

Extention to EventLoopFuture with the following public functions:

- `flatMapBlocking<NewValue)(onto queue DispatchQueue, _ callbackMayBlock: @escpaing (Value) throws -> NewValue) -> EventLoopFuture<NewValue>`
- `whenSuccessBlocking(onto queue DispatchQueue, _ callbackMayBlock: @escaping (Value) -> Void) -> EventLoopFuture<NewValue>`
- `whenFailureBlocking()onto queue DispatchQueue, _ callbackMayBlock: @escaping (Error) -> Void) -> EventLoopFuture<NewValue>`
- `whenCompleteBlocking(onto queue DispatchQueue, _ callbackMayBlock: @escaping (Result<Value, Error>) -> Void) -> EventLoopFuture<NewValue>`

These functions may all be called safely with callbacks that perform blocking IO / Tasks.

Added new unit tests to EventLoopFutureTest.swift for each new function.

Result:

New public API for `EventLoopFuture` that allows scheduling of blocking IO / Tasks.
This commit is contained in:
Graeme Jenkinson 2020-09-30 17:04:21 +01:00 committed by GitHub
parent 7c42e5a45d
commit b5c1696033
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 346 additions and 2 deletions

View File

@ -0,0 +1,47 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Dispatch
extension DispatchQueue {
/// Schedules a work item for immediate execution and immediately returns with an `EventLoopFuture` providing the
/// result. For example:
///
/// let futureResult = DispatchQueue.main.asyncWithFuture(eventLoop: myEventLoop) { () -> String in
/// callbackMayBlock()
/// }
/// try let value = futureResult.wait()
///
/// - parameters:
/// - eventLoop: the `EventLoop` on which to proceses the IO / task specified by `callbackMayBlock`.
/// - callbackMayBlock: The scheduled callback for the IO / task.
/// - returns a new `EventLoopFuture<ReturnType>` with value returned by the `block` parameter.
@inlinable
public func asyncWithFuture<NewValue>(
eventLoop: EventLoop,
_ callbackMayBlock: @escaping () throws -> NewValue
) -> EventLoopFuture<NewValue> {
let promise = eventLoop.makePromise(of: NewValue.self)
self.async {
do {
let result = try callbackMayBlock()
promise.succeed(result)
} catch {
promise.fail(error)
}
}
return promise.futureResult
}
}

View File

@ -2,7 +2,7 @@
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Copyright (c) 2017-2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//
import NIOConcurrencyHelpers
import Dispatch
/// Internal list of callbacks.
///
@ -1402,3 +1403,78 @@ extension EventLoopFuture {
}
}
}
// MARK: may block
extension EventLoopFuture {
/// Chain an `EventLoopFuture<NewValue>` providing the result of a IO / task that may block. For example:
///
/// promise.futureResult.flatMapBlocking(onto: DispatchQueue.global()) { value in Int
/// blockingTask(value)
/// }
///
/// - parameters:
/// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled.
/// - callbackMayBlock: Function that will receive the value of this `EventLoopFuture` and return
/// a new `EventLoopFuture`.
@inlinable
public func flatMapBlocking<NewValue>(onto queue: DispatchQueue, _ callbackMayBlock: @escaping (Value) throws -> NewValue)
-> EventLoopFuture<NewValue> {
return self.flatMap { result in
queue.asyncWithFuture(eventLoop: self.eventLoop) { try callbackMayBlock(result) }
}
}
/// Adds an observer callback to this `EventLoopFuture` that is called when the
/// `EventLoopFuture` has a success result. The observer callback is permitted to block.
///
/// An observer callback cannot return a value, meaning that this function cannot be chained
/// from. If you are attempting to create a computation pipeline, consider `map` or `flatMap`.
/// If you find yourself passing the results from this `EventLoopFuture` to a new `EventLoopPromise`
/// in the body of this function, consider using `cascade` instead.
///
/// - parameters:
/// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled.
/// - callbackMayBlock: The callback that is called with the successful result of the `EventLoopFuture`.
@inlinable
public func whenSuccessBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping (Value) -> Void) {
self.whenSuccess { value in
queue.async { callbackMayBlock(value) }
}
}
/// Adds an observer callback to this `EventLoopFuture` that is called when the
/// `EventLoopFuture` has a failure result. The observer callback is permitted to block.
///
/// An observer callback cannot return a value, meaning that this function cannot be chained
/// from. If you are attempting to create a computation pipeline, consider `recover` or `flatMapError`.
/// If you find yourself passing the results from this `EventLoopFuture` to a new `EventLoopPromise`
/// in the body of this function, consider using `cascade` instead.
///
/// - parameters:
/// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled.
/// - callbackMayBlock: The callback that is called with the failed result of the `EventLoopFuture`.
@inlinable
public func whenFailureBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping (Error) -> Void) {
self.whenFailure { err in
queue.async { callbackMayBlock(err) }
}
}
/// Adds an observer callback to this `EventLoopFuture` that is called when the
/// `EventLoopFuture` has any result. The observer callback is permitted to block.
///
/// Unlike its friends `whenSuccess` and `whenFailure`, `whenComplete` does not receive the result
/// of the `EventLoopFuture`. This is because its primary purpose is to do the appropriate cleanup
/// of any resources that needed to be kept open until the `EventLoopFuture` had resolved.
///
/// - parameters:
/// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is schedulded.
/// - callbackMayBlock: The callback that is called when the `EventLoopFuture` is fulfilled.
@inlinable
public func whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping (Result<Value, Error>) -> Void) {
self.whenComplete { value in
queue.async { callbackMayBlock(value) }
}
}
}

View File

@ -62,6 +62,7 @@ class LinuxMainRunnerImpl: LinuxMainRunner {
testCase(ControlMessageTests.allTests),
testCase(CustomChannelTests.allTests),
testCase(DatagramChannelTests.allTests),
testCase(DispatchQueueWithFutureTest.allTests),
testCase(EchoServerClientTest.allTests),
testCase(EmbeddedChannelTest.allTests),
testCase(EmbeddedEventLoopTest.allTests),

View File

@ -0,0 +1,35 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// DispatchQueue+WithFutureTest+XCTest.swift
//
import XCTest
///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///
extension DispatchQueueWithFutureTest {
@available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings")
static var allTests : [(String, (DispatchQueueWithFutureTest) -> () throws -> Void)] {
return [
("testDispatchQueueAsyncWithFuture", testDispatchQueueAsyncWithFuture),
("testDispatchQueueAsyncWithFutureThrows", testDispatchQueueAsyncWithFutureThrows),
]
}
}

View File

@ -0,0 +1,69 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Dispatch
import NIO
import XCTest
enum DispatchQueueTestError: Error {
case example
}
class DispatchQueueWithFutureTest: XCTestCase {
func testDispatchQueueAsyncWithFuture() {
let eventLoop = EmbeddedEventLoop()
let sem = DispatchSemaphore(value: 0)
var nonBlockingRan = false
let futureResult: EventLoopFuture<String> = DispatchQueue.global().asyncWithFuture(eventLoop: eventLoop) {
() -> String in
sem.wait() // Block in callback
return "hello"
}
futureResult.whenSuccess { value in
XCTAssertEqual(value, "hello")
XCTAssertTrue(nonBlockingRan)
}
let p2 = eventLoop.makePromise(of: Bool.self)
p2.futureResult.whenSuccess { _ in
nonBlockingRan = true
}
p2.succeed(true)
sem.signal()
}
func testDispatchQueueAsyncWithFutureThrows() {
let eventLoop = EmbeddedEventLoop()
let sem = DispatchSemaphore(value: 0)
var nonBlockingRan = false
let futureResult: EventLoopFuture<String> = DispatchQueue.global().asyncWithFuture(eventLoop: eventLoop) {
() -> String in
sem.wait() // Block in callback
throw DispatchQueueTestError.example
}
futureResult.whenFailure { err in
XCTAssertEqual(err as! DispatchQueueTestError, DispatchQueueTestError.example)
XCTAssertTrue(nonBlockingRan)
}
let p2 = eventLoop.makePromise(of: Bool.self)
p2.futureResult.whenSuccess { _ in
nonBlockingRan = true
}
p2.succeed(true)
sem.signal()
}
}

View File

@ -90,6 +90,11 @@ extension EventLoopFutureTest {
("testEventLoopFutureOrReplacement", testEventLoopFutureOrReplacement),
("testEventLoopFutureOrNoElse", testEventLoopFutureOrNoElse),
("testEventLoopFutureOrElse", testEventLoopFutureOrElse),
("testFlatBlockingMapOnto", testFlatBlockingMapOnto),
("testWhenSuccessBlocking", testWhenSuccessBlocking),
("testWhenFailureBlocking", testWhenFailureBlocking),
("testWhenCompleteBlockingSuccess", testWhenCompleteBlockingSuccess),
("testWhenCompleteBlockingFailure", testWhenCompleteBlockingFailure),
]
}
}

View File

@ -2,7 +2,7 @@
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Copyright (c) 2017-2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
@ -1265,4 +1265,115 @@ class EventLoopFutureTest : XCTestCase {
XCTAssertEqual(try! promise.futureResult.unwrap(orElse: { x * 2 } ).wait(), 4)
}
func testFlatBlockingMapOnto() {
let eventLoop = EmbeddedEventLoop()
let p = eventLoop.makePromise(of: String.self)
let sem = DispatchSemaphore(value: 0)
var blockingRan = false
var nonBlockingRan = false
p.futureResult.map {
$0.count
}.flatMapBlocking(onto: DispatchQueue.global()) { value -> Int in
sem.wait() // Block in chained EventLoopFuture
blockingRan = true
return 1 + value
}.whenSuccess {
XCTAssertEqual($0, 6)
XCTAssertTrue(blockingRan)
XCTAssertTrue(nonBlockingRan)
}
p.succeed("hello")
let p2 = eventLoop.makePromise(of: Bool.self)
p2.futureResult.whenSuccess { _ in
nonBlockingRan = true
}
p2.succeed(true)
sem.signal()
}
func testWhenSuccessBlocking() {
let eventLoop = EmbeddedEventLoop()
let sem = DispatchSemaphore(value: 0)
var nonBlockingRan = false
let p = eventLoop.makePromise(of: String.self)
p.futureResult.whenSuccessBlocking(onto: DispatchQueue.global()) {
sem.wait() // Block in callback
XCTAssertEqual($0, "hello")
XCTAssertTrue(nonBlockingRan)
}
p.succeed("hello")
let p2 = eventLoop.makePromise(of: Bool.self)
p2.futureResult.whenSuccess { _ in
nonBlockingRan = true
}
p2.succeed(true)
sem.signal()
}
func testWhenFailureBlocking() {
let eventLoop = EmbeddedEventLoop()
let sem = DispatchSemaphore(value: 0)
var nonBlockingRan = false
let p = eventLoop.makePromise(of: String.self)
p.futureResult.whenFailureBlocking (onto: DispatchQueue.global()) { err in
sem.wait() // Block in callback
XCTAssertEqual(err as! EventLoopFutureTestError, EventLoopFutureTestError.example)
XCTAssertTrue(nonBlockingRan)
}
p.fail(EventLoopFutureTestError.example)
let p2 = eventLoop.makePromise(of: Bool.self)
p2.futureResult.whenSuccess { _ in
nonBlockingRan = true
}
p2.succeed(true)
sem.signal()
}
func testWhenCompleteBlockingSuccess() {
let eventLoop = EmbeddedEventLoop()
let sem = DispatchSemaphore(value: 0)
var nonBlockingRan = false
let p = eventLoop.makePromise(of: String.self)
p.futureResult.whenCompleteBlocking (onto: DispatchQueue.global()) { _ in
sem.wait() // Block in callback
XCTAssertTrue(nonBlockingRan)
}
p.succeed("hello")
let p2 = eventLoop.makePromise(of: Bool.self)
p2.futureResult.whenSuccess { _ in
nonBlockingRan = true
}
p2.succeed(true)
sem.signal()
}
func testWhenCompleteBlockingFailure() {
let eventLoop = EmbeddedEventLoop()
let sem = DispatchSemaphore(value: 0)
var nonBlockingRan = false
let p = eventLoop.makePromise(of: String.self)
p.futureResult.whenCompleteBlocking (onto: DispatchQueue.global()) { _ in
sem.wait() // Block in callback
XCTAssertTrue(nonBlockingRan)
}
p.fail(EventLoopFutureTestError.example)
let p2 = eventLoop.makePromise(of: Bool.self)
p2.futureResult.whenSuccess { _ in
nonBlockingRan = true
}
p2.succeed(true)
sem.signal()
}
}