From 75fd01cb41bcbef5aa3235a4d2c3d9766b656048 Mon Sep 17 00:00:00 2001 From: Mike Lewis Date: Sun, 16 Sep 2012 13:58:16 -0700 Subject: [PATCH] Scheduling improvements By default now, SRWebSocket will schedule the network on a shared background thread It can also be configured to call the delegate methods on a non-main queue --- SocketRocket.xcodeproj/project.pbxproj | 6 +- SocketRocket/SRWebSocket.h | 13 ++ SocketRocket/SRWebSocket.m | 189 +++++++++++++++++++------ 3 files changed, 160 insertions(+), 48 deletions(-) diff --git a/SocketRocket.xcodeproj/project.pbxproj b/SocketRocket.xcodeproj/project.pbxproj index 1502b4596..6f69eaa2d 100644 --- a/SocketRocket.xcodeproj/project.pbxproj +++ b/SocketRocket.xcodeproj/project.pbxproj @@ -7,6 +7,7 @@ objects = { /* Begin PBXBuildFile section */ + 27FF2C2916066747006EF077 /* Default-568h@2x.png in Resources */ = {isa = PBXBuildFile; fileRef = 27FF2C2816066746006EF077 /* Default-568h@2x.png */; }; F6016C7C146124B20037BB3D /* base64.c in Sources */ = {isa = PBXBuildFile; fileRef = F6016C7B146124B20037BB3D /* base64.c */; }; F6016C7F146124ED0037BB3D /* base64.h in Headers */ = {isa = PBXBuildFile; fileRef = F6016C7E146124ED0037BB3D /* base64.h */; }; F6016C8814620EC70037BB3D /* Security.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = F6A12CD3145122FC00C1D980 /* Security.framework */; }; @@ -59,6 +60,7 @@ /* End PBXContainerItemProxy section */ /* Begin PBXFileReference section */ + 27FF2C2816066746006EF077 /* Default-568h@2x.png */ = {isa = PBXFileReference; lastKnownFileType = image.png; name = "Default-568h@2x.png"; path = "../Default-568h@2x.png"; sourceTree = ""; }; F6016C7B146124B20037BB3D /* base64.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = base64.c; sourceTree = ""; }; F6016C7E146124ED0037BB3D /* base64.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = base64.h; sourceTree = ""; }; F60CC29F14D4EA0500A005E4 /* SRTWebSocketOperation.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = SRTWebSocketOperation.h; sourceTree = ""; }; @@ -175,6 +177,7 @@ F62417EB14D52F3C003CE997 /* Supporting Files */ = { isa = PBXGroup; children = ( + 27FF2C2816066746006EF077 /* Default-568h@2x.png */, F62417EC14D52F3C003CE997 /* TestChat-Info.plist */, F62417ED14D52F3C003CE997 /* InfoPlist.strings */, F62417F014D52F3C003CE997 /* main.m */, @@ -397,7 +400,7 @@ F6B208241450F597009315AF /* Project object */ = { isa = PBXProject; attributes = { - LastUpgradeCheck = 0440; + LastUpgradeCheck = 0450; }; buildConfigurationList = F6B208271450F597009315AF /* Build configuration list for PBXProject "SocketRocket" */; compatibilityVersion = "Xcode 3.2"; @@ -426,6 +429,7 @@ files = ( F62417EF14D52F3C003CE997 /* InfoPlist.strings in Resources */, F62417F814D52F3C003CE997 /* MainStoryboard.storyboard in Resources */, + 27FF2C2916066747006EF077 /* Default-568h@2x.png in Resources */, ); runOnlyForDeploymentPostprocessing = 0; }; diff --git a/SocketRocket/SRWebSocket.h b/SocketRocket/SRWebSocket.h index a7e18516b..d533a731a 100644 --- a/SocketRocket/SRWebSocket.h +++ b/SocketRocket/SRWebSocket.h @@ -50,6 +50,13 @@ extern NSString *const SRWebSocketErrorDomain; - (id)initWithURL:(NSURL *)url protocols:(NSArray *)protocols; - (id)initWithURL:(NSURL *)url; +// Delegage queue will be +[NSOperationQueue mainQueue] by default +- (void)setDelegateQueue:(NSOperationQueue*) queue; + +// By default, it will schedule itself on +[NSRunLoop SR_networkRunLoop] using defaultModes +- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode; +- (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode; + // SRWebSockets are intended one-time-use only. Open should be called once and only once - (void)open; @@ -88,3 +95,9 @@ extern NSString *const SRWebSocketErrorDomain; @property (nonatomic, retain) NSArray *SR_SSLPinnedCertificates; @end + +@interface NSRunLoop (SRWebSocket) + ++ (NSRunLoop *)SR_networkRunLoop; + +@end diff --git a/SocketRocket/SRWebSocket.m b/SocketRocket/SRWebSocket.m index ef7c65c5f..0e24ed7a2 100644 --- a/SocketRocket/SRWebSocket.m +++ b/SocketRocket/SRWebSocket.m @@ -98,6 +98,13 @@ - (NSString *)SR_origin; @end +@interface _SRRunLoopThread : NSThread + +@property (nonatomic, readonly) NSRunLoop *runLoop; + +@end + + static NSString *newSHA1String(const char *bytes, size_t length) { uint8_t md[CC_SHA1_DIGEST_LENGTH]; @@ -188,16 +195,20 @@ - (void)_sendFrameWithOpcode:(SROpCode)opcode data:(id)data; - (BOOL)_checkHandshake:(CFHTTPMessageRef)httpMessage; - (void)_SR_commonInit; -- (void)_connectToHost:(NSString *)host port:(NSInteger)port; +- (void)_initializeStreams; +- (void)_connect; @property (nonatomic) SRReadyState readyState; +@property (nonatomic) NSOperationQueue *delegateQueue; + + @end @implementation SRWebSocket { NSInteger _webSocketVersion; - dispatch_queue_t _callbackQueue; + NSOperationQueue *_delegateQueue; dispatch_queue_t _workQueue; NSMutableArray *_consumers; @@ -241,6 +252,8 @@ @implementation SRWebSocket { BOOL _isPumping; + BOOL _didSchedule; + // We use this to retain ourselves. __strong SRWebSocket *_selfRetain; @@ -265,16 +278,9 @@ - (id)initWithURLRequest:(NSURLRequest *)request protocols:(NSArray *)protocols; if (self) { assert(request.URL); _url = request.URL; - NSString *scheme = [_url scheme]; - - _requestedProtocols = [protocols copy]; - - assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]); _urlRequest = request; - if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) { - _secure = YES; - } + _requestedProtocols = [protocols copy]; [self _SR_commonInit]; } @@ -300,6 +306,15 @@ - (id)initWithURL:(NSURL *)url protocols:(NSArray *)protocols; - (void)_SR_commonInit; { + + NSString *scheme = [_url scheme]; + assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]); + + if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) { + _secure = YES; + } + + _readyState = SR_CONNECTING; _consumerStopped = YES; @@ -308,8 +323,7 @@ - (void)_SR_commonInit; _workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); - _callbackQueue = dispatch_get_main_queue(); - dispatch_retain(_callbackQueue); + _delegateQueue = [NSOperationQueue mainQueue]; _readBuffer = [[NSMutableData alloc] init]; _outputBuffer = [[NSMutableData alloc] init]; @@ -318,6 +332,8 @@ - (void)_SR_commonInit; _consumers = [[NSMutableArray alloc] init]; + [self _initializeStreams]; + // default handlers } @@ -329,7 +345,6 @@ - (void)dealloc [_inputStream close]; [_outputStream close]; - dispatch_release(_callbackQueue); dispatch_release(_workQueue); if (_receivedHTTPHeaders) { @@ -353,20 +368,11 @@ - (void)setReadyState:(SRReadyState)aReadyState; - (void)open; { assert(_url); - NSAssert(_readyState == SR_CONNECTING && _inputStream == nil && _outputStream == nil, @"Cannot call -(void)open on SRWebSocket more than once"); + NSAssert(_readyState == SR_CONNECTING, @"Cannot call -(void)open on SRWebSocket more than once"); _selfRetain = self; - NSInteger port = _url.port.integerValue; - if (port == 0) { - if (!_secure) { - port = 80; - } else { - port = 443; - } - } - - [self _connectToHost:_url.host port:port]; + [self _connect]; } @@ -418,11 +424,11 @@ - (void)_HTTPHeadersDidFinish; [self _readFrameNew]; } - dispatch_async(_callbackQueue, ^{ + [_delegateQueue addOperationWithBlock:^{ if ([self.delegate respondsToSelector:@selector(webSocketDidOpen:)]) { [self.delegate webSocketDidOpen:self]; - } - }); + }; + }]; } @@ -480,8 +486,18 @@ - (void)didConnect [self _readHTTPHeader]; } -- (void)_connectToHost:(NSString *)host port:(NSInteger)port; -{ +- (void)_initializeStreams; +{ + NSInteger port = _url.port.integerValue; + if (port == 0) { + if (!_secure) { + port = 80; + } else { + port = 443; + } + } + NSString *host = _url.host; + CFReadStreamRef readStream = NULL; CFWriteStreamRef writeStream = NULL; @@ -493,7 +509,7 @@ - (void)_connectToHost:(NSString *)host port:(NSInteger)port; if (_secure) { NSMutableDictionary *SSLOptions = [[NSMutableDictionary alloc] init]; - + [_outputStream setProperty:(__bridge id)kCFStreamSocketSecurityLevelNegotiatedSSL forKey:(__bridge id)kCFStreamPropertySocketSecurityLevel]; // If we're using pinned certs, don't validate the certificate chain @@ -501,10 +517,10 @@ - (void)_connectToHost:(NSString *)host port:(NSInteger)port; [SSLOptions setValue:[NSNumber numberWithBool:NO] forKey:(__bridge id)kCFStreamSSLValidatesCertificateChain]; } - #if DEBUG +#if DEBUG [SSLOptions setValue:[NSNumber numberWithBool:NO] forKey:(__bridge id)kCFStreamSSLValidatesCertificateChain]; NSLog(@"SocketRocket: In debug mode. Allowing connection to any root cert"); - #endif +#endif [_outputStream setProperty:SSLOptions forKey:(__bridge id)kCFStreamPropertySSLSettings]; @@ -512,16 +528,34 @@ - (void)_connectToHost:(NSString *)host port:(NSInteger)port; _inputStream.delegate = self; _outputStream.delegate = self; +} + +- (void)_connect; +{ - // TODO schedule in a better run loop - [_outputStream scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode]; - [_inputStream scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode]; + if (!_didSchedule) { + [self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode]; + } [_outputStream open]; [_inputStream open]; } +- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode; +{ + _didSchedule = YES; + + [_outputStream scheduleInRunLoop:aRunLoop forMode:mode]; + [_inputStream scheduleInRunLoop:aRunLoop forMode:mode]; +} + +- (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode; +{ + [_outputStream removeFromRunLoop:aRunLoop forMode:mode]; + [_inputStream removeFromRunLoop:aRunLoop forMode:mode]; +} + - (void)close; { [self closeWithCode:-1 reason:nil]; @@ -574,12 +608,12 @@ - (void)closeWithCode:(NSInteger)code reason:(NSString *)reason; - (void)_closeWithProtocolError:(NSString *)message; { // Need to shunt this on the _callbackQueue first to see if they received any messages - dispatch_async(_callbackQueue, ^{ + [_delegateQueue addOperationWithBlock:^{ [self closeWithCode:SRStatusCodeProtocolError reason:message]; dispatch_async(_workQueue, ^{ [self _disconnect]; }); - }); + }]; } - (void)_failWithError:(NSError *)error; @@ -587,11 +621,11 @@ - (void)_failWithError:(NSError *)error; dispatch_async(_workQueue, ^{ if (self.readyState != SR_CLOSED) { _failed = YES; - dispatch_async(_callbackQueue, ^{ + [_delegateQueue addOperationWithBlock:^{ if ([self.delegate respondsToSelector:@selector(webSocket:didFailWithError:)]) { [self.delegate webSocket:self didFailWithError:error]; } - }); + }]; self.readyState = SR_CLOSED; _selfRetain = nil; @@ -634,11 +668,11 @@ - (void)send:(id)data; - (void)handlePing:(NSData *)pingData; { // Need to pingpong this off _callbackQueue first to make sure messages happen in order - dispatch_async(_callbackQueue, ^{ + [_delegateQueue addOperationWithBlock:^{ dispatch_async(_workQueue, ^{ [self _sendFrameWithOpcode:SROpCodePong data:pingData]; }); - }); + }]; } - (void)handlePong; @@ -649,9 +683,9 @@ - (void)handlePong; - (void)_handleMessage:(id)message { SRFastLog(@"Received message"); - dispatch_async(_callbackQueue, ^{ + [_delegateQueue addOperationWithBlock:^{ [self.delegate webSocket:self didReceiveMessage:message]; - }); + }]; } @@ -994,11 +1028,11 @@ - (void)_pumpWriting; [_inputStream close]; if (!_failed) { - dispatch_async(_callbackQueue, ^{ + [_delegateQueue addOperationWithBlock:^{ if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) { [self.delegate webSocket:self didCloseWithCode:_closeCode reason:_closeReason wasClean:YES]; } - }); + }]; } _selfRetain = nil; @@ -1339,11 +1373,11 @@ - (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode; if (!_sentClose && !_failed) { _sentClose = YES; // If we get closed in this state it's probably not clean because we should be sending this when we send messages - dispatch_async(_callbackQueue, ^{ + [_delegateQueue addOperationWithBlock:^{ if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) { [self.delegate webSocket:self didCloseWithCode:0 reason:@"Stream end encountered" wasClean:NO]; } - }); + }]; } } @@ -1550,4 +1584,65 @@ static inline int32_t validate_dispatch_data_partial_string(NSData *data) { #endif +static _SRRunLoopThread *networkThread = nil; +static NSRunLoop *networkRunLoop = nil; + +@implementation NSRunLoop (SRWebSocket) + ++ (NSRunLoop *)SR_networkRunLoop { + static dispatch_once_t onceToken; + dispatch_once(&onceToken, ^{ + networkThread = [[_SRRunLoopThread alloc] init]; + networkThread.name = @"com.squareup.SocketRocket.NetworkThread"; + [networkThread start]; + networkRunLoop = networkThread.runLoop; + }); + + return networkRunLoop; +} + +@end + + +@implementation _SRRunLoopThread { + dispatch_group_t _waitGroup; +} + +@synthesize runLoop = _runLoop; + +- (void)dealloc +{ + dispatch_release(_waitGroup); +} + +- (id)init +{ + self = [super init]; + if (self) { + _waitGroup = dispatch_group_create(); + dispatch_group_enter(_waitGroup); + } + return self; +} + +- (void)main; +{ + _runLoop = [NSRunLoop currentRunLoop]; + dispatch_group_leave(_waitGroup); + + NSTimer *timer = [[NSTimer alloc] initWithFireDate:[NSDate distantFuture] interval:0.0 target:nil selector:nil userInfo:nil repeats:NO]; + [_runLoop addTimer:timer forMode:NSDefaultRunLoopMode]; + + while ([_runLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]]) { + + } + assert(NO); +} + +- (NSRunLoop *)runLoop; +{ + dispatch_group_wait(_waitGroup, DISPATCH_TIME_FOREVER); + return _runLoop; +} +@end \ No newline at end of file