ably-cocoa/Source/ARTRealtime.m

1592 lines
59 KiB
Objective-C

//
// ARTRealtime.m
//
//
#import "ARTRealtime+Private.h"
#import "ARTRealtimeChannel+Private.h"
#import "ARTStatus.h"
#import "ARTDefault.h"
#import "ARTRest+Private.h"
#import "ARTAuth+Private.h"
#import "ARTTokenDetails.h"
#import "ARTMessage.h"
#import "ARTClientOptions.h"
#import "ARTChannelOptions.h"
#import "ARTPresenceMessage.h"
#import "ARTWebSocketTransport+Private.h"
#import "ARTOSReachability.h"
#import "ARTNSArray+ARTFunctional.h"
#import "ARTPresenceMap.h"
#import "ARTProtocolMessage.h"
#import "ARTProtocolMessage+Private.h"
#import "ARTEventEmitter+Private.h"
#import "ARTQueuedMessage.h"
#import "ARTPendingMessage.h"
#import "ARTConnection+Private.h"
#import "ARTConnectionDetails.h"
#import "ARTStats.h"
#import "ARTRealtimeTransport.h"
#import "ARTFallback.h"
#import "ARTFallbackHosts.h"
#import "ARTAuthDetails.h"
#import "ARTGCD.h"
#import "ARTEncoder.h"
#import "ARTLog+Private.h"
#import "ARTRealtimeChannels+Private.h"
#import "ARTPush+Private.h"
#import "ARTQueuedDealloc.h"
@interface ARTConnectionStateChange ()
- (void)setRetryIn:(NSTimeInterval)retryIn;
@end
#pragma mark - ARTRealtime implementation
@implementation ARTRealtime {
ARTQueuedDealloc *_dealloc;
}
- (void)internalAsync:(void (^)(ARTRealtimeInternal * _Nonnull))use {
dispatch_async(_internal.queue, ^{
use(self->_internal);
});
}
- (void)internalSync:(void (^)(ARTRealtimeInternal * _Nonnull))use {
dispatch_sync(_internal.queue, ^{
use(self->_internal);
});
}
- (ARTConnection *)connection {
return [[ARTConnection alloc] initWithInternal:_internal.connection queuedDealloc:_dealloc];
}
- (ARTRealtimeChannels *)channels {
return [[ARTRealtimeChannels alloc] initWithInternal:_internal.channels queuedDealloc:_dealloc];
}
- (ARTAuth *)auth {
return [[ARTAuth alloc] initWithInternal:_internal.auth queuedDealloc:_dealloc];
}
- (ARTPush *)push {
return [[ARTPush alloc] initWithInternal:_internal.push queuedDealloc:_dealloc];
}
#if TARGET_OS_IOS
- (ARTLocalDevice *)device {
return _internal.device;
}
#endif
- (NSString *)clientId {
return _internal.clientId;
}
- (void)initCommon {
_dealloc = [[ARTQueuedDealloc alloc] init:_internal queue:_internal.queue];
}
- (instancetype)initWithOptions:(ARTClientOptions *)options {
self = [super init];
if (self) {
_internal = [[ARTRealtimeInternal alloc] initWithOptions:options];
[self initCommon];
}
return self;
}
- (instancetype)initWithKey:(NSString *)key {
self = [super init];
if (self) {
_internal = [[ARTRealtimeInternal alloc] initWithKey:key];
[self initCommon];
}
return self;
}
- (instancetype)initWithToken:(NSString *)token {
self = [super init];
if (self) {
_internal = [[ARTRealtimeInternal alloc] initWithToken:token];
[self initCommon];
}
return self;
}
+ (instancetype)createWithOptions:(ARTClientOptions *)options {
return [[ARTRealtime alloc] initWithOptions:options];
}
+ (instancetype)createWithKey:(NSString *)key {
return [[ARTRealtime alloc] initWithKey:key];
}
+ (instancetype)createWithToken:(NSString *)tokenId {
return [[ARTRealtime alloc] initWithToken:tokenId];
}
- (void)time:(ARTDateTimeCallback)cb {
[_internal time:cb];
}
- (void)ping:(ARTCallback)cb {
[_internal ping:cb];
}
- (BOOL)stats:(ARTPaginatedStatsCallback)callback {
return [_internal stats:callback];
}
- (BOOL)stats:(nullable ARTStatsQuery *)query callback:(ARTPaginatedStatsCallback)callback error:(NSError **)errorPtr {
return [_internal stats:query callback:callback error:errorPtr];
}
- (void)connect {
[_internal connect];
}
- (void)close {
[_internal close];
}
@end
@implementation ARTRealtimeInternal {
BOOL _resuming;
BOOL _renewingToken;
BOOL _shouldImmediatelyReconnect;
ARTEventEmitter<ARTEvent *, ARTErrorInfo *> *_pingEventEmitter;
NSDate *_connectionLostAt;
NSDate *_lastActivity;
Class _transportClass;
Class _reachabilityClass;
id<ARTRealtimeTransport> _transport;
ARTFallback *_fallbacks;
__weak ARTEventListener *_connectionRetryFromSuspendedListener;
__weak ARTEventListener *_connectionRetryFromDisconnectedListener;
__weak ARTEventListener *_connectingTimeoutListener;
ARTScheduledBlockHandle *_authenitcatingTimeoutWork;
NSObject<ARTCancellable> *_authTask;
ARTScheduledBlockHandle *_idleTimer;
dispatch_queue_t _userQueue;
dispatch_queue_t _queue;
}
- (instancetype)initWithOptions:(ARTClientOptions *)options {
self = [super init];
if (self) {
NSAssert(options, @"ARTRealtime: No options provided");
_rest = [[ARTRestInternal alloc] initWithOptions:options realtime:self];
_userQueue = _rest.userQueue;
_queue = _rest.queue;
_internalEventEmitter = [[ARTInternalEventEmitter alloc] initWithQueue:_rest.queue];
_connectedEventEmitter = [[ARTInternalEventEmitter alloc] initWithQueue:_rest.queue];
_pingEventEmitter = [[ARTInternalEventEmitter alloc] initWithQueue:_rest.queue];
_channels = [[ARTRealtimeChannelsInternal alloc] initWithRealtime:self];
_transport = nil;
_transportClass = [ARTWebSocketTransport class];
_reachabilityClass = [ARTOSReachability class];
_msgSerial = 0;
_queuedMessages = [NSMutableArray array];
_pendingMessages = [NSMutableArray array];
_pendingMessageStartSerial = 0;
_pendingAuthorizations = [NSMutableArray array];
_connection = [[ARTConnectionInternal alloc] initWithRealtime:self];
_connectionStateTtl = [ARTDefault connectionStateTtl];
_shouldImmediatelyReconnect = true;
self.auth.delegate = self;
[self.connection setState:ARTRealtimeInitialized];
[self.logger verbose:__FILE__ line:__LINE__ message:@"R:%p initialized with RS:%p", self, _rest];
self.rest.prioritizedHost = nil;
if (options.autoConnect) {
[self _connect];
}
}
return self;
}
#pragma mark - ARTAuthDelegate
- (void)auth:(ARTAuthInternal *)auth didAuthorize:(ARTTokenDetails *)tokenDetails completion:(void (^)(ARTAuthorizationState, ARTErrorInfo *_Nullable))completion {
void (^waitForResponse)(void) = ^{
[self.pendingAuthorizations art_enqueue:^(ARTRealtimeConnectionState state, ARTErrorInfo *_Nullable error){
switch (state) {
case ARTRealtimeConnected:
completion(ARTAuthorizationSucceeded, nil);
break;
case ARTRealtimeFailed:
completion(ARTAuthorizationFailed, error);
break;
case ARTRealtimeSuspended:
completion(ARTAuthorizationFailed, [ARTErrorInfo createWithCode:ARTStateAuthorizationFailed message:@"Connection has been suspended"]);
break;
case ARTRealtimeClosed:
completion(ARTAuthorizationFailed, [ARTErrorInfo createWithCode:ARTStateAuthorizationFailed message:@"Connection has been closed"]);
break;
case ARTRealtimeDisconnected:
completion(ARTAuthorizationCancelled, nil);
break;
case ARTRealtimeInitialized:
case ARTRealtimeConnecting:
case ARTRealtimeClosing:
[self.logger debug:__FILE__ line:__LINE__ message:@"RS:%p authorize completion has been ignored because the connection state is unexpected (%@)", self.rest, ARTRealtimeConnectionStateToStr(state)];
break;
}
}];
};
void (^haltCurrentConnectionAndReconnect)(void) = ^{
// Halt the current connection and reconnect with the most recent token
[self.logger debug:__FILE__ line:__LINE__ message:@"RS:%p halt current connection and reconnect with %@", self.rest, tokenDetails];
[self abortAndReleaseTransport:[ARTStatus state:ARTStateOk]];
[self setTransportWithResumeKey:self->_transport.resumeKey connectionSerial:self->_transport.connectionSerial];
[self->_transport connectWithToken:tokenDetails.token];
[self cancelAllPendingAuthorizations];
waitForResponse();
};
switch (self.connection.state_nosync) {
case ARTRealtimeConnected: {
// Update (send AUTH message)
[self.logger debug:__FILE__ line:__LINE__ message:@"RS:%p AUTH message using %@", self.rest, tokenDetails];
ARTProtocolMessage *msg = [[ARTProtocolMessage alloc] init];
msg.action = ARTProtocolMessageAuth;
msg.auth = [[ARTAuthDetails alloc] initWithToken:tokenDetails.token];
[self send:msg sentCallback:nil ackCallback:nil];
waitForResponse();
break;
}
case ARTRealtimeConnecting: {
[_transport.stateEmitter once:[ARTEvent newWithTransportState:ARTRealtimeTransportStateOpened] callback:^(id sender) {
haltCurrentConnectionAndReconnect();
}];
break;
}
case ARTRealtimeClosing: {
// Should ignore because the connection is being closed
[self.logger debug:__FILE__ line:__LINE__ message:@"RS:%p authorize has been cancelled because the connection is closing", self.rest];
[self cancelAllPendingAuthorizations];
break;
}
default: {
// Client state is NOT Connecting or Connected, so it should start a new connection
[self.logger debug:__FILE__ line:__LINE__ message:@"RS:%p new connection from successfull authorize %@", self.rest, tokenDetails];
[self transition:ARTRealtimeConnecting];
waitForResponse();
break;
}
}
}
- (void)performPendingAuthorizationWithState:(ARTRealtimeConnectionState)state error:(nullable ARTErrorInfo *)error {
void (^pendingAuthorization)(ARTRealtimeConnectionState, ARTErrorInfo *_Nullable) = [self.pendingAuthorizations art_dequeue];
if (!pendingAuthorization) {
return;
}
switch (state) {
case ARTRealtimeConnected:
pendingAuthorization(state, nil);
break;
case ARTRealtimeFailed:
pendingAuthorization(state, error);
break;
default:
[self discardPendingAuthorizations];
pendingAuthorization(state, error);
break;
}
}
- (void)cancelAllPendingAuthorizations {
[self.pendingAuthorizations enumerateObjectsUsingBlock:^(void (^pendingAuthorization)(ARTRealtimeConnectionState, ARTErrorInfo * _Nullable), NSUInteger idx, BOOL * _Nonnull stop) {
pendingAuthorization(ARTRealtimeDisconnected, nil);
}];
[self.pendingAuthorizations removeAllObjects];
}
- (void)discardPendingAuthorizations {
[self.pendingAuthorizations removeAllObjects];
}
#pragma mark - Realtime
- (instancetype)initWithKey:(NSString *)key {
return [self initWithOptions:[[ARTClientOptions alloc] initWithKey:key]];
}
- (instancetype)initWithToken:(NSString *)token {
return [self initWithOptions:[[ARTClientOptions alloc] initWithToken:token]];
}
- (id<ARTRealtimeTransport>)transport {
return _transport;
}
- (ARTLog *)getLogger {
return _rest.logger;
}
- (ARTClientOptions *)getClientOptions {
return _rest.options;
}
- (NSString *)clientId {
// Doesn't need synchronization since it's immutable.
return _rest.options.clientId;
}
- (NSString *)description {
NSString *info;
if (self.options.token) {
info = [NSString stringWithFormat:@"token: %@", self.options.token];
}
else if (self.options.authUrl) {
info = [NSString stringWithFormat:@"authUrl: %@", self.options.authUrl];
}
else if (self.options.authCallback) {
info = [NSString stringWithFormat:@"authCallback: %@", self.options.authCallback];
}
else {
info = [NSString stringWithFormat:@"key: %@", self.options.key];
}
return [NSString stringWithFormat:@"%@ - \n\t %@;", [super description], info];
}
- (ARTAuthInternal *)auth {
return self.rest.auth;
}
- (ARTPushInternal *)push {
return self.rest.push;
}
- (void)dealloc {
[self.logger verbose:__FILE__ line:__LINE__ message:@"R:%p dealloc", self];
self.rest.prioritizedHost = nil;
}
- (void)connect {
dispatch_sync(_queue, ^{
[self _connect];
});
}
- (void)_connect {
if(self.connection.state_nosync == ARTRealtimeClosing) {
// New connection
_transport = nil;
}
[self transition:ARTRealtimeConnecting];
}
- (void)close {
dispatch_sync(_queue, ^{
[self _close];
});
}
- (void)_close {
[_reachability off];
[self cancelTimers];
switch (self.connection.state_nosync) {
case ARTRealtimeInitialized:
case ARTRealtimeClosing:
case ARTRealtimeClosed:
case ARTRealtimeFailed:
return;
case ARTRealtimeConnecting: {
[_internalEventEmitter once:^(ARTConnectionStateChange *change) {
[self _close];
}];
return;
}
case ARTRealtimeDisconnected:
case ARTRealtimeSuspended:
[self transition:ARTRealtimeClosed];
break;
case ARTRealtimeConnected:
[self transition:ARTRealtimeClosing];
break;
}
}
- (void)time:(ARTDateTimeCallback)cb {
[self.rest time:cb];
}
- (void)ping:(ARTCallback) cb {
if (cb) {
ARTCallback userCallback = cb;
cb = ^(ARTErrorInfo *_Nullable error) {
dispatch_async(self->_userQueue, ^{
userCallback(error);
});
};
}
dispatch_async(_queue, ^{
switch (self.connection.state_nosync) {
case ARTRealtimeInitialized:
case ARTRealtimeSuspended:
case ARTRealtimeClosing:
case ARTRealtimeClosed:
case ARTRealtimeFailed:
cb([ARTErrorInfo createWithCode:0 status:ARTStateConnectionFailed message:[NSString stringWithFormat:@"Can't ping a %@ connection", ARTRealtimeConnectionStateToStr(self.connection.state_nosync)]]);
return;
case ARTRealtimeConnecting:
case ARTRealtimeDisconnected:
case ARTRealtimeConnected:
if (![self shouldSendEvents]) {
[self->_connectedEventEmitter once:^(NSNull *n) {
[self ping:cb];
}];
return;
}
[[[self->_pingEventEmitter once:cb] setTimer:[ARTDefault realtimeRequestTimeout] onTimeout:^{
[self.logger verbose:__FILE__ line:__LINE__ message:@"R:%p ping timed out", self];
cb([ARTErrorInfo createWithCode:ARTErrorConnectionTimedOut status:ARTStateConnectionFailed message:@"timed out"]);
}] startTimer];
[self.transport sendPing];
}
});
}
- (BOOL)stats:(ARTPaginatedStatsCallback)callback {
return [self stats:[[ARTStatsQuery alloc] init] callback:callback error:nil];
}
- (BOOL)stats:(ARTStatsQuery *)query callback:(ARTPaginatedStatsCallback)callback error:(NSError **)errorPtr {
return [self.rest stats:query callback:callback error:errorPtr];
}
- (void)transition:(ARTRealtimeConnectionState)state {
[self transition:state withErrorInfo:nil];
}
- (void)transition:(ARTRealtimeConnectionState)state withErrorInfo:(ARTErrorInfo *)errorInfo {
[self.logger verbose:__FILE__ line:__LINE__ message:@"R:%p realtime state transitions to %tu - %@", self, state, ARTRealtimeConnectionStateToStr(state)];
ARTConnectionStateChange *stateChange = [[ARTConnectionStateChange alloc] initWithCurrent:state previous:self.connection.state_nosync event:(ARTRealtimeConnectionEvent)state reason:errorInfo retryIn:0];
[self.connection setState:state];
[self.connection setErrorReason:errorInfo];
ARTEventListener *stateChangeEventListener = [self transitionSideEffects:stateChange];
[_internalEventEmitter emit:[ARTEvent newWithConnectionEvent:(ARTRealtimeConnectionEvent)state] with:stateChange];
// stateChangeEventListener may be nil if we're in a failed state
if (stateChangeEventListener != nil) {
[stateChangeEventListener startTimer];
}
}
- (void)transitionToDisconnectedOrSuspended {
[self transitionToDisconnectedOrSuspendedWithError:nil];
}
- (void)transitionToDisconnectedOrSuspendedWithError:(nullable ARTErrorInfo *)errorInfo {
if ([self isSuspendMode]) {
[self transition:ARTRealtimeSuspended withErrorInfo:errorInfo];
}
else {
[self transition:ARTRealtimeDisconnected withErrorInfo:errorInfo];
}
}
- (void)updateWithErrorInfo:(nullable ARTErrorInfo *)errorInfo {
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p update requested", self];
if (self.connection.state_nosync != ARTRealtimeConnected) {
[self.logger warn:@"R:%p update ignored because connection is not connected", self];
return;
}
ARTConnectionStateChange *stateChange = [[ARTConnectionStateChange alloc] initWithCurrent:self.connection.state_nosync previous:self.connection.state_nosync event:ARTRealtimeConnectionEventUpdate reason:errorInfo retryIn:0];
ARTEventListener *stateChangeEventListener = [self transitionSideEffects:stateChange];
// stateChangeEventListener may be nil if we're in a failed state
if (stateChangeEventListener != nil) {
[stateChangeEventListener startTimer];
}
}
- (ARTEventListener *)transitionSideEffects:(ARTConnectionStateChange *)stateChange {
ARTStatus *status = nil;
ARTEventListener *stateChangeEventListener = nil;
[self.logger debug:@"RT:%p realtime is transitioning from %tu - %@ to %tu - %@", self, stateChange.previous, ARTRealtimeConnectionStateToStr(stateChange.previous), stateChange.current, ARTRealtimeConnectionStateToStr(stateChange.current)];
switch (stateChange.current) {
case ARTRealtimeConnecting: {
// RTN15g We want to enforce a new connection also when there hasn't been activity for longer than (idle interval + TTL)
if (stateChange.previous == ARTRealtimeDisconnected || stateChange.previous == ARTRealtimeSuspended) {
NSTimeInterval intervalSinceLast = [[NSDate date] timeIntervalSinceDate:_lastActivity];
if (intervalSinceLast > (_maxIdleInterval + _connectionStateTtl)) {
[self.connection setId:nil];
[self.connection setKey:nil];
[self.connection setSerial:0];
}
}
stateChangeEventListener = [self unlessStateChangesBefore:[ARTDefault realtimeRequestTimeout] do:^{
[self onConnectionTimeOut];
}];
_connectingTimeoutListener = stateChangeEventListener;
if (!_reachability) {
_reachability = [[_reachabilityClass alloc] initWithLogger:self.logger queue:_queue];
}
if (!_transport) {
NSString *resumeKey = nil;
NSNumber *connectionSerial = nil;
if (stateChange.previous == ARTRealtimeFailed ||
stateChange.previous == ARTRealtimeDisconnected ||
stateChange.previous == ARTRealtimeSuspended) {
resumeKey = self.connection.key_nosync;
connectionSerial = [NSNumber numberWithLongLong:self.connection.serial_nosync];
_resuming = true;
}
[self setTransportWithResumeKey:resumeKey connectionSerial:connectionSerial];
[self transportConnectForcingNewToken:_renewingToken newConnection:true];
}
if (self.connection.state_nosync != ARTRealtimeFailed &&
self.connection.state_nosync != ARTRealtimeClosed &&
self.connection.state_nosync != ARTRealtimeDisconnected) {
[_reachability listenForHost:[_transport host] callback:^(BOOL reachable) {
// The ref cycle creating by taking self here is resolved on close
// when [_reachability off] is called.
if (reachable) {
switch (self.connection.state_nosync) {
case ARTRealtimeDisconnected:
case ARTRealtimeSuspended:
[self transition:ARTRealtimeConnecting];
default:
break;
}
} else {
switch (self.connection.state_nosync) {
case ARTRealtimeConnecting:
case ARTRealtimeConnected: {
ARTErrorInfo *unreachable = [ARTErrorInfo createWithCode:-1003 message:@"unreachable host"];
[self transitionToDisconnectedOrSuspendedWithError:unreachable];
break;
}
default:
break;
}
}
}];
}
break;
}
case ARTRealtimeClosing: {
[self stopIdleTimer];
[_reachability off];
stateChangeEventListener = [self unlessStateChangesBefore:[ARTDefault realtimeRequestTimeout] do:^{
[self transition:ARTRealtimeClosed];
}];
[self.transport sendClose];
break;
}
case ARTRealtimeClosed:
[self stopIdleTimer];
[_reachability off];
[self closeAndReleaseTransport];
_connection.key = nil;
_connection.id = nil;
_transport = nil;
self.rest.prioritizedHost = nil;
[self.auth cancelAuthorization:nil];
[self failPendingMessages:[ARTStatus state:ARTStateError info:[ARTErrorInfo createWithCode:ARTErrorConnectionClosed message:@"connection broken before receiving publishing acknowledgment"]]];
break;
case ARTRealtimeFailed:
status = [ARTStatus state:ARTStateConnectionFailed info:stateChange.reason];
[self abortAndReleaseTransport:status];
self.rest.prioritizedHost = nil;
[self.auth cancelAuthorization:stateChange.reason];
[self failPendingMessages:[ARTStatus state:ARTStateError info:[ARTErrorInfo createWithCode:ARTErrorConnectionFailed message:@"connection broken before receiving publishing acknowledgment"]]];
break;
case ARTRealtimeDisconnected: {
[self closeAndReleaseTransport];
if (!_connectionLostAt) {
_connectionLostAt = [NSDate date];
[self.logger verbose:@"RT:%p set connection lost time; expected suspension at %@ (ttl=%f)", self, [self suspensionTime], self.connectionStateTtl];
}
NSTimeInterval retryInterval = self.options.disconnectedRetryTimeout;
// RTN15a - retry immediately if client was connected
if (stateChange.previous == ARTRealtimeConnected && _shouldImmediatelyReconnect) {
retryInterval = 0.1;
}
[stateChange setRetryIn:retryInterval];
stateChangeEventListener = [self unlessStateChangesBefore:stateChange.retryIn do:^{
self->_connectionRetryFromDisconnectedListener = nil;
[self transition:ARTRealtimeConnecting];
}];
_connectionRetryFromDisconnectedListener = stateChangeEventListener;
break;
}
case ARTRealtimeSuspended: {
[_connectionRetryFromDisconnectedListener stopTimer];
_connectionRetryFromDisconnectedListener = nil;
[self.auth cancelAuthorization:nil];
[self closeAndReleaseTransport];
[stateChange setRetryIn:self.options.suspendedRetryTimeout];
stateChangeEventListener = [self unlessStateChangesBefore:stateChange.retryIn do:^{
self->_connectionRetryFromSuspendedListener = nil;
[self transition:ARTRealtimeConnecting];
}];
_connectionRetryFromSuspendedListener = stateChangeEventListener;
break;
}
case ARTRealtimeConnected: {
_fallbacks = nil;
_connectionLostAt = nil;
if (stateChange.reason) {
ARTStatus *status = [ARTStatus state:ARTStateError info:[stateChange.reason copy]];
[self failPendingMessages:status];
}
else {
[self resendPendingMessages];
}
[_connectedEventEmitter emit:nil with:nil];
break;
}
case ARTRealtimeInitialized:
break;
}
// If there's a channels.release() going on waiting on this channel
// to detach, doing those operations on it here would fire its event listener and
// immediately remove the channel from the channels dictionary, thus
// invalidating the iterator and causing a crashing.
//
// So copy the channels and operate on them later, when we're done using the iterator.
NSMutableArray<ARTRealtimeChannelInternal *> * const channels = [[NSMutableArray alloc] init];
for (ARTRealtimeChannelInternal *channel in self.channels.nosyncIterable) {
[channels addObject:channel];
}
if ([self shouldSendEvents]) {
[self sendQueuedMessages];
// Channels
for (ARTRealtimeChannelInternal *channel in channels) {
if (stateChange.previous == ARTRealtimeInitialized ||
stateChange.previous == ARTRealtimeConnecting ||
stateChange.previous == ARTRealtimeDisconnected) {
// RTL4i
[channel _attach:nil];
}
}
} else if (![self shouldQueueEvents]) {
ARTStatus *channelStatus = status;
if (!channelStatus) {
channelStatus = stateChange.reason ? [ARTStatus state:ARTStateError info:stateChange.reason] : [self defaultError];
}
[self failQueuedMessages:channelStatus];
// Channels
for (ARTRealtimeChannelInternal *channel in channels) {
switch (stateChange.current) {
case ARTRealtimeClosing:
//do nothing. Closed state is coming.
break;
case ARTRealtimeClosed:
[channel detachChannel:[ARTStatus state:ARTStateOk]];
break;
case ARTRealtimeSuspended:
[channel setSuspended:channelStatus];
break;
case ARTRealtimeFailed:
[channel setFailed:channelStatus];
break;
default:
break;
}
}
}
[self.connection emit:stateChange.event with:stateChange];
[self performPendingAuthorizationWithState:stateChange.current error:stateChange.reason];
return stateChangeEventListener;
}
- (void)abortAndReleaseTransport:(ARTStatus *)status {
[_transport abort:status];
_transport = nil;
}
- (void)closeAndReleaseTransport {
if (_transport) {
[_transport close];
_transport = nil;
}
}
- (void)resetTransportWithResumeKey:(NSString *)resumeKey connectionSerial:(NSNumber *)connectionSerial {
[self closeAndReleaseTransport];
[self setTransportWithResumeKey:resumeKey connectionSerial:connectionSerial];
}
- (void)setTransportWithResumeKey:(NSString *)resumeKey connectionSerial:(NSNumber *)connectionSerial {
_transport = [[_transportClass alloc] initWithRest:self.rest options:self.options resumeKey:resumeKey connectionSerial:connectionSerial];
_transport.delegate = self;
}
- (ARTEventListener *)unlessStateChangesBefore:(NSTimeInterval)deadline do:(void(^)(void))callback __attribute__((warn_unused_result)) {
return [[_internalEventEmitter once:^(ARTConnectionStateChange *change) {
// Any state change cancels the timeout.
}] setTimer:deadline onTimeout:^{
if (callback) {
callback();
}
}];
}
- (void)onHeartbeat {
[self.logger verbose:@"R:%p heartbeat received", self];
if(self.connection.state_nosync != ARTRealtimeConnected) {
NSString *msg = [NSString stringWithFormat:@"received a ping when in state %@", ARTRealtimeConnectionStateToStr(self.connection.state_nosync)];
[self.logger warn:@"R:%p %@", self, msg];
}
[_pingEventEmitter emit:nil with:nil];
}
- (void)onConnected:(ARTProtocolMessage *)message {
_renewingToken = false;
// Resuming
if (_resuming) {
if (![message.connectionId isEqualToString:self.connection.id_nosync]) { // RTN15c3
[self.logger warn:@"RT:%p connection \"%@\" has reconnected, but resume failed. Reattaching any attached channels", self, message.connectionId];
// Reattach all channels
for (ARTRealtimeChannelInternal *channel in self.channels.nosyncIterable) {
[channel reattachWithReason:message.error callback:nil];
}
_resuming = false;
}
else if (message.error) {
[self.logger warn:@"RT:%p connection \"%@\" has resumed with non-fatal error \"%@\"", self, message.connectionId, message.error.message];
// The error will be emitted on `transition`
}
else {
[self.logger debug:@"RT:%p connection \"%@\" has reconnected and resumed successfully", self, message.connectionId];
}
for (ARTRealtimeChannelInternal *channel in self.channels.nosyncIterable) {
if (channel.presenceMap.syncInProgress) {
// FIXME or not, regarding https://github.com/ably/docs/issues/349
//[channel requestContinueSync];
}
}
}
switch (self.connection.state_nosync) {
case ARTRealtimeConnecting: {
// If there's no previous connectionId, then don't reset the msgSerial
//as it may have been set by recover data (unless the recover failed).
NSString *prevConnId = self.connection.id_nosync;
BOOL connIdChanged = prevConnId && ![message.connectionId isEqualToString:prevConnId];
BOOL recoverFailure = !prevConnId && message.error;
if (connIdChanged || recoverFailure) {
[self.logger debug:@"RT:%p msgSerial of connection \"%@\" has been reset", self, self.connection.id_nosync];
self.msgSerial = 0;
self.pendingMessageStartSerial = 0;
}
[self.connection setId:message.connectionId];
[self.connection setKey:message.connectionKey];
[self.connection setMaxMessageSize:message.connectionDetails.maxMessageSize];
[self.connection setSerial:message.connectionSerial];
if (message.connectionDetails && message.connectionDetails.connectionStateTtl) {
_connectionStateTtl = message.connectionDetails.connectionStateTtl;
}
if (message.connectionDetails && message.connectionDetails.maxIdleInterval) {
_maxIdleInterval = message.connectionDetails.maxIdleInterval;
_lastActivity = [NSDate date];
[self setIdleTimer];
}
[self transition:ARTRealtimeConnected withErrorInfo:message.error];
break;
}
case ARTRealtimeConnected: {
// Renewing token.
[self updateWithErrorInfo:message.error];
}
default:
break;
}
_resuming = false;
}
- (void)onDisconnected {
[self onDisconnected:nil];
}
- (void)onDisconnected:(ARTProtocolMessage *)message {
[self.logger info:@"R:%p Realtime disconnected", self];
ARTErrorInfo * const error = message.error;
if (
[self isTokenError:error]
&& !_renewingToken // If already reconnecting, give up.
) {
if (![self.auth tokenIsRenewable]) {
[self transition:ARTRealtimeFailed withErrorInfo:error];
return;
}
[self transitionToDisconnectedOrSuspendedWithError:error];
[self.connection setErrorReason:nil];
_renewingToken = true;
[self transition:ARTRealtimeConnecting withErrorInfo:nil];
return;
}
[self transitionToDisconnectedOrSuspendedWithError:error];
}
- (void)onClosed {
[self.logger info:@"R:%p Realtime closed", self];
switch (self.connection.state_nosync) {
case ARTRealtimeClosed:
break;
case ARTRealtimeClosing:
[self.connection setId:nil];
[self transition:ARTRealtimeClosed];
break;
default:
NSAssert(false, @"Invalid Realtime state transitioning to Closed: expected Closing or Closed, has %@", ARTRealtimeConnectionStateToStr(self.connection.state_nosync));
break;
}
}
- (void)onAuth {
[self.logger info:@"R:%p server has requested an authorize", self];
switch (self.connection.state_nosync) {
case ARTRealtimeConnecting:
case ARTRealtimeConnected:
[self transportConnectForcingNewToken:true newConnection:false];
break;
default:
[self.logger error:@"Invalid Realtime state: expected Connecting or Connected, has %@", ARTRealtimeConnectionStateToStr(self.connection.state_nosync)];
break;
}
}
- (void)onError:(ARTProtocolMessage *)message {
if (message.channel) {
[self onChannelMessage:message];
} else {
ARTErrorInfo *error = message.error;
if ([self isTokenError:error] && [self.auth tokenIsRenewable]) {
if (_renewingToken) {
// Already retrying; give up.
[self.connection setErrorReason:error];
[self transitionToDisconnectedOrSuspendedWithError:error];
return;
}
[self transportReconnectWithRenewedToken];
return;
}
[self.connection setId:nil];
[self transition:ARTRealtimeFailed withErrorInfo:message.error];
}
}
- (void)cancelTimers {
[self.logger verbose:__FILE__ line:__LINE__ message:@"R:%p cancel timers", self];
[_connectionRetryFromSuspendedListener stopTimer];
_connectionRetryFromSuspendedListener = nil;
[_connectionRetryFromDisconnectedListener stopTimer];
_connectionRetryFromDisconnectedListener = nil;
// Cancel connecting scheduled work
[_connectingTimeoutListener stopTimer];
_connectingTimeoutListener = nil;
// Cancel auth scheduled work
artDispatchCancel(_authenitcatingTimeoutWork);
_authenitcatingTimeoutWork = nil;
[_authTask cancel];
_authTask = nil;
// Idle timer
[self stopIdleTimer];
// Ping timer
[_pingEventEmitter off];
}
- (void)onConnectionTimeOut {
[self.logger verbose:__FILE__ line:__LINE__ message:@"R:%p connection timed out", self];
// Cancel connecting scheduled work
[_connectingTimeoutListener stopTimer];
_connectingTimeoutListener = nil;
// Cancel auth scheduled work
artDispatchCancel(_authenitcatingTimeoutWork);
_authenitcatingTimeoutWork = nil;
[_authTask cancel];
_authTask = nil;
ARTErrorInfo *error;
if (self.auth.authorizing_nosync && (self.options.authUrl || self.options.authCallback)) {
error = [ARTErrorInfo createWithCode:ARTErrorAuthConfiguredProviderFailure status:ARTStateConnectionFailed message:@"timed out"];
}
else {
error = [ARTErrorInfo createWithCode:ARTErrorConnectionTimedOut status:ARTStateConnectionFailed message:@"timed out"];
}
switch (self.connection.state_nosync) {
case ARTRealtimeConnected:
[self transition:ARTRealtimeConnected withErrorInfo:error];
break;
default:
[self transitionToDisconnectedOrSuspendedWithError:error];
break;
}
}
- (BOOL)isTokenError:(nullable ARTErrorInfo *)error {
return error != nil && error.statusCode == 401 && error.code >= ARTErrorTokenErrorUnspecified && error.code < ARTErrorConnectionLimitsExceeded;
}
- (void)transportReconnectWithHost:(NSString *)host {
[self resetTransportWithResumeKey:_transport.resumeKey connectionSerial:_transport.connectionSerial];
[self.transport setHost:host];
[self transportConnectForcingNewToken:false newConnection:true];
}
- (void)transportReconnectWithRenewedToken {
_renewingToken = true;
[self resetTransportWithResumeKey:_transport.resumeKey connectionSerial:_transport.connectionSerial];
[_connectingTimeoutListener restartTimer];
[self transportConnectForcingNewToken:true newConnection:true];
}
- (void)transportConnectForcingNewToken:(BOOL)forceNewToken newConnection:(BOOL)newConnection {
ARTClientOptions *options = [self.options copy];
if ([options isBasicAuth]) {
// Basic
[self.transport connectWithKey:options.key];
}
else {
// Token
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p connecting with token auth; authorising (timeout of %f)", self, [ARTDefault realtimeRequestTimeout]];
if (!forceNewToken && [self.auth tokenRemainsValid]) {
// Reuse token
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p reusing token for auth", self];
[self.transport connectWithToken:self.auth.tokenDetails.token];
}
else {
// New Token
[self.auth setTokenDetails:nil];
// Schedule timeout handler
_authenitcatingTimeoutWork = artDispatchScheduled([ARTDefault realtimeRequestTimeout], _rest.queue, ^{
[self onConnectionTimeOut];
});
id<ARTAuthDelegate> delegate = self.auth.delegate;
if (newConnection) {
// Deactivate use of `ARTAuthDelegate`: `authorize` should complete without waiting for a CONNECTED state.
self.auth.delegate = nil;
}
@try {
_authTask = [self.auth _authorize:nil options:options callback:^(ARTTokenDetails *tokenDetails, NSError *error) {
// Cancel scheduled work
artDispatchCancel(self->_authenitcatingTimeoutWork);
self->_authenitcatingTimeoutWork = nil;
self->_authTask = nil;
// It's still valid?
switch (self.connection.state_nosync) {
case ARTRealtimeClosing:
case ARTRealtimeClosed:
return;
default:
break;
}
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p authorized: %@ error: %@", self, tokenDetails, error];
if (error) {
[self handleTokenAuthError:error];
return;
}
if (forceNewToken && newConnection) {
[self resetTransportWithResumeKey:self->_transport.resumeKey connectionSerial:self->_transport.connectionSerial];
}
if (newConnection) {
[self.transport connectWithToken:tokenDetails.token];
}
}];
}
@finally {
self.auth.delegate = delegate;
}
}
}
}
- (void)handleTokenAuthError:(NSError *)error {
[self.logger error:@"R:%p token auth failed with %@", self, error.description];
if (error.code == ARTErrorIncompatibleCredentials) {
// RSA15c
[self transition:ARTRealtimeFailed withErrorInfo:[ARTErrorInfo createFromNSError:error]];
}
else if (self.options.authUrl || self.options.authCallback) {
if (error.code == ARTErrorForbidden /* RSA4d */) {
ARTErrorInfo *errorInfo = [ARTErrorInfo createWithCode:ARTErrorAuthConfiguredProviderFailure
status:error.artStatusCode
message:error.description];
[self transition:ARTRealtimeFailed withErrorInfo:errorInfo];
} else {
ARTErrorInfo *errorInfo = [ARTErrorInfo createWithCode:ARTErrorAuthConfiguredProviderFailure status:ARTStateConnectionFailed message:error.description];
switch (self.connection.state_nosync) {
case ARTRealtimeConnected:
// RSA4c3
[self.connection setErrorReason:errorInfo];
break;
default:
// RSA4c
[self transitionToDisconnectedOrSuspendedWithError:errorInfo];
break;
}
}
}
else {
// RSA4b
[self transitionToDisconnectedOrSuspendedWithError:[ARTErrorInfo createFromNSError:error]];
}
}
- (void)onAck:(ARTProtocolMessage *)message {
[self ack:message];
}
- (void)onNack:(ARTProtocolMessage *)message {
[self nack:message];
}
- (void)onChannelMessage:(ARTProtocolMessage *)message {
if (message.channel == nil) {
return;
}
ARTRealtimeChannelInternal *channel = [self.channels _getChannel:message.channel options:nil addPrefix:false];
[channel onChannelMessage:message];
}
- (void)onSuspended {
[self transition:ARTRealtimeSuspended];
}
- (NSDate *)suspensionTime {
return [_connectionLostAt dateByAddingTimeInterval:self.connectionStateTtl];
}
- (BOOL)isSuspendMode {
NSDate *currentTime = [NSDate date];
return [currentTime timeIntervalSinceDate:[self suspensionTime]] > 0;
}
- (BOOL)shouldSendEvents {
switch (self.connection.state_nosync) {
case ARTRealtimeConnected:
return !_renewingToken;
default:
return false;
}
}
- (BOOL)shouldQueueEvents {
if(!self.options.queueMessages) {
return false;
}
switch (self.connection.state_nosync) {
case ARTRealtimeInitialized:
case ARTRealtimeConnecting:
case ARTRealtimeDisconnected:
return true;
case ARTRealtimeConnected:
return _renewingToken;
default:
return false;
}
}
- (ARTStatus *)defaultError {
return [ARTStatus state:ARTStateError];
}
- (BOOL)isActive {
return [self shouldQueueEvents] || [self shouldSendEvents];
}
- (void)sendImpl:(ARTProtocolMessage *)pm sentCallback:(ARTCallback)sentCallback ackCallback:(ARTStatusCallback)ackCallback {
if (pm.ackRequired) {
pm.msgSerial = [NSNumber numberWithLongLong:self.msgSerial];
}
for (ARTMessage *msg in pm.messages) {
msg.connectionId = self.connection.id_nosync;
}
NSError *error = nil;
NSData *data = [self.rest.defaultEncoder encodeProtocolMessage:pm error:&error];
if (error) {
ARTErrorInfo *e = [ARTErrorInfo createFromNSError:error];
if (sentCallback) sentCallback(e);
if (ackCallback) ackCallback([ARTStatus state:ARTStateError info:e]);
return;
}
else if (!data) {
ARTErrorInfo *e = [ARTErrorInfo createWithCode:ARTClientCodeErrorInvalidType message:@"Encoder as failed without error."];
if (sentCallback) sentCallback(e);
if (ackCallback) ackCallback([ARTStatus state:ARTStateError info:e]);
return;
}
if (pm.ackRequired) {
self.msgSerial++;
ARTPendingMessage *pendingMessage = [[ARTPendingMessage alloc] initWithProtocolMessage:pm ackCallback:ackCallback];
[self.pendingMessages addObject:pendingMessage];
}
[self.logger debug:__FILE__ line:__LINE__ message:@"RT:%p sending action %tu - %@", self, pm.action, ARTProtocolMessageActionToStr(pm.action)];
if ([self.transport send:data withSource:pm]) {
if (sentCallback) sentCallback(nil);
// `ackCallback()` is called with ACK/NACK action
}
}
- (void)send:(ARTProtocolMessage *)msg sentCallback:(ARTCallback)sentCallback ackCallback:(ARTStatusCallback)ackCallback {
if ([self shouldSendEvents]) {
[self sendImpl:msg sentCallback:sentCallback ackCallback:ackCallback];
}
else if ([self shouldQueueEvents]) {
ARTQueuedMessage *lastQueuedMessage = self.queuedMessages.lastObject; //RTL6d5
BOOL merged = [lastQueuedMessage mergeFrom:msg sentCallback:nil ackCallback:ackCallback];
if (!merged) {
ARTQueuedMessage *qm = [[ARTQueuedMessage alloc] initWithProtocolMessage:msg sentCallback:nil ackCallback:ackCallback];
[self.queuedMessages addObject:qm];
[self.logger debug:__FILE__ line:__LINE__ message:@"RT:%p (channel: %@) protocol message with action '%lu - %@' has been queued (%@)", self, msg.channel, (unsigned long)msg.action, ARTProtocolMessageActionToStr(msg.action), msg.messages];
}
else {
[self.logger verbose:__FILE__ line:__LINE__ message:@"RT:%p (channel: %@) message %@ has been bundled to %@", self, msg.channel, msg, lastQueuedMessage.msg];
}
}
else if (ackCallback) {
ARTErrorInfo *error = self.connection.errorReason_nosync;
if (!error) error = [ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed status:400 message:[NSString stringWithFormat:@"not possile to send message (state is %@)", ARTRealtimeConnectionStateToStr(self.connection.state_nosync)]];
ackCallback([ARTStatus state:ARTStateError info:error]);
}
}
- (void)resendPendingMessages {
NSArray<ARTPendingMessage *> *pms = self.pendingMessages;
if (pms.count > 0) {
[self.logger debug:__FILE__ line:__LINE__ message:@"RT:%p resending messages waiting for acknowledgment", self];
}
self.pendingMessages = [NSMutableArray array];
for (ARTPendingMessage *pendingMessage in pms) {
[self send:pendingMessage.msg sentCallback:nil ackCallback:^(ARTStatus *status) {
pendingMessage.ackCallback(status);
}];
}
}
- (void)failPendingMessages:(ARTStatus *)status {
NSArray<ARTPendingMessage *> *pms = self.pendingMessages;
self.pendingMessages = [NSMutableArray array];
for (ARTPendingMessage *pendingMessage in pms) {
pendingMessage.ackCallback(status);
}
}
- (void)sendQueuedMessages {
NSArray *qms = self.queuedMessages;
self.queuedMessages = [NSMutableArray array];
for (ARTQueuedMessage *message in qms) {
[self sendImpl:message.msg sentCallback:message.sentCallback ackCallback:message.ackCallback];
}
}
- (void)failQueuedMessages:(ARTStatus *)status {
NSArray *qms = self.queuedMessages;
self.queuedMessages = [NSMutableArray array];
for (ARTQueuedMessage *message in qms) {
message.sentCallback(status.errorInfo);
message.ackCallback(status);
}
}
- (void)ack:(ARTProtocolMessage *)message {
int64_t serial = [message.msgSerial longLongValue];
int count = message.count;
NSArray *nackMessages = nil;
NSArray *ackMessages = nil;
[self.logger verbose:@"R:%p ACK: msgSerial=%lld, count=%d", self, serial, count];
[self.logger verbose:@"R:%p ACK (before processing): pendingMessageStartSerial=%lld, pendingMessages=%lu", self, self.pendingMessageStartSerial, (unsigned long)self.pendingMessages.count];
if (serial < self.pendingMessageStartSerial) {
// This is an error condition and shouldn't happen but
// we can handle it gracefully by only processing the
// relevant portion of the response
count -= (int)(self.pendingMessageStartSerial - serial);
serial = self.pendingMessageStartSerial;
}
if (serial > self.pendingMessageStartSerial) {
// This counts as a nack of the messages earlier than serial,
// as well as an ack
int nCount = (int)(serial - self.pendingMessageStartSerial);
NSRange nackRange;
if (nCount > self.pendingMessages.count) {
NSString *message = [NSString stringWithFormat:@"R:%p ACK: receiving a serial greater than expected", self];
[self.logger error:@"%@", message];
// Process all the available pending messages as nack
nackRange = NSMakeRange(0, self.pendingMessages.count);
}
else {
nackRange = NSMakeRange(0, nCount);
}
nackMessages = [self.pendingMessages subarrayWithRange:nackRange];
[self.pendingMessages removeObjectsInRange:nackRange];
self.pendingMessageStartSerial = serial;
}
if (serial == self.pendingMessageStartSerial) {
NSRange ackRange;
if (count > self.pendingMessages.count) {
[self.logger error:@"R:%p ACK: count response is greater than the total of pending messages", self];
// Process all the available pending messages
ackRange = NSMakeRange(0, self.pendingMessages.count);
}
else {
ackRange = NSMakeRange(0, count);
}
ackMessages = [self.pendingMessages subarrayWithRange:ackRange];
[self.pendingMessages removeObjectsInRange:ackRange];
self.pendingMessageStartSerial += count;
}
for (ARTPendingMessage *msg in nackMessages) {
msg.ackCallback([ARTStatus state:ARTStateError info:message.error]);
}
for (ARTPendingMessage *msg in ackMessages) {
msg.ackCallback([ARTStatus state:ARTStateOk]);
}
[self.logger verbose:@"R:%p ACK (after processing): pendingMessageStartSerial=%lld, pendingMessages=%lu", self, self.pendingMessageStartSerial, (unsigned long)self.pendingMessages.count];
}
- (void)nack:(ARTProtocolMessage *)message {
int64_t serial = [message.msgSerial longLongValue];
int count = message.count;
[self.logger verbose:@"R:%p NACK: msgSerial=%lld, count=%d", self, serial, count];
[self.logger verbose:@"R:%p NACK (before processing): pendingMessageStartSerial=%lld, pendingMessages=%lu", self, self.pendingMessageStartSerial, (unsigned long)self.pendingMessages.count];
if (serial != self.pendingMessageStartSerial) {
// This is an error condition and it shouldn't happen but
// we can handle it gracefully by only processing the
// relevant portion of the response
count -= (int)(self.pendingMessageStartSerial - serial);
}
NSRange nackRange;
if (count > self.pendingMessages.count) {
[self.logger error:@"R:%p NACK: count response is greater than the total of pending messages", self];
// Process all the available pending messages
nackRange = NSMakeRange(0, self.pendingMessages.count);
}
else {
nackRange = NSMakeRange(0, count);
}
NSArray *nackMessages = [self.pendingMessages subarrayWithRange:nackRange];
[self.pendingMessages removeObjectsInRange:nackRange];
self.pendingMessageStartSerial += count;
for (ARTPendingMessage *msg in nackMessages) {
msg.ackCallback([ARTStatus state:ARTStateError info:message.error]);
}
[self.logger verbose:@"R:%p NACK (after processing): pendingMessageStartSerial=%lld, pendingMessages=%lu", self, self.pendingMessageStartSerial, (unsigned long)self.pendingMessages.count];
}
- (BOOL)reconnectWithFallback {
NSString *host = [_fallbacks popFallbackHost];
if (host != nil) {
[self.rest internetIsUp:^void(BOOL isUp) {
if (!isUp) {
[self transition:ARTRealtimeDisconnected withErrorInfo:[ARTErrorInfo createWithCode:0 message:@"no Internet connection"]];
return;
}
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p host is down; retrying realtime connection at %@", self, host];
self.rest.prioritizedHost = host;
[self transportReconnectWithHost:host];
}];
return true;
} else {
_fallbacks = nil;
return false;
}
}
- (BOOL)shouldRetryWithFallback:(ARTRealtimeTransportError *)error {
if (
(error.type == ARTRealtimeTransportErrorTypeBadResponse && error.badResponseCode >= 500 && error.badResponseCode <= 504) ||
error.type == ARTRealtimeTransportErrorTypeHostUnreachable ||
error.type == ARTRealtimeTransportErrorTypeTimeout
) {
return YES;
}
return NO;
}
- (void)onActivity {
[self.logger verbose:__FILE__ line:__LINE__ message:@"R:%p activity", self];
_lastActivity = [NSDate date];
[self setIdleTimer];
}
- (void)setIdleTimer {
if (self.maxIdleInterval <= 0) {
[self.logger verbose:@"R:%p set idle timer had been ignored", self];
return;
}
artDispatchCancel(_idleTimer);
_idleTimer = artDispatchScheduled([ARTDefault realtimeRequestTimeout] + self.maxIdleInterval, _rest.queue, ^{
[self.logger error:@"R:%p No activity seen from realtime in %f seconds; assuming connection has dropped", self, [[NSDate date] timeIntervalSinceDate:self->_lastActivity]];
ARTErrorInfo *idleTimerExpired = [ARTErrorInfo createWithCode:ARTErrorDisconnected status:408 message:@"Idle timer expired"];
[self transitionToDisconnectedOrSuspendedWithError:idleTimerExpired];
});
}
- (void)stopIdleTimer {
artDispatchCancel(_idleTimer);
_idleTimer = nil;
}
- (void)setTransportClass:(Class)transportClass {
_transportClass = transportClass;
}
- (void)setReachabilityClass:(Class)reachabilityClass {
_reachabilityClass = reachabilityClass;
}
#pragma mark - ARTRealtimeTransportDelegate implementation
- (void)realtimeTransport:(id)transport didReceiveMessage:(ARTProtocolMessage *)message {
[self onActivity];
if (!message) {
// Invalid data
return;
}
if (transport != self.transport) {
// Old connection
return;
}
if (self.connection.state_nosync == ARTRealtimeDisconnected) {
// Already disconnected
return;
}
[self.logger verbose:@"R:%p did receive Protocol Message %@ (connection state is %@)", self, ARTProtocolMessageActionToStr(message.action), ARTRealtimeConnectionStateToStr(self.connection.state_nosync)];
if (message.error) {
[self.logger verbose:@"R:%p Protocol Message with error %@", self, message.error];
}
NSAssert(transport == self.transport, @"Unexpected transport");
if (message.hasConnectionSerial) {
[self.connection setSerial:message.connectionSerial];
}
switch (message.action) {
case ARTProtocolMessageHeartbeat:
[self onHeartbeat];
break;
case ARTProtocolMessageError:
[self onError:message];
break;
case ARTProtocolMessageConnected:
// Set Auth#clientId
if (message.connectionDetails) {
[self.auth setProtocolClientId:message.connectionDetails.clientId];
}
// Event
[self onConnected:message];
break;
case ARTProtocolMessageDisconnect:
case ARTProtocolMessageDisconnected:
[self onDisconnected:message];
break;
case ARTProtocolMessageAck:
[self onAck:message];
break;
case ARTProtocolMessageNack:
[self onNack:message];
break;
case ARTProtocolMessageClosed:
[self onClosed];
break;
case ARTProtocolMessageAuth:
[self onAuth];
break;
default:
[self onChannelMessage:message];
break;
}
}
- (void)realtimeTransportAvailable:(id<ARTRealtimeTransport>)transport {
// Do nothing
}
- (void)realtimeTransportClosed:(id<ARTRealtimeTransport>)transport {
if (transport != self.transport) {
// Old connection
return;
}
if (self.connection.state_nosync == ARTRealtimeClosing) {
// Close succeeded. Nothing more to do.
[self transition:ARTRealtimeClosed];
} else if (self.connection.state_nosync != ARTRealtimeClosed && self.connection.state_nosync != ARTRealtimeFailed) {
// Unexpected closure; recover.
[self transitionToDisconnectedOrSuspended];
}
}
- (void)realtimeTransportDisconnected:(id<ARTRealtimeTransport>)transport withError:(ARTRealtimeTransportError *)error {
if (transport != self.transport) {
// Old connection
return;
}
if (self.connection.state_nosync == ARTRealtimeClosing) {
[self transition:ARTRealtimeClosed];
} else {
[self transitionToDisconnectedOrSuspendedWithError:[ARTErrorInfo createFromNSError:error.error]];
}
}
- (void)realtimeTransportFailed:(id<ARTRealtimeTransport>)transport withError:(ARTRealtimeTransportError *)transportError {
if (transport != self.transport) {
// Old connection
return;
}
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p realtime transport failed: %@", self, transportError];
if ([self shouldRetryWithFallback:transportError]) {
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p host is down; can retry with fallback host", self];
if (!_fallbacks && [transportError.url.host isEqualToString:[ARTDefault realtimeHost]]) {
NSArray *hosts = [ARTFallbackHosts hostsFromOptions:[self getClientOptions]];
self->_fallbacks = [[ARTFallback alloc] initWithFallbackHosts:hosts];
if (self->_fallbacks != nil) {
[self reconnectWithFallback];
} else {
[self transition:ARTRealtimeFailed withErrorInfo:[ARTErrorInfo createFromNSError:transportError.error]];
}
return;
} else if (_fallbacks && [self reconnectWithFallback]) {
return;
}
}
switch (transportError.type) {
case ARTRealtimeTransportErrorTypeBadResponse:
case ARTRealtimeTransportErrorTypeOther:
[self transition:ARTRealtimeFailed withErrorInfo:[ARTErrorInfo createFromNSError:transportError.error]];
break;
default: {
ARTErrorInfo *error = [ARTErrorInfo createFromNSError:transportError.error];
[self transitionToDisconnectedOrSuspendedWithError:error];
}
}
}
- (void)realtimeTransportNeverConnected:(id<ARTRealtimeTransport>)transport {
if (transport != self.transport) {
// Old connection
return;
}
[self transition:ARTRealtimeFailed withErrorInfo:[ARTErrorInfo createWithCode:ARTClientCodeErrorTransport message:@"Transport never connected"]];
}
- (void)realtimeTransportRefused:(id<ARTRealtimeTransport>)transport withError:(ARTRealtimeTransportError *)error {
if (transport != self.transport) {
// Old connection
return;
}
if (error && error.type == ARTRealtimeTransportErrorTypeRefused) {
[self transition:ARTRealtimeFailed withErrorInfo:[ARTErrorInfo createWithCode:ARTClientCodeErrorTransport message:[NSString stringWithFormat:@"Connection refused using %@", error.url]]];
}
else if (error) {
[self transition:ARTRealtimeFailed withErrorInfo:[ARTErrorInfo createFromNSError:error.error]];
}
else {
[self transition:ARTRealtimeFailed];
}
}
- (void)realtimeTransportTooBig:(id<ARTRealtimeTransport>)transport {
if (transport != self.transport) {
// Old connection
return;
}
[self transition:ARTRealtimeFailed withErrorInfo:[ARTErrorInfo createWithCode:ARTClientCodeErrorTransport message:@"Transport too big"]];
}
- (void)realtimeTransportSetMsgSerial:(id<ARTRealtimeTransport>)transport msgSerial:(int64_t)msgSerial {
if (transport != self.transport) {
// Old connection
return;
}
self.msgSerial = msgSerial;
}
#if TARGET_OS_IOS
- (ARTLocalDevice *)device {
return _rest.device;
}
#endif
@end