rename ELF.then to ELF.flatMap (#760)

Motivation:

ELF's API should be as close as possible to the new Result's API.
Therefore, we should rename `then` to `flatMap`

Modifications:

- renamed `then` to `flatMap`
- renamed `thenIfError` to `flatMapError`
- renamed ELF's generic parameter from `T` to `Value`

Result:

- more like Result
- fixes #688
This commit is contained in:
Johannes Weiss 2019-01-21 16:41:04 +00:00 committed by Cory Benfield
parent 9033aa6eff
commit 3e7d6a7bfd
40 changed files with 293 additions and 276 deletions

View File

@ -219,7 +219,7 @@ public func swiftMain() -> Int {
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true).then {
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true).flatMap {
channel.pipeline.add(handler: SimpleHTTPServer())
}
}.bind(host: "127.0.0.1", port: 0).wait()
@ -233,7 +233,7 @@ public func swiftMain() -> Int {
let clientChannel = try ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().then {
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.add(handler: repeatedRequestsHandler)
}
}
@ -349,32 +349,32 @@ public func swiftMain() -> Int {
@inline(never)
func doThenAndFriends(loop: EventLoop) {
let p = loop.makePromise(of: Int.self)
let f = p.futureResult.then { (r: Int) -> EventLoopFuture<Int> in
let f = p.futureResult.flatMap { (r: Int) -> EventLoopFuture<Int> in
// This call allocates a new Future, and
// so does then(), so this is two Futures.
// so does flatMap(), so this is two Futures.
return loop.makeSucceededFuture(result: r + 1)
}.thenThrowing { (r: Int) -> Int in
// thenThrowing allocates a new Future, and calls then
}.flatMapThrowing { (r: Int) -> Int in
// flatMapThrowing allocates a new Future, and calls `flatMap`
// which also allocates, so this is two.
return r + 2
}.map { (r: Int) -> Int in
// map allocates a new future, and calls then which
// map allocates a new future, and calls `flatMap` which
// also allocates, so this is two.
return r + 2
}.thenThrowing { (r: Int) -> Int in
// thenThrowing allocates a future on the error path and
// calls then, which also allocates, so this is two.
}.flatMapThrowing { (r: Int) -> Int in
// flatMapThrowing allocates a future on the error path and
// calls `flatMap`, which also allocates, so this is two.
throw MyError()
}.thenIfError { (err: Error) -> EventLoopFuture<Int> in
// This call allocates a new Future, and so does thenIfError,
}.flatMapError { (err: Error) -> EventLoopFuture<Int> in
// This call allocates a new Future, and so does flatMapError,
// so this is two Futures.
return loop.makeFailedFuture(error: err)
}.thenIfErrorThrowing { (err: Error) -> Int in
// thenIfError allocates a new Future, and calls thenIfError,
}.flatMapErrorThrowing { (err: Error) -> Int in
// flatMapError allocates a new Future, and calls flatMapError,
// so this is two Futures
throw err
}.mapIfError { (err: Error) -> Int in
// mapIfError allocates a future, and calls thenIfError, so
// mapIfError allocates a future, and calls flatMapError, so
// this is two Futures.
return 1
}

View File

@ -29,7 +29,7 @@
/// // Set the handlers that are applied to the accepted child `Channel`s.
/// .childChannelInitializer { channel in
/// // Ensure we don't read faster then we can write by adding the BackPressureHandler into the pipeline.
/// channel.pipeline.add(handler: BackPressureHandler()).then { () in
/// channel.pipeline.add(handler: BackPressureHandler()).flatMap { () in
/// // make sure to instantiate your `ChannelHandlers` inside of
/// // the closure as it will be invoked once per connection.
/// channel.pipeline.add(handler: MyChannelHandler())
@ -214,20 +214,20 @@ public final class ServerBootstrap {
}
return eventLoop.submit {
return serverChannelInit(serverChannel).then {
return serverChannelInit(serverChannel).flatMap {
serverChannel.pipeline.add(handler: AcceptHandler(childChannelInitializer: childChannelInit,
childChannelOptions: childChannelOptions))
}.then {
}.flatMap {
serverChannelOptions.applyAll(channel: serverChannel)
}.then {
}.flatMap {
register(eventLoop, serverChannel)
}.map {
serverChannel as Channel
}.thenIfError { error in
}.flatMapError { error in
serverChannel.close0(error: error, mode: .all, promise: nil)
return eventLoop.makeFailedFuture(error: error)
}
}.then {
}.flatMap {
$0
}
}
@ -258,7 +258,7 @@ public final class ServerBootstrap {
@inline(__always)
func setupChildChannel() -> EventLoopFuture<Void> {
return self.childChannelOptions.applyAll(channel: accepted).then { () -> EventLoopFuture<Void> in
return self.childChannelOptions.applyAll(channel: accepted).flatMap { () -> EventLoopFuture<Void> in
childEventLoop.assertInEventLoop()
return childChannelInit(accepted)
}
@ -267,7 +267,7 @@ public final class ServerBootstrap {
@inline(__always)
func fireThroughPipeline(_ future: EventLoopFuture<Void>) {
ctxEventLoop.assertInEventLoop()
future.then { (_) -> EventLoopFuture<Void> in
future.flatMap { (_) -> EventLoopFuture<Void> in
ctxEventLoop.assertInEventLoop()
guard !ctx.pipeline.destroyed else {
return ctx.eventLoop.makeFailedFuture(error: ChannelError.ioOnClosedChannel)
@ -285,7 +285,7 @@ public final class ServerBootstrap {
} else {
fireThroughPipeline(childEventLoop.submit {
return setupChildChannel()
}.then { $0 }.hopTo(eventLoop: ctxEventLoop))
}.flatMap { $0 }.hopTo(eventLoop: ctxEventLoop))
}
}
@ -307,9 +307,9 @@ private extension Channel {
// this is pretty delicate at the moment:
// In many cases `body` must be _synchronously_ follow `register`, otherwise in our current
// implementation, `epoll` will send us `EPOLLHUP`. To have it run synchronously, we need to invoke the
// `then` on the eventloop that the `register` will succeed on.
// `flatMap` on the eventloop that the `register` will succeed on.
self.eventLoop.assertInEventLoop()
return self.register().then {
return self.register().flatMap {
self.eventLoop.assertInEventLoop()
return body(self)
}
@ -476,15 +476,15 @@ public final class ClientBootstrap {
return eventLoop.makeFailedFuture(error: error)
}
return channelInitializer(channel).then {
return channelInitializer(channel).flatMap {
self.channelOptions.applyAll(channel: channel)
}.then {
}.flatMap {
let promise = eventLoop.makePromise(of: Void.self)
channel.registerAlreadyConfigured0(promise: promise)
return promise.futureResult
}.map {
channel
}.thenIfError { error in
}.flatMapError { error in
channel.close0(error: error, mode: .all, promise: nil)
return channel.eventLoop.makeFailedFuture(error: error)
}
@ -508,13 +508,13 @@ public final class ClientBootstrap {
@inline(__always)
func setupChannel() -> EventLoopFuture<Channel> {
eventLoop.assertInEventLoop()
channelInitializer(channel).then {
channelInitializer(channel).flatMap {
channelOptions.applyAll(channel: channel)
}.then {
}.flatMap {
channel.registerAndDoSynchronously(body)
}.map {
channel
}.thenIfError { error in
}.flatMapError { error in
channel.close0(error: error, mode: .all, promise: nil)
return channel.eventLoop.makeFailedFuture(error: error)
}.cascade(promise: promise)
@ -524,7 +524,7 @@ public final class ClientBootstrap {
if eventLoop.inEventLoop {
return setupChannel()
} else {
return eventLoop.submit(setupChannel).then { $0 }
return eventLoop.submit(setupChannel).flatMap { $0 }
}
}
}
@ -642,7 +642,7 @@ public final class DatagramBootstrap {
protocolFamily: address.protocolFamily)
}
return bind0(makeChannel: makeChannel) { (eventLoop, channel) in
channel.register().then {
channel.register().flatMap {
channel.bind(to: address)
}
}
@ -660,13 +660,13 @@ public final class DatagramBootstrap {
return eventLoop.makeFailedFuture(error: error)
}
return channelInitializer(channel).then {
return channelInitializer(channel).flatMap {
channelOptions.applyAll(channel: channel)
}.then {
}.flatMap {
registerAndBind(eventLoop, channel)
}.map {
channel
}.thenIfError { error in
}.flatMapError { error in
eventLoop.makeFailedFuture(error: error)
}
}

View File

@ -86,13 +86,13 @@
///
/// ```
/// ChannelPipeline p = ...
/// let future = p.add(name: "1", handler: InboundHandlerA()).then {
/// let future = p.add(name: "1", handler: InboundHandlerA()).flatMap {
/// p.add(name: "2", handler: InboundHandlerB())
/// }.then {
/// }.flatMap {
/// p.add(name: "3", handler: OutboundHandlerA())
/// }.then {
/// }.flatMap {
/// p.add(name: "4", handler: OutboundHandlerB())
/// }.then {
/// }.flatMap {
/// p.add(name: "5", handler: InboundOutboundHandlerX())
/// }
/// // Handle the future as well ....

View File

@ -325,8 +325,8 @@ extension EventLoop {
/// - parameters:
/// - result: the value that is used by the `EventLoopFuture`.
/// - returns: a succeeded `EventLoopFuture`.
public func makeSucceededFuture<T>(result: T) -> EventLoopFuture<T> {
return EventLoopFuture<T>(eventLoop: self, result: result, file: "n/a", line: 0)
public func makeSucceededFuture<Success>(result: Success) -> EventLoopFuture<Success> {
return EventLoopFuture<Success>(eventLoop: self, result: result, file: "n/a", line: 0)
}
public func next() -> EventLoop {

View File

@ -20,7 +20,7 @@ import NIOConcurrencyHelpers
/// result into another, then return a list of callbacks from the target future that are now ready to be invoked.
///
/// In particular, note that _run() here continues to obtain and execute lists of callbacks until it completes.
/// This eliminates recursion when processing `then()` chains.
/// This eliminates recursion when processing `flatMap()` chains.
private struct CallbackList: ExpressibleByArrayLiteral {
typealias Element = () -> CallbackList
var firstCallback: Element?
@ -108,8 +108,8 @@ private struct CallbackList: ExpressibleByArrayLiteral {
/// A promise to provide a result later.
///
/// This is the provider API for `EventLoopFuture<T>`. If you want to return an
/// unfulfilled `EventLoopFuture<T>` -- presumably because you are interfacing to
/// This is the provider API for `EventLoopFuture<Value>`. If you want to return an
/// unfulfilled `EventLoopFuture<Value>` -- presumably because you are interfacing to
/// some asynchronous service that will return a real result later, follow this
/// pattern:
///
@ -132,7 +132,7 @@ private struct CallbackList: ExpressibleByArrayLiteral {
/// of the following:
///
/// * If you have an `EventLoopFuture` and want to do something else after it completes,
/// use `.then()`
/// use `.flatMap()`
/// * If you just want to get a value back after running something on another thread,
/// use `EventLoopFuture<ResultType>.async()`
/// * If you already have a value and need an `EventLoopFuture<>` object to plug into
@ -140,10 +140,10 @@ private struct CallbackList: ExpressibleByArrayLiteral {
/// or `eventLoop.newFailedFuture(error:)`.
///
/// - note: `EventLoopPromise` has reference semantics.
public struct EventLoopPromise<T> {
public struct EventLoopPromise<Value> {
/// The `EventLoopFuture` which is used by the `EventLoopPromise`. You can use it to add callbacks which are notified once the
/// `EventLoopPromise` is completed.
public let futureResult: EventLoopFuture<T>
public let futureResult: EventLoopFuture<Value>
/// General initializer
///
@ -152,18 +152,18 @@ public struct EventLoopPromise<T> {
/// - file: The file this promise was allocated in, for debugging purposes.
/// - line: The line this promise was allocated on, for debugging purposes.
init(eventLoop: EventLoop, file: StaticString, line: UInt) {
futureResult = EventLoopFuture<T>(eventLoop: eventLoop, file: file, line: line)
futureResult = EventLoopFuture<Value>(eventLoop: eventLoop, file: file, line: line)
}
/// Deliver a successful result to the associated `EventLoopFuture<T>` object.
/// Deliver a successful result to the associated `EventLoopFuture<Value>` object.
///
/// - parameters:
/// - result: The successful result of the operation.
public func succeed(result: T) {
public func succeed(result: Value) {
_resolve(value: .success(result))
}
/// Deliver an error to the associated `EventLoopFuture<T>` object.
/// Deliver an error to the associated `EventLoopFuture<Value>` object.
///
/// - parameters:
/// - error: The error from the operation.
@ -179,7 +179,7 @@ public struct EventLoopPromise<T> {
///
/// - parameters:
/// - value: The value to fire the future with.
private func _resolve(value: Result<T, Error>) {
private func _resolve(value: Result<Value, Error>) {
if futureResult.eventLoop.inEventLoop {
_setValue(value: value)._run()
} else {
@ -194,7 +194,7 @@ public struct EventLoopPromise<T> {
/// - parameters:
/// - value: The result of the promise.
/// - returns: The callback list to run.
fileprivate func _setValue(value: Result<T, Error>) -> CallbackList {
fileprivate func _setValue(value: Result<Value, Error>) -> CallbackList {
return futureResult._setValue(value: value)
}
}
@ -202,10 +202,10 @@ public struct EventLoopPromise<T> {
/// Holder for a result that will be provided later.
///
/// Functions that promise to do work asynchronously can return an `EventLoopFuture<T>`.
/// Functions that promise to do work asynchronously can return an `EventLoopFuture<Value>`.
/// The recipient of such an object can then observe it to be notified when the operation completes.
///
/// The provider of a `EventLoopFuture<T>` can create and return a placeholder object
/// The provider of a `EventLoopFuture<Value>` can create and return a placeholder object
/// before the actual result is available. For example:
///
/// ```
@ -230,12 +230,12 @@ public struct EventLoopPromise<T> {
/// - [Scala](http://docs.scala-lang.org/overviews/core/futures.html)
/// - [Python](https://docs.google.com/document/d/10WOZgLQaYNpOrag-eTbUm-JUCCfdyfravZ4qSOQPg1M/edit)
///
/// If you receive a `EventLoopFuture<T>` from another function, you have a number of options:
/// The most common operation is to use `then()` or `map()` to add a function that will be called
/// with the eventual result. Both methods returns a new `EventLoopFuture<T>` immediately
/// If you receive a `EventLoopFuture<Value>` from another function, you have a number of options:
/// The most common operation is to use `flatMap()` or `map()` to add a function that will be called
/// with the eventual result. Both methods returns a new `EventLoopFuture<Value>` immediately
/// that will receive the return value from your function, but they behave differently. If you have
/// a function that can return synchronously, the `map` function will transform the result `T` to a
/// the new result value `U` and return an `EventLoopFuture<U>`.
/// a function that can return synchronously, the `map` function will transform the result of type
/// `Value` to a the new result of type `NewValue` and return an `EventLoopFuture<NewValue>`.
///
/// ```
/// let networkData = getNetworkData(args)
@ -247,34 +247,34 @@ public struct EventLoopPromise<T> {
/// }
/// ```
///
/// If however you need to do more asynchronous processing, you can call `then()`. The return value of the
/// function passed to `then` must be a new `EventLoopFuture<U>` object: the return value of `then()` is
/// a new `EventLoopFuture<U>` that will contain the eventual result of both the original operation and
/// If however you need to do more asynchronous processing, you can call `flatMap()`. The return value of the
/// function passed to `flatMap` must be a new `EventLoopFuture<NewValue>` object: the return value of `flatMap()` is
/// a new `EventLoopFuture<NewValue>` that will contain the eventual result of both the original operation and
/// the subsequent one.
///
/// ```
/// // When converted network data is available, begin the database operation.
/// let databaseResult: EventLoopFuture<DBResult> = processedResult.then { (p: Processed) -> EventLoopFuture<DBResult> in
/// let databaseResult: EventLoopFuture<DBResult> = processedResult.flatMap { (p: Processed) -> EventLoopFuture<DBResult> in
/// return someDatabaseOperation(p)
/// }
/// ```
///
/// In essence, future chains created via `then()` provide a form of data-driven asynchronous programming
/// In essence, future chains created via `flatMap()` provide a form of data-driven asynchronous programming
/// that allows you to dynamically declare data dependencies for your various operations.
///
/// `EventLoopFuture` chains created via `then()` are sufficient for most purposes. All of the registered
/// `EventLoopFuture` chains created via `flatMap()` are sufficient for most purposes. All of the registered
/// functions will eventually run in order. If one of those functions throws an error, that error will
/// bypass the remaining functions. You can use `thenIfError()` to handle and optionally recover from
/// bypass the remaining functions. You can use `flatMapError()` to handle and optionally recover from
/// errors in the middle of a chain.
///
/// At the end of an `EventLoopFuture` chain, you can use `whenSuccess()` or `whenFailure()` to add an
/// observer callback that will be invoked with the result or error at that point. (Note: If you ever
/// find yourself invoking `promise.succeed()` from inside a `whenSuccess()` callback, you probably should
/// use `then()` or `cascade(promise:)` instead.)
/// use `flatMap()` or `cascade(promise:)` instead.)
///
/// `EventLoopFuture` objects are typically obtained by:
/// * Using `EventLoopFuture<T>.async` or a similar wrapper function.
/// * Using `.then()` on an existing future to create a new future for the next step in a series of operations.
/// * Using `EventLoopFuture<Value>.async` or a similar wrapper function.
/// * Using `.flatMap()` on an existing future to create a new future for the next step in a series of operations.
/// * Initializing an `EventLoopFuture` that already has a value or an error
///
/// ### Threading and Futures
@ -323,9 +323,9 @@ public struct EventLoopPromise<T> {
/// or `EventLoopFuture` callbacks need to invoke a lock (either directly or in the form of `DispatchQueue`) this
/// should be considered a code smell worth investigating: the `EventLoop`-based synchronization guarantees of
/// `EventLoopFuture` should be sufficient to guarantee thread-safety.
public final class EventLoopFuture<T> {
public final class EventLoopFuture<Value> {
// TODO: Provide a tracing facility. It would be nice to be able to set '.debugTrace = true' on any EventLoopFuture or EventLoopPromise and have every subsequent chained EventLoopFuture report the success result or failure error. That would simplify some debugging scenarios.
fileprivate var value: Result<T, Error>? {
fileprivate var value: Result<Value, Error>? {
didSet {
_isFulfilled.store(true)
}
@ -342,13 +342,13 @@ public final class EventLoopFuture<T> {
return _isFulfilled.load()
}
/// Callbacks that should be run when this `EventLoopFuture<T>` gets a value.
/// Callbacks that should be run when this `EventLoopFuture<Value>` gets a value.
/// These callbacks may give values to other `EventLoopFuture`s; if that happens,
/// they return any callbacks from those `EventLoopFuture`s so that we can run
/// the entire chain from the top without recursing.
fileprivate var callbacks: CallbackList = CallbackList()
private init(eventLoop: EventLoop, value: Result<T, Error>?, file: StaticString, line: UInt) {
private init(eventLoop: EventLoop, value: Result<Value, Error>?, file: StaticString, line: UInt) {
self.eventLoop = eventLoop
self.value = value
self._isFulfilled = UnsafeEmbeddedAtomic(value: value != nil)
@ -365,12 +365,12 @@ public final class EventLoopFuture<T> {
self.init(eventLoop: eventLoop, value: nil, file: file, line: line)
}
/// A EventLoopFuture<T> that has already succeeded
convenience init(eventLoop: EventLoop, result: T, file: StaticString, line: UInt) {
/// A EventLoopFuture<Value> that has already succeeded
convenience init(eventLoop: EventLoop, result: Value, file: StaticString, line: UInt) {
self.init(eventLoop: eventLoop, value: .success(result), file: file, line: line)
}
/// A EventLoopFuture<T> that has already failed
/// A EventLoopFuture<Value> that has already failed
convenience init(eventLoop: EventLoop, error: Error, file: StaticString, line: UInt) {
self.init(eventLoop: eventLoop, value: .failure(error), file: file, line: line)
}
@ -397,21 +397,21 @@ extension EventLoopFuture: Equatable {
}
}
// 'then' and 'map' implementations. This is really the key of the entire system.
// 'flatMap' and 'map' implementations. This is really the key of the entire system.
extension EventLoopFuture {
/// When the current `EventLoopFuture<T>` is fulfilled, run the provided callback,
/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback,
/// which will provide a new `EventLoopFuture`.
///
/// This allows you to dynamically dispatch new asynchronous tasks as phases in a
/// longer series of processing steps. Note that you can use the results of the
/// current `EventLoopFuture<T>` when determining how to dispatch the next operation.
/// current `EventLoopFuture<Value>` when determining how to dispatch the next operation.
///
/// This works well when you have APIs that already know how to return `EventLoopFuture`s.
/// You can do something with the result of one and just return the next future:
///
/// ```
/// let d1 = networkRequest(args).future()
/// let d2 = d1.then { t -> EventLoopFuture<U> in
/// let d2 = d1.flatMap { t -> EventLoopFuture<NewValue> in
/// . . . something with t . . .
/// return netWorkRequest(args)
/// }
@ -420,14 +420,14 @@ extension EventLoopFuture {
/// }
/// ```
///
/// Note: In a sense, the `EventLoopFuture<U>` is returned before it's created.
/// Note: In a sense, the `EventLoopFuture<NewValue>` is returned before it's created.
///
/// - parameters:
/// - callback: Function that will receive the value of this `EventLoopFuture` and return
/// a new `EventLoopFuture`.
/// - returns: A future that will receive the eventual value.
public func then<U>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (T) -> EventLoopFuture<U>) -> EventLoopFuture<U> {
let next = EventLoopPromise<U>(eventLoop: eventLoop, file: file, line: line)
public func flatMap<NewValue>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Value) -> EventLoopFuture<NewValue>) -> EventLoopFuture<NewValue> {
let next = EventLoopPromise<NewValue>(eventLoop: eventLoop, file: file, line: line)
_whenComplete {
switch self.value! {
case .success(let t):
@ -447,12 +447,12 @@ extension EventLoopFuture {
return next.futureResult
}
/// When the current `EventLoopFuture<T>` is fulfilled, run the provided callback, which
/// performs a synchronous computation and returns a new value of type `U`. The provided
/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback, which
/// performs a synchronous computation and returns a new value of type `NewValue`. The provided
/// callback may optionally `throw`.
///
/// Operations performed in `thenThrowing` should not block, or they will block the entire
/// event loop. `thenThrowing` is intended for use when you have a data-driven function that
/// Operations performed in `flatMapThrowing` should not block, or they will block the entire
/// event loop. `flatMapThrowing` is intended for use when you have a data-driven function that
/// performs a simple data transformation that can potentially error.
///
/// If your callback function throws, the returned `EventLoopFuture` will error.
@ -461,22 +461,24 @@ extension EventLoopFuture {
/// - callback: Function that will receive the value of this `EventLoopFuture` and return
/// a new value lifted into a new `EventLoopFuture`.
/// - returns: A future that will receive the eventual value.
public func thenThrowing<U>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (T) throws -> U) -> EventLoopFuture<U> {
return self.then(file: file, line: line) { (value: T) -> EventLoopFuture<U> in
public func flatMapThrowing<NewValue>(file: StaticString = #file,
line: UInt = #line,
_ callback: @escaping (Value) throws -> NewValue) -> EventLoopFuture<NewValue> {
return self.flatMap(file: file, line: line) { (value: Value) -> EventLoopFuture<NewValue> in
do {
return EventLoopFuture<U>(eventLoop: self.eventLoop, result: try callback(value), file: file, line: line)
return EventLoopFuture<NewValue>(eventLoop: self.eventLoop, result: try callback(value), file: file, line: line)
} catch {
return EventLoopFuture<U>(eventLoop: self.eventLoop, error: error, file: file, line: line)
return EventLoopFuture<NewValue>(eventLoop: self.eventLoop, error: error, file: file, line: line)
}
}
}
/// When the current `EventLoopFuture<T>` is in an error state, run the provided callback, which
/// may recover from the error and returns a new value of type `U`. The provided callback may optionally `throw`,
/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
/// may recover from the error and returns a new value of type `Value`. The provided callback may optionally `throw`,
/// in which case the `EventLoopFuture` will be in a failed state with the new thrown error.
///
/// Operations performed in `thenIfErrorThrowing` should not block, or they will block the entire
/// event loop. `thenIfErrorThrowing` is intended for use when you have the ability to synchronously
/// Operations performed in `flatMapErrorThrowing` should not block, or they will block the entire
/// event loop. `flatMapErrorThrowing` is intended for use when you have the ability to synchronously
/// recover from errors.
///
/// If your callback function throws, the returned `EventLoopFuture` will error.
@ -485,8 +487,8 @@ extension EventLoopFuture {
/// - callback: Function that will receive the error value of this `EventLoopFuture` and return
/// a new value lifted into a new `EventLoopFuture`.
/// - returns: A future that will receive the eventual value or a rethrown error.
public func thenIfErrorThrowing(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) throws -> T) -> EventLoopFuture<T> {
return self.thenIfError(file: file, line: line) { value in
public func flatMapErrorThrowing(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) throws -> Value) -> EventLoopFuture<Value> {
return self.flatMapError(file: file, line: line) { value in
do {
return EventLoopFuture(eventLoop: self.eventLoop, result: try callback(value), file: file, line: line)
} catch {
@ -495,8 +497,8 @@ extension EventLoopFuture {
}
}
/// When the current `EventLoopFuture<T>` is fulfilled, run the provided callback, which
/// performs a synchronous computation and returns a new value of type `U`.
/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback, which
/// performs a synchronous computation and returns a new value of type `NewValue`.
///
/// Operations performed in `map` should not block, or they will block the entire event
/// loop. `map` is intended for use when you have a data-driven function that performs
@ -518,18 +520,18 @@ extension EventLoopFuture {
/// - callback: Function that will receive the value of this `EventLoopFuture` and return
/// a new value lifted into a new `EventLoopFuture`.
/// - returns: A future that will receive the eventual value.
public func map<U>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (T) -> (U)) -> EventLoopFuture<U> {
if U.self == T.self && U.self == Void.self {
whenSuccess(callback as! (T) -> Void)
return self as! EventLoopFuture<U>
public func map<NewValue>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Value) -> (NewValue)) -> EventLoopFuture<NewValue> {
if NewValue.self == Value.self && NewValue.self == Void.self {
whenSuccess(callback as! (Value) -> Void)
return self as! EventLoopFuture<NewValue>
} else {
return then { return EventLoopFuture<U>(eventLoop: self.eventLoop, result: callback($0), file: file, line: line) }
return flatMap { return EventLoopFuture<NewValue>(eventLoop: self.eventLoop, result: callback($0), file: file, line: line) }
}
}
/// When the current `EventLoopFuture<T>` is in an error state, run the provided callback, which
/// may recover from the error by returning an `EventLoopFuture<U>`. The callback is intended to potentially
/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
/// may recover from the error by returning an `EventLoopFuture<NewValue>`. The callback is intended to potentially
/// recover from the error by returning a new `EventLoopFuture` that will eventually contain the recovered
/// result.
///
@ -539,8 +541,8 @@ extension EventLoopFuture {
/// - callback: Function that will receive the error value of this `EventLoopFuture` and return
/// a new value lifted into a new `EventLoopFuture`.
/// - returns: A future that will receive the recovered value.
public func thenIfError(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
let next = EventLoopPromise<T>(eventLoop: eventLoop, file: file, line: line)
public func flatMapError(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> EventLoopFuture<Value>) -> EventLoopFuture<Value> {
let next = EventLoopPromise<Value>(eventLoop: eventLoop, file: file, line: line)
_whenComplete {
switch self.value! {
case .success(let t):
@ -560,8 +562,8 @@ extension EventLoopFuture {
return next.futureResult
}
/// When the current `EventLoopFuture<T>` is in an error state, run the provided callback, which
/// can recover from the error and return a new value of type `U`. The provided callback may not `throw`,
/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
/// can recover from the error and return a new value of type `Value`. The provided callback may not `throw`,
/// so this function should be used when the error is always recoverable.
///
/// Operations performed in `mapIfError` should not block, or they will block the entire
@ -572,9 +574,9 @@ extension EventLoopFuture {
/// - callback: Function that will receive the error value of this `EventLoopFuture` and return
/// a new value lifted into a new `EventLoopFuture`.
/// - returns: A future that will receive the recovered value.
public func mapIfError(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> T) -> EventLoopFuture<T> {
return thenIfError(file: file, line: line) {
return EventLoopFuture<T>(eventLoop: self.eventLoop, result: callback($0), file: file, line: line)
public func mapIfError(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> Value) -> EventLoopFuture<Value> {
return flatMapError(file: file, line: line) {
return EventLoopFuture<Value>(eventLoop: self.eventLoop, result: callback($0), file: file, line: line)
}
}
@ -600,7 +602,7 @@ extension EventLoopFuture {
}
}
fileprivate func _whenCompleteWithValue(_ callback: @escaping (Result<T, Error>) -> Void) {
fileprivate func _whenCompleteWithValue(_ callback: @escaping (Result<Value, Error>) -> Void) {
_whenComplete {
callback(self.value!)
return CallbackList()
@ -611,13 +613,13 @@ extension EventLoopFuture {
/// `EventLoopFuture` has a success result.
///
/// An observer callback cannot return a value, meaning that this function cannot be chained
/// from. If you are attempting to create a computation pipeline, consider `map` or `then`.
/// from. If you are attempting to create a computation pipeline, consider `map` or `flatMap`.
/// If you find yourself passing the results from this `EventLoopFuture` to a new `EventLoopPromise`
/// in the body of this function, consider using `cascade` instead.
///
/// - parameters:
/// - callback: The callback that is called with the successful result of the `EventLoopFuture`.
public func whenSuccess(_ callback: @escaping (T) -> Void) {
public func whenSuccess(_ callback: @escaping (Value) -> Void) {
_whenComplete {
if case .success(let t) = self.value! {
callback(t)
@ -630,7 +632,7 @@ extension EventLoopFuture {
/// `EventLoopFuture` has a failure result.
///
/// An observer callback cannot return a value, meaning that this function cannot be chained
/// from. If you are attempting to create a computation pipeline, consider `mapIfError` or `thenIfError`.
/// from. If you are attempting to create a computation pipeline, consider `mapIfError` or `flatMapError`.
/// If you find yourself passing the results from this `EventLoopFuture` to a new `EventLoopPromise`
/// in the body of this function, consider using `cascade` instead.
///
@ -654,7 +656,7 @@ extension EventLoopFuture {
///
/// - parameters:
/// - callback: The callback that is called when the `EventLoopFuture` is fulfilled.
public func whenComplete(_ callback: @escaping (Result<T, Error>) -> Void) {
public func whenComplete(_ callback: @escaping (Result<Value, Error>) -> Void) {
_whenComplete {
callback(self.value!)
return CallbackList()
@ -662,8 +664,8 @@ extension EventLoopFuture {
}
/// Internal: Set the value and return a list of callbacks that should be invoked as a result.
fileprivate func _setValue(value: Result<T, Error>) -> CallbackList {
/// Internal: Set the value and return a list of callbacks that should be invoked as a result.
fileprivate func _setValue(value: Result<Value, Error>) -> CallbackList {
self.eventLoop.assertInEventLoop()
if self.value == nil {
self.value = value
@ -681,10 +683,12 @@ extension EventLoopFuture {
/// provided `EventLoopFuture` both succeed. It then provides the pair
/// of results. If either one fails, the combined `EventLoopFuture` will fail with
/// the first error encountered.
public func and<U>(_ other: EventLoopFuture<U>, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(T,U)> {
let promise = EventLoopPromise<(T,U)>(eventLoop: eventLoop, file: file, line: line)
var tvalue: T?
var uvalue: U?
public func and<OtherValue>(_ other: EventLoopFuture<OtherValue>,
file: StaticString = #file,
line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> {
let promise = EventLoopPromise<(Value, OtherValue)>(eventLoop: eventLoop, file: file, line: line)
var tvalue: Value?
var uvalue: OtherValue?
assert(self.eventLoop === promise.futureResult.eventLoop)
_whenComplete { () -> CallbackList in
@ -721,9 +725,11 @@ extension EventLoopFuture {
}
/// Return a new EventLoopFuture that contains this "and" another value.
/// This is just syntactic sugar for `future.and(loop.newSucceedFuture<U>(result: result))`.
public func and<U>(result: U, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(T,U)> {
return and(EventLoopFuture<U>(eventLoop: self.eventLoop, result: result, file: file, line: line))
/// This is just syntactic sugar for `future.and(loop.newSucceedFuture<NewValue>(result: result))`.
public func and<OtherValue>(result: OtherValue,
file: StaticString = #file,
line: UInt = #line) -> EventLoopFuture<(Value, OtherValue)> {
return and(EventLoopFuture<OtherValue>(eventLoop: self.eventLoop, result: result, file: file, line: line))
}
}
@ -736,11 +742,11 @@ extension EventLoopFuture {
/// tidy up your computational pipelines. For example:
///
/// ```
/// doWork().then {
/// doWork().flatMap {
/// doMoreWork($0)
/// }.then {
/// }.flatMap {
/// doYetMoreWork($0)
/// }.thenIfError {
/// }.flatMapError {
/// maybeRecoverFromError($0)
/// }.map {
/// transformData($0)
@ -749,7 +755,7 @@ extension EventLoopFuture {
///
/// - parameters:
/// - promise: The `EventLoopPromise` to fulfill with the results of this future.
public func cascade(promise: EventLoopPromise<T>?) {
public func cascade(promise: EventLoopPromise<Value>?) {
guard let promise = promise else { return }
_whenCompleteWithValue { v in
switch v {
@ -770,7 +776,7 @@ extension EventLoopFuture {
///
/// - parameters:
/// - promise: The `EventLoopPromise` to fulfill with the results of this future.
public func cascadeFailure<U>(promise: EventLoopPromise<U>?) {
public func cascadeFailure<NewValue>(promise: EventLoopPromise<NewValue>?) {
guard let promise = promise else { return }
self.whenFailure { err in
promise.fail(error: err)
@ -790,7 +796,7 @@ extension EventLoopFuture {
///
/// - returns: The value of the `EventLoopFuture` when it completes.
/// - throws: The error value of the `EventLoopFuture` if it errors.
public func wait(file: StaticString = #file, line: UInt = #line) throws -> T {
public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value {
if !(self.eventLoop is EmbeddedEventLoop) {
let explainer: () -> String = { """
BUG DETECTED: wait() must not be called when on an EventLoop.
@ -807,7 +813,7 @@ Further information:
precondition(MultiThreadedEventLoopGroup.currentEventLoop == nil, explainer(), file: file, line: line)
}
var v: Result<T, Error>? = nil
var v: Result<Value, Error>? = nil
let lock = ConditionLock(value: 0)
_whenComplete { () -> CallbackList in
lock.lock()
@ -841,12 +847,13 @@ extension EventLoopFuture {
/// a failure is encountered, it will immediately fail the overall EventLoopFuture.
///
/// - parameters:
/// - futures: An array of `EventLoopFuture<U>` to wait for.
/// - futures: An array of `EventLoopFuture<NewValue>` to wait for.
/// - with: A function that will be used to fold the values of two `EventLoopFuture`s and return a new value wrapped in an `EventLoopFuture`.
/// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`.
public func fold<U>(_ futures: [EventLoopFuture<U>], with combiningFunction: @escaping (T, U) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
let body = futures.reduce(self) { (f1: EventLoopFuture<T>, f2: EventLoopFuture<U>) -> EventLoopFuture<T> in
let newFuture = f1.and(f2).then { (args: (T, U)) -> EventLoopFuture<T> in
public func fold<OtherValue>(_ futures: [EventLoopFuture<OtherValue>],
with combiningFunction: @escaping (Value, OtherValue) -> EventLoopFuture<Value>) -> EventLoopFuture<Value> {
let body = futures.reduce(self) { (f1: EventLoopFuture<Value>, f2: EventLoopFuture<OtherValue>) -> EventLoopFuture<Value> in
let newFuture = f1.and(f2).flatMap { (args: (Value, OtherValue)) -> EventLoopFuture<Value> in
let (f1Value, f2Value) = args
self.eventLoop.assertInEventLoop()
return combiningFunction(f1Value, f2Value)
@ -877,10 +884,10 @@ extension EventLoopFuture {
/// Returns a new `EventLoopFuture` that fires only when all the provided futures complete.
/// The new `EventLoopFuture` contains the result of reducing the `initialResult` with the
/// values of the `[EventLoopFuture<U>]`.
/// values of the `[EventLoopFuture<NewValue>]`.
///
/// This function makes copies of the result for each EventLoopFuture, for a version which avoids
/// making copies, check out `reduce<U>(into:)`.
/// making copies, check out `reduce<NewValue>(into:)`.
///
/// The returned `EventLoopFuture` will fail as soon as a failure is encountered in any of the
/// `futures`. However, the failure will not occur until all preceding
@ -894,10 +901,13 @@ extension EventLoopFuture {
/// - eventLoop: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire.
/// - nextPartialResult: The bifunction used to produce partial results.
/// - returns: A new `EventLoopFuture` with the reduced value.
public static func reduce<U>(_ initialResult: T, _ futures: [EventLoopFuture<U>], eventLoop: EventLoop, _ nextPartialResult: @escaping (T, U) -> T) -> EventLoopFuture<T> {
public static func reduce<InputValue>(_ initialResult: Value,
_ futures: [EventLoopFuture<InputValue>],
eventLoop: EventLoop,
_ nextPartialResult: @escaping (Value, InputValue) -> Value) -> EventLoopFuture<Value> {
let f0 = eventLoop.makeSucceededFuture(result: initialResult)
let body = f0.fold(futures) { (t: T, u: U) -> EventLoopFuture<T> in
let body = f0.fold(futures) { (t: Value, u: InputValue) -> EventLoopFuture<Value> in
eventLoop.makeSucceededFuture(result: nextPartialResult(t, u))
}
@ -906,7 +916,7 @@ extension EventLoopFuture {
/// Returns a new `EventLoopFuture` that fires only when all the provided futures complete.
/// The new `EventLoopFuture` contains the result of combining the `initialResult` with the
/// values of the `[EventLoopFuture<U>]`. This function is analogous to the standard library's
/// values of the `[EventLoopFuture<NewValue>]`. This function is analogous to the standard library's
/// `reduce(into:)`, which does not make copies of the result type for each `EventLoopFuture`.
///
/// The returned `EventLoopFuture` will fail as soon as a failure is encountered in any of the
@ -921,12 +931,15 @@ extension EventLoopFuture {
/// - eventLoop: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire.
/// - updateAccumulatingResult: The bifunction used to combine partialResults with new elements.
/// - returns: A new `EventLoopFuture` with the combined value.
public static func reduce<U>(into initialResult: T, _ futures: [EventLoopFuture<U>], eventLoop: EventLoop, _ updateAccumulatingResult: @escaping (inout T, U) -> Void) -> EventLoopFuture<T> {
let p0 = eventLoop.makePromise(of: T.self)
var result: T = initialResult
public static func reduce<InputValue>(into initialResult: Value,
_ futures: [EventLoopFuture<InputValue>],
eventLoop: EventLoop,
_ updateAccumulatingResult: @escaping (inout Value, InputValue) -> Void) -> EventLoopFuture<Value> {
let p0 = eventLoop.makePromise(of: Value.self)
var result: Value = initialResult
let f0 = eventLoop.makeSucceededFuture(result: ())
let future = f0.fold(futures) { (_: (), value: U) -> EventLoopFuture<Void> in
let future = f0.fold(futures) { (_: (), value: InputValue) -> EventLoopFuture<Void> in
eventLoop.assertInEventLoop()
updateAccumulatingResult(&result, value)
return eventLoop.makeSucceededFuture(result: ())
@ -956,19 +969,19 @@ public extension EventLoopFuture {
/// - parameters:
/// - target: The `EventLoop` that the returned `EventLoopFuture` will run on.
/// - returns: An `EventLoopFuture` whose callbacks run on `target` instead of the original loop.
func hopTo(eventLoop target: EventLoop) -> EventLoopFuture<T> {
func hopTo(eventLoop target: EventLoop) -> EventLoopFuture<Value> {
if target === self.eventLoop {
// We're already on that event loop, nothing to do here. Save an allocation.
return self
}
let hoppingPromise = target.makePromise(of: T.self)
let hoppingPromise = target.makePromise(of: Value.self)
self.cascade(promise: hoppingPromise)
return hoppingPromise.futureResult
}
}
/// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`).
func executeAndComplete<T>(_ promise: EventLoopPromise<T>?, _ body: () throws -> T) {
func executeAndComplete<Value>(_ promise: EventLoopPromise<Value>?, _ body: () throws -> Value) {
do {
let result = try body()
promise?.succeed(result: result)

View File

@ -124,8 +124,8 @@ public struct NonBlockingFileIO {
if remainingReads > 1 || (remainingReads == 1 && lastReadSize > 0) {
let readSize = remainingReads > 1 ? chunkSize : lastReadSize
assert(readSize > 0)
return self.read(fileHandle: fileHandle, byteCount: readSize, allocator: allocator, eventLoop: eventLoop).then { buffer in
chunkHandler(buffer).then { () -> EventLoopFuture<Void> in
return self.read(fileHandle: fileHandle, byteCount: readSize, allocator: allocator, eventLoop: eventLoop).flatMap { buffer in
chunkHandler(buffer).flatMap { () -> EventLoopFuture<Void> in
eventLoop.assertInEventLoop()
return _read(remainingReads: remainingReads - 1)
}

View File

@ -685,7 +685,7 @@ internal extension Selector where R == NIORegistration {
return closeChannel(chan)
}
}.map { future in
future.thenIfErrorThrowing { error in
future.flatMapErrorThrowing { error in
if let error = error as? ChannelError, error == .alreadyClosed {
return ()
} else {

View File

@ -454,7 +454,7 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
let ch = data.forceAsOther() as SocketChannel
ch.eventLoop.execute {
ch.register().thenThrowing {
ch.register().flatMapThrowing {
guard ch.isOpen else {
throw ChannelError.ioOnClosedChannel
}

View File

@ -122,7 +122,7 @@ let bootstrap = ServerBootstrap(group: group)
// Set the handlers that are applied to the accepted Channels
.childChannelInitializer { channel in
// Add handler that will buffer data until a \n is received
channel.pipeline.add(handler: ByteToMessageHandler(LineDelimiterCodec())).then { v in
channel.pipeline.add(handler: ByteToMessageHandler(LineDelimiterCodec())).flatMap { v in
// It's important we use the same handler for all accepted channels. The ChatHandler is thread-safe!
channel.pipeline.add(handler: chatHandler)
}

View File

@ -48,7 +48,7 @@ let bootstrap = ServerBootstrap(group: group)
// Set the handlers that are appled to the accepted Channels
.childChannelInitializer { channel in
// Ensure we don't read faster than we can write by adding the BackPressureHandler into the pipeline.
channel.pipeline.add(handler: BackPressureHandler()).then { v in
channel.pipeline.add(handler: BackPressureHandler()).flatMap { v in
channel.pipeline.add(handler: EchoHandler())
}
}

View File

@ -313,7 +313,7 @@ public final class HTTPServerPipelineHandler: ChannelDuplexHandler {
// we just received the .end that we're missing so we can fall through to closing the connection
fallthrough
case .quiescingLastRequestEndReceived:
ctx.write(data).then {
ctx.write(data).flatMap {
ctx.close()
}.cascade(promise: promise)
case .acceptingEvents, .quiescingWaitingForRequestEnd:

View File

@ -185,13 +185,13 @@ public class HTTPServerUpgradeHandler: ChannelInboundHandler {
// internal handler, then call the user code, and then finally when the user code is done we do
// our final cleanup steps, namely we replay the received data we buffered in the meantime and
// then remove ourselves from the pipeline.
self.removeExtraHandlers(ctx: ctx).then {
self.removeExtraHandlers(ctx: ctx).flatMap {
self.sendUpgradeResponse(ctx: ctx, upgradeRequest: request, responseHeaders: responseHeaders)
}.then {
}.flatMap {
self.removeHandler(ctx: ctx, handler: self.httpEncoder)
}.map { (_: Bool) in
self.upgradeCompletionHandler(ctx)
}.then {
}.flatMap {
upgrader.upgrade(ctx: ctx, upgradeRequest: request)
}.map {
ctx.fireUserInboundEventTriggered(HTTPUpgradeEvents.upgradeComplete(toProtocol: proto, upgradeRequest: request))

View File

@ -342,11 +342,11 @@ private final class HTTPHandler: ChannelInboundHandler {
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
}
return ctx.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))))
}.then { () -> EventLoopFuture<Void> in
}.flatMap { () -> EventLoopFuture<Void> in
let p = ctx.eventLoop.makePromise(of: Void.self)
self.completeResponse(ctx, trailers: nil, promise: p)
return p.futureResult
}.thenIfError { error in
}.flatMapError { error in
if !responseStarted {
let response = httpResponseHead(request: request, status: .ok)
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
@ -364,11 +364,11 @@ private final class HTTPHandler: ChannelInboundHandler {
case .sendfile:
let response = responseHead(request: request, fileRegion: region)
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.body(.fileRegion(region)))).then {
ctx.writeAndFlush(self.wrapOutboundOut(.body(.fileRegion(region)))).flatMap {
let p = ctx.eventLoop.makePromise(of: Void.self)
self.completeResponse(ctx, trailers: nil, promise: p)
return p.futureResult
}.thenIfError { (_: Error) in
}.flatMapError { (_: Error) in
ctx.close()
}.whenComplete { (_: Result<Void, Error>) in
_ = try? file.close()
@ -516,7 +516,7 @@ let bootstrap = ServerBootstrap(group: group)
// Set the handlers that are applied to the accepted Channels
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).then {
channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
channel.pipeline.add(handler: HTTPHandler(fileIO: fileIO, htdocsPath: htdocs))
}
}

View File

@ -73,7 +73,7 @@ var datagramBootstrap = DatagramBootstrap(group: group)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEPORT), value: 1)
.channelInitializer { channel in
return channel.pipeline.add(handler: ChatMessageEncoder()).then {
return channel.pipeline.add(handler: ChatMessageEncoder()).flatMap {
channel.pipeline.add(handler: ChatMessageDecoder())
}
}
@ -81,10 +81,10 @@ var datagramBootstrap = DatagramBootstrap(group: group)
// We cast our channel to MulticastChannel to obtain the multicast operations.
let datagramChannel = try datagramBootstrap
.bind(host: "0.0.0.0", port: 7654)
.then { channel -> EventLoopFuture<Channel> in
.flatMap { channel -> EventLoopFuture<Channel> in
let channel = channel as! MulticastChannel
return channel.joinGroup(chatMulticastGroup, interface: targetInterface).map { channel }
}.then { channel -> EventLoopFuture<Channel> in
}.flatMap { channel -> EventLoopFuture<Channel> in
guard let targetInterface = targetInterface else {
return channel.eventLoop.makeSucceededFuture(result: channel)
}

View File

@ -126,7 +126,7 @@ let serverChannel = try ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true).then {
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true).flatMap {
channel.pipeline.add(handler: SimpleHTTPServer())
}
}.bind(host: "127.0.0.1", port: 0).wait()
@ -558,9 +558,9 @@ try measureAndPrint(desc: "no-net_http1_10k_reqs_1_conn") {
done = true
}
try channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: true,
withErrorHandling: true).then {
withErrorHandling: true).flatMap {
channel.pipeline.add(handler: SimpleHTTPServer())
}.then {
}.flatMap {
channel.pipeline.add(handler: measuringHandler, first: true)
}.wait()
@ -580,7 +580,7 @@ measureAndPrint(desc: "http1_10k_reqs_1_conn") {
let clientChannel = try! ClientBootstrap(group: group)
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().then {
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.add(handler: repeatedRequestsHandler)
}
}
@ -601,7 +601,7 @@ measureAndPrint(desc: "http1_10k_reqs_100_conns") {
let clientChannel = try! ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().then {
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.add(handler: repeatedRequestsHandler)
}
}

View File

@ -147,15 +147,15 @@ public final class WebSocketUpgrader: HTTPProtocolUpgrader {
public func upgrade(ctx: ChannelHandlerContext, upgradeRequest: HTTPRequestHead) -> EventLoopFuture<Void> {
/// We never use the automatic error handling feature of the WebSocketFrameDecoder: we always use the separate channel
/// handler.
var upgradeFuture = ctx.pipeline.add(handler: WebSocketFrameEncoder()).then {
var upgradeFuture = ctx.pipeline.add(handler: WebSocketFrameEncoder()).flatMap {
ctx.pipeline.add(handler: ByteToMessageHandler(WebSocketFrameDecoder(maxFrameSize: self.maxFrameSize, automaticErrorHandling: false)))
}
if self.automaticErrorHandling {
upgradeFuture = upgradeFuture.then { ctx.pipeline.add(handler: WebSocketProtocolErrorHandler())}
upgradeFuture = upgradeFuture.flatMap { ctx.pipeline.add(handler: WebSocketProtocolErrorHandler())}
}
return upgradeFuture.then {
return upgradeFuture.flatMap {
self.upgradePipelineHandler(ctx.channel, upgradeRequest)
}
}

View File

@ -222,7 +222,7 @@ let bootstrap = ServerBootstrap(group: group)
channel.pipeline.remove(handler: httpHandler, promise: nil)
}
)
return channel.pipeline.configureHTTPServerPipeline(withServerUpgrade: config).then {
return channel.pipeline.configureHTTPServerPipeline(withServerUpgrade: config).flatMap {
channel.pipeline.add(handler: httpHandler)
}
}

View File

@ -287,7 +287,7 @@ class HTTPDecoderTest: XCTestCase {
let part = self.unwrapInboundIn(data)
switch part {
case .end:
_ = ctx.pipeline.remove(handler: self).then { _ in
_ = ctx.pipeline.remove(handler: self).flatMap { _ in
ctx.pipeline.add(handler: self.collector)
}
default:

View File

@ -337,7 +337,7 @@ class HTTPServerClientTest : XCTestCase {
// Set the handlers that are appled to the accepted Channels
.childChannelInitializer { channel in
// Ensure we don't read faster then we can write by adding the BackPressureHandler into the pipeline.
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).then {
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).flatMap {
channel.pipeline.add(handler: httpHandler)
}
}.bind(host: "127.0.0.1", port: 0).wait())
@ -348,7 +348,7 @@ class HTTPServerClientTest : XCTestCase {
let clientChannel = try assertNoThrowWithValue(ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().then {
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.add(handler: accumulation)
}
}
@ -395,7 +395,7 @@ class HTTPServerClientTest : XCTestCase {
// Set the handlers that are appled to the accepted Channels
.childChannelInitializer { channel in
// Ensure we don't read faster then we can write by adding the BackPressureHandler into the pipeline.
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).then {
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).flatMap {
channel.pipeline.add(handler: httpHandler)
}
}.bind(host: "127.0.0.1", port: 0).wait())
@ -406,7 +406,7 @@ class HTTPServerClientTest : XCTestCase {
let clientChannel = try assertNoThrowWithValue(ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().then {
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.add(handler: accumulation)
}
}
@ -453,7 +453,7 @@ class HTTPServerClientTest : XCTestCase {
let serverChannel = try assertNoThrowWithValue(ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).then {
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).flatMap {
channel.pipeline.add(handler: httpHandler)
}
}.bind(host: "127.0.0.1", port: 0).wait())
@ -464,7 +464,7 @@ class HTTPServerClientTest : XCTestCase {
let clientChannel = try assertNoThrowWithValue(ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().then {
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.add(handler: accumulation)
}
}
@ -512,7 +512,7 @@ class HTTPServerClientTest : XCTestCase {
// Set the handlers that are appled to the accepted Channels
.childChannelInitializer { channel in
// Ensure we don't read faster then we can write by adding the BackPressureHandler into the pipeline.
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).then {
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).flatMap {
channel.pipeline.add(handler: httpHandler)
}
}.bind(host: "127.0.0.1", port: 0).wait())
@ -552,7 +552,7 @@ class HTTPServerClientTest : XCTestCase {
let serverChannel = try assertNoThrowWithValue(ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).then {
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).flatMap {
channel.pipeline.add(handler: httpHandler)
}
}.bind(host: "127.0.0.1", port: 0).wait())
@ -562,7 +562,7 @@ class HTTPServerClientTest : XCTestCase {
let clientChannel = try assertNoThrowWithValue(ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().then {
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.add(handler: accumulation)
}
}
@ -597,7 +597,7 @@ class HTTPServerClientTest : XCTestCase {
let serverChannel = try assertNoThrowWithValue(ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).then {
channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: false).flatMap {
channel.pipeline.add(handler: httpHandler)
}
}.bind(host: "127.0.0.1", port: 0).wait())
@ -607,7 +607,7 @@ class HTTPServerClientTest : XCTestCase {
let clientChannel = try assertNoThrowWithValue(ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().then {
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.add(handler: accumulation)
}
}

View File

@ -113,7 +113,7 @@ class HTTPServerProtocolErrorHandlerTest: XCTestCase {
}
let channel = EmbeddedChannel()
XCTAssertNoThrow(try channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).then {
XCTAssertNoThrow(try channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
channel.pipeline.add(handler: DelayWriteHandler())
}.wait())

View File

@ -133,7 +133,7 @@ class HTTPTest: XCTestCase {
let bd1 = try sendAndCheckRequests(expecteds, body: body, trailers: trailers, sendStrategy: { (reqString, chan) in
var buf = chan.allocator.buffer(capacity: 1024)
buf.write(string: reqString)
return chan.eventLoop.makeSucceededFuture(result: ()).thenThrowing {
return chan.eventLoop.makeSucceededFuture(result: ()).flatMapThrowing {
try chan.writeInbound(buf)
}
})
@ -145,7 +145,7 @@ class HTTPTest: XCTestCase {
var buf = chan.allocator.buffer(capacity: 1024)
buf.write(string: "\(c)")
writeFutures.append(chan.eventLoop.makeSucceededFuture(result: ()).thenThrowing {
writeFutures.append(chan.eventLoop.makeSucceededFuture(result: ()).flatMapThrowing {
try chan.writeInbound(buf)
})
}

View File

@ -83,7 +83,7 @@ private func serverHTTPChannelWithAutoremoval(group: EventLoopGroup,
.childChannelInitializer { channel in
p.succeed(result: channel)
let upgradeConfig = (upgraders: upgraders, completionHandler: upgradeCompletionHandler)
return channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: pipelining, withServerUpgrade: upgradeConfig).then {
return channel.pipeline.configureHTTPServerPipeline(withPipeliningAssistance: pipelining, withServerUpgrade: upgradeConfig).flatMap {
let futureResults = extraHandlers.map { channel.pipeline.add(handler: $0) }
return EventLoopFuture<Void>.andAll(futureResults, eventLoop: channel.eventLoop)
}

View File

@ -254,16 +254,16 @@ public class AcceptBackoffHandlerTest: XCTestCase {
group: group))
XCTAssertNoThrow(try serverChannel.setOption(option: ChannelOptions.autoRead, value: false).wait())
XCTAssertNoThrow(try serverChannel.pipeline.add(handler: readCountHandler).then { _ in
XCTAssertNoThrow(try serverChannel.pipeline.add(handler: readCountHandler).flatMap { _ in
serverChannel.pipeline.add(name: self.acceptHandlerName, handler: AcceptBackoffHandler(backoffProvider: backoffProvider))
}.wait())
XCTAssertNoThrow(try eventLoop.submit {
// this is pretty delicate at the moment:
// `bind` must be _synchronously_ follow `register`, otherwise in our current implementation, `epoll` will
// send us `EPOLLHUP`. To have it run synchronously, we need to invoke the `then` on the eventloop that the
// send us `EPOLLHUP`. To have it run synchronously, we need to invoke the `flatMap` on the eventloop that the
// `register` will succeed.
serverChannel.register().then { () -> EventLoopFuture<()> in
serverChannel.register().flatMap { () -> EventLoopFuture<()> in
return serverChannel.bind(to: try! SocketAddress(ipAddress: "127.0.0.1", port: 0))
}
}.wait().wait() as Void)

View File

@ -388,7 +388,7 @@ class ChannelNotificationTest: XCTestCase {
var buffer = clientChannel.allocator.buffer(capacity: 2)
buffer.write(string: "X")
XCTAssertNoThrow(try clientChannel.writeAndFlush(buffer).then {
XCTAssertNoThrow(try clientChannel.writeAndFlush(buffer).flatMap {
clientChannel.close()
}.wait())
XCTAssertNoThrow(try promise.futureResult.wait())

View File

@ -184,7 +184,7 @@ class ChannelPipelineTest: XCTestCase {
let channel = EmbeddedChannel()
let h = FireChannelReadOnRemoveHandler()
_ = try channel.pipeline.add(handler: h).then {
_ = try channel.pipeline.add(handler: h).flatMap {
channel.pipeline.remove(handler: h)
}.wait()
@ -363,7 +363,7 @@ class ChannelPipelineTest: XCTestCase {
weakHandlerContext2 = ctx
}
weakHandler2 = handler2
XCTAssertNoThrow(try channel.pipeline.add(handler: handler1).then {
XCTAssertNoThrow(try channel.pipeline.add(handler: handler1).flatMap {
channel.pipeline.add(handler: handler2)
}.wait())
}()

View File

@ -1118,7 +1118,7 @@ public class ChannelTests: XCTestCase {
let verificationHandler = ShutdownVerificationHandler(shutdownEvent: .output, promise: group.next().makePromise())
let future = ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.add(handler: verificationHandler).then {
channel.pipeline.add(handler: verificationHandler).flatMap {
channel.pipeline.add(handler: byteCountingHandler)
}
}
@ -1181,7 +1181,7 @@ public class ChannelTests: XCTestCase {
let verificationHandler = ShutdownVerificationHandler(shutdownEvent: .input, promise: group.next().makePromise())
let future = ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.add(handler: VerifyNoReadHandler()).then {
channel.pipeline.add(handler: VerifyNoReadHandler()).flatMap {
channel.pipeline.add(handler: verificationHandler)
}
}
@ -1508,7 +1508,7 @@ public class ChannelTests: XCTestCase {
public func expectRead(loop: EventLoop) -> EventLoopFuture<Void> {
return loop.submit {
self.waitingForReadPromise = loop.makePromise()
}.then {
}.flatMap {
self.waitingForReadPromise!.futureResult
}
}
@ -1964,11 +1964,11 @@ public class ChannelTests: XCTestCase {
try serverWriteHappenedPromise.futureResult.wait()
try sc.pipeline.add(handler: ReadDoesNotHappen(hasRegisteredPromise: clientHasRegistered,
hasUnregisteredPromise: clientHasUnregistered,
hasReadPromise: clientHasRead)).then {
hasReadPromise: clientHasRead)).flatMap {
// this will succeed and should not cause the socket to be read even though there'll be something
// available to be read immediately
sc.register()
}.then {
}.flatMap {
// this would normally fail but our special Socket subclass will let it succeed.
sc.connect(to: try! SocketAddress(ipAddress: "127.0.0.1", port: 123))
}.wait()
@ -2129,12 +2129,12 @@ public class ChannelTests: XCTestCase {
XCTAssertNoThrow(try sc.eventLoop.submit {
// this is pretty delicate at the moment:
// `bind` must be _synchronously_ follow `register`, otherwise in our current implementation, `epoll` will
// send us `EPOLLHUP`. To have it run synchronously, we need to invoke the `then` on the eventloop that the
// send us `EPOLLHUP`. To have it run synchronously, we need to invoke the `flatMap` on the eventloop that the
// `register` will succeed.
sc.register().then {
sc.register().flatMap {
sc.pipeline.add(handler: VerifyThingsAreRightHandler(allDone: allDone))
}.then {
}.flatMap {
sc.connect(to: serverChannel.localAddress!)
}
}.wait().wait() as Void)
@ -2175,8 +2175,8 @@ public class ChannelTests: XCTestCase {
let allDone = group.next().makePromise(of: Void.self)
let cf = try! sc.eventLoop.submit {
sc.pipeline.add(handler: VerifyConnectionFailureHandler(allDone: allDone)).then {
sc.register().then {
sc.pipeline.add(handler: VerifyConnectionFailureHandler(allDone: allDone)).flatMap {
sc.register().flatMap {
sc.connect(to: serverChannel.localAddress!)
}
}
@ -2221,8 +2221,8 @@ public class ChannelTests: XCTestCase {
let allDone = group.next().makePromise(of: Void.self)
try! sc.eventLoop.submit {
let f = sc.pipeline.add(handler: VerifyConnectionFailureHandler(allDone: allDone)).then {
sc.register().then {
let f = sc.pipeline.add(handler: VerifyConnectionFailureHandler(allDone: allDone)).flatMap {
sc.register().flatMap {
sc.connect(to: serverChannel.localAddress!)
}
}
@ -2293,7 +2293,7 @@ public class ChannelTests: XCTestCase {
}
XCTAssertNoThrow(try sc.eventLoop.submit {
sc.register().then {
sc.register().flatMap {
sc.connect(to: serverChannel.localAddress!)
}
}.wait().wait() as Void)
@ -2304,7 +2304,7 @@ public class ChannelTests: XCTestCase {
// this callback must be attached before we call the close
let f = p.futureResult.map {
XCTFail("shouldn't be reached")
}.thenIfError { err in
}.flatMapError { err in
XCTAssertNotNil(err as? DummyError)
return sc.close()
}
@ -2519,7 +2519,7 @@ public class ChannelTests: XCTestCase {
XCTAssertTrue(serverChannel.isActive)
// we allow auto-read again to make sure that the socket buffer is drained on write error
// (cf. https://github.com/apple/swift-nio/issues/593)
ctx.channel.setOption(option: ChannelOptions.autoRead, value: true).then {
ctx.channel.setOption(option: ChannelOptions.autoRead, value: true).flatMap {
// let's trigger the write error
var buffer = ctx.channel.allocator.buffer(capacity: 16)
buffer.write(staticString: "THIS WILL FAIL ANYWAY")

View File

@ -390,7 +390,7 @@ public class ByteToMessageDecoderTest: XCTestCase {
buffer.write(staticString: "4567890x")
XCTAssertNoThrow(try channel.writeInbound(buffer))
channel.pipeline.context(handlerType: ByteToMessageHandler<PairOfBytesDecoder>.self).then { ctx in
channel.pipeline.context(handlerType: ByteToMessageHandler<PairOfBytesDecoder>.self).flatMap { ctx in
return channel.pipeline.remove(ctx: ctx)
}.map {
XCTAssertTrue($0)

View File

@ -18,7 +18,7 @@ import XCTest
private extension Channel {
func waitForDatagrams(count: Int) throws -> [AddressedEnvelope<ByteBuffer>] {
return try self.pipeline.context(name: "ByteReadRecorder").then { context in
return try self.pipeline.context(name: "ByteReadRecorder").flatMap { context in
if let future = (context.handler as? DatagramReadRecorder<ByteBuffer>)?.notifyForDatagrams(count) {
return future
}

View File

@ -75,7 +75,7 @@ class EchoServerClientTest : XCTestCase {
let promise = group.next().makePromise(of: ByteBuffer.self)
let clientChannel = try assertNoThrowWithValue(ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.add(handler: WriteOnConnectHandler(toWrite: "X")).then { v2 in
channel.pipeline.add(handler: WriteOnConnectHandler(toWrite: "X")).flatMap { v2 in
channel.pipeline.add(handler: ByteCountingHandler(numBytes: 10000, promise: promise))
}
}
@ -444,7 +444,7 @@ class EchoServerClientTest : XCTestCase {
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
// When we've received all the bytes we know the connection is up. Remove the handler.
_ = bytesReceivedPromise.futureResult.then { (_: ByteBuffer) in
_ = bytesReceivedPromise.futureResult.flatMap { (_: ByteBuffer) in
channel.pipeline.remove(handler: byteCountingHandler)
}
@ -498,7 +498,7 @@ class EchoServerClientTest : XCTestCase {
let promise = group.next().makePromise(of: ByteBuffer.self)
let clientChannel = try assertNoThrowWithValue(ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.add(handler: WriteOnConnectHandler(toWrite: stringToWrite)).then {
channel.pipeline.add(handler: WriteOnConnectHandler(toWrite: stringToWrite)).flatMap {
channel.pipeline.add(handler: ByteCountingHandler(numBytes: stringToWrite.utf8.count, promise: promise))
}
}
@ -627,13 +627,13 @@ class EchoServerClientTest : XCTestCase {
buffer.write(string: str)
// write it four times and then close the connect.
ctx.writeAndFlush(NIOAny(buffer)).then {
ctx.writeAndFlush(NIOAny(buffer)).flatMap {
ctx.writeAndFlush(NIOAny(buffer))
}.then {
}.flatMap {
ctx.writeAndFlush(NIOAny(buffer))
}.then {
}.flatMap {
ctx.writeAndFlush(NIOAny(buffer))
}.then {
}.flatMap {
ctx.close()
}.whenComplete { (_: Result<Void, Error>) in
self.dpGroup.leave()
@ -656,7 +656,7 @@ class EchoServerClientTest : XCTestCase {
//.channelOption(ChannelOptions.autoRead, value: false)
.channelOption(ChannelOptions.recvAllocator, value: FixedSizeRecvByteBufferAllocator(capacity: 2))
.channelInitializer { channel in
channel.pipeline.add(handler: WriteHandler()).then {
channel.pipeline.add(handler: WriteHandler()).flatMap {
channel.pipeline.add(handler: countingHandler)
}
}.connect(to: serverChannel.localAddress!).wait())
@ -801,7 +801,7 @@ class EchoServerClientTest : XCTestCase {
// but we're trying to connect to (depending on the system configuration and resolver) IPv4 and IPv6
let clientChannel = try assertNoThrowWithValue(ClientBootstrap(group: group)
.connect(host: "localhost", port: Int(serverChannel.localAddress!.port!))
.thenIfError {
.flatMapError {
promise.fail(error: $0)
return group.next().makeFailedFuture(error: $0)
}

View File

@ -51,8 +51,8 @@ extension EventLoopFutureTest {
("testReduceIntoWithMultipleEventLoops", testReduceIntoWithMultipleEventLoops),
("testThenThrowingWhichDoesNotThrow", testThenThrowingWhichDoesNotThrow),
("testThenThrowingWhichDoesThrow", testThenThrowingWhichDoesThrow),
("testThenIfErrorThrowingWhichDoesNotThrow", testThenIfErrorThrowingWhichDoesNotThrow),
("testThenIfErrorThrowingWhichDoesThrow", testThenIfErrorThrowingWhichDoesThrow),
("testflatMapErrorThrowingWhichDoesNotThrow", testflatMapErrorThrowingWhichDoesNotThrow),
("testflatMapErrorThrowingWhichDoesThrow", testflatMapErrorThrowingWhichDoesThrow),
("testOrderOfFutureCompletion", testOrderOfFutureCompletion),
("testEventLoopHoppingInThen", testEventLoopHoppingInThen),
("testEventLoopHoppingInThenWithFailures", testEventLoopHoppingInThenWithFailures),

View File

@ -478,7 +478,7 @@ class EventLoopFutureTest : XCTestCase {
let p = eventLoop.makePromise(of: String.self)
p.futureResult.map {
$0.count
}.thenThrowing {
}.flatMapThrowing {
1 + $0
}.whenSuccess {
ran = true
@ -497,7 +497,7 @@ class EventLoopFutureTest : XCTestCase {
let p = eventLoop.makePromise(of: String.self)
p.futureResult.map {
$0.count
}.thenThrowing { (x: Int) throws -> Int in
}.flatMapThrowing { (x: Int) throws -> Int in
XCTAssertEqual(5, x)
throw DummyError.dummyError
}.map { (x: Int) -> Int in
@ -511,7 +511,7 @@ class EventLoopFutureTest : XCTestCase {
XCTAssertTrue(ran)
}
func testThenIfErrorThrowingWhichDoesNotThrow() {
func testflatMapErrorThrowingWhichDoesNotThrow() {
enum DummyError: Error, Equatable {
case dummyError
}
@ -520,10 +520,10 @@ class EventLoopFutureTest : XCTestCase {
let p = eventLoop.makePromise(of: String.self)
p.futureResult.map {
$0.count
}.thenIfErrorThrowing {
}.flatMapErrorThrowing {
XCTAssertEqual(.some(DummyError.dummyError), $0 as? DummyError)
return 5
}.thenIfErrorThrowing { (_: Error) in
}.flatMapErrorThrowing { (_: Error) in
XCTFail("shouldn't have been called")
return 5
}.whenSuccess {
@ -534,7 +534,7 @@ class EventLoopFutureTest : XCTestCase {
XCTAssertTrue(ran)
}
func testThenIfErrorThrowingWhichDoesThrow() {
func testflatMapErrorThrowingWhichDoesThrow() {
enum DummyError: Error, Equatable {
case dummyError1
case dummyError2
@ -544,7 +544,7 @@ class EventLoopFutureTest : XCTestCase {
let p = eventLoop.makePromise(of: String.self)
p.futureResult.map {
$0.count
}.thenIfErrorThrowing { (x: Error) throws -> Int in
}.flatMapErrorThrowing { (x: Error) throws -> Int in
XCTAssertEqual(.some(DummyError.dummyError1), x as? DummyError)
throw DummyError.dummyError2
}.map { (x: Int) -> Int in
@ -583,7 +583,7 @@ class EventLoopFutureTest : XCTestCase {
var prev: EventLoopFuture<Int> = elg.next().makeSucceededFuture(result: 0)
(1..<20).forEach { (i: Int) in
let p = elg.next().makePromise(of: Int.self)
prev.then { (i2: Int) -> EventLoopFuture<Int> in
prev.flatMap { (i2: Int) -> EventLoopFuture<Int> in
XCTAssertEqual(i - 1, i2)
p.succeed(result: i)
return p.futureResult
@ -605,7 +605,7 @@ class EventLoopFutureTest : XCTestCase {
var prev: EventLoopFuture<Int> = elg.next().makeSucceededFuture(result: 0)
(1..<n).forEach { (i: Int) in
let p = elg.next().makePromise(of: Int.self)
prev.then { (i2: Int) -> EventLoopFuture<Int> in
prev.flatMap { (i2: Int) -> EventLoopFuture<Int> in
XCTAssertEqual(i - 1, i2)
if i == n/2 {
p.fail(error: DummyError.dummy)
@ -613,7 +613,7 @@ class EventLoopFutureTest : XCTestCase {
p.succeed(result: i)
}
return p.futureResult
}.thenIfError { error in
}.flatMapError { error in
p.fail(error: error)
return p.futureResult
}.whenSuccess { i2 in
@ -801,7 +801,7 @@ class EventLoopFutureTest : XCTestCase {
XCTAssertFalse(loop1 === loop2)
let failingPromise = loop2.makePromise(of: Void.self)
let failingFuture = failingPromise.futureResult.thenIfErrorThrowing { error in
let failingFuture = failingPromise.futureResult.flatMapErrorThrowing { error in
XCTAssertEqual(error as? EventLoopFutureTestError, EventLoopFutureTestError.example)
XCTAssertTrue(loop2.inEventLoop)
throw error

View File

@ -365,9 +365,9 @@ public class EventLoopTest : XCTestCase {
}
let channel = try SocketChannel(eventLoop: loop, protocolFamily: AF_INET)
try channel.eventLoop.submit {
channel.pipeline.add(handler: wedgeHandler).then {
channel.pipeline.add(handler: wedgeHandler).flatMap {
channel.register()
}.then {
}.flatMap {
// connecting here to stop epoll from throwing EPOLLHUP at us
channel.connect(to: serverChannel.localAddress!)
}.cascade(promise: connectPromise)
@ -517,7 +517,7 @@ public class EventLoopTest : XCTestCase {
protocolFamily: serverSocket.localAddress!.protocolFamily))
XCTAssertNoThrow(try channel.pipeline.add(handler: assertHandler).wait() as Void)
XCTAssertNoThrow(try channel.eventLoop.submit {
channel.register().then {
channel.register().flatMap {
channel.connect(to: serverSocket.localAddress!)
}
}.wait().wait() as Void)

View File

@ -153,14 +153,14 @@ class FileRegionTest : XCTestCase {
}
try content.write(toFile: filePath, atomically: false, encoding: .ascii)
do {
() = try clientChannel.writeAndFlush(NIOAny(fr1)).then {
() = try clientChannel.writeAndFlush(NIOAny(fr1)).flatMap {
let frFuture = clientChannel.write(NIOAny(fr2))
var buffer = clientChannel.allocator.buffer(capacity: bytes.count)
buffer.write(bytes: bytes)
let bbFuture = clientChannel.write(NIOAny(buffer))
clientChannel.close(promise: nil)
clientChannel.flush()
return frFuture.then { bbFuture }
return frFuture.flatMap { bbFuture }
}.wait()
XCTFail("no error happened even though we closed before flush")
} catch let e as ChannelError {

View File

@ -112,7 +112,7 @@ private extension Channel {
func state() -> ConnectRecorder.State {
return try! self.pipeline.context(name: CONNECT_RECORDER).map {
($0.handler as! ConnectRecorder).state
}.thenIfErrorThrowing {
}.flatMapErrorThrowing {
switch $0 {
case ChannelPipelineError.notFound:
return .closed
@ -236,7 +236,7 @@ private func buildEyeballer(host: String,
public class HappyEyeballsTest : XCTestCase {
func testIPv4OnlyResolution() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().flatMapThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -260,7 +260,7 @@ public class HappyEyeballsTest : XCTestCase {
func testIPv6OnlyResolution() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().flatMapThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -316,7 +316,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAAAAQueryReturningFirst() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().flatMapThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -348,7 +348,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAQueryReturningFirstDelayElapses() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().flatMapThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -390,7 +390,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAQueryReturningFirstThenAAAAReturns() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().flatMapThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -424,7 +424,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAQueryReturningFirstThenAAAAErrors() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().flatMapThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -458,7 +458,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAQueryReturningFirstThenEmptyAAAA() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().flatMapThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target

View File

@ -77,7 +77,7 @@ class IdleStateHandlerTest : XCTestCase {
let serverChannel = try assertNoThrowWithValue(ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
channel.pipeline.add(handler: handler).then { f in
channel.pipeline.add(handler: handler).flatMap { f in
channel.pipeline.add(handler: TestWriteHandler(writeToChannel, assertEventFn))
}
}.bind(host: "127.0.0.1", port: 0).wait())

View File

@ -70,7 +70,7 @@ final class MulticastTest: XCTestCase {
return DatagramBootstrap(group: self.group)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.bind(host: host, port: port)
.then { channel in
.flatMap { channel in
let channel = channel as! MulticastChannel
do {
@ -79,7 +79,7 @@ final class MulticastTest: XCTestCase {
} catch {
return channel.eventLoop.makeFailedFuture(error: error)
}
}.then { (channel: MulticastChannel) -> EventLoopFuture<MulticastChannel> in
}.flatMap { (channel: MulticastChannel) -> EventLoopFuture<MulticastChannel> in
let provider = channel as! SocketOptionProvider
switch channel.localAddress! {

View File

@ -155,9 +155,9 @@ public class SocketChannelTest : XCTestCase {
let promise = serverChannel.eventLoop.makePromise(of: IOError.self)
XCTAssertNoThrow(try serverChannel.eventLoop.submit {
serverChannel.pipeline.add(handler: AcceptHandler(promise)).then {
serverChannel.pipeline.add(handler: AcceptHandler(promise)).flatMap {
serverChannel.register()
}.then {
}.flatMap {
serverChannel.bind(to: try! SocketAddress(ipAddress: "127.0.0.1", port: 0))
}
}.wait().wait() as Void)
@ -237,11 +237,11 @@ public class SocketChannelTest : XCTestCase {
eventLoop: eventLoop as! SelectableEventLoop))
let promise = channel.eventLoop.makePromise(of: Void.self)
XCTAssertNoThrow(try channel.pipeline.add(handler: ActiveVerificationHandler(promise)).then {
XCTAssertNoThrow(try channel.pipeline.add(handler: ActiveVerificationHandler(promise)).flatMap {
channel.register()
}.then {
}.flatMap {
channel.connect(to: try! SocketAddress(ipAddress: "127.0.0.1", port: 9999))
}.then {
}.flatMap {
channel.close()
}.wait())
@ -320,7 +320,7 @@ public class SocketChannelTest : XCTestCase {
buffer.write(staticString: "hello")
let writeFut = clientChannel.write(buffer).map {
XCTFail("Must not succeed")
}.thenIfError { error in
}.flatMapError { error in
XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
return clientChannel.close()
}

View File

@ -68,14 +68,14 @@ final class SocketOptionProviderTest: XCTestCase {
let v4LoopbackInterface = try! System.enumerateInterfaces().filter { $0.address == v4LoopbackAddress }.first!
self.ipv4DatagramChannel = try? assertNoThrowWithValue(
DatagramBootstrap(group: group).bind(host: "127.0.0.1", port: 0).then { channel in
DatagramBootstrap(group: group).bind(host: "127.0.0.1", port: 0).flatMap { channel in
return (channel as! MulticastChannel).joinGroup(try! SocketAddress(ipAddress: "224.0.2.66", port: 0), interface: v4LoopbackInterface).map { channel }
}.wait()
)
// The IPv6 setup is allowed to fail, some hosts don't have IPv6.
let v6LoopbackInterface = try! System.enumerateInterfaces().filter { $0.address == v6LoopbackAddress }.first
self.ipv6DatagramChannel = try? DatagramBootstrap(group: group).bind(host: "::1", port: 0).then { channel in
self.ipv6DatagramChannel = try? DatagramBootstrap(group: group).bind(host: "::1", port: 0).flatMap { channel in
return (channel as! MulticastChannel).joinGroup(try! SocketAddress(ipAddress: "ff12::beeb", port: 0), interface: v6LoopbackInterface).map { channel }
}.wait()
}
@ -92,7 +92,7 @@ final class SocketOptionProviderTest: XCTestCase {
let provider = try assertNoThrowWithValue(self.convertedChannel())
let newTimeout = timeval(tv_sec: 5, tv_usec: 0)
let retrievedTimeout = try assertNoThrowWithValue(provider.unsafeSetSocketOption(level: SocketOptionLevel(SOL_SOCKET), name: SO_RCVTIMEO, value: newTimeout).then {
let retrievedTimeout = try assertNoThrowWithValue(provider.unsafeSetSocketOption(level: SocketOptionLevel(SOL_SOCKET), name: SO_RCVTIMEO, value: newTimeout).flatMap {
provider.unsafeGetSocketOption(level: SocketOptionLevel(SOL_SOCKET), name: SO_RCVTIMEO) as EventLoopFuture<timeval>
}.wait())
@ -112,7 +112,7 @@ final class SocketOptionProviderTest: XCTestCase {
let provider = try assertNoThrowWithValue(self.convertedChannel())
let newReuseAddr = 1 as CInt
let retrievedReuseAddr = try assertNoThrowWithValue(provider.unsafeSetSocketOption(level: SocketOptionLevel(SOL_SOCKET), name: SO_REUSEADDR, value: newReuseAddr).then {
let retrievedReuseAddr = try assertNoThrowWithValue(provider.unsafeSetSocketOption(level: SocketOptionLevel(SOL_SOCKET), name: SO_REUSEADDR, value: newReuseAddr).flatMap {
provider.unsafeGetSocketOption(level: SocketOptionLevel(SOL_SOCKET), name: SO_REUSEADDR) as EventLoopFuture<CInt>
}.wait())
@ -150,7 +150,7 @@ final class SocketOptionProviderTest: XCTestCase {
let newLingerValue = linger(l_onoff: 1, l_linger: 64)
let provider = try self.convertedChannel()
XCTAssertNoThrow(try provider.setSoLinger(newLingerValue).then {
XCTAssertNoThrow(try provider.setSoLinger(newLingerValue).flatMap {
provider.getSoLinger()
}.map {
XCTAssertEqual($0.l_linger, newLingerValue.l_linger)
@ -171,7 +171,7 @@ final class SocketOptionProviderTest: XCTestCase {
return
}
XCTAssertNoThrow(try provider.setIPMulticastIF(address).then {
XCTAssertNoThrow(try provider.setIPMulticastIF(address).flatMap {
provider.getIPMulticastIF()
}.map {
XCTAssertEqual($0.s_addr, address.s_addr)
@ -180,7 +180,7 @@ final class SocketOptionProviderTest: XCTestCase {
func testIpMulticastTtl() throws {
let provider = try assertNoThrowWithValue(self.ipv4MulticastProvider())
XCTAssertNoThrow(try provider.setIPMulticastTTL(6).then {
XCTAssertNoThrow(try provider.setIPMulticastTTL(6).flatMap {
provider.getIPMulticastTTL()
}.map {
XCTAssertEqual($0, 6)
@ -189,7 +189,7 @@ final class SocketOptionProviderTest: XCTestCase {
func testIpMulticastLoop() throws {
let provider = try assertNoThrowWithValue(self.ipv4MulticastProvider())
XCTAssertNoThrow(try provider.setIPMulticastLoop(1).then {
XCTAssertNoThrow(try provider.setIPMulticastLoop(1).flatMap {
provider.getIPMulticastLoop()
}.map {
XCTAssertNotEqual($0, 0)
@ -209,7 +209,7 @@ final class SocketOptionProviderTest: XCTestCase {
return
}
XCTAssertNoThrow(try provider.setIPv6MulticastIF(CUnsignedInt(loopbackInterface.interfaceIndex)).then {
XCTAssertNoThrow(try provider.setIPv6MulticastIF(CUnsignedInt(loopbackInterface.interfaceIndex)).flatMap {
provider.getIPv6MulticastIF()
}.map {
XCTAssertEqual($0, CUnsignedInt(loopbackInterface.interfaceIndex))
@ -222,7 +222,7 @@ final class SocketOptionProviderTest: XCTestCase {
return
}
XCTAssertNoThrow(try provider.setIPv6MulticastHops(6).then {
XCTAssertNoThrow(try provider.setIPv6MulticastHops(6).flatMap {
provider.getIPv6MulticastHops()
}.map {
XCTAssertEqual($0, 6)
@ -235,7 +235,7 @@ final class SocketOptionProviderTest: XCTestCase {
return
}
XCTAssertNoThrow(try provider.setIPv6MulticastLoop(1).then {
XCTAssertNoThrow(try provider.setIPv6MulticastLoop(1).flatMap {
provider.getIPv6MulticastLoop()
}.map {
XCTAssertNotEqual($0, 0)

View File

@ -103,9 +103,9 @@ public class WebSocketFrameDecoderTest: XCTestCase {
private func swapDecoder(for handler: ChannelHandler) {
// We need to insert a decoder that doesn't do error handling. We still insert
// an encoder because we want to fail gracefully if a frame is written.
XCTAssertNoThrow(try self.decoderChannel.pipeline.context(handlerType: ByteToMessageHandler<WebSocketFrameDecoder>.self).then {
XCTAssertNoThrow(try self.decoderChannel.pipeline.context(handlerType: ByteToMessageHandler<WebSocketFrameDecoder>.self).flatMap {
self.decoderChannel.pipeline.remove(handler: $0.handler)
}.then { (_: Bool) in
}.flatMap { (_: Bool) in
self.decoderChannel.pipeline.add(handler: handler)
}.wait())
}

View File

@ -29,3 +29,7 @@
returns `Int` and has had its return value made discardable.
- removed ContiguousCollection
- EventLoopFuture.whenComplete now provides Result<T, Error>
- renamed `EventLoopFuture.then` to `EventLoopFuture.flatMap`
- renamed `EventLoopFuture.thenIfError` to `EventLoopFuture.flatMapError`
- renamed `EventLoopFuture.thenThrowing` to `EventLoopFuture.flatMapThrowing`
- renamed `EventLoopFuture`'s generic parameter from `T` to `Value`