Make WriteBufferWaterMark be a struct and validate low and high watermarks. (#21)
Motivation: We used Range for WriteBufferWaterMark which did not do any validation of the low and high marks. Modifications: Make WriteBufferWaterMark a struct and validate low and high values Result: More correct impl for watermarks.
This commit is contained in:
parent
49270cc761
commit
c95051db78
|
@ -124,15 +124,40 @@ public enum BacklogOption: ChannelOption {
|
|||
case const(())
|
||||
}
|
||||
|
||||
/// The watermark used to detect once `Channel.writable` returns `true` or `false`.
|
||||
public typealias WriteBufferWaterMark = Range<Int>
|
||||
/// The watermark used to detect when `Channel.isWritable` returns `true` or `false`.
|
||||
public struct WriteBufferWaterMark {
|
||||
/// The low mark setting for a `Channel`.
|
||||
///
|
||||
/// When the amount of buffered bytes in the `Channel`s outbound buffer drops below this value the `Channel` will be
|
||||
/// marked as writable again (after it was non-writable).
|
||||
public let low: Int
|
||||
|
||||
/// The high mark setting for a `Channel`.
|
||||
///
|
||||
/// When the amount of buffered bytes in the `Channel`s outbound exceeds this value the `Channel` will be
|
||||
/// marked as non-writable. It will be marked as writable again once the amount of buffered bytes drops below `low`.
|
||||
public let high: Int
|
||||
|
||||
/// Create a new instance.
|
||||
///
|
||||
/// Valid initialization is restricted to `1 <= low <= high`.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - low: The low watermark.
|
||||
/// - high: The high watermark.
|
||||
public init(low: Int, high: Int) {
|
||||
precondition(low >= 1, "low must be >= 1 but was \(low)")
|
||||
precondition(high >= low, "low must be <= high, but was low: \(low) high: \(high)")
|
||||
self.low = low
|
||||
self.high = high
|
||||
}
|
||||
}
|
||||
|
||||
/// `WriteBufferWaterMarkOption` allows to configure when a `Channel` should be marked as writable or not. Once the amount of bytes queued in a
|
||||
/// `Channel`s outbound buffer is larger then the upper value of the `WriteBufferWaterMark` the channel will be marked as non-writable and so
|
||||
/// `Channel`s outbound buffer is larger than `WriteBufferWaterMark.high` the channel will be marked as non-writable and so
|
||||
/// `Channel.isWritable` will return `false`. Once we were able to write some data out of the outbound buffer and the amount of bytes queued
|
||||
/// falls under the lower value of `WriteBufferWaterMark` the `Channel` will become writable again. Once this happens `Channel.writable` will return
|
||||
/// `true` again. These writability changes are also propagated through the `ChannelPipeline` via the `ChannelPipeline.fireChannelWritabilityChanged`
|
||||
/// method and so its possible to act on this in a `ChannelInboundHandler` implementation.
|
||||
/// falls below `WriteBufferWaterMark.low` the `Channel` will become writable again. Once this happens `Channel.writable` will return
|
||||
/// `true` again. These writability changes are also propagated through the `ChannelPipeline` and so can be intercepted via `ChannelInboundHandler.channelWritabilityChanged`.
|
||||
public enum WriteBufferWaterMarkOption: ChannelOption {
|
||||
public typealias AssociatedValueType = ()
|
||||
public typealias OptionType = WriteBufferWaterMark
|
||||
|
|
|
@ -422,7 +422,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {
|
|||
|
||||
private var state = PendingDatagramWritesState()
|
||||
|
||||
internal var waterMark: WriteBufferWaterMark = WriteBufferWaterMark(32 * 1024..<64 * 1024)
|
||||
internal var waterMark: WriteBufferWaterMark = WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
|
||||
internal let channelWritabilityFlag: Atomic<Bool> = Atomic(value: true)
|
||||
internal var writeSpinCount: UInt = 16
|
||||
private(set) var isOpen = true
|
||||
|
@ -475,7 +475,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {
|
|||
assert(self.isOpen)
|
||||
self.state.append(.init(data: envelope.data, promise: promise, address: envelope.remoteAddress))
|
||||
|
||||
if self.state.bytes > waterMark.upperBound && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
|
||||
if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
|
||||
// Returns false to signal the Channel became non-writable and we need to notify the user
|
||||
return false
|
||||
}
|
||||
|
@ -529,7 +529,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {
|
|||
private func didWrite(_ data: IOResult<Int>, messages: UnsafeMutableBufferPointer<MMsgHdr>?) -> OneWriteOperationResult {
|
||||
let (promises, result) = self.state.didWrite(data, messages: messages)
|
||||
|
||||
if self.state.bytes < waterMark.lowerBound {
|
||||
if self.state.bytes < waterMark.low {
|
||||
channelWritabilityFlag.store(true)
|
||||
}
|
||||
|
||||
|
|
|
@ -283,7 +283,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
|
|||
private var iovecs: UnsafeMutableBufferPointer<IOVector>
|
||||
private var storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>
|
||||
|
||||
internal var waterMark: WriteBufferWaterMark = WriteBufferWaterMark(32 * 1024..<64 * 1024)
|
||||
internal var waterMark: WriteBufferWaterMark = WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
|
||||
internal let channelWritabilityFlag: Atomic<Bool> = Atomic(value: true)
|
||||
|
||||
internal var writeSpinCount: UInt = 16
|
||||
|
@ -318,7 +318,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
|
|||
assert(self.isOpen)
|
||||
self.state.append(.init(data: data, promise: promise))
|
||||
|
||||
if self.state.bytes > waterMark.upperBound && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
|
||||
if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
|
||||
// Returns false to signal the Channel became non-writable and we need to notify the user
|
||||
return false
|
||||
}
|
||||
|
@ -365,7 +365,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
|
|||
private func didWrite(itemCount: Int, result: IOResult<Int>) -> OneWriteOperationResult {
|
||||
let (fulfillPromises, result) = self.state.didWrite(itemCount: itemCount, result: result)
|
||||
|
||||
if self.state.bytes < waterMark.lowerBound {
|
||||
if self.state.bytes < waterMark.low {
|
||||
channelWritabilityFlag.store(true)
|
||||
}
|
||||
|
||||
|
|
|
@ -139,7 +139,7 @@ final class DatagramChannelTests: XCTestCase {
|
|||
}
|
||||
|
||||
func testDatagramChannelHasWatermark() throws {
|
||||
_ = try self.firstChannel.setOption(option: ChannelOptions.writeBufferWaterMark, value: 1..<1024).wait()
|
||||
_ = try self.firstChannel.setOption(option: ChannelOptions.writeBufferWaterMark, value: WriteBufferWaterMark(low: 1, high: 1024)).wait()
|
||||
|
||||
var buffer = self.firstChannel.allocator.buffer(capacity: 256)
|
||||
buffer.write(bytes: [UInt8](repeating: 5, count: 256))
|
||||
|
|
Loading…
Reference in New Issue