From 765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 2 Apr 2014 02:44:29 +0530 Subject: multiplexing support --- common/PubnubBase.py | 12 +- common/PubnubCore.py | 4 +- common/PubnubCoreAsync.py | 171 +++++++++++------ python-tornado/Pubnub.py | 329 +++++++++++++++++++++++++-------- python-tornado/unassembled/Platform.py | 28 ++- python-twisted/Pubnub.py | 329 +++++++++++++++++++++++++-------- python-twisted/unassembled/Platform.py | 28 ++- python/Makefile | 2 +- python/Pubnub.py | 326 +++++++++++++++++++++++++------- python/unassembled/Platform.py | 82 +++++++- 10 files changed, 1002 insertions(+), 309 deletions(-) diff --git a/common/PubnubBase.py b/common/PubnubBase.py index 4c5b422..d287be3 100644 --- a/common/PubnubBase.py +++ b/common/PubnubBase.py @@ -90,6 +90,14 @@ class PubnubBase(object): return message + def _return_wrapped_callback(self, callback=None): + def _new_format_callback(response): + if 'payload' in response: + if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) + else: + if (callback != None):callback(response) + if (callback != None): return _new_format_callback + def publish( self, args ) : """ @@ -139,7 +147,7 @@ class PubnubBase(object): channel, '0', message - ]}, callback) + ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) def presence( self, args ) : """ @@ -179,7 +187,7 @@ class PubnubBase(object): callback = args['callback'] subscribe_key = args.get('subscribe_key') or self.subscribe_key - return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) + return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)}) def here_now( self, args ) : diff --git a/common/PubnubCore.py b/common/PubnubCore.py index dcfd319..3ed3a68 100644 --- a/common/PubnubCore.py +++ b/common/PubnubCore.py @@ -1,4 +1,4 @@ -class PubnubCore(PubnubBase): +class PubnubCore(PubnubCoreAsync): def __init__( self, publish_key, @@ -44,7 +44,7 @@ class PubnubCore(PubnubBase): - def subscribe( self, args ) : + def subscribe_sync( self, args ) : """ #** #* Subscribe diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index a7fbb7d..0038243 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -10,8 +10,6 @@ class PubnubCoreAsync(PubnubBase): def start(self): pass def stop(self): pass - def timeout( self, delay, callback ): - pass def __init__( self, @@ -54,8 +52,23 @@ class PubnubCoreAsync(PubnubBase): self.timetoken = 0 self.version = '3.3.4' self.accept_encoding = 'gzip' - - def subscribe( self, args ) : + self.SUB_RECEIVER = None + self._connect = None + + def get_channel_list(self, channels): + channel = '' + first = True + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel + + def subscribe( self, args=None, sync=False ) : """ #** #* Subscribe @@ -87,94 +100,140 @@ class PubnubCoreAsync(PubnubBase): }) """ - ## Fail if missing channel - if not 'channel' in args : - return 'Missing Channel.' - ## Fail if missing callback - if not 'callback' in args : - return 'Missing Callback.' + if sync is True and self.susbcribe_sync is not None: + self.susbcribe_sync(args) + return + + def _invoke(func,msg=None): + if func is not None: + if msg is not None: + func(msg) + else: + func() + + def _invoke_connect(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect']) + + def _invoke_error(err=None): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj.error,err) + - ## Capture User Input - channel = str(args['channel']) - callback = args['callback'] - connectcb = args['connect'] + if callback is None: + _invoke(error, "Callback Missing") + return + + if channel is None: + _invoke(error, "Channel Missing") + return + + def _get_channel(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['subscribed'] is True: + return chobj - if 'errorback' in args: - errorback = args['errorback'] - else: - errorback = lambda x: x ## New Channel? - if not (channel in self.subscriptions) : + if not channel in self.subscriptions: self.subscriptions[channel] = { - 'first' : False, - 'connected' : False, + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect } - ## Ensure Single Connection + ## return if already connected to channel if self.subscriptions[channel]['connected'] : - return "Already Connected" + _invoke(error, "Already Connected") + return + - self.subscriptions[channel]['connected'] = 1 ## SUBSCRIPTION RECURSION - def _subscribe(): - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + def _connect(): + self._reset_offline() + def sub_callback(response): - if not self.subscriptions[channel]['first'] : - self.subscriptions[channel]['first'] = True - connectcb() + print response + ## ERROR ? + if not response or error in response: + _invoke_error() - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + _invoke_connect() + self.timetoken = response[1] - ## PROBLEM? - if not response: - def time_callback(_time): - if not _time: - self.timeout( 1, _subscribe ) - return errorback("Lost Network Connection") - else: - self.timeout( 1, _subscribe) + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) - ## ENSURE CONNECTED (Call Time Function) - return self.time({ 'callback' : time_callback }) - self.timetoken = response[1] - _subscribe() + _connect() + - pc = PubnubCrypto() - out = [] - for message in response[0]: - callback(self.decrypt(message)) + channel_list = self.get_channel_list(self.subscriptions) + print channel_list ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: - self._request( { "urlcomponents" : [ + self.SUB_RECEIVER = self._request( { "urlcomponents" : [ 'subscribe', self.subscribe_key, - channel, + channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback ) - except : - self.timeout( 1, _subscribe) + ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + except Exception as e: + self.timeout( 1, _connect) return + self._connect = _connect + + ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) - _subscribe() + _connect() + + def _reset_offline(self): + if self.SUB_RECEIVER is not None: + self.SUB_RECEIVER() + self.SUB_RECEIVER = None + + def CONNECT(self): + self._reset_offline() + self._connect() + + def unsubscribe( self, args ): + #print(args['channel']) channel = str(args['channel']) if not (channel in self.subscriptions): return False ## DISCONNECT self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False self.subscriptions[channel]['timetoken'] = 0 self.subscriptions[channel]['first'] = False + self.CONNECT() diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py index 89c0d97..ee66619 100644 --- a/python-tornado/Pubnub.py +++ b/python-tornado/Pubnub.py @@ -16,7 +16,7 @@ from base64 import encodestring, decodestring import hashlib import hmac -class PubnubCrypto() : +class PubnubCrypto2() : """ #** #* PubnubCrypto @@ -93,13 +93,89 @@ class PubnubCrypto() : return self.depad((cipher.decrypt(decodestring(msg)))) +class PubnubCrypto3() : + """ + #** + #* PubnubCrypto + #* + #** + + ## Initiate Class + pc = PubnubCrypto + + """ + + def pad( self, msg, block_size=16 ): + """ + #** + #* pad + #* + #* pad the text to be encrypted + #* appends a padding character to the end of the String + #* until the string has block_size length + #* @return msg with padding. + #** + """ + padding = block_size - (len(msg) % block_size) + return msg + (chr(padding)*padding).encode('utf-8') + + def depad( self, msg ): + """ + #** + #* depad + #* + #* depad the decryptet message" + #* @return msg without padding. + #** + """ + return msg[0:-ord(msg[-1])] + + def getSecret( self, key ): + """ + #** + #* getSecret + #* + #* hases the key to MD5 + #* @return key in MD5 format + #** + """ + return hashlib.sha256(key.encode("utf-8")).hexdigest() + + def encrypt( self, key, msg ): + """ + #** + #* encrypt + #* + #* encrypts the message + #* @return message in encrypted format + #** + """ + secret = self.getSecret(key) + Initial16bytes='0123456789012345' + cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) + return encodestring(cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') + def decrypt( self, key, msg ): + """ + #** + #* decrypt + #* + #* decrypts the message + #* @return message in decryped format + #** + """ + secret = self.getSecret(key) + Initial16bytes='0123456789012345' + cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) + return (cipher.decrypt(decodestring(msg.encode('utf-8')))).decode('utf-8') + + try: import json except ImportError: import simplejson as json import time import hashlib -import urllib2 -import uuid +import uuid +import sys class PubnubBase(object): def __init__( @@ -137,7 +213,7 @@ class PubnubBase(object): self.secret_key = secret_key self.cipher_key = cipher_key self.ssl = ssl_on - self.pc = PubnubCrypto() + if self.ssl : self.origin = 'https://' + self.origin @@ -145,8 +221,16 @@ class PubnubBase(object): self.origin = 'http://' + self.origin self.uuid = UUID or str(uuid.uuid4()) + + if type(sys.version_info) is tuple: + self.python_version = 2 + self.pc = PubnubCrypto2() + else: + self.python_version = 3 + self.pc = PubnubCrypto3() + - if not isinstance(self.uuid, basestring): + if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") def sign(self, channel, message): @@ -177,6 +261,14 @@ class PubnubBase(object): return message + def _return_wrapped_callback(self, callback=None): + def _new_format_callback(response): + if 'payload' in response: + if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) + else: + if (callback != None):callback(response) + if (callback != None): return _new_format_callback + def publish( self, args ) : """ @@ -207,7 +299,7 @@ class PubnubBase(object): channel = str(args['channel']) ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -226,7 +318,7 @@ class PubnubBase(object): channel, '0', message - ]}, callback) + ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) def presence( self, args ) : """ @@ -266,7 +358,7 @@ class PubnubBase(object): callback = args['callback'] subscribe_key = args.get('subscribe_key') or self.subscribe_key - return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) + return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)}) def here_now( self, args ) : @@ -291,7 +383,7 @@ class PubnubBase(object): channel = str(args['channel']) ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -329,7 +421,7 @@ class PubnubBase(object): """ ## Capture User Input - limit = args.has_key('limit') and int(args['limit']) or 10 + limit = 'limit' in args and int(args['limit']) or 10 channel = str(args['channel']) ## Fail if bad input. @@ -338,7 +430,7 @@ class PubnubBase(object): return False ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -377,18 +469,18 @@ class PubnubBase(object): params = dict() count = 100 - if args.has_key('count'): + if 'count' in args: count = int(args['count']) params['count'] = str(count) - if args.has_key('reverse'): + if 'reverse' in args: params['reverse'] = str(args['reverse']).lower() - if args.has_key('start'): + if 'start' in args: params['start'] = str(args['start']) - if args.has_key('end'): + if 'end' in args: params['end'] = str(args['end']) ## Fail if bad input. @@ -397,7 +489,7 @@ class PubnubBase(object): return False ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -428,7 +520,7 @@ class PubnubBase(object): """ ## Capture Callback - if args and args.has_key('callback') : + if args and 'callback' in args : callback = args['callback'] else : callback = None @@ -454,8 +546,8 @@ class PubnubBase(object): hex(ord(ch)).replace( '0x', '%' ).upper() or ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) - if (request.has_key("urlparams")): - url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].iteritems()]) + if ("urlparams" in request): + url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()]) return url @@ -471,8 +563,6 @@ class PubnubCoreAsync(PubnubBase): def start(self): pass def stop(self): pass - def timeout( self, delay, callback ): - pass def __init__( self, @@ -515,8 +605,23 @@ class PubnubCoreAsync(PubnubBase): self.timetoken = 0 self.version = '3.3.4' self.accept_encoding = 'gzip' - - def subscribe( self, args ) : + self.SUB_RECEIVER = None + self._connect = None + + def get_channel_list(self, channels): + channel = '' + first = True + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel + + def subscribe( self, args=None, sync=False ) : """ #** #* Subscribe @@ -548,97 +653,143 @@ class PubnubCoreAsync(PubnubBase): }) """ - ## Fail if missing channel - if not 'channel' in args : - return 'Missing Channel.' - ## Fail if missing callback - if not 'callback' in args : - return 'Missing Callback.' + if sync is True and self.susbcribe_sync is not None: + self.susbcribe_sync(args) + return - ## Capture User Input - channel = str(args['channel']) - callback = args['callback'] - connectcb = args['connect'] + def _invoke(func,msg=None): + if func is not None: + if msg is not None: + func(msg) + else: + func() + + def _invoke_connect(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect']) + + def _invoke_error(err=None): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj.error,err) + + + if callback is None: + _invoke(error, "Callback Missing") + return + + if channel is None: + _invoke(error, "Channel Missing") + return + + def _get_channel(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['subscribed'] is True: + return chobj - if 'errorback' in args: - errorback = args['errorback'] - else: - errorback = lambda x: x ## New Channel? - if not (channel in self.subscriptions) : + if not channel in self.subscriptions: self.subscriptions[channel] = { - 'first' : False, - 'connected' : False, + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect } - ## Ensure Single Connection + ## return if already connected to channel if self.subscriptions[channel]['connected'] : - return "Already Connected" + _invoke(error, "Already Connected") + return + - self.subscriptions[channel]['connected'] = 1 ## SUBSCRIPTION RECURSION - def _subscribe(): - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + def _connect(): + self._reset_offline() + def sub_callback(response): - if not self.subscriptions[channel]['first'] : - self.subscriptions[channel]['first'] = True - connectcb() + print response + ## ERROR ? + if not response or error in response: + _invoke_error() - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + _invoke_connect() + self.timetoken = response[1] - ## PROBLEM? - if not response: - def time_callback(_time): - if not _time: - self.timeout( 1, _subscribe ) - return errorback("Lost Network Connection") - else: - self.timeout( 1, _subscribe) + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) - ## ENSURE CONNECTED (Call Time Function) - return self.time({ 'callback' : time_callback }) - self.timetoken = response[1] - _subscribe() + _connect() + - pc = PubnubCrypto() - out = [] - for message in response[0]: - callback(self.decrypt(message)) + channel_list = self.get_channel_list(self.subscriptions) + print channel_list ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: - self._request( { "urlcomponents" : [ + self.SUB_RECEIVER = self._request( { "urlcomponents" : [ 'subscribe', self.subscribe_key, - channel, + channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback ) - except : - self.timeout( 1, _subscribe) + ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + except Exception as e: + self.timeout( 1, _connect) return + self._connect = _connect + + ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) - _subscribe() + _connect() + + def _reset_offline(self): + if self.SUB_RECEIVER is not None: + self.SUB_RECEIVER() + self.SUB_RECEIVER = None + + def CONNECT(self): + self._reset_offline() + self._connect() + + def unsubscribe( self, args ): + #print(args['channel']) channel = str(args['channel']) if not (channel in self.subscriptions): return False ## DISCONNECT self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False self.subscriptions[channel]['timetoken'] = 0 self.subscriptions[channel]['first'] = False + self.CONNECT() import tornado.httpclient @@ -685,15 +836,24 @@ class Pubnub(PubnubCoreAsync): self.headers['Accept-Encoding'] = self.accept_encoding self.headers['V'] = self.version self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000) + self.id = None - def _request( self, request, callback ) : + def _request( self, request, callback, single=False ) : url = self.getUrl(request) - ## Send Request Expecting JSON Response - #print self.headers - request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 ) + if single is True: + id = time.time() + self.id = id def responseCallback(response): + if single is True: + if not id == self.id: + return None + + body = response._get_body() + if body is None: + return + def handle_exc(*args): return True if response.error is not None: @@ -701,9 +861,14 @@ class Pubnub(PubnubCoreAsync): response.rethrow() elif callback: callback(eval(response._get_body())) - + self.http.fetch( - request, - callback=responseCallback, + request=request, + callback=responseCallback ) + def abort(): + pass + + return abort + diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py index 62d3a26..f98befb 100644 --- a/python-tornado/unassembled/Platform.py +++ b/python-tornado/unassembled/Platform.py @@ -42,15 +42,24 @@ class Pubnub(PubnubCoreAsync): self.headers['Accept-Encoding'] = self.accept_encoding self.headers['V'] = self.version self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000) + self.id = None - def _request( self, request, callback ) : + def _request( self, request, callback, single=False ) : url = self.getUrl(request) - ## Send Request Expecting JSON Response - #print self.headers - request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 ) + if single is True: + id = time.time() + self.id = id def responseCallback(response): + if single is True: + if not id == self.id: + return None + + body = response._get_body() + if body is None: + return + def handle_exc(*args): return True if response.error is not None: @@ -58,9 +67,14 @@ class Pubnub(PubnubCoreAsync): response.rethrow() elif callback: callback(eval(response._get_body())) - + self.http.fetch( - request, - callback=responseCallback, + request=request, + callback=responseCallback ) + def abort(): + pass + + return abort + diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py index 66534b5..3bc2d35 100644 --- a/python-twisted/Pubnub.py +++ b/python-twisted/Pubnub.py @@ -16,7 +16,7 @@ from base64 import encodestring, decodestring import hashlib import hmac -class PubnubCrypto() : +class PubnubCrypto2() : """ #** #* PubnubCrypto @@ -93,13 +93,89 @@ class PubnubCrypto() : return self.depad((cipher.decrypt(decodestring(msg)))) +class PubnubCrypto3() : + """ + #** + #* PubnubCrypto + #* + #** + + ## Initiate Class + pc = PubnubCrypto + + """ + + def pad( self, msg, block_size=16 ): + """ + #** + #* pad + #* + #* pad the text to be encrypted + #* appends a padding character to the end of the String + #* until the string has block_size length + #* @return msg with padding. + #** + """ + padding = block_size - (len(msg) % block_size) + return msg + (chr(padding)*padding).encode('utf-8') + + def depad( self, msg ): + """ + #** + #* depad + #* + #* depad the decryptet message" + #* @return msg without padding. + #** + """ + return msg[0:-ord(msg[-1])] + + def getSecret( self, key ): + """ + #** + #* getSecret + #* + #* hases the key to MD5 + #* @return key in MD5 format + #** + """ + return hashlib.sha256(key.encode("utf-8")).hexdigest() + + def encrypt( self, key, msg ): + """ + #** + #* encrypt + #* + #* encrypts the message + #* @return message in encrypted format + #** + """ + secret = self.getSecret(key) + Initial16bytes='0123456789012345' + cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) + return encodestring(cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') + def decrypt( self, key, msg ): + """ + #** + #* decrypt + #* + #* decrypts the message + #* @return message in decryped format + #** + """ + secret = self.getSecret(key) + Initial16bytes='0123456789012345' + cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) + return (cipher.decrypt(decodestring(msg.encode('utf-8')))).decode('utf-8') + + try: import json except ImportError: import simplejson as json import time import hashlib -import urllib2 -import uuid +import uuid +import sys class PubnubBase(object): def __init__( @@ -137,7 +213,7 @@ class PubnubBase(object): self.secret_key = secret_key self.cipher_key = cipher_key self.ssl = ssl_on - self.pc = PubnubCrypto() + if self.ssl : self.origin = 'https://' + self.origin @@ -145,8 +221,16 @@ class PubnubBase(object): self.origin = 'http://' + self.origin self.uuid = UUID or str(uuid.uuid4()) + + if type(sys.version_info) is tuple: + self.python_version = 2 + self.pc = PubnubCrypto2() + else: + self.python_version = 3 + self.pc = PubnubCrypto3() + - if not isinstance(self.uuid, basestring): + if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") def sign(self, channel, message): @@ -177,6 +261,14 @@ class PubnubBase(object): return message + def _return_wrapped_callback(self, callback=None): + def _new_format_callback(response): + if 'payload' in response: + if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) + else: + if (callback != None):callback(response) + if (callback != None): return _new_format_callback + def publish( self, args ) : """ @@ -207,7 +299,7 @@ class PubnubBase(object): channel = str(args['channel']) ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -226,7 +318,7 @@ class PubnubBase(object): channel, '0', message - ]}, callback) + ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) def presence( self, args ) : """ @@ -266,7 +358,7 @@ class PubnubBase(object): callback = args['callback'] subscribe_key = args.get('subscribe_key') or self.subscribe_key - return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) + return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)}) def here_now( self, args ) : @@ -291,7 +383,7 @@ class PubnubBase(object): channel = str(args['channel']) ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -329,7 +421,7 @@ class PubnubBase(object): """ ## Capture User Input - limit = args.has_key('limit') and int(args['limit']) or 10 + limit = 'limit' in args and int(args['limit']) or 10 channel = str(args['channel']) ## Fail if bad input. @@ -338,7 +430,7 @@ class PubnubBase(object): return False ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -377,18 +469,18 @@ class PubnubBase(object): params = dict() count = 100 - if args.has_key('count'): + if 'count' in args: count = int(args['count']) params['count'] = str(count) - if args.has_key('reverse'): + if 'reverse' in args: params['reverse'] = str(args['reverse']).lower() - if args.has_key('start'): + if 'start' in args: params['start'] = str(args['start']) - if args.has_key('end'): + if 'end' in args: params['end'] = str(args['end']) ## Fail if bad input. @@ -397,7 +489,7 @@ class PubnubBase(object): return False ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -428,7 +520,7 @@ class PubnubBase(object): """ ## Capture Callback - if args and args.has_key('callback') : + if args and 'callback' in args : callback = args['callback'] else : callback = None @@ -454,8 +546,8 @@ class PubnubBase(object): hex(ord(ch)).replace( '0x', '%' ).upper() or ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) - if (request.has_key("urlparams")): - url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].iteritems()]) + if ("urlparams" in request): + url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()]) return url @@ -471,8 +563,6 @@ class PubnubCoreAsync(PubnubBase): def start(self): pass def stop(self): pass - def timeout( self, delay, callback ): - pass def __init__( self, @@ -515,8 +605,23 @@ class PubnubCoreAsync(PubnubBase): self.timetoken = 0 self.version = '3.3.4' self.accept_encoding = 'gzip' - - def subscribe( self, args ) : + self.SUB_RECEIVER = None + self._connect = None + + def get_channel_list(self, channels): + channel = '' + first = True + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel + + def subscribe( self, args=None, sync=False ) : """ #** #* Subscribe @@ -548,97 +653,143 @@ class PubnubCoreAsync(PubnubBase): }) """ - ## Fail if missing channel - if not 'channel' in args : - return 'Missing Channel.' - ## Fail if missing callback - if not 'callback' in args : - return 'Missing Callback.' + if sync is True and self.susbcribe_sync is not None: + self.susbcribe_sync(args) + return - ## Capture User Input - channel = str(args['channel']) - callback = args['callback'] - connectcb = args['connect'] + def _invoke(func,msg=None): + if func is not None: + if msg is not None: + func(msg) + else: + func() + + def _invoke_connect(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect']) + + def _invoke_error(err=None): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj.error,err) + + + if callback is None: + _invoke(error, "Callback Missing") + return + + if channel is None: + _invoke(error, "Channel Missing") + return + + def _get_channel(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['subscribed'] is True: + return chobj - if 'errorback' in args: - errorback = args['errorback'] - else: - errorback = lambda x: x ## New Channel? - if not (channel in self.subscriptions) : + if not channel in self.subscriptions: self.subscriptions[channel] = { - 'first' : False, - 'connected' : False, + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect } - ## Ensure Single Connection + ## return if already connected to channel if self.subscriptions[channel]['connected'] : - return "Already Connected" + _invoke(error, "Already Connected") + return + - self.subscriptions[channel]['connected'] = 1 ## SUBSCRIPTION RECURSION - def _subscribe(): - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + def _connect(): + self._reset_offline() + def sub_callback(response): - if not self.subscriptions[channel]['first'] : - self.subscriptions[channel]['first'] = True - connectcb() + print response + ## ERROR ? + if not response or error in response: + _invoke_error() - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + _invoke_connect() + self.timetoken = response[1] - ## PROBLEM? - if not response: - def time_callback(_time): - if not _time: - self.timeout( 1, _subscribe ) - return errorback("Lost Network Connection") - else: - self.timeout( 1, _subscribe) + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) - ## ENSURE CONNECTED (Call Time Function) - return self.time({ 'callback' : time_callback }) - self.timetoken = response[1] - _subscribe() + _connect() + - pc = PubnubCrypto() - out = [] - for message in response[0]: - callback(self.decrypt(message)) + channel_list = self.get_channel_list(self.subscriptions) + print channel_list ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: - self._request( { "urlcomponents" : [ + self.SUB_RECEIVER = self._request( { "urlcomponents" : [ 'subscribe', self.subscribe_key, - channel, + channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback ) - except : - self.timeout( 1, _subscribe) + ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + except Exception as e: + self.timeout( 1, _connect) return + self._connect = _connect + + ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) - _subscribe() + _connect() + + def _reset_offline(self): + if self.SUB_RECEIVER is not None: + self.SUB_RECEIVER() + self.SUB_RECEIVER = None + + def CONNECT(self): + self._reset_offline() + self._connect() + + def unsubscribe( self, args ): + #print(args['channel']) channel = str(args['channel']) if not (channel in self.subscriptions): return False ## DISCONNECT self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False self.subscriptions[channel]['timetoken'] = 0 self.subscriptions[channel]['first'] = False + self.CONNECT() from twisted.web.client import getPage @@ -650,10 +801,14 @@ from twisted.web.client import HTTPConnectionPool from twisted.web.http_headers import Headers from twisted.internet.ssl import ClientContextFactory from twisted.internet.task import LoopingCall +import twisted +from hashlib import sha256 +import time pnconn_pool = HTTPConnectionPool(reactor, persistent=True) -pnconn_pool.maxPersistentPerHost = 100 +pnconn_pool.maxPersistentPerHost = 100000 pnconn_pool.cachedConnectionTimeout = 310 +pnconn_pool.retryAutomatically = True class Pubnub(PubnubCoreAsync): @@ -684,7 +839,7 @@ class Pubnub(PubnubCoreAsync): #self.headers['Accept-Encoding'] = [self.accept_encoding] self.headers['V'] = [self.version] - def _request( self, request, callback ) : + def _request( self, request, callback, single=False ) : global pnconn_pool ## Build URL @@ -696,24 +851,42 @@ class Pubnub(PubnubCoreAsync): ]) for bit in request]) ''' url = self.getUrl(request) + agent = ContentDecoderAgent(RedirectAgent(Agent( reactor, contextFactory = WebClientContextFactory(), pool = self.ssl and None or pnconn_pool )), [('gzip', GzipDecoder)]) + request = agent.request( 'GET', url, Headers(self.headers), None ) + if single is True: + id = time.time() + self.id = id + def received(response): finished = Deferred() response.deliverBody(PubNubResponse(finished)) return finished def complete(data): - callback(eval(data)) + if single is True: + if not id == self.id: + return None + try: + callback(eval(data)) + except Exception as e: + pass + #need error handling here + + def abort(): + pass request.addCallback(received) request.addBoth(complete) + return abort + class WebClientContextFactory(ClientContextFactory): def getContext(self, hostname, port): return ClientContextFactory.getContext(self) diff --git a/python-twisted/unassembled/Platform.py b/python-twisted/unassembled/Platform.py index 7318703..3b84b30 100644 --- a/python-twisted/unassembled/Platform.py +++ b/python-twisted/unassembled/Platform.py @@ -7,10 +7,14 @@ from twisted.web.client import HTTPConnectionPool from twisted.web.http_headers import Headers from twisted.internet.ssl import ClientContextFactory from twisted.internet.task import LoopingCall +import twisted +from hashlib import sha256 +import time pnconn_pool = HTTPConnectionPool(reactor, persistent=True) -pnconn_pool.maxPersistentPerHost = 100 +pnconn_pool.maxPersistentPerHost = 100000 pnconn_pool.cachedConnectionTimeout = 310 +pnconn_pool.retryAutomatically = True class Pubnub(PubnubCoreAsync): @@ -41,7 +45,7 @@ class Pubnub(PubnubCoreAsync): #self.headers['Accept-Encoding'] = [self.accept_encoding] self.headers['V'] = [self.version] - def _request( self, request, callback ) : + def _request( self, request, callback, single=False ) : global pnconn_pool ## Build URL @@ -53,24 +57,42 @@ class Pubnub(PubnubCoreAsync): ]) for bit in request]) ''' url = self.getUrl(request) + agent = ContentDecoderAgent(RedirectAgent(Agent( reactor, contextFactory = WebClientContextFactory(), pool = self.ssl and None or pnconn_pool )), [('gzip', GzipDecoder)]) + request = agent.request( 'GET', url, Headers(self.headers), None ) + if single is True: + id = time.time() + self.id = id + def received(response): finished = Deferred() response.deliverBody(PubNubResponse(finished)) return finished def complete(data): - callback(eval(data)) + if single is True: + if not id == self.id: + return None + try: + callback(eval(data)) + except Exception as e: + pass + #need error handling here + + def abort(): + pass request.addCallback(received) request.addBoth(complete) + return abort + class WebClientContextFactory(ClientContextFactory): def getContext(self, hostname, port): return ClientContextFactory.getContext(self) diff --git a/python/Makefile b/python/Makefile index 5eb9e2f..b693cf8 100644 --- a/python/Makefile +++ b/python/Makefile @@ -12,7 +12,7 @@ build: echo "\n" >> ./Pubnub.py cat ../common/PubnubBase.py >> ./Pubnub.py echo "\n" >> ./Pubnub.py - cat ../common/PubnubCore.py >> ./Pubnub.py + cat ../common/PubnubCoreAsync.py >> ./Pubnub.py echo "\n" >> ./Pubnub.py cat ./unassembled/Platform.py >> ./Pubnub.py find -name "Pubnub*py" | xargs sed -i "s/PubNub\ [0-9]\.[0-9]\.[0-9]/PubNub\ $(VERSION)/g" diff --git a/python/Pubnub.py b/python/Pubnub.py index 91f67ad..a449c2d 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -206,13 +206,13 @@ class PubnubBase(object): pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) """ - self.origin = origin - self.limit = 1800 - self.publish_key = publish_key - self.subscribe_key = subscribe_key - self.secret_key = secret_key - self.cipher_key = cipher_key - self.ssl = ssl_on + self.origin = origin + self.limit = 1800 + self.publish_key = publish_key + self.subscribe_key = subscribe_key + self.secret_key = secret_key + self.cipher_key = cipher_key + self.ssl = ssl_on if self.ssl : @@ -261,6 +261,14 @@ class PubnubBase(object): return message + def _return_wrapped_callback(self, callback=None): + def _new_format_callback(response): + if 'payload' in response: + if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) + else: + if (callback != None):callback(response) + if (callback != None): return _new_format_callback + def publish( self, args ) : """ @@ -310,7 +318,7 @@ class PubnubBase(object): channel, '0', message - ]}, callback) + ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) def presence( self, args ) : """ @@ -350,7 +358,7 @@ class PubnubBase(object): callback = args['callback'] subscribe_key = args.get('subscribe_key') or self.subscribe_key - return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) + return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)}) def here_now( self, args ) : @@ -543,7 +551,19 @@ class PubnubBase(object): return url -class PubnubCore(PubnubBase): +try: + from hashlib import sha256 + digestmod = sha256 +except ImportError: + import Crypto.Hash.SHA256 as digestmod + sha256 = digestmod.new +import hmac + +class PubnubCoreAsync(PubnubBase): + + def start(self): pass + def stop(self): pass + def __init__( self, publish_key, @@ -562,17 +582,16 @@ class PubnubCore(PubnubBase): #* #* @param string publish_key required key to send messages. #* @param string subscribe_key required key to receive messages. - #* @param string secret_key optional key to sign messages. + #* @param string secret_key required key to sign messages. #* @param boolean ssl required for 2048 bit encrypted messages. #* @param string origin PUBNUB Server Origin. - #* @param string pres_uuid optional identifier for presence (auto-generated if not supplied) #** ## Initiat Class pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) """ - super(PubnubCore, self).__init__( + super(PubnubCoreAsync, self).__init__( publish_key=publish_key, subscribe_key=subscribe_key, secret_key=secret_key, @@ -584,20 +603,33 @@ class PubnubCore(PubnubBase): self.subscriptions = {} self.timetoken = 0 - self.version = '3.4' + self.version = '3.3.4' self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + def get_channel_list(self, channels): + channel = '' + first = True + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel - - def subscribe( self, args ) : + def subscribe( self, args=None, sync=False ) : """ #** #* Subscribe #* - #* This is BLOCKING. + #* This is NON-BLOCKING. #* Listen for a message on a channel. #* - #* @param array args with channel and callback. + #* @param array args with channel and message. #* @return false on fail, array on success. #** @@ -606,58 +638,158 @@ class PubnubCore(PubnubBase): print(message) return True + ## On Connect Callback + def connected() : + pubnub.publish({ + 'channel' : 'hello_world', + 'message' : { 'some_var' : 'text' } + }) + + ## Subscribe pubnub.subscribe({ 'channel' : 'hello_world', - 'callback' : receive + 'connect' : connected, + 'callback' : receive }) """ - ## Fail if missing channel - if not 'channel' in args : - raise Exception('Missing Channel.') - return False + if sync is True and self.susbcribe_sync is not None: + self.susbcribe_sync(args) + return + + def _invoke(func,msg=None): + if func is not None: + if msg is not None: + func(msg) + else: + func() + + def _invoke_connect(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect']) + + def _invoke_error(err=None): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj.error,err) + + + if callback is None: + _invoke(error, "Callback Missing") + return + + if channel is None: + _invoke(error, "Channel Missing") + return + + def _get_channel(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['subscribed'] is True: + return chobj + + + ## New Channel? + if not channel in self.subscriptions: + self.subscriptions[channel] = { + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect + } - ## Fail if missing callback - if not 'callback' in args : - raise Exception('Missing Callback.') - return False + ## return if already connected to channel + if self.subscriptions[channel]['connected'] : + _invoke(error, "Already Connected") + return + - ## Capture User Input - channel = str(args['channel']) - callback = args['callback'] - subscribe_key = args.get('subscribe_key') or self.subscribe_key + ## SUBSCRIPTION RECURSION + def _connect(): + + self._reset_offline() + + def sub_callback(response): + print response + ## ERROR ? + if not response or error in response: + _invoke_error() + + _invoke_connect() + + + self.timetoken = response[1] - ## Begin Subscribe - while True : + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) - timetoken = 'timetoken' in args and args['timetoken'] or 0 - try : - ## Wait for Message - response = self._request({"urlcomponents" : [ + + _connect() + + + + channel_list = self.get_channel_list(self.subscriptions) + print channel_list + ## CONNECT TO PUBNUB SUBSCRIBE SERVERS + try: + self.SUB_RECEIVER = self._request( { "urlcomponents" : [ 'subscribe', - subscribe_key, - channel, + self.subscribe_key, + channel_list, '0', - str(timetoken) - ],"urlparams" : {"uuid" : self.uuid }}) + str(self.timetoken) + ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + except Exception as e: + self.timeout( 1, _connect) + return + + self._connect = _connect - messages = response[0] - args['timetoken'] = response[1] - ## If it was a timeout - if not len(messages) : - continue + ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) + _connect() - ## Run user Callback and Reconnect if user permits. - for message in messages : - if not callback(self.decrypt(message)) : - return + def _reset_offline(self): + if self.SUB_RECEIVER is not None: + self.SUB_RECEIVER() + self.SUB_RECEIVER = None - except Exception: - time.sleep(1) + def CONNECT(self): + self._reset_offline() + self._connect() + + + def unsubscribe( self, args ): + #print(args['channel']) + channel = str(args['channel']) + if not (channel in self.subscriptions): + return False - return True + ## DISCONNECT + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False + self.CONNECT() try: @@ -665,6 +797,34 @@ try: except: import urllib2 +import threading +import json +import time + +current_req_id = -1 + +class HTTPClient: + def __init__(self, url, callback, id=None): + self.url = url + self.id = id + self.callback = callback + self.stop = False + + def cancel(self): + self.stop = True + self.callback = None + + def run(self): + global current_req_id + data = urllib2.urlopen(self.url, timeout=310).read() + if self.stop is True: + return + if self.id is not None and current_req_id != self.id: + return + if self.callback is not None: + self.callback(json.loads(data)) + + class Pubnub(PubnubCore): def __init__( self, @@ -690,7 +850,33 @@ class Pubnub(PubnubCore): else: self._request = self._request3 - def _request2( self, request, callback = None ) : + def timeout(self, interval, func): + def cb(): + time.sleep(interval) + func() + thread = threading.Thread(target=cb) + thread.start() + + def _request2_async( self, request, callback, single=False ) : + global current_req_id + ## Build URL + url = self.getUrl(request) + if single is True: + id = time.time() + client = HTTPClient(url, callback, id) + current_req_id = id + else: + client = HTTPClient(url, callback) + + thread = threading.Thread(target=client.run) + thread.start() + def abort(): + client.cancel(); + return abort + + + def _request2_sync( self, request) : + ## Build URL url = self.getUrl(request) @@ -704,13 +890,18 @@ class Pubnub(PubnubCore): except: return None - if (callback): - callback(resp_json) - else: return resp_json - def _request3( self, request, callback = None ) : + def _request2(self, request, callback=None, single=False): + if callback is None: + return self._request2_sync(request,single=single) + else: + self._request2_async(request, callback, single=single) + + + + def _request3_sync( self, request) : ## Build URL url = self.getUrl(request) ## Send Request Expecting JSONP Response @@ -718,18 +909,15 @@ class Pubnub(PubnubCore): response = urllib.request.urlopen(url,timeout=310) resp_json = json.loads(response.read().decode("utf-8")) except Exception as e: - print(e) return None - if (callback): - callback(resp_json) - else: - return resp_json + return resp_json - ''' - def _request(self, request, callback = None): - if self.python_version == 2: - return self._request2(request,callback) + def _request3_async( self, request, callback, single=False ) : + pass + + def _request3(self, request, callback=None, single=False): + if callback is None: + return self._request3_sync(request,single=single) else: - return self._request3(request, callback) - ''' + self._request3_async(request, callback, single=single) diff --git a/python/unassembled/Platform.py b/python/unassembled/Platform.py index c60690f..f0f9327 100644 --- a/python/unassembled/Platform.py +++ b/python/unassembled/Platform.py @@ -3,6 +3,34 @@ try: except: import urllib2 +import threading +import json +import time + +current_req_id = -1 + +class HTTPClient: + def __init__(self, url, callback, id=None): + self.url = url + self.id = id + self.callback = callback + self.stop = False + + def cancel(self): + self.stop = True + self.callback = None + + def run(self): + global current_req_id + data = urllib2.urlopen(self.url, timeout=310).read() + if self.stop is True: + return + if self.id is not None and current_req_id != self.id: + return + if self.callback is not None: + self.callback(json.loads(data)) + + class Pubnub(PubnubCore): def __init__( self, @@ -28,7 +56,33 @@ class Pubnub(PubnubCore): else: self._request = self._request3 - def _request2( self, request, callback = None ) : + def timeout(self, interval, func): + def cb(): + time.sleep(interval) + func() + thread = threading.Thread(target=cb) + thread.start() + + def _request2_async( self, request, callback, single=False ) : + global current_req_id + ## Build URL + url = self.getUrl(request) + if single is True: + id = time.time() + client = HTTPClient(url, callback, id) + current_req_id = id + else: + client = HTTPClient(url, callback) + + thread = threading.Thread(target=client.run) + thread.start() + def abort(): + client.cancel(); + return abort + + + def _request2_sync( self, request) : + ## Build URL url = self.getUrl(request) @@ -42,13 +96,18 @@ class Pubnub(PubnubCore): except: return None - if (callback): - callback(resp_json) - else: return resp_json - def _request3( self, request, callback = None ) : + def _request2(self, request, callback=None, single=False): + if callback is None: + return self._request2_sync(request,single=single) + else: + self._request2_async(request, callback, single=single) + + + + def _request3_sync( self, request) : ## Build URL url = self.getUrl(request) ## Send Request Expecting JSONP Response @@ -56,10 +115,15 @@ class Pubnub(PubnubCore): response = urllib.request.urlopen(url,timeout=310) resp_json = json.loads(response.read().decode("utf-8")) except Exception as e: - print(e) return None - if (callback): - callback(resp_json) + return resp_json + + def _request3_async( self, request, callback, single=False ) : + pass + + def _request3(self, request, callback=None, single=False): + if callback is None: + return self._request3_sync(request,single=single) else: - return resp_json + self._request3_async(request, callback, single=single) -- cgit v1.2.3