405 lines
14 KiB
Swift
405 lines
14 KiB
Swift
//
|
|
// SoakTestWebSocket.swift
|
|
// Ably-iOS-SoakTest
|
|
//
|
|
// Copyright © 2019 Ably. All rights reserved.
|
|
//
|
|
|
|
import Foundation
|
|
import Ably.Private
|
|
|
|
class SoakTestWebSocket: NSObject, ARTWebSocket {
|
|
var readyState: ARTSRReadyState
|
|
var queue: DispatchQueue!
|
|
var delegate: ARTWebSocketDelegate?
|
|
|
|
let id = nextGlobalSerial()
|
|
let nextConnectionSerial: () -> Int64
|
|
|
|
required init(urlRequest request: URLRequest) {
|
|
readyState = .CLOSED
|
|
// TODO (maybe?): Extract connectionKey from params, resume conn state if
|
|
// connectionStateTtl hasn't passed yet.
|
|
nextConnectionSerial = serialSequence(label: "fakeConnection.\(id)", first: -1)
|
|
}
|
|
|
|
func setDelegateDispatchQueue(_ queue: DispatchQueue) {
|
|
self.queue = queue
|
|
}
|
|
|
|
func open() {
|
|
readyState = .CONNECTING
|
|
queue.afterSeconds(between: 0.1 ... ARTDefault.realtimeRequestTimeout() + 1.0) {
|
|
if true.times(9, outOf: 10) {
|
|
self.readyState = .OPEN
|
|
self.delegate?.webSocketDidOpen(self)
|
|
|
|
self.doIfStillOpen(afterSecondsBetween: 0.1 ... 3.0) {
|
|
if true.times(9, outOf: 10) {
|
|
self.messageToClient(action: .connected) { m in
|
|
m.connectionId = "fakeConnection.\(self.id)"
|
|
m.connectionDetails = ARTConnectionDetails(
|
|
clientId: "*",
|
|
connectionKey: "fakeConnectionKey.\(self.id))",
|
|
maxMessageSize: 999999999,
|
|
maxFrameSize: 999999999,
|
|
maxInboundRate: 999999999,
|
|
connectionStateTtl: 2.0,
|
|
serverId: "fakeServer",
|
|
maxIdleInterval: (0.0 ... 60.0).randomWithin()
|
|
)
|
|
}
|
|
self.ackMessages()
|
|
self.sendHeartbeats()
|
|
} else {
|
|
self.messageToClient(action: .error) { m in
|
|
m.error = ARTErrorInfo.create(from: fakeError)
|
|
}
|
|
}
|
|
}
|
|
|
|
self.doIfStillOpen(afterSecondsBetween: 3.0 ... 300.0) {
|
|
let (code, clean) : (ARTSRStatusCode, Bool) = [
|
|
(.codeAbnormal, false),
|
|
(.codeNormal, true),
|
|
(.codeGoingAway, true),
|
|
(.codePolicyViolated, true),
|
|
(.codeMessageTooBig, true),
|
|
(.codeInternalError, true),
|
|
].randomElement(using: &seededRandomNumberGenerator)!
|
|
self.delegate?.webSocket(self, didCloseWithCode: code.rawValue, reason: "fake close", wasClean: clean)
|
|
}
|
|
} else {
|
|
self.delegate?.webSocket(self, didFailWithError: fakeError)
|
|
}
|
|
}
|
|
}
|
|
|
|
func sendHeartbeats() {
|
|
doIfStillOpen(afterSecondsBetween: (2.0 ... 30.0)) {
|
|
self.messageToClient(action: .heartbeat)
|
|
self.sendHeartbeats()
|
|
}
|
|
}
|
|
|
|
func protocolMessage(action: ARTProtocolMessageAction, setUp: (ARTProtocolMessage) -> Void = { _ in }) -> ARTProtocolMessage {
|
|
return ARTProtocolMessage.build(action: action) { m in
|
|
m.connectionSerial = self.nextConnectionSerial()
|
|
m.timestamp = Date()
|
|
setUp(m)
|
|
}
|
|
}
|
|
|
|
func messageToClient(action: ARTProtocolMessageAction, setUp: (ARTProtocolMessage) -> Void = { _ in }) {
|
|
let message = protocolMessage(action: action, setUp: setUp)
|
|
self.delegate?.webSocket(self, didReceiveMessage: message)
|
|
}
|
|
|
|
func close(withCode code: Int, reason: String?) {
|
|
readyState = .CLOSING
|
|
queue.afterSeconds(between: 0.1 ... 3.0) {
|
|
if self.readyState != .CLOSING {
|
|
return
|
|
}
|
|
self.readyState = .CLOSED
|
|
}
|
|
}
|
|
|
|
var pendingSerials: [NSNumber] = []
|
|
|
|
func ackMessages() {
|
|
doIfStillOpen(afterSecondsBetween: 0.1 ... 3.0) {
|
|
self.ackMessages()
|
|
|
|
if self.pendingSerials.count == 0 {
|
|
return
|
|
}
|
|
|
|
if true.times(1, outOf: 5) {
|
|
self.messageToClient(action: .ack) { m in
|
|
m.msgSerial = self.pendingSerials.first!
|
|
m.count = Int32(self.pendingSerials.count)
|
|
}
|
|
} else {
|
|
self.messageToClient(action: .nack) { m in
|
|
m.msgSerial = self.pendingSerials.first!
|
|
m.count = Int32(self.pendingSerials.count)
|
|
m.error = ARTErrorInfo.create(from: fakeError)
|
|
}
|
|
}
|
|
self.pendingSerials.removeAll()
|
|
}
|
|
}
|
|
|
|
func send(_ msgPackEncodedMessage: Any?) {
|
|
let message = try! msgPackEncoder.decodeProtocolMessage(msgPackEncodedMessage as! Data)
|
|
|
|
switch message.action {
|
|
case .close:
|
|
doIfStillOpen(afterSecondsBetween: 0.1 ... 3.0) {
|
|
self.messageToClient(action: .closed)
|
|
self.doIfStillOpen(afterSecondsBetween: 0.1 ... 0.5) {
|
|
self.delegate?.webSocket(self, didCloseWithCode: 1000, reason: nil, wasClean: true)
|
|
}
|
|
}
|
|
case .message:
|
|
queue.async {
|
|
self.pendingSerials.append(message.msgSerial!)
|
|
}
|
|
case .attach:
|
|
doIfStillOpen(afterSecondsBetween: 0.1 ... 3.0) {
|
|
let serial: Int64
|
|
if let s = self.serialForAttachedChannel[message.channel!] {
|
|
serial = s
|
|
} else {
|
|
if true.times(1, outOf: 10) {
|
|
self.messageToClient(action: .error) { m in
|
|
m.channel = message.channel
|
|
m.error = ARTErrorInfo.create(from: fakeError)
|
|
}
|
|
return
|
|
}
|
|
|
|
serial = -1
|
|
self.serialForAttachedChannel[message.channel!] = -1
|
|
}
|
|
|
|
let hasPresence = true.times(4, outOf: 5)
|
|
|
|
self.messageToClient(action: .attached) { m in
|
|
m.channel = message.channel
|
|
m.channelSerial = "somethingsomething:\(serial)"
|
|
if hasPresence {
|
|
m.flags = m.flags | Int64(ARTProtocolMessageFlag.hasPresence.rawValue)
|
|
}
|
|
}
|
|
|
|
if hasPresence {
|
|
self.startPresenceSync(channel: message.channel!)
|
|
}
|
|
|
|
self.sendMessages(channel: message.channel!)
|
|
self.sendPresenceMessages(channel: message.channel!)
|
|
}
|
|
case .detach:
|
|
doIfStillOpen(afterSecondsBetween: 0.1 ... 3.0) {
|
|
self.messageToClient(action: .detached) { m in
|
|
m.channel = message.channel
|
|
if true.times(1, outOf: 5) {
|
|
m.error = ARTErrorInfo.create(from: fakeError)
|
|
}
|
|
}
|
|
|
|
self.serialForAttachedChannel.removeValue(forKey: message.channel!)
|
|
}
|
|
case .auth:
|
|
doIfStillOpen(afterSecondsBetween: 0.1 ... 3.0) {
|
|
if true.times(1, outOf: 10) {
|
|
self.messageToClient(action: .error) { m in
|
|
m.error = authError()
|
|
}
|
|
} else {
|
|
self.messageToClient(action: .connected)
|
|
}
|
|
}
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
var serialForAttachedChannel: [String: Int64] = [:]
|
|
|
|
func nextMessageToClient(forChannel channel: String, action: ARTProtocolMessageAction, setUp: (ARTProtocolMessage) -> Void = { _ in }) {
|
|
guard var channelSerial = self.serialForAttachedChannel[channel] else {
|
|
return
|
|
}
|
|
channelSerial += 1
|
|
self.serialForAttachedChannel[channel] = channelSerial
|
|
|
|
messageToClient(action: action) { m in
|
|
m.channel = channel
|
|
m.channelSerial = "somethingsomething:\(channelSerial)"
|
|
setUp(m)
|
|
}
|
|
}
|
|
|
|
func sendMessages(channel: String) {
|
|
doIfStillOpen(afterSecondsBetween: 0.1 ... 2.0) {
|
|
self.nextMessageToClient(forChannel: channel, action: .message) { m in
|
|
m.id = "message.\(nextGlobalSerial())"
|
|
m.messages = [ARTMessage(
|
|
name: "fakeMessage",
|
|
data: randomMessageData()
|
|
)]
|
|
}
|
|
|
|
self.sendMessages(channel: channel)
|
|
}
|
|
}
|
|
|
|
func sendPresenceMessages(channel: String) {
|
|
doIfStillOpen(afterSecondsBetween: 0.1 ... 2.0) {
|
|
let presence = ARTPresenceMessage()
|
|
presence.clientId = "someone.\(nextGlobalSerial())"
|
|
presence.data = randomMessageData()
|
|
presence.action = .enter
|
|
|
|
self.nextMessageToClient(forChannel: channel, action: .presence) { m in
|
|
m.id = "presence:\(nextGlobalSerial())"
|
|
m.presence = [presence]
|
|
}
|
|
|
|
self.sendPresenceMessages(channel: channel)
|
|
|
|
self.updatePresence(channel: channel, clientId: presence.clientId!)
|
|
}
|
|
}
|
|
|
|
func updatePresence(channel: String, clientId: String) {
|
|
doIfStillOpen(afterSecondsBetween: 0.1 ... 10.0) {
|
|
let presence = ARTPresenceMessage()
|
|
presence.clientId = clientId
|
|
|
|
if true.times(3, outOf: 4) {
|
|
presence.action = .update
|
|
presence.data = randomMessageData()
|
|
} else {
|
|
presence.action = .leave
|
|
}
|
|
|
|
self.nextMessageToClient(forChannel: channel, action: .presence) { m in
|
|
m.id = "presence:\(nextGlobalSerial())"
|
|
m.presence = [presence]
|
|
}
|
|
|
|
if presence.action == .update {
|
|
self.updatePresence(channel: channel, clientId: clientId)
|
|
}
|
|
}
|
|
}
|
|
|
|
func startPresenceSync(channel: String) {
|
|
let numMembers = Int((1 ... 10).randomWithin())
|
|
let members = (1 ... numMembers).map { "member\($0)" }
|
|
let memberPages = Array(Groups(of: 3, outOf: members))
|
|
var iter = memberPages.enumerated().makeIterator()
|
|
|
|
sendPresenceSyncs(next: { iter.next() }, numPages: memberPages.count)
|
|
}
|
|
|
|
func sendPresenceSyncs(next: @escaping () -> (Int, [String])?, numPages: Int) {
|
|
guard let (i, page) = next() else {
|
|
return
|
|
}
|
|
|
|
var cursor: String
|
|
let isLast = i == numPages - 1
|
|
if isLast {
|
|
cursor = ""
|
|
} else {
|
|
cursor = "page\(i)"
|
|
}
|
|
|
|
doIfStillOpen(afterSecondsBetween: (0.1 ... 1.0)) {
|
|
self.messageToClient(action: .sync) { m in
|
|
m.channelSerial = "somethingsomething:\(cursor)"
|
|
m.presence = page.map { member in
|
|
let message = ARTPresenceMessage()
|
|
message.clientId = member
|
|
message.action = .present
|
|
return message
|
|
}
|
|
}
|
|
|
|
self.sendPresenceSyncs(next: next, numPages: numPages)
|
|
}
|
|
}
|
|
|
|
func doIfStillOpen(afterSecondsBetween between: ClosedRange<TimeInterval>, execute: @escaping () -> Void) {
|
|
queue.afterSeconds(between: between) {
|
|
if self.readyState != .OPEN {
|
|
return
|
|
}
|
|
execute()
|
|
}
|
|
}
|
|
}
|
|
|
|
let jsonEncoder = ARTJsonLikeEncoder(delegate: ARTJsonEncoder())
|
|
let msgPackEncoder = ARTJsonLikeEncoder(delegate: ARTMsgPackEncoder())
|
|
|
|
extension String {
|
|
func asError(code: Int = 1) -> NSError {
|
|
return NSError(domain: "io.ably", code: code, userInfo: [NSLocalizedDescriptionKey: self])
|
|
}
|
|
}
|
|
|
|
let fakeError = "fake error for soak test".asError()
|
|
|
|
func authError() -> ARTErrorInfo {
|
|
return ARTErrorInfo.create(from: "fake auth error".asError(code: 40140))
|
|
}
|
|
|
|
func serialSequence(label: String, first: Int64 = 0) -> (() -> Int64) {
|
|
let queue = DispatchQueue(label: "io.ably.soakTest.\(label)")
|
|
var serial: Int64 = first
|
|
|
|
return {
|
|
var assigned: Int64!
|
|
queue.sync {
|
|
assigned = serial
|
|
serial += 1
|
|
}
|
|
return assigned
|
|
}
|
|
}
|
|
|
|
let nextGlobalSerial = serialSequence(label: "globalSerial")
|
|
|
|
extension ARTProtocolMessage {
|
|
class func build(action: ARTProtocolMessageAction, setUp: (ARTProtocolMessage) -> Void) -> ARTProtocolMessage {
|
|
let m = ARTProtocolMessage()
|
|
m.action = action
|
|
m.id = "fakeProtocolMessage.\(nextGlobalSerial())"
|
|
setUp(m)
|
|
return m
|
|
}
|
|
}
|
|
|
|
private class Groups<S: Sequence>: Sequence, IteratorProtocol {
|
|
typealias Element = [S.Element]
|
|
|
|
let perGroup: Int
|
|
var outOf: S.Iterator
|
|
var done = false
|
|
|
|
required init(of: Int, outOf: S) {
|
|
self.perGroup = of
|
|
self.outOf = outOf.makeIterator()
|
|
}
|
|
|
|
func next() -> Element? {
|
|
if done {
|
|
return nil
|
|
}
|
|
|
|
var group: [S.Element]?
|
|
|
|
for _ in 0 ..< perGroup {
|
|
guard let next = outOf.next() else {
|
|
done = true
|
|
break
|
|
}
|
|
|
|
if group == nil {
|
|
group = []
|
|
}
|
|
group!.append(next)
|
|
}
|
|
|
|
return group
|
|
}
|
|
}
|
|
|
|
|