swift-nio/Sources/NIOPerformanceTester/UDPBenchmark.swift

276 lines
9.6 KiB
Swift

//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
import NIOPosix
final class UDPBenchmark {
/// Request to send.
private let data: ByteBuffer
/// Number of requests to send in each run.
private let numberOfRequests: Int
/// Setting for `.datagramVectorReadMessageCount`
private let vectorReads: Int
/// Number of writes before each flush (for the client; the server flushes at the end
/// of each read cycle).
private let vectorWrites: Int
private var group: EventLoopGroup!
private var server: Channel!
private var client: Channel!
private var clientHandler: EchoHandlerClient!
init(data: ByteBuffer, numberOfRequests: Int, vectorReads: Int, vectorWrites: Int) {
self.data = data
self.numberOfRequests = numberOfRequests
self.vectorReads = vectorReads
self.vectorWrites = vectorWrites
}
}
extension UDPBenchmark: Benchmark {
func setUp() throws {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let address = try SocketAddress.makeAddressResolvingHost("127.0.0.1", port: 0)
self.server = try DatagramBootstrap(group: group)
// zero is the same as not applying the option.
.channelOption(ChannelOptions.datagramVectorReadMessageCount, value: self.vectorReads)
.channelInitializer { channel in
return channel.pipeline.addHandler(EchoHandler())
}
.bind(to: address)
.wait()
let remoteAddress = self.server.localAddress!
self.client = try DatagramBootstrap(group: group)
// zero is the same as not applying the option.
.channelOption(ChannelOptions.datagramVectorReadMessageCount, value: self.vectorReads)
.channelInitializer { channel in
let handler = EchoHandlerClient(eventLoop: channel.eventLoop,
config: .init(remoteAddress: remoteAddress,
request: self.data,
requests: self.numberOfRequests,
writesPerFlush: self.vectorWrites))
return channel.pipeline.addHandler(handler)
}
.bind(to: address)
.wait()
self.clientHandler = try self.client.pipeline.handler(type: EchoHandlerClient.self).wait()
}
func tearDown() {
try! self.client.close().wait()
try! self.server.close().wait()
}
func run() throws -> Int {
try self.clientHandler.run().wait()
return self.vectorReads &+ self.vectorWrites
}
}
extension UDPBenchmark {
final class EchoHandler: ChannelInboundHandler {
typealias InboundIn = AddressedEnvelope<ByteBuffer>
typealias OutboundOut = AddressedEnvelope<ByteBuffer>
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
// echo back the message; skip the unwrap/rewrap.
context.write(data, promise: nil)
}
func channelReadComplete(context: ChannelHandlerContext) {
context.flush()
}
func errorCaught(context: ChannelHandlerContext, error: Error) {
fatalError("EchoHandler received errorCaught")
}
}
final class EchoHandlerClient: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = AddressedEnvelope<ByteBuffer>
typealias OutboundOut = AddressedEnvelope<ByteBuffer>
private let eventLoop: EventLoop
private let config: Config
private var state = State()
private var context: ChannelHandlerContext?
private struct State {
private enum _State {
case stopped
case running(Running)
struct Running {
/// Number of requests still to send.
var requestsToSend: Int
/// Number of responses still being waited for.
var responsesToRecieve: Int
/// Number of writes before the next flush, i.e. flush on zero.
var writesBeforeNextFlush: Int
/// Number of writes before each flush.
let writesPerFlush: Int
/// Completed once the `requestsToSend` and outstanding have dropped to zero.
let promise: EventLoopPromise<Void>
init(requests: Int, writesPerFlush: Int, promise: EventLoopPromise<Void>) {
self.requestsToSend = requests
self.responsesToRecieve = requests
self.writesBeforeNextFlush = writesPerFlush
self.writesPerFlush = writesPerFlush
self.promise = promise
}
}
}
private var state: _State
init() {
self.state = .stopped
}
mutating func run(requests: Int, writesPerFlush: Int, promise: EventLoopPromise<Void>) {
switch self.state {
case .stopped:
let running = _State.Running(requests: requests, writesPerFlush: writesPerFlush, promise: promise)
self.state = .running(running)
case .running:
fatalError("Invalid state")
}
}
enum Receive {
case write
case finished(EventLoopPromise<Void>)
}
mutating func receive() -> Receive {
switch self.state {
case .running(var running):
running.responsesToRecieve &-= 1
if running.responsesToRecieve == 0, running.requestsToSend == 0 {
self.state = .stopped
return .finished(running.promise)
} else {
self.state = .running(running)
return .write
}
case .stopped:
fatalError("Received too many messages")
}
}
enum Write {
case write(flush: Bool)
case doNothing
}
mutating func write() -> Write {
switch self.state {
case .stopped:
return .doNothing
case .running(var running):
guard running.requestsToSend > 0 else {
return .doNothing
}
running.requestsToSend &-= 1
running.writesBeforeNextFlush &-= 1
let flush: Bool
if running.writesBeforeNextFlush == 0 {
running.writesBeforeNextFlush = running.writesPerFlush
flush = true
} else {
flush = false
}
self.state = .running(running)
return .write(flush: flush)
}
}
}
init(eventLoop: EventLoop, config: Config) {
self.eventLoop = eventLoop
self.config = config
}
struct Config {
var remoteAddress: SocketAddress
var request: ByteBuffer
var requests: Int
var writesPerFlush: Int
}
func handlerAdded(context: ChannelHandlerContext) {
self.context = context
}
func handlerRemoved(context: ChannelHandlerContext) {
self.context = nil
}
func run() -> EventLoopFuture<Void> {
let p = self.eventLoop.makePromise(of: Void.self)
self.eventLoop.execute {
self._run(promise: p)
}
return p.futureResult
}
private func _run(promise: EventLoopPromise<Void>) {
self.state.run(requests: self.config.requests, writesPerFlush: self.config.writesPerFlush, promise: promise)
let context = self.context!
for _ in 0 ..< self.config.writesPerFlush {
self.maybeSend(context: context)
}
}
private func maybeSend(context: ChannelHandlerContext) {
switch self.state.write() {
case .doNothing:
()
case let .write(flush):
let envolope = AddressedEnvelope<ByteBuffer>(remoteAddress: self.config.remoteAddress, data: self.config.request)
context.write(self.wrapOutboundOut(envolope), promise: nil)
if flush {
context.flush()
}
}
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
switch self.state.receive() {
case .write:
self.maybeSend(context: context)
case .finished(let promise):
promise.succeed()
}
}
func errorCaught(context: ChannelHandlerContext, error: Error) {
fatalError("EchoHandlerClient received errorCaught")
}
}
}