diff options
Diffstat (limited to 'python/Pubnub.py')
| -rw-r--r-- | python/Pubnub.py | 326 |
1 files changed, 257 insertions, 69 deletions
diff --git a/python/Pubnub.py b/python/Pubnub.py index 91f67ad..a449c2d 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -206,13 +206,13 @@ class PubnubBase(object): pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) """ - self.origin = origin - self.limit = 1800 - self.publish_key = publish_key - self.subscribe_key = subscribe_key - self.secret_key = secret_key - self.cipher_key = cipher_key - self.ssl = ssl_on + self.origin = origin + self.limit = 1800 + self.publish_key = publish_key + self.subscribe_key = subscribe_key + self.secret_key = secret_key + self.cipher_key = cipher_key + self.ssl = ssl_on if self.ssl : @@ -261,6 +261,14 @@ class PubnubBase(object): return message + def _return_wrapped_callback(self, callback=None): + def _new_format_callback(response): + if 'payload' in response: + if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) + else: + if (callback != None):callback(response) + if (callback != None): return _new_format_callback + def publish( self, args ) : """ @@ -310,7 +318,7 @@ class PubnubBase(object): channel, '0', message - ]}, callback) + ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) def presence( self, args ) : """ @@ -350,7 +358,7 @@ class PubnubBase(object): callback = args['callback'] subscribe_key = args.get('subscribe_key') or self.subscribe_key - return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) + return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)}) def here_now( self, args ) : @@ -543,7 +551,19 @@ class PubnubBase(object): return url -class PubnubCore(PubnubBase): +try: + from hashlib import sha256 + digestmod = sha256 +except ImportError: + import Crypto.Hash.SHA256 as digestmod + sha256 = digestmod.new +import hmac + +class PubnubCoreAsync(PubnubBase): + + def start(self): pass + def stop(self): pass + def __init__( self, publish_key, @@ -562,17 +582,16 @@ class PubnubCore(PubnubBase): #* #* @param string publish_key required key to send messages. #* @param string subscribe_key required key to receive messages. - #* @param string secret_key optional key to sign messages. + #* @param string secret_key required key to sign messages. #* @param boolean ssl required for 2048 bit encrypted messages. #* @param string origin PUBNUB Server Origin. - #* @param string pres_uuid optional identifier for presence (auto-generated if not supplied) #** ## Initiat Class pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) """ - super(PubnubCore, self).__init__( + super(PubnubCoreAsync, self).__init__( publish_key=publish_key, subscribe_key=subscribe_key, secret_key=secret_key, @@ -584,20 +603,33 @@ class PubnubCore(PubnubBase): self.subscriptions = {} self.timetoken = 0 - self.version = '3.4' + self.version = '3.3.4' self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + def get_channel_list(self, channels): + channel = '' + first = True + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel - - def subscribe( self, args ) : + def subscribe( self, args=None, sync=False ) : """ #** #* Subscribe #* - #* This is BLOCKING. + #* This is NON-BLOCKING. #* Listen for a message on a channel. #* - #* @param array args with channel and callback. + #* @param array args with channel and message. #* @return false on fail, array on success. #** @@ -606,58 +638,158 @@ class PubnubCore(PubnubBase): print(message) return True + ## On Connect Callback + def connected() : + pubnub.publish({ + 'channel' : 'hello_world', + 'message' : { 'some_var' : 'text' } + }) + + ## Subscribe pubnub.subscribe({ 'channel' : 'hello_world', - 'callback' : receive + 'connect' : connected, + 'callback' : receive }) """ - ## Fail if missing channel - if not 'channel' in args : - raise Exception('Missing Channel.') - return False + if sync is True and self.susbcribe_sync is not None: + self.susbcribe_sync(args) + return + + def _invoke(func,msg=None): + if func is not None: + if msg is not None: + func(msg) + else: + func() + + def _invoke_connect(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect']) + + def _invoke_error(err=None): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj.error,err) + + + if callback is None: + _invoke(error, "Callback Missing") + return + + if channel is None: + _invoke(error, "Channel Missing") + return + + def _get_channel(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['subscribed'] is True: + return chobj + + + ## New Channel? + if not channel in self.subscriptions: + self.subscriptions[channel] = { + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect + } - ## Fail if missing callback - if not 'callback' in args : - raise Exception('Missing Callback.') - return False + ## return if already connected to channel + if self.subscriptions[channel]['connected'] : + _invoke(error, "Already Connected") + return + - ## Capture User Input - channel = str(args['channel']) - callback = args['callback'] - subscribe_key = args.get('subscribe_key') or self.subscribe_key + ## SUBSCRIPTION RECURSION + def _connect(): + + self._reset_offline() + + def sub_callback(response): + print response + ## ERROR ? + if not response or error in response: + _invoke_error() + + _invoke_connect() + + + self.timetoken = response[1] - ## Begin Subscribe - while True : + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) - timetoken = 'timetoken' in args and args['timetoken'] or 0 - try : - ## Wait for Message - response = self._request({"urlcomponents" : [ + + _connect() + + + + channel_list = self.get_channel_list(self.subscriptions) + print channel_list + ## CONNECT TO PUBNUB SUBSCRIBE SERVERS + try: + self.SUB_RECEIVER = self._request( { "urlcomponents" : [ 'subscribe', - subscribe_key, - channel, + self.subscribe_key, + channel_list, '0', - str(timetoken) - ],"urlparams" : {"uuid" : self.uuid }}) + str(self.timetoken) + ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + except Exception as e: + self.timeout( 1, _connect) + return + + self._connect = _connect - messages = response[0] - args['timetoken'] = response[1] - ## If it was a timeout - if not len(messages) : - continue + ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) + _connect() - ## Run user Callback and Reconnect if user permits. - for message in messages : - if not callback(self.decrypt(message)) : - return + def _reset_offline(self): + if self.SUB_RECEIVER is not None: + self.SUB_RECEIVER() + self.SUB_RECEIVER = None - except Exception: - time.sleep(1) + def CONNECT(self): + self._reset_offline() + self._connect() + + + def unsubscribe( self, args ): + #print(args['channel']) + channel = str(args['channel']) + if not (channel in self.subscriptions): + return False - return True + ## DISCONNECT + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False + self.CONNECT() try: @@ -665,6 +797,34 @@ try: except: import urllib2 +import threading +import json +import time + +current_req_id = -1 + +class HTTPClient: + def __init__(self, url, callback, id=None): + self.url = url + self.id = id + self.callback = callback + self.stop = False + + def cancel(self): + self.stop = True + self.callback = None + + def run(self): + global current_req_id + data = urllib2.urlopen(self.url, timeout=310).read() + if self.stop is True: + return + if self.id is not None and current_req_id != self.id: + return + if self.callback is not None: + self.callback(json.loads(data)) + + class Pubnub(PubnubCore): def __init__( self, @@ -690,7 +850,33 @@ class Pubnub(PubnubCore): else: self._request = self._request3 - def _request2( self, request, callback = None ) : + def timeout(self, interval, func): + def cb(): + time.sleep(interval) + func() + thread = threading.Thread(target=cb) + thread.start() + + def _request2_async( self, request, callback, single=False ) : + global current_req_id + ## Build URL + url = self.getUrl(request) + if single is True: + id = time.time() + client = HTTPClient(url, callback, id) + current_req_id = id + else: + client = HTTPClient(url, callback) + + thread = threading.Thread(target=client.run) + thread.start() + def abort(): + client.cancel(); + return abort + + + def _request2_sync( self, request) : + ## Build URL url = self.getUrl(request) @@ -704,13 +890,18 @@ class Pubnub(PubnubCore): except: return None - if (callback): - callback(resp_json) - else: return resp_json - def _request3( self, request, callback = None ) : + def _request2(self, request, callback=None, single=False): + if callback is None: + return self._request2_sync(request,single=single) + else: + self._request2_async(request, callback, single=single) + + + + def _request3_sync( self, request) : ## Build URL url = self.getUrl(request) ## Send Request Expecting JSONP Response @@ -718,18 +909,15 @@ class Pubnub(PubnubCore): response = urllib.request.urlopen(url,timeout=310) resp_json = json.loads(response.read().decode("utf-8")) except Exception as e: - print(e) return None - if (callback): - callback(resp_json) - else: - return resp_json + return resp_json - ''' - def _request(self, request, callback = None): - if self.python_version == 2: - return self._request2(request,callback) + def _request3_async( self, request, callback, single=False ) : + pass + + def _request3(self, request, callback=None, single=False): + if callback is None: + return self._request3_sync(request,single=single) else: - return self._request3(request, callback) - ''' + self._request3_async(request, callback, single=single) |
