1273 lines
47 KiB
Objective-C
1273 lines
47 KiB
Objective-C
#import "ARTRealtimeChannel+Private.h"
|
|
#import "ARTChannel+Private.h"
|
|
#import "ARTDataQuery+Private.h"
|
|
|
|
#import "ARTRealtime+Private.h"
|
|
#import "ARTMessage.h"
|
|
#import "ARTBaseMessage+Private.h"
|
|
#import "ARTAuth.h"
|
|
#import "ARTRealtimePresence+Private.h"
|
|
#import "ARTChannel.h"
|
|
#import "ARTChannelOptions.h"
|
|
#import "ARTRealtimeChannelOptions.h"
|
|
#import "ARTProtocolMessage.h"
|
|
#import "ARTProtocolMessage+Private.h"
|
|
#import "ARTPresenceMap.h"
|
|
#import "ARTNSArray+ARTFunctional.h"
|
|
#import "ARTStatus.h"
|
|
#import "ARTDefault.h"
|
|
#import "ARTRest.h"
|
|
#import "ARTClientOptions.h"
|
|
#import "ARTTypes.h"
|
|
#import "ARTGCD.h"
|
|
#import "ARTConnection+Private.h"
|
|
#import "ARTRestChannels+Private.h"
|
|
#import "ARTEventEmitter+Private.h"
|
|
#if TARGET_OS_IPHONE
|
|
#import "ARTPushChannel+Private.h"
|
|
#endif
|
|
|
|
@implementation ARTRealtimeChannel {
|
|
ARTQueuedDealloc *_dealloc;
|
|
}
|
|
|
|
- (void)internalAsync:(void (^)(ARTRealtimeChannelInternal * _Nonnull))use {
|
|
dispatch_async(_internal.queue, ^{
|
|
use(self->_internal);
|
|
});
|
|
}
|
|
|
|
- (void)internalSync:(void (^)(ARTRealtimeChannelInternal * _Nonnull))use {
|
|
dispatch_sync(_internal.queue, ^{
|
|
use(self->_internal);
|
|
});
|
|
}
|
|
|
|
- (instancetype)initWithInternal:(ARTRealtimeChannelInternal *)internal queuedDealloc:(ARTQueuedDealloc *)dealloc {
|
|
self = [super init];
|
|
if (self) {
|
|
_internal = internal;
|
|
_dealloc = dealloc;
|
|
}
|
|
return self;
|
|
}
|
|
|
|
- (NSString *)name {
|
|
return _internal.name;
|
|
}
|
|
|
|
- (ARTRealtimeChannelState)state {
|
|
return _internal.state;
|
|
}
|
|
|
|
- (ARTErrorInfo *)errorReason {
|
|
return _internal.errorReason;
|
|
}
|
|
|
|
- (ARTRealtimePresence *)presence {
|
|
return [[ARTRealtimePresence alloc] initWithInternal:_internal.presence queuedDealloc:_dealloc];
|
|
}
|
|
|
|
#if TARGET_OS_IPHONE
|
|
|
|
- (ARTPushChannel *)push {
|
|
return [[ARTPushChannel alloc] initWithInternal:_internal.push queuedDealloc:_dealloc];
|
|
}
|
|
|
|
#endif
|
|
|
|
- (void)publish:(nullable NSString *)name data:(nullable id)data {
|
|
[_internal publish:name data:data];
|
|
}
|
|
|
|
- (void)publish:(nullable NSString *)name data:(nullable id)data callback:(nullable ARTCallback)callback {
|
|
[_internal publish:name data:data callback:callback];
|
|
}
|
|
|
|
- (void)publish:(nullable NSString *)name data:(nullable id)data clientId:(NSString *)clientId {
|
|
[_internal publish:name data:data clientId:clientId];
|
|
}
|
|
|
|
- (void)publish:(nullable NSString *)name data:(nullable id)data clientId:(NSString *)clientId callback:(nullable ARTCallback)callback {
|
|
[_internal publish:name data:data clientId:clientId callback:callback];
|
|
}
|
|
|
|
- (void)publish:(nullable NSString *)name data:(nullable id)data extras:(nullable id<ARTJsonCompatible>)extras {
|
|
[_internal publish:name data:data extras:extras];
|
|
}
|
|
|
|
- (void)publish:(nullable NSString *)name data:(nullable id)data extras:(nullable id<ARTJsonCompatible>)extras callback:(nullable ARTCallback)callback {
|
|
[_internal publish:name data:data extras:extras callback:callback];
|
|
}
|
|
|
|
- (void)publish:(nullable NSString *)name data:(nullable id)data clientId:(NSString *)clientId extras:(nullable id<ARTJsonCompatible>)extras {
|
|
[_internal publish:name data:data clientId:clientId extras:extras];
|
|
}
|
|
|
|
- (void)publish:(nullable NSString *)name data:(nullable id)data clientId:(NSString *)clientId extras:(nullable id<ARTJsonCompatible>)extras callback:(nullable ARTCallback)callback {
|
|
[_internal publish:name data:data clientId:clientId extras:extras callback:callback];
|
|
}
|
|
|
|
- (void)publish:(NSArray<ARTMessage *> *)messages {
|
|
[_internal publish:messages];
|
|
}
|
|
|
|
- (void)publish:(NSArray<ARTMessage *> *)messages callback:(nullable ARTCallback)callback {
|
|
[_internal publish:messages callback:callback];
|
|
}
|
|
|
|
- (void)history:(ARTPaginatedMessagesCallback)callback {
|
|
[_internal history:callback];
|
|
}
|
|
|
|
- (BOOL)exceedMaxSize:(NSArray<ARTBaseMessage *> *)messages {
|
|
return [_internal exceedMaxSize:messages];
|
|
}
|
|
|
|
- (void)attach {
|
|
[_internal attach];
|
|
}
|
|
|
|
- (void)attach:(nullable ARTCallback)callback {
|
|
[_internal attach:callback];
|
|
}
|
|
|
|
- (void)detach {
|
|
[_internal detach];
|
|
}
|
|
|
|
- (void)detach:(nullable ARTCallback)callback {
|
|
[_internal detach:callback];
|
|
}
|
|
|
|
- (ARTEventListener *_Nullable)subscribe:(ARTMessageCallback)callback {
|
|
return [_internal subscribe:callback];
|
|
}
|
|
|
|
- (ARTEventListener *_Nullable)subscribeWithAttachCallback:(nullable ARTCallback)onAttach callback:(ARTMessageCallback)cb {
|
|
return [_internal subscribeWithAttachCallback:onAttach callback:cb];
|
|
}
|
|
|
|
- (ARTEventListener *_Nullable)subscribe:(NSString *)name callback:(ARTMessageCallback)cb {
|
|
return [_internal subscribe:name callback:cb];
|
|
}
|
|
|
|
- (ARTEventListener *_Nullable)subscribe:(NSString *)name onAttach:(nullable ARTCallback)onAttach callback:(ARTMessageCallback)cb {
|
|
return [_internal subscribe:name onAttach:onAttach callback:cb];
|
|
}
|
|
|
|
- (void)unsubscribe {
|
|
[_internal unsubscribe];
|
|
}
|
|
|
|
- (void)unsubscribe:(ARTEventListener *_Nullable)listener {
|
|
[_internal unsubscribe:listener];
|
|
}
|
|
|
|
- (void)unsubscribe:(NSString *)name listener:(ARTEventListener *_Nullable)listener {
|
|
[_internal unsubscribe:name listener:listener];
|
|
}
|
|
|
|
- (BOOL)history:(ARTRealtimeHistoryQuery *_Nullable)query callback:(ARTPaginatedMessagesCallback)callback error:(NSError *_Nullable *_Nullable)errorPtr {
|
|
return [_internal history:query callback:callback error:errorPtr];
|
|
}
|
|
|
|
- (ARTEventListener *)on:(ARTChannelStateCallback)cb {
|
|
return [_internal on:cb];
|
|
}
|
|
|
|
- (ARTEventListener *)once:(ARTChannelEvent)event callback:(ARTChannelStateCallback)cb {
|
|
return [_internal once:event callback:cb];
|
|
}
|
|
|
|
- (ARTEventListener *)once:(ARTChannelStateCallback)cb {
|
|
return [_internal once:cb];
|
|
}
|
|
|
|
- (void)off:(ARTChannelEvent)event listener:(ARTEventListener *)listener {
|
|
[_internal off:event listener:listener];
|
|
}
|
|
|
|
- (void)off:(ARTEventListener *)listener {
|
|
[_internal off:listener];
|
|
}
|
|
|
|
- (void)off {
|
|
[_internal off];
|
|
}
|
|
|
|
- (nonnull ARTEventListener *)on:(ARTChannelEvent)event callback:(nonnull ARTChannelStateCallback)cb {
|
|
return [_internal on:event callback:cb];
|
|
}
|
|
|
|
- (ARTRealtimeChannelOptions *)getOptions {
|
|
return [_internal getOptions];
|
|
}
|
|
|
|
- (void)setOptions:(ARTRealtimeChannelOptions *_Nullable)options callback:(nullable ARTCallback)cb {
|
|
[_internal setOptions:options callback:cb];
|
|
}
|
|
|
|
@end
|
|
|
|
@interface ARTRealtimeChannelInternal () {
|
|
ARTRealtimePresenceInternal *_realtimePresence;
|
|
#if TARGET_OS_IPHONE
|
|
ARTPushChannelInternal *_pushChannel;
|
|
#endif
|
|
CFRunLoopTimerRef _attachTimer;
|
|
CFRunLoopTimerRef _detachTimer;
|
|
__GENERIC(ARTEventEmitter, ARTEvent *, ARTErrorInfo *) *_attachedEventEmitter;
|
|
__GENERIC(ARTEventEmitter, ARTEvent *, ARTErrorInfo *) *_detachedEventEmitter;
|
|
NSString * _Nullable _lastPayloadMessageId;
|
|
NSString * _Nullable _lastPayloadProtocolMessageChannelSerial;
|
|
BOOL _decodeFailureRecoveryInProgress;
|
|
}
|
|
|
|
@end
|
|
|
|
@implementation ARTRealtimeChannelInternal {
|
|
dispatch_queue_t _queue;
|
|
dispatch_queue_t _userQueue;
|
|
ARTErrorInfo *_errorReason;
|
|
}
|
|
|
|
- (instancetype)initWithRealtime:(ARTRealtimeInternal *)realtime andName:(NSString *)name withOptions:(ARTRealtimeChannelOptions *)options {
|
|
self = [super initWithName:name andOptions:options rest:realtime.rest];
|
|
if (self) {
|
|
_realtime = realtime;
|
|
_queue = realtime.rest.queue;
|
|
_userQueue = realtime.rest.userQueue;
|
|
_restChannel = [_realtime.rest.channels _getChannel:self.name options:options addPrefix:true];
|
|
_state = ARTRealtimeChannelInitialized;
|
|
_attachSerial = nil;
|
|
_presenceMap = [[ARTPresenceMap alloc] initWithQueue:_queue logger:self.logger];
|
|
_presenceMap.delegate = self;
|
|
_statesEventEmitter = [[ARTPublicEventEmitter alloc] initWithRest:_realtime.rest];
|
|
_messagesEventEmitter = [[ARTInternalEventEmitter alloc] initWithQueues:_queue userQueue:_userQueue];
|
|
_presenceEventEmitter = [[ARTInternalEventEmitter alloc] initWithQueue:_queue];
|
|
_attachedEventEmitter = [[ARTInternalEventEmitter alloc] initWithQueue:_queue];
|
|
_detachedEventEmitter = [[ARTInternalEventEmitter alloc] initWithQueue:_queue];
|
|
_internalEventEmitter = [[ARTInternalEventEmitter alloc] initWithQueue:_queue];
|
|
}
|
|
return self;
|
|
}
|
|
|
|
+ (instancetype)channelWithRealtime:(ARTRealtimeInternal *)realtime andName:(NSString *)name withOptions:(ARTRealtimeChannelOptions *)options {
|
|
return [[ARTRealtimeChannelInternal alloc] initWithRealtime:realtime andName:name withOptions:options];
|
|
}
|
|
|
|
- (ARTRealtimeChannelState)state {
|
|
__block ARTRealtimeChannelState ret;
|
|
dispatch_sync(_queue, ^{
|
|
ret = [self state_nosync];
|
|
});
|
|
return ret;
|
|
}
|
|
|
|
- (ARTErrorInfo *)errorReason {
|
|
__block ARTErrorInfo * ret;
|
|
dispatch_sync(_queue, ^{
|
|
ret = [self errorReason_nosync];
|
|
});
|
|
return ret;
|
|
}
|
|
|
|
- (ARTRealtimeChannelState)state_nosync {
|
|
return _state;
|
|
}
|
|
|
|
- (BOOL)canBeReattached {
|
|
switch (self.state_nosync) {
|
|
case ARTRealtimeChannelAttaching:
|
|
case ARTRealtimeChannelAttached:
|
|
case ARTRealtimeChannelSuspended:
|
|
return YES;
|
|
default:
|
|
return NO;
|
|
}
|
|
}
|
|
|
|
- (ARTErrorInfo *)errorReason_nosync {
|
|
return _errorReason;
|
|
}
|
|
|
|
- (ARTLog *)getLogger {
|
|
return _realtime.logger;
|
|
}
|
|
|
|
- (ARTRealtimePresenceInternal *)presence {
|
|
if (!_realtimePresence) {
|
|
_realtimePresence = [[ARTRealtimePresenceInternal alloc] initWithChannel:self];
|
|
}
|
|
return _realtimePresence;
|
|
}
|
|
|
|
#if TARGET_OS_IPHONE
|
|
- (ARTPushChannelInternal *)push {
|
|
if (!_pushChannel) {
|
|
_pushChannel = [[ARTPushChannelInternal alloc] init:self.realtime.rest withChannel:self];
|
|
}
|
|
return _pushChannel;
|
|
}
|
|
#endif
|
|
|
|
- (void)internalPostMessages:(id)data callback:(ARTCallback)callback {
|
|
if (callback) {
|
|
ARTCallback userCallback = callback;
|
|
callback = ^(ARTErrorInfo *__art_nullable error) {
|
|
dispatch_async(self->_userQueue, ^{
|
|
userCallback(error);
|
|
});
|
|
};
|
|
}
|
|
|
|
if (![data isKindOfClass:[NSArray class]]) {
|
|
data = @[data];
|
|
}
|
|
|
|
dispatch_sync(_queue, ^{
|
|
if ([data isKindOfClass:[ARTMessage class]]) {
|
|
ARTMessage *message = (ARTMessage *)data;
|
|
if (message.clientId && self->_realtime.rest.auth.clientId_nosync && ![message.clientId isEqualToString:self->_realtime.rest.auth.clientId_nosync]) {
|
|
if (callback)
|
|
callback([ARTErrorInfo createWithCode:ARTStateMismatchedClientId message:@"attempted to publish message with an invalid clientId"]);
|
|
return;
|
|
}
|
|
}
|
|
else if ([data isKindOfClass:[NSArray class]]) {
|
|
NSArray<ARTMessage *> *messages = (NSArray *)data;
|
|
for (ARTMessage *message in messages) {
|
|
if (message.clientId && self->_realtime.rest.auth.clientId_nosync && ![message.clientId isEqualToString:self->_realtime.rest.auth.clientId_nosync]) {
|
|
if (callback)
|
|
callback([ARTErrorInfo createWithCode:ARTStateMismatchedClientId message:@"attempted to publish message with an invalid clientId"]);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!self.realtime.connection.isActive_nosync) {
|
|
if (callback)
|
|
callback([self.realtime.connection error_nosync]);
|
|
return;
|
|
}
|
|
|
|
ARTProtocolMessage *msg = [[ARTProtocolMessage alloc] init];
|
|
msg.action = ARTProtocolMessageMessage;
|
|
msg.channel = self.name;
|
|
msg.messages = data;
|
|
|
|
[self publishProtocolMessage:msg callback:^void(ARTStatus *status) {
|
|
if (callback)
|
|
callback(status.errorInfo);
|
|
}];
|
|
});
|
|
}
|
|
|
|
- (void)sync {
|
|
[self sync:nil];
|
|
}
|
|
|
|
- (void)sync:(ARTCallback)callback {
|
|
if (callback) {
|
|
ARTCallback userCallback = callback;
|
|
callback = ^(ARTErrorInfo *__art_nullable error) {
|
|
dispatch_async(self->_userQueue, ^{
|
|
userCallback(error);
|
|
});
|
|
};
|
|
}
|
|
|
|
switch (self.state_nosync) {
|
|
case ARTRealtimeChannelInitialized:
|
|
case ARTRealtimeChannelDetaching:
|
|
case ARTRealtimeChannelDetached: {
|
|
ARTErrorInfo *error = [ARTErrorInfo createWithCode:ARTErrorBadRequest
|
|
message:@"Unable to sync to channel; not attached."];
|
|
[self.logger logWithError:error];
|
|
if (callback) callback(error);
|
|
return;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
[self.logger verbose:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) requesting a sync operation", _realtime, self, self.name];
|
|
|
|
ARTProtocolMessage *msg = [[ARTProtocolMessage alloc] init];
|
|
msg.action = ARTProtocolMessageSync;
|
|
msg.channel = self.name;
|
|
msg.channelSerial = self.presenceMap.syncChannelSerial;
|
|
|
|
[self.presenceMap startSync];
|
|
[self.realtime send:msg sentCallback:^(ARTErrorInfo *error) {
|
|
if (error) {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) SYNC request failed with %@", self->_realtime, self, self.name, error];
|
|
[self.presenceMap endSync];
|
|
if (callback) callback(error);
|
|
}
|
|
else {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) SYNC requested with success", self->_realtime, self, self.name];
|
|
if (callback) callback(nil);
|
|
}
|
|
} ackCallback:nil];
|
|
}
|
|
|
|
- (void)requestContinueSync {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) requesting to continue sync operation after reconnect using msgSerial %lld and channelSerial %@", _realtime, self, self.name, self.presenceMap.syncMsgSerial, self.presenceMap.syncChannelSerial];
|
|
|
|
ARTProtocolMessage * msg = [[ARTProtocolMessage alloc] init];
|
|
msg.action = ARTProtocolMessageSync;
|
|
msg.msgSerial = [NSNumber numberWithLongLong:self.presenceMap.syncMsgSerial];
|
|
msg.channelSerial = self.presenceMap.syncChannelSerial;
|
|
msg.channel = self.name;
|
|
|
|
[self.presenceMap startSync];
|
|
[self.realtime send:msg sentCallback:^(ARTErrorInfo *error) {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) continue sync, error is %@", self->_realtime, self, self.name, error];
|
|
if (error) {
|
|
[self.presenceMap endSync];
|
|
}
|
|
} ackCallback:nil];
|
|
}
|
|
|
|
- (void)publishProtocolMessage:(ARTProtocolMessage *)pm callback:(ARTStatusCallback)cb {
|
|
switch (self.state_nosync) {
|
|
case ARTRealtimeChannelSuspended:
|
|
case ARTRealtimeChannelFailed: {
|
|
if (cb) {
|
|
ARTStatus *statusInvalidChannelState = [ARTStatus state:ARTStateError info:[ARTErrorInfo createWithCode:ARTErrorChannelOperationFailedInvalidState message:[NSString stringWithFormat:@"channel operation failed (invalid channel state: %@)", ARTRealtimeChannelStateToStr(self.state_nosync)]]];
|
|
cb(statusInvalidChannelState);
|
|
}
|
|
break;
|
|
}
|
|
case ARTRealtimeChannelInitialized:
|
|
case ARTRealtimeChannelDetaching:
|
|
case ARTRealtimeChannelDetached:
|
|
case ARTRealtimeChannelAttaching:
|
|
case ARTRealtimeChannelAttached: {
|
|
[self.realtime send:pm sentCallback:nil ackCallback:^(ARTStatus *status) {
|
|
if (cb) cb(status);
|
|
}];
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
- (ARTPresenceMap *)presenceMap {
|
|
return _presenceMap;
|
|
}
|
|
|
|
- (void)throwOnDisconnectedOrFailed {
|
|
if (self.realtime.connection.state_nosync == ARTRealtimeFailed || self.realtime.connection.state_nosync == ARTRealtimeDisconnected) {
|
|
[ARTException raise:@"realtime cannot perform action in disconnected or failed state" format:@"state: %d", (int)self.realtime.connection.state_nosync];
|
|
}
|
|
}
|
|
|
|
- (ARTEventListener *)subscribe:(ARTMessageCallback)callback {
|
|
return [self subscribeWithAttachCallback:nil callback:callback];
|
|
}
|
|
|
|
- (ARTEventListener *)subscribeWithAttachCallback:(ARTCallback)onAttach callback:(ARTMessageCallback)cb {
|
|
if (cb) {
|
|
ARTMessageCallback userCallback = cb;
|
|
cb = ^(ARTMessage *_Nonnull m) {
|
|
if (self.state_nosync != ARTRealtimeChannelAttached) { //RTL17
|
|
return;
|
|
}
|
|
dispatch_async(self->_userQueue, ^{
|
|
userCallback(m);
|
|
});
|
|
};
|
|
}
|
|
if (onAttach) {
|
|
ARTCallback userOnAttach = onAttach;
|
|
onAttach = ^(ARTErrorInfo *_Nullable e) {
|
|
dispatch_async(self->_userQueue, ^{
|
|
userOnAttach(e);
|
|
});
|
|
};
|
|
}
|
|
|
|
__block ARTEventListener *listener = nil;
|
|
dispatch_sync(_queue, ^{
|
|
if (self.state_nosync == ARTRealtimeChannelFailed) {
|
|
if (onAttach) onAttach([ARTErrorInfo createWithCode:0 message:@"attempted to subscribe while channel is in FAILED state."]);
|
|
[self.logger warn:@"R:%p C:%p (%@) subscribe has been ignored (attempted to subscribe while channel is in FAILED state)", self->_realtime, self, self.name];
|
|
return;
|
|
}
|
|
if (self.state_nosync == ARTRealtimeChannelInitialized) { //RTL7c
|
|
[self _attach:onAttach];
|
|
}
|
|
listener = [self.messagesEventEmitter on:cb];
|
|
[self.logger verbose:@"R:%p C:%p (%@) subscribe to all events", self->_realtime, self, self.name];
|
|
});
|
|
return listener;
|
|
}
|
|
|
|
- (ARTEventListener *)subscribe:(NSString *)name callback:(ARTMessageCallback)cb {
|
|
return [self subscribe:name onAttach:nil callback:cb];
|
|
}
|
|
|
|
- (ARTEventListener *)subscribe:(NSString *)name onAttach:(ARTCallback)onAttach callback:(ARTMessageCallback)cb {
|
|
if (cb) {
|
|
ARTMessageCallback userCallback = cb;
|
|
cb = ^(ARTMessage *_Nonnull m) {
|
|
dispatch_async(self->_userQueue, ^{
|
|
userCallback(m);
|
|
});
|
|
};
|
|
}
|
|
if (onAttach) {
|
|
ARTCallback userOnAttach = onAttach;
|
|
onAttach = ^(ARTErrorInfo *_Nullable e) {
|
|
dispatch_async(self->_userQueue, ^{
|
|
userOnAttach(e);
|
|
});
|
|
};
|
|
}
|
|
|
|
__block ARTEventListener *listener = nil;
|
|
dispatch_sync(_queue, ^{
|
|
if (self.state_nosync == ARTRealtimeChannelFailed) {
|
|
if (onAttach) onAttach([ARTErrorInfo createWithCode:0 message:@"attempted to subscribe while channel is in FAILED state."]);
|
|
[self.logger warn:@"R:%p C:%p (%@) subscribe of '%@' has been ignored (attempted to subscribe while channel is in FAILED state)", self->_realtime, self, self.name, name];
|
|
return;
|
|
}
|
|
[self _attach:onAttach];
|
|
listener = [self.messagesEventEmitter on:name callback:cb];
|
|
[self.logger verbose:@"R:%p C:%p (%@) subscribe to event '%@'", self->_realtime, self, self.name, name];
|
|
});
|
|
return listener;
|
|
}
|
|
|
|
- (void)unsubscribe {
|
|
dispatch_sync(_queue, ^{
|
|
[self _unsubscribe];
|
|
[self.logger verbose:@"R:%p C:%p (%@) unsubscribe to all events", self->_realtime, self, self.name];
|
|
});
|
|
}
|
|
|
|
- (void)_unsubscribe {
|
|
[self.messagesEventEmitter off];
|
|
}
|
|
|
|
- (void)unsubscribe:(ARTEventListener *)listener {
|
|
dispatch_sync(_queue, ^{
|
|
[self.messagesEventEmitter off:listener];
|
|
[self.logger verbose:@"RT:%p C:%p (%@) unsubscribe to all events", self->_realtime, self, self.name];
|
|
});
|
|
}
|
|
|
|
- (void)unsubscribe:(NSString *)name listener:(ARTEventListener *)listener {
|
|
dispatch_sync(_queue, ^{
|
|
[self.messagesEventEmitter off:name listener:listener];
|
|
[self.logger verbose:@"RT:%p C:%p (%@) unsubscribe to event '%@'", self->_realtime, self, self.name, name];
|
|
});
|
|
}
|
|
|
|
- (ARTEventListener *)on:(ARTChannelEvent)event callback:(ARTChannelStateCallback)cb {
|
|
return [self.statesEventEmitter on:[ARTEvent newWithChannelEvent:event] callback:cb];
|
|
}
|
|
|
|
- (ARTEventListener *)on:(ARTChannelStateCallback)cb {
|
|
return [self.statesEventEmitter on:cb];
|
|
}
|
|
|
|
- (ARTEventListener *)once:(ARTChannelEvent)event callback:(ARTChannelStateCallback)cb {
|
|
return [self.statesEventEmitter once:[ARTEvent newWithChannelEvent:event] callback:cb];
|
|
}
|
|
|
|
- (ARTEventListener *)once:(ARTChannelStateCallback)cb {
|
|
return [self.statesEventEmitter once:cb];
|
|
}
|
|
|
|
- (void)off {
|
|
[self.statesEventEmitter off];
|
|
}
|
|
|
|
|
|
- (void)off_nosync {
|
|
[(ARTPublicEventEmitter *)self.statesEventEmitter off_nosync];
|
|
}
|
|
|
|
- (void)off:(ARTChannelEvent)event listener:listener {
|
|
[self.statesEventEmitter off:[ARTEvent newWithChannelEvent:event] listener:listener];
|
|
}
|
|
|
|
- (void)off:(ARTEventListener *)listener {
|
|
[self.statesEventEmitter off:listener];
|
|
}
|
|
|
|
- (void)emit:(ARTChannelEvent)event with:(ARTChannelStateChange *)data {
|
|
[self.statesEventEmitter emit:[ARTEvent newWithChannelEvent:event] with:data];
|
|
[self.internalEventEmitter emit:[ARTEvent newWithChannelEvent:event] with:data];
|
|
}
|
|
|
|
- (void)transition:(ARTRealtimeChannelState)state status:(ARTStatus *)status {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) channel state transitions from %tu - %@ to %tu - %@", _realtime, self, self.name, self.state_nosync, ARTRealtimeChannelStateToStr(self.state_nosync), state, ARTRealtimeChannelStateToStr(state)];
|
|
ARTChannelStateChange *stateChange = [[ARTChannelStateChange alloc] initWithCurrent:state previous:self.state_nosync event:(ARTChannelEvent)state reason:status.errorInfo];
|
|
self.state = state;
|
|
|
|
if (status.storeErrorInfo) {
|
|
_errorReason = status.errorInfo;
|
|
}
|
|
|
|
ARTEventListener *channelRetryListener = nil;
|
|
switch (state) {
|
|
case ARTRealtimeChannelAttached:
|
|
self.attachResume = true;
|
|
break;
|
|
case ARTRealtimeChannelSuspended:
|
|
[_attachedEventEmitter emit:nil with:status.errorInfo];
|
|
if (self.realtime.shouldSendEvents) {
|
|
channelRetryListener = [self unlessStateChangesBefore:self.realtime.options.channelRetryTimeout do:^{
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) reattach initiated by retry timeout", self->_realtime, self, self.name];
|
|
[self reattachWithReason:nil callback:^(ARTErrorInfo *errorInfo) {
|
|
if (errorInfo) {
|
|
ARTStatus *status = [ARTStatus state:ARTStateError info:errorInfo];
|
|
[self setSuspended:status];
|
|
}
|
|
}];
|
|
}];
|
|
}
|
|
break;
|
|
case ARTRealtimeChannelDetaching:
|
|
self.attachResume = false;
|
|
break;
|
|
case ARTRealtimeChannelDetached:
|
|
[self.presenceMap failsSync:status.errorInfo];
|
|
break;
|
|
case ARTRealtimeChannelFailed:
|
|
self.attachResume = false;
|
|
[_attachedEventEmitter emit:nil with:status.errorInfo];
|
|
[_detachedEventEmitter emit:nil with:status.errorInfo];
|
|
[self.presenceMap failsSync:status.errorInfo];
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
[self emit:stateChange.event with:stateChange];
|
|
|
|
if (channelRetryListener) {
|
|
[channelRetryListener startTimer];
|
|
}
|
|
}
|
|
|
|
- (ARTEventListener *)unlessStateChangesBefore:(NSTimeInterval)deadline do:(void(^)(void))callback {
|
|
return [[self.internalEventEmitter once:^(ARTChannelStateChange *stateChange) {
|
|
// Any state change cancels the timeout.
|
|
}] setTimer:deadline onTimeout:^{
|
|
if (callback) {
|
|
callback();
|
|
}
|
|
}];
|
|
}
|
|
|
|
/**
|
|
Checks that a channelSerial is the final serial in a sequence of sync messages,
|
|
by checking that there is nothing after the colon
|
|
*/
|
|
- (bool)isLastChannelSerial:(NSString *)channelSerial {
|
|
NSArray * a = [channelSerial componentsSeparatedByString:@":"];
|
|
if([a count] >1 && ![[a objectAtIndex:1] isEqualToString:@""] ) {
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
- (void)onChannelMessage:(ARTProtocolMessage *)message {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) received channel message %tu - %@", _realtime, self, self.name, message.action, ARTProtocolMessageActionToStr(message.action)];
|
|
switch (message.action) {
|
|
case ARTProtocolMessageAttached:
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) %@", _realtime, self, self.name, message.description];
|
|
[self setAttached:message];
|
|
break;
|
|
case ARTProtocolMessageDetach:
|
|
case ARTProtocolMessageDetached:
|
|
[self setDetached:message];
|
|
break;
|
|
case ARTProtocolMessageMessage:
|
|
if (_decodeFailureRecoveryInProgress) {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) message decode recovery in progress, message skipped: %@", _realtime, self, self.name, message.description];
|
|
break;
|
|
}
|
|
[self onMessage:message];
|
|
break;
|
|
case ARTProtocolMessagePresence:
|
|
[self onPresence:message];
|
|
break;
|
|
case ARTProtocolMessageError:
|
|
[self onError:message];
|
|
break;
|
|
case ARTProtocolMessageSync:
|
|
[self onSync:message];
|
|
break;
|
|
default:
|
|
[self.logger warn:@"R:%p C:%p (%@) unknown ARTProtocolMessage action: %tu", _realtime, self, self.name, message.action];
|
|
break;
|
|
}
|
|
}
|
|
|
|
- (void)setAttached:(ARTProtocolMessage *)message {
|
|
switch (self.state_nosync) {
|
|
case ARTRealtimeChannelDetaching:
|
|
case ARTRealtimeChannelFailed:
|
|
// Ignore
|
|
return;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
if (message.resumed) {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) channel has resumed", _realtime, self, self.name];
|
|
}
|
|
|
|
self.attachSerial = message.channelSerial;
|
|
|
|
if (message.hasPresence) {
|
|
[self.presenceMap startSync];
|
|
}
|
|
else if ([self.presenceMap.members count] > 0 || [self.presenceMap.localMembers count] > 0) {
|
|
if (!message.resumed) {
|
|
// When an ATTACHED message is received without a HAS_PRESENCE flag and PresenceMap has existing members
|
|
[self.presenceMap startSync];
|
|
[self.presenceMap endSync];
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"R:%p C:%p (%@) PresenceMap has been reset", _realtime, self, self.name];
|
|
}
|
|
}
|
|
|
|
if (self.state_nosync == ARTRealtimeChannelAttached) {
|
|
if (message.error != nil) {
|
|
_errorReason = message.error;
|
|
}
|
|
ARTChannelStateChange *stateChange = [[ARTChannelStateChange alloc] initWithCurrent:self.state_nosync previous:self.state_nosync event:ARTChannelEventUpdate reason:message.error resumed:message.resumed];
|
|
[self emit:stateChange.event with:stateChange];
|
|
return;
|
|
}
|
|
|
|
ARTStatus *status = message.error ? [ARTStatus state:ARTStateError info:message.error] : [ARTStatus state:ARTStateOk];
|
|
[self transition:ARTRealtimeChannelAttached status:status];
|
|
[_attachedEventEmitter emit:nil with:nil];
|
|
|
|
[self.presence sendPendingPresence];
|
|
}
|
|
|
|
- (void)setDetached:(ARTProtocolMessage *)message {
|
|
switch (self.state_nosync) {
|
|
case ARTRealtimeChannelAttached:
|
|
case ARTRealtimeChannelSuspended:
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) reattach initiated by DETACHED message", _realtime, self, self.name];
|
|
[self reattachWithReason:message.error callback:nil];
|
|
return;
|
|
case ARTRealtimeChannelAttaching: {
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) reattach initiated by DETACHED message but it is currently attaching", _realtime, self, self.name];
|
|
ARTStatus *status = message.error ? [ARTStatus state:ARTStateError info:message.error] : [ARTStatus state:ARTStateOk];
|
|
status.storeErrorInfo = false;
|
|
[self setSuspended:status];
|
|
return;
|
|
}
|
|
case ARTRealtimeChannelFailed:
|
|
return;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
self.attachSerial = nil;
|
|
|
|
ARTErrorInfo *errorInfo = message.error ? message.error : [ARTErrorInfo createWithCode:0 message:@"channel has detached"];
|
|
ARTStatus *reason = [ARTStatus state:ARTStateNotAttached info:errorInfo];
|
|
[self detachChannel:reason];
|
|
[_detachedEventEmitter emit:nil with:nil];
|
|
}
|
|
|
|
- (void)detachChannel:(ARTStatus *)status {
|
|
if (self.state_nosync == ARTRealtimeChannelDetached) {
|
|
return;
|
|
}
|
|
[self.presence failPendingPresence:status];
|
|
[self transition:ARTRealtimeChannelDetached status:status];
|
|
}
|
|
|
|
- (void)setFailed:(ARTStatus *)status {
|
|
[self.presence failPendingPresence:status];
|
|
[self transition:ARTRealtimeChannelFailed status:status];
|
|
}
|
|
|
|
- (void)setSuspended:(ARTStatus *)status {
|
|
[self.presence failPendingPresence:status];
|
|
[self transition:ARTRealtimeChannelSuspended status:status];
|
|
}
|
|
|
|
- (void)onMessage:(ARTProtocolMessage *)pm {
|
|
int i = 0;
|
|
|
|
ARTMessage *firstMessage = pm.messages.firstObject;
|
|
if (firstMessage.extras) {
|
|
NSError *extrasDecodeError;
|
|
NSDictionary *const extras = [firstMessage.extras toJSON:&extrasDecodeError];
|
|
if (extrasDecodeError) {
|
|
[self.logger error:@"R:%p C:%p (%@) message extras %@ decode error: %@", _realtime, self, self.name, firstMessage.extras, extrasDecodeError];
|
|
}
|
|
else {
|
|
NSString *const deltaFrom = [[extras objectForKey:@"delta"] objectForKey:@"from"];
|
|
if (deltaFrom && _lastPayloadMessageId && ![deltaFrom isEqualToString:_lastPayloadMessageId]) {
|
|
ARTErrorInfo *incompatibleIdError = [ARTErrorInfo createWithCode:ARTErrorUnableToDecodeMessage message:[NSString stringWithFormat:@"previous id '%@' is incompatible with message delta %@", _lastPayloadMessageId, firstMessage]];
|
|
[self.logger error:@"R:%p C:%p (%@) %@", _realtime, self, self.name, incompatibleIdError.message];
|
|
for (int j = i + 1; j < pm.messages.count; j++) {
|
|
[self.logger verbose:@"R:%p C:%p (%@) message skipped %@", _realtime, self, self.name, pm.messages[j]];
|
|
}
|
|
[self startDecodeFailureRecoveryWithChannelSerial:_lastPayloadProtocolMessageChannelSerial error:incompatibleIdError];
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
ARTDataEncoder *dataEncoder = self.dataEncoder;
|
|
for (ARTMessage *m in pm.messages) {
|
|
ARTMessage *msg = m;
|
|
|
|
if (msg.data && dataEncoder) {
|
|
NSError *decodeError = nil;
|
|
msg = [msg decodeWithEncoder:dataEncoder error:&decodeError];
|
|
if (decodeError) {
|
|
ARTErrorInfo *errorInfo = [ARTErrorInfo wrap:[ARTErrorInfo createWithCode:ARTErrorUnableToDecodeMessage message:decodeError.localizedFailureReason] prepend:@"Failed to decode data: "];
|
|
[self.logger error:@"R:%p C:%p (%@) %@", _realtime, self, self.name, errorInfo.message];
|
|
_errorReason = errorInfo;
|
|
ARTChannelStateChange *stateChange = [[ARTChannelStateChange alloc] initWithCurrent:self.state_nosync previous:self.state_nosync event:ARTChannelEventUpdate reason:errorInfo];
|
|
[self emit:stateChange.event with:stateChange];
|
|
|
|
if (decodeError.code == ARTErrorUnableToDecodeMessage) {
|
|
[self startDecodeFailureRecoveryWithChannelSerial:_lastPayloadProtocolMessageChannelSerial error:errorInfo];
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!msg.timestamp) {
|
|
msg.timestamp = pm.timestamp;
|
|
}
|
|
if (!msg.id) {
|
|
msg.id = [NSString stringWithFormat:@"%@:%d", pm.id, i];
|
|
}
|
|
|
|
_lastPayloadMessageId = msg.id;
|
|
|
|
[self.messagesEventEmitter emit:msg.name with:msg];
|
|
|
|
++i;
|
|
}
|
|
|
|
_lastPayloadProtocolMessageChannelSerial = pm.channelSerial;
|
|
}
|
|
|
|
- (void)onPresence:(ARTProtocolMessage *)message {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) handle PRESENCE message", _realtime, self, self.name];
|
|
int i = 0;
|
|
ARTDataEncoder *dataEncoder = self.dataEncoder;
|
|
for (ARTPresenceMessage *p in message.presence) {
|
|
ARTPresenceMessage *presence = p;
|
|
if (presence.data && dataEncoder) {
|
|
NSError *decodeError = nil;
|
|
presence = [p decodeWithEncoder:dataEncoder error:&decodeError];
|
|
if (decodeError != nil) {
|
|
ARTErrorInfo *errorInfo = [ARTErrorInfo wrap:[ARTErrorInfo createWithCode:ARTErrorUnableToDecodeMessage message:decodeError.localizedFailureReason] prepend:@"Failed to decode data: "];
|
|
[self.logger error:@"RT:%p C:%p (%@) %@", _realtime, self, self.name, errorInfo.message];
|
|
}
|
|
}
|
|
|
|
if (!presence.timestamp) {
|
|
presence.timestamp = message.timestamp;
|
|
}
|
|
|
|
if (!presence.id) {
|
|
presence.id = [NSString stringWithFormat:@"%@:%d", message.id, i];
|
|
}
|
|
|
|
if ([self.presenceMap add:presence]) {
|
|
[self broadcastPresence:presence];
|
|
}
|
|
|
|
++i;
|
|
}
|
|
}
|
|
|
|
- (void)onSync:(ARTProtocolMessage *)message {
|
|
self.presenceMap.syncMsgSerial = [message.msgSerial longLongValue];
|
|
self.presenceMap.syncChannelSerial = message.channelSerial;
|
|
|
|
if (!self.presenceMap.syncInProgress) {
|
|
[self.presenceMap startSync];
|
|
}
|
|
else {
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) PresenceMap sync is in progress", _realtime, self, self.name];
|
|
}
|
|
|
|
for (int i=0; i<[message.presence count]; i++) {
|
|
ARTPresenceMessage *presence = [message.presence objectAtIndex:i];
|
|
if ([self.presenceMap add:presence]) {
|
|
[self broadcastPresence:presence];
|
|
}
|
|
}
|
|
|
|
if ([self isLastChannelSerial:message.channelSerial]) {
|
|
[self.presenceMap endSync];
|
|
self.presenceMap.syncChannelSerial = nil;
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) PresenceMap sync ended", _realtime, self, self.name];
|
|
}
|
|
}
|
|
|
|
- (void)broadcastPresence:(ARTPresenceMessage *)pm {
|
|
[self.presenceEventEmitter emit:[ARTEvent newWithPresenceAction:pm.action] with:pm];
|
|
}
|
|
|
|
- (void)onError:(ARTProtocolMessage *)msg {
|
|
[self transition:ARTRealtimeChannelFailed status:[ARTStatus state:ARTStateError info:msg.error]];
|
|
[self.presence failPendingPresence:[ARTStatus state:ARTStateError info: msg.error]];
|
|
}
|
|
|
|
- (void)attach {
|
|
[self attach:nil];
|
|
}
|
|
|
|
- (void)attach:(ARTCallback)callback {
|
|
if (callback) {
|
|
ARTCallback userCallback = callback;
|
|
callback = ^(ARTErrorInfo *__art_nullable error) {
|
|
dispatch_async(self->_userQueue, ^{
|
|
userCallback(error);
|
|
});
|
|
};
|
|
}
|
|
dispatch_sync(_queue, ^{
|
|
[self _attach:callback];
|
|
});
|
|
}
|
|
|
|
- (void)_attach:(ARTCallback)callback {
|
|
switch (self.state_nosync) {
|
|
case ARTRealtimeChannelAttaching:
|
|
[self.realtime.logger verbose:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) already attaching", _realtime, self, self.name];
|
|
if (callback) [_attachedEventEmitter once:callback];
|
|
return;
|
|
case ARTRealtimeChannelAttached:
|
|
[self.realtime.logger verbose:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) already attached", _realtime, self, self.name];
|
|
if (callback) callback(nil);
|
|
return;
|
|
default:
|
|
break;
|
|
}
|
|
[self internalAttach:callback withReason:nil];
|
|
}
|
|
|
|
- (void)reattachWithReason:(ARTErrorInfo *)reason callback:(ARTCallback)callback {
|
|
if ([self canBeReattached]) {
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) %@ and will reattach", _realtime, self, self.name, ARTRealtimeChannelStateToStr(self.state_nosync)];
|
|
[self internalAttach:callback withReason:reason];
|
|
} else {
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) %@ should not reattach", _realtime, self, self.name, ARTRealtimeChannelStateToStr(self.state_nosync)];
|
|
}
|
|
}
|
|
|
|
- (void)internalAttach:(ARTCallback)callback withReason:(ARTErrorInfo *)reason {
|
|
[self internalAttach:callback reason:reason storeErrorInfo:false channelSerial:nil];
|
|
}
|
|
|
|
- (void)internalAttach:(ARTCallback)callback channelSerial:(NSString *)channelSerial reason:(ARTErrorInfo *)reason {
|
|
[self internalAttach:callback reason:reason storeErrorInfo:false channelSerial:channelSerial];
|
|
}
|
|
|
|
- (void)internalAttach:(ARTCallback)callback reason:(ARTErrorInfo *)reason storeErrorInfo:(BOOL)storeErrorInfo channelSerial:(NSString *)channelSerial {
|
|
switch (self.state_nosync) {
|
|
case ARTRealtimeChannelDetaching: {
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) attach after the completion of Detaching", _realtime, self, self.name];
|
|
[_detachedEventEmitter once:^(ARTErrorInfo *error) {
|
|
[self _attach:callback];
|
|
}];
|
|
return;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
_errorReason = nil;
|
|
|
|
if (![self.realtime isActive]) {
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) can't attach when not in an active state", _realtime, self, self.name];
|
|
if (callback) callback([ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed message:@"Can't attach when not in an active state"]);
|
|
return;
|
|
}
|
|
|
|
if (callback) [_attachedEventEmitter once:callback];
|
|
// Set state: Attaching
|
|
ARTStatus *status = reason ? [ARTStatus state:ARTStateError info:reason] : [ARTStatus state:ARTStateOk];
|
|
status.storeErrorInfo = storeErrorInfo;
|
|
[self transition:ARTRealtimeChannelAttaching status:status];
|
|
|
|
[self attachAfterChecks:callback channelSerial:channelSerial];
|
|
}
|
|
|
|
- (void)attachAfterChecks:(ARTCallback)callback channelSerial:(NSString *)channelSerial {
|
|
ARTProtocolMessage *attachMessage = [[ARTProtocolMessage alloc] init];
|
|
attachMessage.action = ARTProtocolMessageAttach;
|
|
attachMessage.channel = self.name;
|
|
attachMessage.channelSerial = channelSerial;
|
|
attachMessage.params = self.options_nosync.params;
|
|
attachMessage.flags = self.options_nosync.modes;
|
|
|
|
if (self.attachResume) {
|
|
attachMessage.flags = attachMessage.flags | ARTProtocolMessageFlagAttachResume;
|
|
}
|
|
|
|
[self.realtime send:attachMessage sentCallback:^(ARTErrorInfo *error) {
|
|
if (error) {
|
|
return;
|
|
}
|
|
// Set attach timer after the connection is active
|
|
[[self unlessStateChangesBefore:[ARTDefault realtimeRequestTimeout] do:^{
|
|
// Timeout
|
|
ARTErrorInfo *errorInfo = [ARTErrorInfo createWithCode:ARTStateAttachTimedOut message:@"attach timed out"];
|
|
ARTStatus *status = [ARTStatus state:ARTStateAttachTimedOut info:errorInfo];
|
|
[self setSuspended:status];
|
|
}] startTimer];
|
|
} ackCallback:nil];
|
|
|
|
if (![self.realtime shouldQueueEvents]) {
|
|
ARTEventListener *reconnectedListener = [self.realtime.connectedEventEmitter once:^(NSNull *n) {
|
|
// Disconnected and connected while attaching, re-attach.
|
|
[self attachAfterChecks:callback channelSerial:channelSerial];
|
|
}];
|
|
[_attachedEventEmitter once:^(ARTErrorInfo *err) {
|
|
[self.realtime.connectedEventEmitter off:reconnectedListener];
|
|
}];
|
|
}
|
|
}
|
|
|
|
- (void)detach:(ARTCallback)callback {
|
|
if (callback) {
|
|
ARTCallback userCallback = callback;
|
|
callback = ^(ARTErrorInfo *__art_nullable error) {
|
|
dispatch_async(self->_userQueue, ^{
|
|
userCallback(error);
|
|
});
|
|
};
|
|
}
|
|
dispatch_sync(_queue, ^{
|
|
[self _detach:callback];
|
|
});
|
|
}
|
|
|
|
- (void)_detach:(ARTCallback)callback {
|
|
switch (self.state_nosync) {
|
|
case ARTRealtimeChannelInitialized:
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) can't detach when not attached", _realtime, self, self.name];
|
|
if (callback) callback(nil);
|
|
return;
|
|
case ARTRealtimeChannelAttaching: {
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) waiting for the completion of the attaching operation", _realtime, self, self.name];
|
|
[_attachedEventEmitter once:^(ARTErrorInfo *errorInfo) {
|
|
if (callback && errorInfo) {
|
|
callback(errorInfo);
|
|
return;
|
|
}
|
|
[self _detach:callback];
|
|
}];
|
|
return;
|
|
}
|
|
case ARTRealtimeChannelDetaching:
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) already detaching", _realtime, self, self.name];
|
|
if (callback) [_detachedEventEmitter once:callback];
|
|
return;
|
|
case ARTRealtimeChannelDetached:
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) already detached", _realtime, self, self.name];
|
|
if (callback) callback(nil);
|
|
return;
|
|
case ARTRealtimeChannelSuspended:
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) transitions immediately to the detached", _realtime, self, self.name];
|
|
[self transition:ARTRealtimeChannelDetached status:[ARTStatus state:ARTStateOk]];
|
|
if (callback) callback(nil);
|
|
return;
|
|
case ARTRealtimeChannelFailed:
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) can't detach when in a failed state", _realtime, self, self.name];
|
|
if (callback) callback([ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed message:@"can't detach when in a failed state"]);
|
|
return;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
if (![self.realtime isActive]) {
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) can't detach when not in an active state", _realtime, self, self.name];
|
|
if (callback) callback([ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed message:@"Can't detach when not in an active state"]);
|
|
return;
|
|
}
|
|
|
|
if (callback) [_detachedEventEmitter once:callback];
|
|
// Set state: Detaching
|
|
[self transition:ARTRealtimeChannelDetaching status:[ARTStatus state:ARTStateOk]];
|
|
|
|
[self detachAfterChecks:callback];
|
|
}
|
|
|
|
- (void)detachAfterChecks:(ARTCallback)callback {
|
|
ARTProtocolMessage *detachMessage = [[ARTProtocolMessage alloc] init];
|
|
detachMessage.action = ARTProtocolMessageDetach;
|
|
detachMessage.channel = self.name;
|
|
|
|
[self.realtime send:detachMessage sentCallback:nil ackCallback:nil];
|
|
|
|
[[self unlessStateChangesBefore:[ARTDefault realtimeRequestTimeout] do:^{
|
|
if (!self.realtime) {
|
|
return;
|
|
}
|
|
// Timeout
|
|
ARTErrorInfo *errorInfo = [ARTErrorInfo createWithCode:ARTStateDetachTimedOut message:@"detach timed out"];
|
|
ARTStatus *status = [ARTStatus state:ARTStateDetachTimedOut info:errorInfo];
|
|
[self transition:ARTRealtimeChannelAttached status:status];
|
|
[self->_detachedEventEmitter emit:nil with:errorInfo];
|
|
}] startTimer];
|
|
|
|
if (![self.realtime shouldQueueEvents]) {
|
|
ARTEventListener *reconnectedListener = [self.realtime.connectedEventEmitter once:^(NSNull *n) {
|
|
// Disconnected and connected while detaching, re-detach.
|
|
[self detachAfterChecks:callback];
|
|
}];
|
|
[_detachedEventEmitter once:^(ARTErrorInfo *err) {
|
|
[self.realtime.connectedEventEmitter off:reconnectedListener];
|
|
}];
|
|
}
|
|
|
|
if (self.presenceMap.syncInProgress) {
|
|
[self.presenceMap failsSync:[ARTErrorInfo createWithCode:ARTErrorChannelOperationFailed message:@"channel is being DETACHED"]];
|
|
}
|
|
}
|
|
|
|
- (void)detach {
|
|
[self detach:nil];
|
|
}
|
|
|
|
- (NSString *)getClientId {
|
|
return self.realtime.auth.clientId;
|
|
}
|
|
|
|
- (NSString *)clientId_nosync {
|
|
return self.realtime.auth.clientId_nosync;
|
|
}
|
|
|
|
- (void)history:(ARTPaginatedMessagesCallback)callback {
|
|
[self history:[[ARTRealtimeHistoryQuery alloc] init] callback:callback error:nil];
|
|
}
|
|
|
|
- (BOOL)history:(ARTRealtimeHistoryQuery *)query callback:(ARTPaginatedMessagesCallback)callback error:(NSError **)errorPtr {
|
|
query.realtimeChannel = self;
|
|
return [_restChannel history:query callback:callback error:errorPtr];
|
|
}
|
|
|
|
- (void)startDecodeFailureRecoveryWithChannelSerial:(NSString *)channelSerial error:(ARTErrorInfo *)error {
|
|
if (_decodeFailureRecoveryInProgress) {
|
|
return;
|
|
}
|
|
|
|
[self.logger warn:@"R:%p C:%p (%@) starting delta decode failure recovery process", _realtime, self, self.name];
|
|
_decodeFailureRecoveryInProgress = true;
|
|
[self internalAttach:^(ARTErrorInfo *e) {
|
|
self->_decodeFailureRecoveryInProgress = false;
|
|
} channelSerial:channelSerial reason:error];
|
|
}
|
|
|
|
#pragma mark - ARTPresenceMapDelegate
|
|
|
|
- (NSString *)connectionId {
|
|
return _realtime.connection.id_nosync;
|
|
}
|
|
|
|
- (void)map:(ARTPresenceMap *)map didRemovedMemberNoLongerPresent:(ARTPresenceMessage *)presence {
|
|
presence.action = ARTPresenceLeave;
|
|
presence.id = nil;
|
|
presence.timestamp = [NSDate date];
|
|
[self broadcastPresence:presence];
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) member \"%@\" no longer present", _realtime, self, self.name, presence.memberKey];
|
|
}
|
|
|
|
- (void)map:(ARTPresenceMap *)map shouldReenterLocalMember:(ARTPresenceMessage *)presence {
|
|
[self.presence enterClient:presence.clientId data:presence.data callback:^(ARTErrorInfo *error) {
|
|
NSString *message = [NSString stringWithFormat:@"Re-entering member \"%@\" as failed with code %ld (%@)", presence.clientId, (long)error.code, error.message];
|
|
ARTErrorInfo *reenterError = [ARTErrorInfo createWithCode:ARTErrorUnableToAutomaticallyReEnterPresenceChannel message:message];
|
|
ARTChannelStateChange *stateChange = [[ARTChannelStateChange alloc] initWithCurrent:self.state_nosync previous:self.state_nosync event:ARTChannelEventUpdate reason:reenterError resumed:true];
|
|
[self emit:stateChange.event with:stateChange];
|
|
}];
|
|
[self.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) re-entering local member \"%@\"", _realtime, self, self.name, presence.memberKey];
|
|
}
|
|
|
|
- (BOOL)exceedMaxSize:(NSArray<ARTBaseMessage *> *)messages {
|
|
NSInteger size = 0;
|
|
for (ARTMessage *message in messages) {
|
|
size += [message messageSize];
|
|
}
|
|
NSInteger maxSize = [ARTDefault maxMessageSize];
|
|
if (self.realtime.connection.maxMessageSize) {
|
|
maxSize = self.realtime.connection.maxMessageSize;
|
|
}
|
|
return size > maxSize;
|
|
}
|
|
|
|
- (ARTRealtimeChannelOptions *)getOptions {
|
|
return (ARTRealtimeChannelOptions *)[self options];
|
|
}
|
|
|
|
- (ARTRealtimeChannelOptions *)getOptions_nosync {
|
|
return (ARTRealtimeChannelOptions *)[self options_nosync];
|
|
}
|
|
|
|
- (void)setOptions:(ARTRealtimeChannelOptions *_Nullable)options callback:(nullable ARTCallback)callback {
|
|
if (callback) {
|
|
ARTCallback userCallback = callback;
|
|
callback = ^(ARTErrorInfo *_Nullable error) {
|
|
dispatch_async(self->_userQueue, ^{
|
|
userCallback(error);
|
|
});
|
|
};
|
|
}
|
|
dispatch_sync(_queue, ^{
|
|
[self setOptions_nosync:options callback:callback];
|
|
});
|
|
}
|
|
|
|
- (void)setOptions_nosync:(ARTRealtimeChannelOptions *_Nullable)options callback:(nullable ARTCallback)callback {
|
|
[self setOptions_nosync:options];
|
|
|
|
if (!options.modes && !options.params) {
|
|
if (callback)
|
|
callback(nil);
|
|
return;
|
|
}
|
|
|
|
switch (self.state_nosync) {
|
|
case ARTRealtimeChannelAttached:
|
|
case ARTRealtimeChannelAttaching:
|
|
[self.realtime.logger debug:__FILE__ line:__LINE__ message:@"RT:%p C:%p (%@) set options in %@ state", _realtime, self, self.name, ARTRealtimeChannelStateToStr(self.state_nosync)];
|
|
[self internalAttach:callback withReason:nil];
|
|
break;
|
|
default:
|
|
if (callback)
|
|
callback(nil);
|
|
break;
|
|
}
|
|
}
|
|
|
|
@end
|
|
|
|
#pragma mark - ARTEvent
|
|
|
|
@implementation ARTEvent (ChannelEvent)
|
|
|
|
- (instancetype)initWithChannelEvent:(ARTChannelEvent)value {
|
|
return [self initWithString:[NSString stringWithFormat:@"ARTChannelEvent%@",ARTChannelEventToStr(value)]];
|
|
}
|
|
|
|
+ (instancetype)newWithChannelEvent:(ARTChannelEvent)value {
|
|
return [[self alloc] initWithChannelEvent:value];
|
|
}
|
|
|
|
@end
|