From 80edcffbfe140a6d19c65deca24e1ba1c0f49b99 Mon Sep 17 00:00:00 2001 From: Devendra Date: Mon, 24 Mar 2014 19:17:48 +0530 Subject: support for python3 --- python/Pubnub.py | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 59a38af..5dd4c7d 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -98,7 +98,6 @@ except ImportError: import simplejson as json import time import hashlib -import urllib2 import uuid class PubnubBase(object): @@ -146,7 +145,7 @@ class PubnubBase(object): self.uuid = UUID or str(uuid.uuid4()) - if not isinstance(self.uuid, basestring): + if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") def sign(self, channel, message): @@ -207,7 +206,7 @@ class PubnubBase(object): channel = str(args['channel']) ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -291,7 +290,7 @@ class PubnubBase(object): channel = str(args['channel']) ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -329,7 +328,7 @@ class PubnubBase(object): """ ## Capture User Input - limit = args.has_key('limit') and int(args['limit']) or 10 + limit = 'limit' in args and int(args['limit']) or 10 channel = str(args['channel']) ## Fail if bad input. @@ -338,7 +337,7 @@ class PubnubBase(object): return False ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -377,18 +376,18 @@ class PubnubBase(object): params = dict() count = 100 - if args.has_key('count'): + if 'count' in args: count = int(args['count']) params['count'] = str(count) - if args.has_key('reverse'): + if 'reverse' in args: params['reverse'] = str(args['reverse']).lower() - if args.has_key('start'): + if 'start' in args: params['start'] = str(args['start']) - if args.has_key('end'): + if 'end' in args: params['end'] = str(args['end']) ## Fail if bad input. @@ -397,7 +396,7 @@ class PubnubBase(object): return False ## Capture Callback - if args.has_key('callback') : + if 'callback' in args : callback = args['callback'] else : callback = None @@ -428,7 +427,7 @@ class PubnubBase(object): """ ## Capture Callback - if args and args.has_key('callback') : + if args and 'callback' in args : callback = args['callback'] else : callback = None @@ -454,8 +453,8 @@ class PubnubBase(object): hex(ord(ch)).replace( '0x', '%' ).upper() or ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) - if (request.has_key("urlparams")): - url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].iteritems()]) + if ("urlparams" in request): + url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()]) return url @@ -576,6 +575,7 @@ class PubnubCore(PubnubBase): return True +import urllib3 class Pubnub(PubnubCore): def __init__( @@ -596,7 +596,8 @@ class Pubnub(PubnubCore): ssl_on = ssl_on, origin = origin, uuid = pres_uuid - ) + ) + self.http = urllib3.PoolManager(timeout=310) def _request( self, request, callback = None ) : ## Build URL @@ -604,11 +605,8 @@ class Pubnub(PubnubCore): ## Send Request Expecting JSONP Response try: - try: usock = urllib2.urlopen( url, None, 310 ) - except TypeError: usock = urllib2.urlopen( url, None ) - response = usock.read() - usock.close() - resp_json = json.loads(response) + response = self.http.request('GET', url) + resp_json = json.loads(response.data.decode("utf-8")) except: return None -- cgit v1.2.3 From b8d1dd86a0d8c4261d4f3765f3ca227d7b555c84 Mon Sep 17 00:00:00 2001 From: Devendra Date: Tue, 25 Mar 2014 10:50:39 +0530 Subject: fixing encryption, and changing from urllib3 to urllib --- python/Pubnub.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 5dd4c7d..aa7068b 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -40,7 +40,7 @@ class PubnubCrypto() : #** """ padding = block_size - (len(msg) % block_size) - return msg + chr(padding)*padding + return msg + (chr(padding)*padding).encode('utf-8') def depad( self, msg ): """ @@ -62,7 +62,7 @@ class PubnubCrypto() : #* @return key in MD5 format #** """ - return hashlib.sha256(key).hexdigest() + return hashlib.sha256(key.encode("utf-8")).hexdigest() def encrypt( self, key, msg ): """ @@ -76,8 +76,7 @@ class PubnubCrypto() : secret = self.getSecret(key) Initial16bytes='0123456789012345' cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) - enc = encodestring(cipher.encrypt(self.pad(msg))) - return enc + return encodestring(cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') def decrypt( self, key, msg ): """ #** @@ -90,7 +89,7 @@ class PubnubCrypto() : secret = self.getSecret(key) Initial16bytes='0123456789012345' cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) - return self.depad((cipher.decrypt(decodestring(msg)))) + return (cipher.decrypt(decodestring(msg.encode('utf-8')))).decode('utf-8') try: import json @@ -575,7 +574,8 @@ class PubnubCore(PubnubBase): return True -import urllib3 + +import urllib.request class Pubnub(PubnubCore): def __init__( @@ -597,17 +597,16 @@ class Pubnub(PubnubCore): origin = origin, uuid = pres_uuid ) - self.http = urllib3.PoolManager(timeout=310) def _request( self, request, callback = None ) : ## Build URL url = self.getUrl(request) - + print(url) ## Send Request Expecting JSONP Response try: - response = self.http.request('GET', url) - resp_json = json.loads(response.data.decode("utf-8")) - except: + response = urllib.request.urlopen(url,timeout=310) + resp_json = json.loads(response.read().decode("utf-8")) + except Exception as e: return None if (callback): -- cgit v1.2.3 From 9ac3ccf6283772b404a0c80945e3cdf3406ac5bf Mon Sep 17 00:00:00 2001 From: Devendra Date: Tue, 25 Mar 2014 11:47:03 +0530 Subject: making version 2 and version 3 work same time --- python/Pubnub.py | 148 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 134 insertions(+), 14 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index aa7068b..91f67ad 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -16,7 +16,84 @@ from base64 import encodestring, decodestring import hashlib import hmac -class PubnubCrypto() : +class PubnubCrypto2() : + """ + #** + #* PubnubCrypto + #* + #** + + ## Initiate Class + pc = PubnubCrypto + + """ + + def pad( self, msg, block_size=16 ): + """ + #** + #* pad + #* + #* pad the text to be encrypted + #* appends a padding character to the end of the String + #* until the string has block_size length + #* @return msg with padding. + #** + """ + padding = block_size - (len(msg) % block_size) + return msg + chr(padding)*padding + + def depad( self, msg ): + """ + #** + #* depad + #* + #* depad the decryptet message" + #* @return msg without padding. + #** + """ + return msg[0:-ord(msg[-1])] + + def getSecret( self, key ): + """ + #** + #* getSecret + #* + #* hases the key to MD5 + #* @return key in MD5 format + #** + """ + return hashlib.sha256(key).hexdigest() + + def encrypt( self, key, msg ): + """ + #** + #* encrypt + #* + #* encrypts the message + #* @return message in encrypted format + #** + """ + secret = self.getSecret(key) + Initial16bytes='0123456789012345' + cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) + enc = encodestring(cipher.encrypt(self.pad(msg))) + return enc + def decrypt( self, key, msg ): + """ + #** + #* decrypt + #* + #* decrypts the message + #* @return message in decryped format + #** + """ + secret = self.getSecret(key) + Initial16bytes='0123456789012345' + cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) + return self.depad((cipher.decrypt(decodestring(msg)))) + + +class PubnubCrypto3() : """ #** #* PubnubCrypto @@ -97,7 +174,8 @@ except ImportError: import simplejson as json import time import hashlib -import uuid +import uuid +import sys class PubnubBase(object): def __init__( @@ -128,14 +206,14 @@ class PubnubBase(object): pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) """ - self.origin = origin - self.limit = 1800 - self.publish_key = publish_key - self.subscribe_key = subscribe_key - self.secret_key = secret_key - self.cipher_key = cipher_key - self.ssl = ssl_on - self.pc = PubnubCrypto() + self.origin = origin + self.limit = 1800 + self.publish_key = publish_key + self.subscribe_key = subscribe_key + self.secret_key = secret_key + self.cipher_key = cipher_key + self.ssl = ssl_on + if self.ssl : self.origin = 'https://' + self.origin @@ -143,6 +221,14 @@ class PubnubBase(object): self.origin = 'http://' + self.origin self.uuid = UUID or str(uuid.uuid4()) + + if type(sys.version_info) is tuple: + self.python_version = 2 + self.pc = PubnubCrypto2() + else: + self.python_version = 3 + self.pc = PubnubCrypto3() + if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") @@ -574,8 +660,10 @@ class PubnubCore(PubnubBase): return True - -import urllib.request +try: + import urllib.request +except: + import urllib2 class Pubnub(PubnubCore): def __init__( @@ -597,19 +685,51 @@ class Pubnub(PubnubCore): origin = origin, uuid = pres_uuid ) + if self.python_version == 2: + self._request = self._request2 + else: + self._request = self._request3 - def _request( self, request, callback = None ) : + def _request2( self, request, callback = None ) : + ## Build URL + url = self.getUrl(request) + + ## Send Request Expecting JSONP Response + try: + try: usock = urllib2.urlopen( url, None, 310 ) + except TypeError: usock = urllib2.urlopen( url, None ) + response = usock.read() + usock.close() + resp_json = json.loads(response) + except: + return None + + if (callback): + callback(resp_json) + else: + return resp_json + + + def _request3( self, request, callback = None ) : ## Build URL url = self.getUrl(request) - print(url) ## Send Request Expecting JSONP Response try: response = urllib.request.urlopen(url,timeout=310) resp_json = json.loads(response.read().decode("utf-8")) except Exception as e: + print(e) return None if (callback): callback(resp_json) else: return resp_json + + ''' + def _request(self, request, callback = None): + if self.python_version == 2: + return self._request2(request,callback) + else: + return self._request3(request, callback) + ''' -- cgit v1.2.3 From 765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 2 Apr 2014 02:44:29 +0530 Subject: multiplexing support --- python/Pubnub.py | 326 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 257 insertions(+), 69 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 91f67ad..a449c2d 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -206,13 +206,13 @@ class PubnubBase(object): pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) """ - self.origin = origin - self.limit = 1800 - self.publish_key = publish_key - self.subscribe_key = subscribe_key - self.secret_key = secret_key - self.cipher_key = cipher_key - self.ssl = ssl_on + self.origin = origin + self.limit = 1800 + self.publish_key = publish_key + self.subscribe_key = subscribe_key + self.secret_key = secret_key + self.cipher_key = cipher_key + self.ssl = ssl_on if self.ssl : @@ -261,6 +261,14 @@ class PubnubBase(object): return message + def _return_wrapped_callback(self, callback=None): + def _new_format_callback(response): + if 'payload' in response: + if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) + else: + if (callback != None):callback(response) + if (callback != None): return _new_format_callback + def publish( self, args ) : """ @@ -310,7 +318,7 @@ class PubnubBase(object): channel, '0', message - ]}, callback) + ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) def presence( self, args ) : """ @@ -350,7 +358,7 @@ class PubnubBase(object): callback = args['callback'] subscribe_key = args.get('subscribe_key') or self.subscribe_key - return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) + return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)}) def here_now( self, args ) : @@ -543,7 +551,19 @@ class PubnubBase(object): return url -class PubnubCore(PubnubBase): +try: + from hashlib import sha256 + digestmod = sha256 +except ImportError: + import Crypto.Hash.SHA256 as digestmod + sha256 = digestmod.new +import hmac + +class PubnubCoreAsync(PubnubBase): + + def start(self): pass + def stop(self): pass + def __init__( self, publish_key, @@ -562,17 +582,16 @@ class PubnubCore(PubnubBase): #* #* @param string publish_key required key to send messages. #* @param string subscribe_key required key to receive messages. - #* @param string secret_key optional key to sign messages. + #* @param string secret_key required key to sign messages. #* @param boolean ssl required for 2048 bit encrypted messages. #* @param string origin PUBNUB Server Origin. - #* @param string pres_uuid optional identifier for presence (auto-generated if not supplied) #** ## Initiat Class pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) """ - super(PubnubCore, self).__init__( + super(PubnubCoreAsync, self).__init__( publish_key=publish_key, subscribe_key=subscribe_key, secret_key=secret_key, @@ -584,20 +603,33 @@ class PubnubCore(PubnubBase): self.subscriptions = {} self.timetoken = 0 - self.version = '3.4' + self.version = '3.3.4' self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + def get_channel_list(self, channels): + channel = '' + first = True + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel - - def subscribe( self, args ) : + def subscribe( self, args=None, sync=False ) : """ #** #* Subscribe #* - #* This is BLOCKING. + #* This is NON-BLOCKING. #* Listen for a message on a channel. #* - #* @param array args with channel and callback. + #* @param array args with channel and message. #* @return false on fail, array on success. #** @@ -606,58 +638,158 @@ class PubnubCore(PubnubBase): print(message) return True + ## On Connect Callback + def connected() : + pubnub.publish({ + 'channel' : 'hello_world', + 'message' : { 'some_var' : 'text' } + }) + + ## Subscribe pubnub.subscribe({ 'channel' : 'hello_world', - 'callback' : receive + 'connect' : connected, + 'callback' : receive }) """ - ## Fail if missing channel - if not 'channel' in args : - raise Exception('Missing Channel.') - return False + if sync is True and self.susbcribe_sync is not None: + self.susbcribe_sync(args) + return + + def _invoke(func,msg=None): + if func is not None: + if msg is not None: + func(msg) + else: + func() + + def _invoke_connect(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect']) + + def _invoke_error(err=None): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj.error,err) + + + if callback is None: + _invoke(error, "Callback Missing") + return + + if channel is None: + _invoke(error, "Channel Missing") + return + + def _get_channel(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['subscribed'] is True: + return chobj + + + ## New Channel? + if not channel in self.subscriptions: + self.subscriptions[channel] = { + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect + } - ## Fail if missing callback - if not 'callback' in args : - raise Exception('Missing Callback.') - return False + ## return if already connected to channel + if self.subscriptions[channel]['connected'] : + _invoke(error, "Already Connected") + return + - ## Capture User Input - channel = str(args['channel']) - callback = args['callback'] - subscribe_key = args.get('subscribe_key') or self.subscribe_key + ## SUBSCRIPTION RECURSION + def _connect(): + + self._reset_offline() + + def sub_callback(response): + print response + ## ERROR ? + if not response or error in response: + _invoke_error() + + _invoke_connect() + + + self.timetoken = response[1] - ## Begin Subscribe - while True : + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) - timetoken = 'timetoken' in args and args['timetoken'] or 0 - try : - ## Wait for Message - response = self._request({"urlcomponents" : [ + + _connect() + + + + channel_list = self.get_channel_list(self.subscriptions) + print channel_list + ## CONNECT TO PUBNUB SUBSCRIBE SERVERS + try: + self.SUB_RECEIVER = self._request( { "urlcomponents" : [ 'subscribe', - subscribe_key, - channel, + self.subscribe_key, + channel_list, '0', - str(timetoken) - ],"urlparams" : {"uuid" : self.uuid }}) + str(self.timetoken) + ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + except Exception as e: + self.timeout( 1, _connect) + return + + self._connect = _connect - messages = response[0] - args['timetoken'] = response[1] - ## If it was a timeout - if not len(messages) : - continue + ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) + _connect() - ## Run user Callback and Reconnect if user permits. - for message in messages : - if not callback(self.decrypt(message)) : - return + def _reset_offline(self): + if self.SUB_RECEIVER is not None: + self.SUB_RECEIVER() + self.SUB_RECEIVER = None - except Exception: - time.sleep(1) + def CONNECT(self): + self._reset_offline() + self._connect() + + + def unsubscribe( self, args ): + #print(args['channel']) + channel = str(args['channel']) + if not (channel in self.subscriptions): + return False - return True + ## DISCONNECT + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False + self.CONNECT() try: @@ -665,6 +797,34 @@ try: except: import urllib2 +import threading +import json +import time + +current_req_id = -1 + +class HTTPClient: + def __init__(self, url, callback, id=None): + self.url = url + self.id = id + self.callback = callback + self.stop = False + + def cancel(self): + self.stop = True + self.callback = None + + def run(self): + global current_req_id + data = urllib2.urlopen(self.url, timeout=310).read() + if self.stop is True: + return + if self.id is not None and current_req_id != self.id: + return + if self.callback is not None: + self.callback(json.loads(data)) + + class Pubnub(PubnubCore): def __init__( self, @@ -690,7 +850,33 @@ class Pubnub(PubnubCore): else: self._request = self._request3 - def _request2( self, request, callback = None ) : + def timeout(self, interval, func): + def cb(): + time.sleep(interval) + func() + thread = threading.Thread(target=cb) + thread.start() + + def _request2_async( self, request, callback, single=False ) : + global current_req_id + ## Build URL + url = self.getUrl(request) + if single is True: + id = time.time() + client = HTTPClient(url, callback, id) + current_req_id = id + else: + client = HTTPClient(url, callback) + + thread = threading.Thread(target=client.run) + thread.start() + def abort(): + client.cancel(); + return abort + + + def _request2_sync( self, request) : + ## Build URL url = self.getUrl(request) @@ -704,13 +890,18 @@ class Pubnub(PubnubCore): except: return None - if (callback): - callback(resp_json) - else: return resp_json - def _request3( self, request, callback = None ) : + def _request2(self, request, callback=None, single=False): + if callback is None: + return self._request2_sync(request,single=single) + else: + self._request2_async(request, callback, single=single) + + + + def _request3_sync( self, request) : ## Build URL url = self.getUrl(request) ## Send Request Expecting JSONP Response @@ -718,18 +909,15 @@ class Pubnub(PubnubCore): response = urllib.request.urlopen(url,timeout=310) resp_json = json.loads(response.read().decode("utf-8")) except Exception as e: - print(e) return None - if (callback): - callback(resp_json) - else: - return resp_json + return resp_json - ''' - def _request(self, request, callback = None): - if self.python_version == 2: - return self._request2(request,callback) + def _request3_async( self, request, callback, single=False ) : + pass + + def _request3(self, request, callback=None, single=False): + if callback is None: + return self._request3_sync(request,single=single) else: - return self._request3(request, callback) - ''' + self._request3_async(request, callback, single=single) -- cgit v1.2.3 From 99096b8c11b9a541f6350639e8735495cf90091c Mon Sep 17 00:00:00 2001 From: Devendra Date: Fri, 11 Apr 2014 14:49:43 +0530 Subject: v1 MX and async code for python, twisted, tornado --- python/Pubnub.py | 286 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 224 insertions(+), 62 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index a449c2d..f3c518c 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -176,6 +176,14 @@ import time import hashlib import uuid import sys +from urllib import quote + +from base64 import urlsafe_b64encode +from hashlib import sha256 +from urllib import quote +from urllib import urlopen + +import hmac class PubnubBase(object): def __init__( @@ -184,6 +192,7 @@ class PubnubBase(object): subscribe_key, secret_key = False, cipher_key = False, + auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', UUID = None @@ -213,6 +222,7 @@ class PubnubBase(object): self.secret_key = secret_key self.cipher_key = cipher_key self.ssl = ssl_on + self.auth_key = auth_key if self.ssl : @@ -247,6 +257,86 @@ class PubnubBase(object): signature = '0' return signature + def _pam_sign( self, msg ): + """Calculate a signature by secret key and message.""" + + return urlsafe_b64encode(hmac.new( + self.secret_key.encode("utf-8"), + msg.encode("utf-8"), + sha256 + ).digest()) + + def _pam_auth( self, query , apicode=0, callback=None): + """Issue an authenticated request.""" + + if 'timestamp' not in query: + query['timestamp'] = int(time.time()) + + ## Global Grant? + if 'auth' in query and not query['auth']: + del query['auth'] + + if 'channel' in query and not query['channel']: + del query['channel'] + + params = "&".join([ + x + "=" + quote( + str(query[x]), safe="" + ) for x in sorted(query) + ]) + sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format( + subkey=self.subscribe_key, + pubkey=self.publish_key, + apitype="audit" if (apicode) else "grant", + params=params + ) + + signature = self._pam_sign(sign_input) + + ''' + url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") + + self.subscribe_key + "?" + + params + "&signature=" + + quote(signature, safe="")) + ''' + + return self._request({"urlcomponents": [ + 'v1', 'auth', "audit" if (apicode) else "grant" , + 'sub-key', + self.subscribe_key + ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}}, + self._return_wrapped_callback(callback)) + + def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None): + """Grant Access on a Channel.""" + + return self._pam_auth({ + "channel" : channel, + "auth" : authkey, + "r" : read and 1 or 0, + "w" : write and 1 or 0, + "ttl" : ttl + }, callback=callback) + + def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None): + """Revoke Access on a Channel.""" + + return self._pam_auth({ + "channel" : channel, + "auth" : authkey, + "r" : read and 1 or 0, + "w" : write and 1 or 0, + "ttl" : ttl + }, callback=callback) + + def audit(self, channel=False, authkey=False, callback=None): + return self._pam_auth({ + "channel" : channel, + "auth" : authkey + },1, callback=callback) + + + def encrypt(self, message): if self.cipher_key: message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n','')) @@ -318,7 +408,7 @@ class PubnubBase(object): channel, '0', message - ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) + ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) def presence( self, args ) : """ @@ -520,7 +610,7 @@ class PubnubBase(object): """ ## Capture Callback - if args and 'callback' in args : + if args and 'callback' in args: callback = args['callback'] else : callback = None @@ -547,7 +637,7 @@ class PubnubBase(object): ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) if ("urlparams" in request): - url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()]) + url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()]) return url @@ -558,6 +648,9 @@ except ImportError: import Crypto.Hash.SHA256 as digestmod sha256 = digestmod.new import hmac +import threading +from threading import current_thread +import threading class PubnubCoreAsync(PubnubBase): @@ -570,6 +663,7 @@ class PubnubCoreAsync(PubnubBase): subscribe_key, secret_key = False, cipher_key = False, + auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', uuid = None @@ -596,6 +690,7 @@ class PubnubCoreAsync(PubnubBase): subscribe_key=subscribe_key, secret_key=secret_key, cipher_key=cipher_key, + auth_key=auth_key, ssl_on=ssl_on, origin=origin, UUID=uuid @@ -603,22 +698,36 @@ class PubnubCoreAsync(PubnubBase): self.subscriptions = {} self.timetoken = 0 + self.last_timetoken = 0 self.version = '3.3.4' self.accept_encoding = 'gzip' self.SUB_RECEIVER = None self._connect = None + self._tt_lock = threading.RLock() def get_channel_list(self, channels): channel = '' first = True - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch + if self._channel_list_lock: + with self._channel_list_lock: + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + else: + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel def subscribe( self, args=None, sync=False ) : @@ -653,6 +762,26 @@ class PubnubCoreAsync(PubnubBase): }) """ + if args is None: + _invoke(error, "Arguments Missing") + return + channel = args['channel'] if 'channel' in args else None + callback = args['callback'] if 'callback' in args else None + connect = args['connect'] if 'connect' in args else None + disconnect = args['disconnect'] if 'disconnect' in args else None + reconnect = args['reconnect'] if 'reconnect' in args else None + error = args['error'] if 'error' in args else None + + with self._tt_lock: + self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken + self.timetoken = 0 + + if channel is None: + _invoke(error, "Channel Missing") + return + if callback is None: + _invoke(error, "Callback Missing") + return if sync is True and self.susbcribe_sync is not None: self.susbcribe_sync(args) @@ -666,18 +795,20 @@ class PubnubCoreAsync(PubnubBase): func() def _invoke_connect(): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - if chobj['connected'] is False: - chobj['connected'] = True - _invoke(chobj['connect']) + if self._channel_list_lock: + with self._channel_list_lock: + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect'],chobj['name']) def _invoke_error(err=None): for ch in self.subscriptions: chobj = self.subscriptions[ch] _invoke(chobj.error,err) - + ''' if callback is None: _invoke(error, "Callback Missing") return @@ -685,6 +816,7 @@ class PubnubCoreAsync(PubnubBase): if channel is None: _invoke(error, "Channel Missing") return + ''' def _get_channel(): for ch in self.subscriptions: @@ -695,22 +827,36 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? if not channel in self.subscriptions: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect - } + if self._channel_list_lock: + with self._channel_list_lock: + self.subscriptions[channel] = { + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect + } + else: + self.subscriptions[channel] = { + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect + } ## return if already connected to channel - if self.subscriptions[channel]['connected'] : + if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: _invoke(error, "Already Connected") return + ## SUBSCRIPTION RECURSION def _connect(): @@ -718,37 +864,37 @@ class PubnubCoreAsync(PubnubBase): self._reset_offline() def sub_callback(response): - print response ## ERROR ? if not response or error in response: _invoke_error() _invoke_connect() - - self.timetoken = response[1] - - if len(response) > 2: - channel_list = response[2].split(',') - response_list = response[0] - for ch in enumerate(channel_list): - if ch[1] in self.subscriptions: - chobj = self.subscriptions[ch[1]] - _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) - else: - response_list = response[0] - chobj = _get_channel() - for r in response_list: - if chobj: - _invoke(chobj['callback'], self.decrypt(r)) - - - _connect() + with self._tt_lock: + #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken + self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] + #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) + + #with self._tt_lock: + # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] + _connect() channel_list = self.get_channel_list(self.subscriptions) - print channel_list ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: self.SUB_RECEIVER = self._request( { "urlcomponents" : [ @@ -759,6 +905,7 @@ class PubnubCoreAsync(PubnubBase): str(self.timetoken) ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) except Exception as e: + print e self.timeout( 1, _connect) return @@ -800,8 +947,11 @@ except: import threading import json import time +import threading +from threading import current_thread -current_req_id = -1 +latest_sub_callback_lock = threading.RLock() +latest_sub_callback = {'id' : None, 'callback' : None} class HTTPClient: def __init__(self, url, callback, id=None): @@ -815,23 +965,32 @@ class HTTPClient: self.callback = None def run(self): - global current_req_id data = urllib2.urlopen(self.url, timeout=310).read() if self.stop is True: return - if self.id is not None and current_req_id != self.id: - return - if self.callback is not None: + if self.callback is None: + global latest_sub_callback + global latest_sub_callback_lock + with latest_sub_callback_lock: + if latest_sub_callback['id'] != self.id: + return + else: + print(data) + if latest_sub_callback['callback'] is not None: + latest_sub_callback['id'] = 0 + latest_sub_callback['callback'](json.loads(data)) + else: self.callback(json.loads(data)) -class Pubnub(PubnubCore): +class Pubnub(PubnubCoreAsync): def __init__( self, publish_key, subscribe_key, secret_key = False, cipher_key = False, + auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', pres_uuid = None @@ -841,6 +1000,7 @@ class Pubnub(PubnubCore): subscribe_key = subscribe_key, secret_key = secret_key, cipher_key = cipher_key, + auth_key = auth_key, ssl_on = ssl_on, origin = origin, uuid = pres_uuid @@ -849,6 +1009,7 @@ class Pubnub(PubnubCore): self._request = self._request2 else: self._request = self._request3 + self._channel_list_lock = threading.RLock() def timeout(self, interval, func): def cb(): @@ -858,13 +1019,14 @@ class Pubnub(PubnubCore): thread.start() def _request2_async( self, request, callback, single=False ) : - global current_req_id ## Build URL url = self.getUrl(request) if single is True: id = time.time() - client = HTTPClient(url, callback, id) - current_req_id = id + client = HTTPClient(url, None, id) + with latest_sub_callback_lock: + latest_sub_callback['id'] = id + latest_sub_callback['callback'] = callback else: client = HTTPClient(url, callback) @@ -879,7 +1041,6 @@ class Pubnub(PubnubCore): ## Build URL url = self.getUrl(request) - ## Send Request Expecting JSONP Response try: try: usock = urllib2.urlopen( url, None, 310 ) @@ -887,15 +1048,16 @@ class Pubnub(PubnubCore): response = usock.read() usock.close() resp_json = json.loads(response) - except: + except Exception as e: + print e return None - return resp_json + return resp_json def _request2(self, request, callback=None, single=False): if callback is None: - return self._request2_sync(request,single=single) + return self._request2_sync(request) else: self._request2_async(request, callback, single=single) -- cgit v1.2.3 From 150ae1566d813acbb773839e919db2c0f467931c Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 16 Apr 2014 00:00:40 +0530 Subject: adding code to support async and pam client capabilities with python v2 and v3 --- python/Pubnub.py | 376 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 198 insertions(+), 178 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index f3c518c..95eafd0 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -176,12 +176,13 @@ import time import hashlib import uuid import sys -from urllib import quote + +try: from urllib.parse import quote +except: from urllib2 import quote from base64 import urlsafe_b64encode from hashlib import sha256 -from urllib import quote -from urllib import urlopen + import hmac @@ -233,12 +234,11 @@ class PubnubBase(object): self.uuid = UUID or str(uuid.uuid4()) if type(sys.version_info) is tuple: - self.python_version = 2 - self.pc = PubnubCrypto2() + self.python_version = 2 + self.pc = PubnubCrypto2() else: self.python_version = 3 self.pc = PubnubCrypto3() - if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") @@ -357,7 +357,10 @@ class PubnubBase(object): if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) else: if (callback != None):callback(response) - if (callback != None): return _new_format_callback + if (callback != None): + return _new_format_callback + else: + return None def publish( self, args ) : @@ -392,23 +395,28 @@ class PubnubBase(object): if 'callback' in args : callback = args['callback'] else : - callback = None + callback = None + + if 'error' in args : + error = args['error'] + else : + error = None - #message = json.dumps(args['message'], separators=(',',':')) message = self.encrypt(args['message']) - signature = self.sign(channel, message) + #signature = self.sign(channel, message) ## Send Message return self._request({"urlcomponents": [ 'publish', self.publish_key, self.subscribe_key, - signature, + '0', channel, '0', message - ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def presence( self, args ) : """ @@ -472,12 +480,10 @@ class PubnubBase(object): """ channel = str(args['channel']) - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - + + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None + ## Fail if bad input. if not channel : raise Exception('Missing Channel') @@ -488,59 +494,16 @@ class PubnubBase(object): 'v2','presence', 'sub_key', self.subscribe_key, 'channel', channel - ]}, callback); - - - def history( self, args ) : + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) + + def history(self, args) : """ #** #* History #* #* Load history from a channel. #* - #* @param array args with 'channel' and 'limit'. - #* @return mixed false on fail, array on success. - #* - - ## History Example - history = pubnub.history({ - 'channel' : 'hello_world', - 'limit' : 1 - }) - print(history) - - """ - ## Capture User Input - limit = 'limit' in args and int(args['limit']) or 10 - channel = str(args['channel']) - - ## Fail if bad input. - if not channel : - raise Exception('Missing Channel') - return False - - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - - ## Get History - return self._request({ "urlcomponents" : [ - 'history', - self.subscribe_key, - channel, - '0', - str(limit) - ] }, callback); - - def detailedHistory(self, args) : - """ - #** - #* Detailed History - #* - #* Load Detailed history from a channel. - #* #* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count' #* @return mixed false on fail, array on success. #* @@ -556,34 +519,21 @@ class PubnubBase(object): ## Capture User Input channel = str(args['channel']) - params = dict() - count = 100 - - if 'count' in args: - count = int(args['count']) + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None - params['count'] = str(count) - - if 'reverse' in args: - params['reverse'] = str(args['reverse']).lower() - - if 'start' in args: - params['start'] = str(args['start']) + params = dict() - if 'end' in args: - params['end'] = str(args['end']) + params['count'] = str(args['count']) if 'count' in args else 100 + params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false' + params['start'] = str(args['start']) if 'start' in args else None + params['end'] = str(args['end']) if 'end' in args else None ## Fail if bad input. if not channel : raise Exception('Missing Channel') return False - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - ## Get History return self._request({ 'urlcomponents' : [ 'v2', @@ -592,7 +542,8 @@ class PubnubBase(object): self.subscribe_key, 'channel', channel, - ],'urlparams' : params }, callback=callback); + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def time(self, args = None) : """ @@ -610,10 +561,9 @@ class PubnubBase(object): """ ## Capture Callback - if args and 'callback' in args: - callback = args['callback'] - else : - callback = None + + callback = callback if args and 'callback' in args else None + time = self._request({'urlcomponents' : [ 'time', '0' @@ -637,7 +587,8 @@ class PubnubBase(object): ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) if ("urlparams" in request): - url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()]) + url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None]) + #print(url) return url @@ -648,9 +599,14 @@ except ImportError: import Crypto.Hash.SHA256 as digestmod sha256 = digestmod.new import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): + def __enter__(self): + pass + def __exit__(self,a,b,c): + pass + +empty_lock = EmptyLock() class PubnubCoreAsync(PubnubBase): @@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase): auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', - uuid = None + uuid = None, + _tt_lock=empty_lock, + _channel_list_lock=empty_lock ) : """ #** @@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase): UUID=uuid ) - self.subscriptions = {} - self.timetoken = 0 - self.last_timetoken = 0 - self.version = '3.3.4' - self.accept_encoding = 'gzip' - self.SUB_RECEIVER = None - self._connect = None - self._tt_lock = threading.RLock() + self.subscriptions = {} + self.timetoken = 0 + self.last_timetoken = 0 + self.version = '3.3.4' + self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + self._tt_lock = _tt_lock + self._channel_list_lock = _channel_list_lock def get_channel_list(self, channels): channel = '' first = True - if self._channel_list_lock: - with self._channel_list_lock: - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch - else: + with self._channel_list_lock: for ch in channels: if not channels[ch]['subscribed']: continue @@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase): else: first = False channel += ch - return channel + + def each(l, func): + if func is None: + return + for i in l: + func(i) + def subscribe( self, args=None, sync=False ) : """ #** @@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase): if args is None: _invoke(error, "Arguments Missing") return - channel = args['channel'] if 'channel' in args else None - callback = args['callback'] if 'callback' in args else None - connect = args['connect'] if 'connect' in args else None - disconnect = args['disconnect'] if 'disconnect' in args else None - reconnect = args['reconnect'] if 'reconnect' in args else None - error = args['error'] if 'error' in args else None + channel = args['channel'] if 'channel' in args else None + callback = args['callback'] if 'callback' in args else None + connect = args['connect'] if 'connect' in args else None + disconnect = args['disconnect'] if 'disconnect' in args else None + reconnect = args['reconnect'] if 'reconnect' in args else None + error = args['error'] if 'error' in args else None with self._tt_lock: self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase): chobj['connected'] = True _invoke(chobj['connect'],chobj['name']) - def _invoke_error(err=None): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - _invoke(chobj.error,err) + def _invoke_error(channel_list=None, err=None): + if channel_list is None: + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) + else: + for ch in channel_list: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) ''' if callback is None: @@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? if not channel in self.subscriptions: - if self._channel_list_lock: - with self._channel_list_lock: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect - } - else: + with self._channel_list_lock: self.subscriptions[channel] = { 'name' : channel, 'first' : False, @@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase): 'callback' : callback, 'connect' : connect, 'disconnect' : disconnect, - 'reconnect' : reconnect + 'reconnect' : reconnect, + 'error' : error } + ## return if already connected to channel if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: _invoke(error, "Already Connected") @@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase): def sub_callback(response): ## ERROR ? - if not response or error in response: - _invoke_error() + #print response + if not response or ('message' in response and response['message'] == 'Forbidden'): + _invoke_error(response['payload']['channels'], response['message']) + _connect() + return _invoke_connect() @@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase): _connect() - channel_list = self.get_channel_list(self.subscriptions) ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: @@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase): channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True ) except Exception as e: - print e + print(e) self.timeout( 1, _connect) return @@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase): def unsubscribe( self, args ): - #print(args['channel']) - channel = str(args['channel']) - if not (channel in self.subscriptions): + + if 'channel' in self.subscriptions is False: return False + channel = str(args['channel']) + + ## DISCONNECT - self.subscriptions[channel]['connected'] = 0 - self.subscriptions[channel]['subscribed'] = False - self.subscriptions[channel]['timetoken'] = 0 - self.subscriptions[channel]['first'] = False + with self._channel_list_lock: + if channel in self.subscriptions: + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False self.CONNECT() @@ -953,19 +909,43 @@ from threading import current_thread latest_sub_callback_lock = threading.RLock() latest_sub_callback = {'id' : None, 'callback' : None} + + + class HTTPClient: - def __init__(self, url, callback, id=None): + def __init__(self, url, urllib_func=None, callback=None, error=None, id=None): self.url = url self.id = id self.callback = callback + self.error = error self.stop = False + self._urllib_func = urllib_func def cancel(self): self.stop = True self.callback = None + self.error = None + def run(self): - data = urllib2.urlopen(self.url, timeout=310).read() + + def _invoke(func, data): + if func is not None: + func(data) + + if self._urllib_func is None: + return + + ''' + try: + resp = urllib2.urlopen(self.url, timeout=320) + except urllib2.HTTPError as http_error: + resp = http_error + ''' + resp = self._urllib_func(self.url, timeout=320) + data = resp[0] + code = resp[1] + if self.stop is True: return if self.callback is None: @@ -975,14 +955,49 @@ class HTTPClient: if latest_sub_callback['id'] != self.id: return else: - print(data) if latest_sub_callback['callback'] is not None: latest_sub_callback['id'] = 0 - latest_sub_callback['callback'](json.loads(data)) + try: + data = json.loads(data) + except: + _invoke(latest_sub_callback['error'], {'error' : 'json decoding error'}) + return + if code != 200: + _invoke(latest_sub_callback['error'],data) + else: + _invoke(latest_sub_callback['callback'],data) else: - self.callback(json.loads(data)) + try: + data = json.loads(data) + except: + _invoke(self.error, {'error' : 'json decoding error'}) + return + + if code != 200: + _invoke(self.error,data) + else: + _invoke(self.callback,data) +def _urllib_request_2(url, timeout=320): + try: + resp = urllib2.urlopen(url,timeout=timeout) + except urllib2.HTTPError as http_error: + resp = http_error + return (resp.read(),resp.code) + +def _urllib_request_3(url, timeout=320): + #print(url) + try: + resp = urllib.request.urlopen(url,timeout=timeout) + except urllib.request.HTTPError as http_error: + resp = http_error + r = resp.read().decode("utf-8") + #print(r) + return (r,resp.code) + +_urllib_request = None + class Pubnub(PubnubCoreAsync): def __init__( self, @@ -1003,13 +1018,15 @@ class Pubnub(PubnubCoreAsync): auth_key = auth_key, ssl_on = ssl_on, origin = origin, - uuid = pres_uuid + uuid = pres_uuid, + _tt_lock=threading.RLock(), + _channel_list_lock=threading.RLock() ) + global _urllib_request if self.python_version == 2: - self._request = self._request2 + _urllib_request = _urllib_request_2 else: - self._request = self._request3 - self._channel_list_lock = threading.RLock() + _urllib_request = _urllib_request_3 def timeout(self, interval, func): def cb(): @@ -1018,17 +1035,20 @@ class Pubnub(PubnubCoreAsync): thread = threading.Thread(target=cb) thread.start() - def _request2_async( self, request, callback, single=False ) : + + def _request_async( self, request, callback=None, error=None, single=False ) : + global _urllib_request ## Build URL url = self.getUrl(request) if single is True: id = time.time() - client = HTTPClient(url, None, id) + client = HTTPClient(url=url, urllib_func=_urllib_request, callback=None, error=None, id=id) with latest_sub_callback_lock: latest_sub_callback['id'] = id latest_sub_callback['callback'] = callback + latest_sub_callback['error'] = error else: - client = HTTPClient(url, callback) + client = HTTPClient(url=url, urllib_func=_urllib_request, callback=callback, error=error) thread = threading.Thread(target=client.run) thread.start() @@ -1037,31 +1057,30 @@ class Pubnub(PubnubCoreAsync): return abort - def _request2_sync( self, request) : - + def _request_sync( self, request) : + global _urllib_request ## Build URL url = self.getUrl(request) ## Send Request Expecting JSONP Response + response = _urllib_request(url, timeout=320) try: - try: usock = urllib2.urlopen( url, None, 310 ) - except TypeError: usock = urllib2.urlopen( url, None ) - response = usock.read() - usock.close() - resp_json = json.loads(response) - except Exception as e: - print e - return None - + resp_json = json.loads(response[0]) + except: + return [0,"JSON Error"] + + if response[1] != 200 and 'status' in resp_json: + return {'message' : resp_json['message'], 'payload' : resp_json['payload']} + return resp_json - def _request2(self, request, callback=None, single=False): + def _request(self, request, callback=None, error=None, single=False): if callback is None: - return self._request2_sync(request) + return self._request_sync(request) else: - self._request2_async(request, callback, single=single) - + self._request_async(request, callback, error, single=single) +''' def _request3_sync( self, request) : ## Build URL @@ -1083,3 +1102,4 @@ class Pubnub(PubnubCoreAsync): return self._request3_sync(request,single=single) else: self._request3_async(request, callback, single=single) + ''' -- cgit v1.2.3 From 9dc2555746adf717da0db808f4096af2167a1580 Mon Sep 17 00:00:00 2001 From: Devendra Date: Thu, 17 Apr 2014 00:28:05 +0530 Subject: adding ver 1 of dev console --- python/Pubnub.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 95eafd0..169bd54 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -291,7 +291,7 @@ class PubnubBase(object): params=params ) - signature = self._pam_sign(sign_input) + query['signature'] = self._pam_sign(sign_input) ''' url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") + @@ -304,7 +304,7 @@ class PubnubBase(object): 'v1', 'auth', "audit" if (apicode) else "grant" , 'sub-key', self.subscribe_key - ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}}, + ], 'urlparams' : query}, self._return_wrapped_callback(callback)) def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None): -- cgit v1.2.3 From 85416ee30a4f54183e2be30f9ef3cc1363f98c4d Mon Sep 17 00:00:00 2001 From: Devendra Date: Thu, 17 Apr 2014 01:06:51 +0530 Subject: adding more options to dev console --- python/Pubnub.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 169bd54..64bc14b 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -318,14 +318,14 @@ class PubnubBase(object): "ttl" : ttl }, callback=callback) - def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None): + def revoke( self, channel, authkey=False, ttl=1, callback=None): """Revoke Access on a Channel.""" return self._pam_auth({ "channel" : channel, "auth" : authkey, - "r" : read and 1 or 0, - "w" : write and 1 or 0, + "r" : 0, + "w" : 0, "ttl" : ttl }, callback=callback) -- cgit v1.2.3 From 5d6a3e1356182663b03d62f9258d38459d49017e Mon Sep 17 00:00:00 2001 From: Devendra Date: Fri, 18 Apr 2014 01:23:19 +0530 Subject: fixing wrong version detection with python 2.7 --- python/Pubnub.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 64bc14b..7af2e92 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -237,8 +237,12 @@ class PubnubBase(object): self.python_version = 2 self.pc = PubnubCrypto2() else: - self.python_version = 3 - self.pc = PubnubCrypto3() + if sys.version_info.major == 2: + self.python_version = 2 + self.pc = PubnubCrypto2() + else: + self.python_version = 3 + self.pc = PubnubCrypto3() if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") -- cgit v1.2.3 From 4926061ebebbc060d14feac9c9d6b13880205724 Mon Sep 17 00:00:00 2001 From: Devendra Date: Tue, 22 Apr 2014 23:12:05 +0530 Subject: improvements to dev console --- python/Pubnub.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 7af2e92..bca3ee5 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -592,7 +592,6 @@ class PubnubBase(object): ]) for bit in request["urlcomponents"]]) if ("urlparams" in request): url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None]) - #print(url) return url @@ -682,6 +681,15 @@ class PubnubCoreAsync(PubnubBase): channel += ch return channel + def get_channel_array(self): + channels = self.subscriptions + channel = [] + with self._channel_list_lock: + for ch in channels: + if not channels[ch]['subscribed']: + continue + channel.append(ch) + return channel def each(l, func): if func is None: @@ -790,7 +798,7 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? - if not channel in self.subscriptions: + if not channel in self.subscriptions or self.subscriptions[channel]['subscribed'] is False: with self._channel_list_lock: self.subscriptions[channel] = { 'name' : channel, @@ -819,7 +827,6 @@ class PubnubCoreAsync(PubnubBase): def sub_callback(response): ## ERROR ? - #print response if not response or ('message' in response and response['message'] == 'Forbidden'): _invoke_error(response['payload']['channels'], response['message']) _connect() @@ -828,9 +835,7 @@ class PubnubCoreAsync(PubnubBase): _invoke_connect() with self._tt_lock: - #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] - #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken if len(response) > 2: channel_list = response[2].split(',') response_list = response[0] @@ -845,12 +850,13 @@ class PubnubCoreAsync(PubnubBase): if chobj: _invoke(chobj['callback'], self.decrypt(r)) - #with self._tt_lock: - # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] _connect() channel_list = self.get_channel_list(self.subscriptions) + if len(channel_list) <= 0: + return + ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: self.SUB_RECEIVER = self._request( { "urlcomponents" : [ @@ -867,7 +873,6 @@ class PubnubCoreAsync(PubnubBase): self._connect = _connect - ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) _connect() -- cgit v1.2.3 From b2f1e0ed2337b61073a595eaf36afd718a21a3fe Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 23 Apr 2014 02:10:55 +0530 Subject: fixing issues reported by Dara in dev console --- python/Pubnub.py | 1 + 1 file changed, 1 insertion(+) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index bca3ee5..d41f626 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -666,6 +666,7 @@ class PubnubCoreAsync(PubnubBase): self._connect = None self._tt_lock = _tt_lock self._channel_list_lock = _channel_list_lock + self._connect = lambda: None def get_channel_list(self, channels): channel = '' -- cgit v1.2.3 From fdb46e56fa6794940f9fbe51a2863d58e927e655 Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 23 Apr 2014 12:54:16 +0530 Subject: switching to positional arguments --- python/Pubnub.py | 108 +++++++++---------------------------------------------- 1 file changed, 17 insertions(+), 91 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index d41f626..230915c 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -247,7 +247,9 @@ class PubnubBase(object): if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") - def sign(self, channel, message): + ''' + + def _sign(self, channel, message): ## Sign Message if self.secret_key: signature = hashlib.md5('/'.join([ @@ -260,6 +262,7 @@ class PubnubBase(object): else: signature = '0' return signature + ''' def _pam_sign( self, msg ): """Calculate a signature by secret key and message.""" @@ -367,7 +370,7 @@ class PubnubBase(object): return None - def publish( self, args ) : + def publish(channel, message, callback=None, error=None): """ #** #* Publish @@ -388,28 +391,9 @@ class PubnubBase(object): print(info) """ - ## Fail if bad input. - if not (args['channel'] and args['message']) : - return [ 0, 'Missing Channel or Message' ] - - ## Capture User Input - channel = str(args['channel']) - - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - - if 'error' in args : - error = args['error'] - else : - error = None message = self.encrypt(args['message']) - #signature = self.sign(channel, message) - ## Send Message return self._request({"urlcomponents": [ 'publish', @@ -422,7 +406,7 @@ class PubnubBase(object): ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), error=self._return_wrapped_callback(error)) - def presence( self, args ) : + def presence( self, channel, callback, error=None) : """ #** #* presence @@ -444,26 +428,10 @@ class PubnubBase(object): 'callback' : receive }) """ - - ## Fail if missing channel - if not 'channel' in args : - raise Exception('Missing Channel.') - return False - - ## Fail if missing callback - if not 'callback' in args : - raise Exception('Missing Callback.') - return False - - ## Capture User Input - channel = str(args['channel']) - callback = args['callback'] - subscribe_key = args.get('subscribe_key') or self.subscribe_key - - return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)}) + return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':self.subscribe_key, 'callback': self._return_wrapped_callback(callback)}) - def here_now( self, args ) : + def here_now( self, channel, callback, error=None) : """ #** #* Here Now @@ -501,7 +469,7 @@ class PubnubBase(object): ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), error=self._return_wrapped_callback(error)) - def history(self, args) : + def history(self, channel, count=100, reverse=False, start=None, end=None, callback=None, error=None) : """ #** #* History @@ -520,23 +488,13 @@ class PubnubBase(object): print(history) """ - ## Capture User Input - channel = str(args['channel']) - - callback = args['callback'] if 'callback' in args else None - error = args['error'] if 'error' in args else None params = dict() - params['count'] = str(args['count']) if 'count' in args else 100 - params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false' - params['start'] = str(args['start']) if 'start' in args else None - params['end'] = str(args['end']) if 'end' in args else None - - ## Fail if bad input. - if not channel : - raise Exception('Missing Channel') - return False + params['count'] = count + params['reverse'] = reverse + params['start'] = start + params['end'] = end ## Get History return self._request({ 'urlcomponents' : [ @@ -549,7 +507,7 @@ class PubnubBase(object): ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), error=self._return_wrapped_callback(error)) - def time(self, args = None) : + def time(self,callback=None) : """ #** #* Time @@ -564,9 +522,6 @@ class PubnubBase(object): print(timestamp) """ - ## Capture Callback - - callback = callback if args and 'callback' in args else None time = self._request({'urlcomponents' : [ 'time', @@ -698,7 +653,7 @@ class PubnubCoreAsync(PubnubBase): for i in l: func(i) - def subscribe( self, args=None, sync=False ) : + def subscribe( self, channel, callback, error=None, connect=None, disconnect=None, reconnect=None, sync=False ) : """ #** #* Subscribe @@ -730,27 +685,11 @@ class PubnubCoreAsync(PubnubBase): }) """ - if args is None: - _invoke(error, "Arguments Missing") - return - channel = args['channel'] if 'channel' in args else None - callback = args['callback'] if 'callback' in args else None - connect = args['connect'] if 'connect' in args else None - disconnect = args['disconnect'] if 'disconnect' in args else None - reconnect = args['reconnect'] if 'reconnect' in args else None - error = args['error'] if 'error' in args else None with self._tt_lock: self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken self.timetoken = 0 - if channel is None: - _invoke(error, "Channel Missing") - return - if callback is None: - _invoke(error, "Callback Missing") - return - if sync is True and self.susbcribe_sync is not None: self.susbcribe_sync(args) return @@ -781,16 +720,6 @@ class PubnubCoreAsync(PubnubBase): chobj = self.subscriptions[ch] _invoke(chobj['error'],err) - ''' - if callback is None: - _invoke(error, "Callback Missing") - return - - if channel is None: - _invoke(error, "Channel Missing") - return - ''' - def _get_channel(): for ch in self.subscriptions: chobj = self.subscriptions[ch] @@ -887,14 +816,11 @@ class PubnubCoreAsync(PubnubBase): self._connect() - def unsubscribe( self, args ): + def unsubscribe( self, channel ): - if 'channel' in self.subscriptions is False: + if channel in self.subscriptions is False: return False - channel = str(args['channel']) - - ## DISCONNECT with self._channel_list_lock: if channel in self.subscriptions: -- cgit v1.2.3 From 09cd0c015ae276aa849297a6a976065b2b3f247b Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 23 Apr 2014 14:03:13 +0530 Subject: modifying code for pep 8 compliance --- python/Pubnub.py | 521 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 272 insertions(+), 249 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 230915c..93f416b 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -12,11 +12,12 @@ from Crypto.Cipher import AES from Crypto.Hash import MD5 -from base64 import encodestring, decodestring +from base64 import encodestring, decodestring import hashlib import hmac -class PubnubCrypto2() : + +class PubnubCrypto2(): """ #** #* PubnubCrypto @@ -27,8 +28,8 @@ class PubnubCrypto2() : pc = PubnubCrypto """ - - def pad( self, msg, block_size=16 ): + + def pad(self, msg, block_size=16): """ #** #* pad @@ -40,9 +41,9 @@ class PubnubCrypto2() : #** """ padding = block_size - (len(msg) % block_size) - return msg + chr(padding)*padding - - def depad( self, msg ): + return msg + chr(padding) * padding + + def depad(self, msg): """ #** #* depad @@ -53,7 +54,7 @@ class PubnubCrypto2() : """ return msg[0:-ord(msg[-1])] - def getSecret( self, key ): + def getSecret(self, key): """ #** #* getSecret @@ -64,7 +65,7 @@ class PubnubCrypto2() : """ return hashlib.sha256(key).hexdigest() - def encrypt( self, key, msg ): + def encrypt(self, key, msg): """ #** #* encrypt @@ -74,11 +75,12 @@ class PubnubCrypto2() : #** """ secret = self.getSecret(key) - Initial16bytes='0123456789012345' - cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) + Initial16bytes = '0123456789012345' + cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) enc = encodestring(cipher.encrypt(self.pad(msg))) return enc - def decrypt( self, key, msg ): + + def decrypt(self, key, msg): """ #** #* decrypt @@ -88,12 +90,12 @@ class PubnubCrypto2() : #** """ secret = self.getSecret(key) - Initial16bytes='0123456789012345' - cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) + Initial16bytes = '0123456789012345' + cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) return self.depad((cipher.decrypt(decodestring(msg)))) -class PubnubCrypto3() : +class PubnubCrypto3(): """ #** #* PubnubCrypto @@ -104,8 +106,8 @@ class PubnubCrypto3() : pc = PubnubCrypto """ - - def pad( self, msg, block_size=16 ): + + def pad(self, msg, block_size=16): """ #** #* pad @@ -117,9 +119,9 @@ class PubnubCrypto3() : #** """ padding = block_size - (len(msg) % block_size) - return msg + (chr(padding)*padding).encode('utf-8') - - def depad( self, msg ): + return msg + (chr(padding) * padding).encode('utf-8') + + def depad(self, msg): """ #** #* depad @@ -130,7 +132,7 @@ class PubnubCrypto3() : """ return msg[0:-ord(msg[-1])] - def getSecret( self, key ): + def getSecret(self, key): """ #** #* getSecret @@ -141,7 +143,7 @@ class PubnubCrypto3() : """ return hashlib.sha256(key.encode("utf-8")).hexdigest() - def encrypt( self, key, msg ): + def encrypt(self, key, msg): """ #** #* encrypt @@ -151,10 +153,12 @@ class PubnubCrypto3() : #** """ secret = self.getSecret(key) - Initial16bytes='0123456789012345' - cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) - return encodestring(cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') - def decrypt( self, key, msg ): + Initial16bytes = '0123456789012345' + cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) + return encodestring( + cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') + + def decrypt(self, key, msg): """ #** #* decrypt @@ -164,40 +168,46 @@ class PubnubCrypto3() : #** """ secret = self.getSecret(key) - Initial16bytes='0123456789012345' - cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) - return (cipher.decrypt(decodestring(msg.encode('utf-8')))).decode('utf-8') + Initial16bytes = '0123456789012345' + cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) + return (cipher.decrypt( + decodestring(msg.encode('utf-8')))).decode('utf-8') -try: import json -except ImportError: import simplejson as json +try: + import json +except ImportError: + import simplejson as json import time import hashlib import uuid import sys -try: from urllib.parse import quote -except: from urllib2 import quote +try: + from urllib.parse import quote +except: + from urllib2 import quote -from base64 import urlsafe_b64encode +from base64 import urlsafe_b64encode from hashlib import sha256 import hmac + class PubnubBase(object): def __init__( self, publish_key, subscribe_key, - secret_key = False, - cipher_key = False, - auth_key = None, - ssl_on = False, - origin = 'pubsub.pubnub.com', - UUID = None - ) : + secret_key=False, + cipher_key=False, + auth_key=None, + ssl_on=False, + origin='pubsub.pubnub.com', + UUID=None + ): """ #** #* Pubnub @@ -209,41 +219,41 @@ class PubnubBase(object): #* @param string secret_key optional key to sign messages. #* @param boolean ssl required for 2048 bit encrypted messages. #* @param string origin PUBNUB Server Origin. - #* @param string pres_uuid optional identifier for presence (auto-generated if not supplied) + #* @param string pres_uuid optional identifier + #* for presence (auto-generated if not supplied) #** ## Initiat Class pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) """ - self.origin = origin - self.limit = 1800 - self.publish_key = publish_key + self.origin = origin + self.limit = 1800 + self.publish_key = publish_key self.subscribe_key = subscribe_key - self.secret_key = secret_key - self.cipher_key = cipher_key - self.ssl = ssl_on - self.auth_key = auth_key + self.secret_key = secret_key + self.cipher_key = cipher_key + self.ssl = ssl_on + self.auth_key = auth_key - - if self.ssl : + if self.ssl: self.origin = 'https://' + self.origin - else : - self.origin = 'http://' + self.origin - + else: + self.origin = 'http://' + self.origin + self.uuid = UUID or str(uuid.uuid4()) if type(sys.version_info) is tuple: - self.python_version = 2 - self.pc = PubnubCrypto2() + self.python_version = 2 + self.pc = PubnubCrypto2() else: if sys.version_info.major == 2: - self.python_version = 2 - self.pc = PubnubCrypto2() + self.python_version = 2 + self.pc = PubnubCrypto2() else: self.python_version = 3 - self.pc = PubnubCrypto3() - + self.pc = PubnubCrypto3() + if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") @@ -264,7 +274,7 @@ class PubnubBase(object): return signature ''' - def _pam_sign( self, msg ): + def _pam_sign(self, msg): """Calculate a signature by secret key and message.""" return urlsafe_b64encode(hmac.new( @@ -273,7 +283,7 @@ class PubnubBase(object): sha256 ).digest()) - def _pam_auth( self, query , apicode=0, callback=None): + def _pam_auth(self, query, apicode=0, callback=None): """Issue an authenticated request.""" if 'timestamp' not in query: @@ -300,57 +310,50 @@ class PubnubBase(object): query['signature'] = self._pam_sign(sign_input) - ''' - url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") + - self.subscribe_key + "?" + - params + "&signature=" + - quote(signature, safe="")) - ''' - return self._request({"urlcomponents": [ - 'v1', 'auth', "audit" if (apicode) else "grant" , + 'v1', 'auth', "audit" if (apicode) else "grant", 'sub-key', self.subscribe_key - ], 'urlparams' : query}, - self._return_wrapped_callback(callback)) + ], 'urlparams': query}, + self._return_wrapped_callback(callback)) - def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None): + def grant(self, channel, authkey=False, read=True, + write=True, ttl=5, callback=None): """Grant Access on a Channel.""" return self._pam_auth({ - "channel" : channel, - "auth" : authkey, - "r" : read and 1 or 0, - "w" : write and 1 or 0, - "ttl" : ttl + "channel": channel, + "auth": authkey, + "r": read and 1 or 0, + "w": write and 1 or 0, + "ttl": ttl }, callback=callback) - def revoke( self, channel, authkey=False, ttl=1, callback=None): + def revoke(self, channel, authkey=False, ttl=1, callback=None): """Revoke Access on a Channel.""" return self._pam_auth({ - "channel" : channel, - "auth" : authkey, - "r" : 0, - "w" : 0, - "ttl" : ttl + "channel": channel, + "auth": authkey, + "r": 0, + "w": 0, + "ttl": ttl }, callback=callback) def audit(self, channel=False, authkey=False, callback=None): return self._pam_auth({ - "channel" : channel, - "auth" : authkey - },1, callback=callback) - - + "channel": channel, + "auth": authkey + }, 1, callback=callback) def encrypt(self, message): if self.cipher_key: - message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n','')) - else : + message = json.dumps(self.pc.encrypt( + self.cipher_key, json.dumps(message)).replace('\n', '')) + else: message = json.dumps(message) - return message; + return message def decrypt(self, message): if self.cipher_key: @@ -361,15 +364,17 @@ class PubnubBase(object): def _return_wrapped_callback(self, callback=None): def _new_format_callback(response): if 'payload' in response: - if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) + if (callback is not None): + callback({'message': response['message'], + 'payload': response['payload']}) else: - if (callback != None):callback(response) - if (callback != None): + if (callback is not None): + callback(response) + if (callback is not None): return _new_format_callback else: return None - def publish(channel, message, callback=None, error=None): """ #** @@ -403,10 +408,11 @@ class PubnubBase(object): channel, '0', message - ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), - error=self._return_wrapped_callback(error)) - - def presence( self, channel, callback, error=None) : + ], 'urlparams': {'auth': self.auth_key}}, + callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) + + def presence(self, channel, callback, error=None): """ #** #* presence @@ -425,13 +431,15 @@ class PubnubBase(object): pubnub.presence({ 'channel' : 'hello_world', - 'callback' : receive + 'callback' : receive }) """ - return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':self.subscribe_key, 'callback': self._return_wrapped_callback(callback)}) - - - def here_now( self, channel, callback, error=None) : + return self.subscribe({ + 'channel': channel + '-pnpres', + 'subscribe_key': self.subscribe_key, + 'callback': self._return_wrapped_callback(callback)}) + + def here_now(self, channel, callback, error=None): """ #** #* Here Now @@ -452,33 +460,31 @@ class PubnubBase(object): """ channel = str(args['channel']) - - callback = args['callback'] if 'callback' in args else None - error = args['error'] if 'error' in args else None + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None ## Fail if bad input. - if not channel : + if not channel: raise Exception('Missing Channel') return False - + ## Get Presence Here Now return self._request({"urlcomponents": [ - 'v2','presence', + 'v2', 'presence', 'sub_key', self.subscribe_key, 'channel', channel - ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), - error=self._return_wrapped_callback(error)) + ], 'urlparams': {'auth': self.auth_key}}, + callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) - def history(self, channel, count=100, reverse=False, start=None, end=None, callback=None, error=None) : + def history(self, channel, count=100, reverse=False, + start=None, end=None, callback=None, error=None): """ #** #* History #* #* Load history from a channel. #* - #* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count' - #* @return mixed false on fail, array on success. - #* ## History Example history = pubnub.detailedHistory({ @@ -489,25 +495,26 @@ class PubnubBase(object): """ - params = dict() + params = dict() - params['count'] = count - params['reverse'] = reverse - params['start'] = start - params['end'] = end + params['count'] = count + params['reverse'] = reverse + params['start'] = start + params['end'] = end ## Get History - return self._request({ 'urlcomponents' : [ + return self._request({'urlcomponents': [ 'v2', 'history', 'sub-key', self.subscribe_key, 'channel', channel, - ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), - error=self._return_wrapped_callback(error)) + ], 'urlparams': {'auth': self.auth_key}}, + callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) - def time(self,callback=None) : + def time(self, callback=None): """ #** #* Time @@ -523,30 +530,30 @@ class PubnubBase(object): """ - time = self._request({'urlcomponents' : [ + time = self._request({'urlcomponents': [ 'time', '0' ]}, callback) - if time != None: + if time is not None: return time[0] - - def _encode( self, request ) : + def _encode(self, request): return [ - "".join([ ' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and - hex(ord(ch)).replace( '0x', '%' ).upper() or - ch for ch in list(bit) - ]) for bit in request] - - def getUrl(self,request): + "".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and + hex(ord(ch)).replace('0x', '%').upper() or + ch for ch in list(bit) + ]) for bit in request] + + def getUrl(self, request): ## Build URL url = self.origin + '/' + "/".join([ - "".join([ ' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and - hex(ord(ch)).replace( '0x', '%' ).upper() or - ch for ch in list(bit) - ]) for bit in request["urlcomponents"]]) + "".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and + hex(ord(ch)).replace('0x', '%').upper() or + ch for ch in list(bit) + ]) for bit in request["urlcomponents"]]) if ("urlparams" in request): - url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None]) + url = url + '?' + "&".join([x + "=" + str(y) for x, y in request[ + "urlparams"].items() if y is not None]) return url @@ -558,32 +565,38 @@ except ImportError: sha256 = digestmod.new import hmac + class EmptyLock(): def __enter__(self): pass - def __exit__(self,a,b,c): + + def __exit__(self, a, b, c): pass empty_lock = EmptyLock() + class PubnubCoreAsync(PubnubBase): - def start(self): pass - def stop(self): pass + def start(self): + pass + + def stop(self): + pass def __init__( self, publish_key, subscribe_key, - secret_key = False, - cipher_key = False, - auth_key = None, - ssl_on = False, - origin = 'pubsub.pubnub.com', - uuid = None, + secret_key=False, + cipher_key=False, + auth_key=None, + ssl_on=False, + origin='pubsub.pubnub.com', + uuid=None, _tt_lock=empty_lock, _channel_list_lock=empty_lock - ) : + ): """ #** #* Pubnub @@ -610,18 +623,18 @@ class PubnubCoreAsync(PubnubBase): ssl_on=ssl_on, origin=origin, UUID=uuid - ) - - self.subscriptions = {} - self.timetoken = 0 - self.last_timetoken = 0 - self.version = '3.3.4' - self.accept_encoding = 'gzip' - self.SUB_RECEIVER = None - self._connect = None - self._tt_lock = _tt_lock - self._channel_list_lock = _channel_list_lock - self._connect = lambda: None + ) + + self.subscriptions = {} + self.timetoken = 0 + self.last_timetoken = 0 + self.version = '3.3.4' + self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + self._tt_lock = _tt_lock + self._channel_list_lock = _channel_list_lock + self._connect = lambda: None def get_channel_list(self, channels): channel = '' @@ -653,7 +666,8 @@ class PubnubCoreAsync(PubnubBase): for i in l: func(i) - def subscribe( self, channel, callback, error=None, connect=None, disconnect=None, reconnect=None, sync=False ) : + def subscribe(self, channel, callback, error=None, + connect=None, disconnect=None, reconnect=None, sync=False): """ #** #* Subscribe @@ -687,14 +701,15 @@ class PubnubCoreAsync(PubnubBase): """ with self._tt_lock: - self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken + self.last_timetoken = self.timetoken if self.timetoken != 0 \ + else self.last_timetoken self.timetoken = 0 if sync is True and self.susbcribe_sync is not None: self.susbcribe_sync(args) return - def _invoke(func,msg=None): + def _invoke(func, msg=None): if func is not None: if msg is not None: func(msg) @@ -708,17 +723,17 @@ class PubnubCoreAsync(PubnubBase): chobj = self.subscriptions[ch] if chobj['connected'] is False: chobj['connected'] = True - _invoke(chobj['connect'],chobj['name']) + _invoke(chobj['connect'], chobj['name']) def _invoke_error(channel_list=None, err=None): if channel_list is None: for ch in self.subscriptions: chobj = self.subscriptions[ch] - _invoke(chobj['error'],err) + _invoke(chobj['error'], err) else: for ch in channel_list: chobj = self.subscriptions[ch] - _invoke(chobj['error'],err) + _invoke(chobj['error'], err) def _get_channel(): for ch in self.subscriptions: @@ -726,53 +741,58 @@ class PubnubCoreAsync(PubnubBase): if chobj['subscribed'] is True: return chobj - ## New Channel? - if not channel in self.subscriptions or self.subscriptions[channel]['subscribed'] is False: - with self._channel_list_lock: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect, - 'error' : error - } - + if not channel in self.subscriptions or \ + self.subscriptions[channel]['subscribed'] is False: + with self._channel_list_lock: + self.subscriptions[channel] = { + 'name': channel, + 'first': False, + 'connected': False, + 'subscribed': True, + 'callback': callback, + 'connect': connect, + 'disconnect': disconnect, + 'reconnect': reconnect, + 'error': error + } ## return if already connected to channel - if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: - _invoke(error, "Already Connected") - return - - + if channel in self.subscriptions and \ + 'connected' in self.subscriptions[channel] and \ + self.subscriptions[channel]['connected'] is True: + _invoke(error, "Already Connected") + return - ## SUBSCRIPTION RECURSION + ## SUBSCRIPTION RECURSION def _connect(): - + self._reset_offline() def sub_callback(response): ## ERROR ? - if not response or ('message' in response and response['message'] == 'Forbidden'): - _invoke_error(response['payload']['channels'], response['message']) - _connect() - return + if not response or \ + ('message' in response and + response['message'] == 'Forbidden'): + _invoke_error(response['payload'][ + 'channels'], response['message']) + _connect() + return _invoke_connect() with self._tt_lock: - self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] + self.timetoken = \ + self.last_timetoken if self.timetoken == 0 and \ + self.last_timetoken != 0 else response[1] if len(response) > 2: channel_list = response[2].split(',') response_list = response[0] for ch in enumerate(channel_list): if ch[1] in self.subscriptions: chobj = self.subscriptions[ch[1]] - _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + _invoke(chobj['callback'], + self.decrypt(response_list[ch[0]])) else: response_list = response[0] chobj = _get_channel() @@ -782,23 +802,25 @@ class PubnubCoreAsync(PubnubBase): _connect() - channel_list = self.get_channel_list(self.subscriptions) if len(channel_list) <= 0: return ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: - self.SUB_RECEIVER = self._request( { "urlcomponents" : [ + self.SUB_RECEIVER = self._request({"urlcomponents": [ 'subscribe', self.subscribe_key, channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True ) + ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key}}, + sub_callback, + sub_callback, + single=True) except Exception as e: print(e) - self.timeout( 1, _connect) + self.timeout(1, _connect) return self._connect = _connect @@ -815,8 +837,7 @@ class PubnubCoreAsync(PubnubBase): self._reset_offline() self._connect() - - def unsubscribe( self, channel ): + def unsubscribe(self, channel): if channel in self.subscriptions is False: return False @@ -824,10 +845,10 @@ class PubnubCoreAsync(PubnubBase): ## DISCONNECT with self._channel_list_lock: if channel in self.subscriptions: - self.subscriptions[channel]['connected'] = 0 - self.subscriptions[channel]['subscribed'] = False - self.subscriptions[channel]['timetoken'] = 0 - self.subscriptions[channel]['first'] = False + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False self.CONNECT() @@ -843,13 +864,12 @@ import threading from threading import current_thread latest_sub_callback_lock = threading.RLock() -latest_sub_callback = {'id' : None, 'callback' : None} - - +latest_sub_callback = {'id': None, 'callback': None} class HTTPClient: - def __init__(self, url, urllib_func=None, callback=None, error=None, id=None): + def __init__(self, url, urllib_func=None, + callback=None, error=None, id=None): self.url = url self.id = id self.callback = callback @@ -862,7 +882,6 @@ class HTTPClient: self.callback = None self.error = None - def run(self): def _invoke(func, data): @@ -896,65 +915,68 @@ class HTTPClient: try: data = json.loads(data) except: - _invoke(latest_sub_callback['error'], {'error' : 'json decoding error'}) + _invoke(latest_sub_callback['error'], + {'error': 'json decoding error'}) return if code != 200: - _invoke(latest_sub_callback['error'],data) + _invoke(latest_sub_callback['error'], data) else: - _invoke(latest_sub_callback['callback'],data) + _invoke(latest_sub_callback['callback'], data) else: try: data = json.loads(data) except: - _invoke(self.error, {'error' : 'json decoding error'}) + _invoke(self.error, {'error': 'json decoding error'}) return if code != 200: - _invoke(self.error,data) + _invoke(self.error, data) else: - _invoke(self.callback,data) + _invoke(self.callback, data) def _urllib_request_2(url, timeout=320): try: - resp = urllib2.urlopen(url,timeout=timeout) + resp = urllib2.urlopen(url, timeout=timeout) except urllib2.HTTPError as http_error: resp = http_error - return (resp.read(),resp.code) + return (resp.read(), resp.code) + def _urllib_request_3(url, timeout=320): #print(url) try: - resp = urllib.request.urlopen(url,timeout=timeout) + resp = urllib.request.urlopen(url, timeout=timeout) except urllib.request.HTTPError as http_error: resp = http_error - r = resp.read().decode("utf-8") + r = resp.read().decode("utf-8") #print(r) - return (r,resp.code) + return (r, resp.code) _urllib_request = None + class Pubnub(PubnubCoreAsync): def __init__( self, publish_key, subscribe_key, - secret_key = False, - cipher_key = False, - auth_key = None, - ssl_on = False, - origin = 'pubsub.pubnub.com', - pres_uuid = None - ) : + secret_key=False, + cipher_key=False, + auth_key=None, + ssl_on=False, + origin='pubsub.pubnub.com', + pres_uuid=None + ): super(Pubnub, self).__init__( - publish_key = publish_key, - subscribe_key = subscribe_key, - secret_key = secret_key, - cipher_key = cipher_key, - auth_key = auth_key, - ssl_on = ssl_on, - origin = origin, - uuid = pres_uuid, + publish_key=publish_key, + subscribe_key=subscribe_key, + secret_key=secret_key, + cipher_key=cipher_key, + auth_key=auth_key, + ssl_on=ssl_on, + origin=origin, + uuid=pres_uuid, _tt_lock=threading.RLock(), _channel_list_lock=threading.RLock() ) @@ -971,29 +993,30 @@ class Pubnub(PubnubCoreAsync): thread = threading.Thread(target=cb) thread.start() - - def _request_async( self, request, callback=None, error=None, single=False ) : + def _request_async(self, request, callback=None, error=None, single=False): global _urllib_request ## Build URL url = self.getUrl(request) if single is True: id = time.time() - client = HTTPClient(url=url, urllib_func=_urllib_request, callback=None, error=None, id=id) + client = HTTPClient(url=url, urllib_func=_urllib_request, + callback=None, error=None, id=id) with latest_sub_callback_lock: latest_sub_callback['id'] = id latest_sub_callback['callback'] = callback latest_sub_callback['error'] = error else: - client = HTTPClient(url=url, urllib_func=_urllib_request, callback=callback, error=error) + client = HTTPClient(url=url, urllib_func=_urllib_request, + callback=callback, error=error) thread = threading.Thread(target=client.run) thread.start() + def abort(): - client.cancel(); + client.cancel() return abort - - def _request_sync( self, request) : + def _request_sync(self, request): global _urllib_request ## Build URL url = self.getUrl(request) @@ -1002,14 +1025,14 @@ class Pubnub(PubnubCoreAsync): try: resp_json = json.loads(response[0]) except: - return [0,"JSON Error"] + return [0, "JSON Error"] if response[1] != 200 and 'status' in resp_json: - return {'message' : resp_json['message'], 'payload' : resp_json['payload']} + return {'message': resp_json['message'], + 'payload': resp_json['payload']} return resp_json - def _request(self, request, callback=None, error=None, single=False): if callback is None: return self._request_sync(request) @@ -1027,7 +1050,7 @@ class Pubnub(PubnubCoreAsync): resp_json = json.loads(response.read().decode("utf-8")) except Exception as e: return None - + return resp_json def _request3_async( self, request, callback, single=False ) : -- cgit v1.2.3 From f7b89bfafae34fa22509c1d1c59d1284ec62c5df Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 23 Apr 2014 21:35:06 +0530 Subject: exception handling changes --- python/Pubnub.py | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 93f416b..3bf02a3 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -186,7 +186,7 @@ import sys try: from urllib.parse import quote -except: +except ImportError: from urllib2 import quote from base64 import urlsafe_b64encode @@ -397,7 +397,7 @@ class PubnubBase(object): """ - message = self.encrypt(args['message']) + message = self.encrypt(message) ## Send Message return self._request({"urlcomponents": [ @@ -458,15 +458,6 @@ class PubnubBase(object): print(here_now['uuids']) """ - channel = str(args['channel']) - - callback = args['callback'] if 'callback' in args else None - error = args['error'] if 'error' in args else None - - ## Fail if bad input. - if not channel: - raise Exception('Missing Channel') - return False ## Get Presence Here Now return self._request({"urlcomponents": [ @@ -807,21 +798,23 @@ class PubnubCoreAsync(PubnubBase): return ## CONNECT TO PUBNUB SUBSCRIBE SERVERS - try: - self.SUB_RECEIVER = self._request({"urlcomponents": [ - 'subscribe', - self.subscribe_key, - channel_list, - '0', - str(self.timetoken) - ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key}}, - sub_callback, - sub_callback, - single=True) + #try: + self.SUB_RECEIVER = self._request({"urlcomponents": [ + 'subscribe', + self.subscribe_key, + channel_list, + '0', + str(self.timetoken) + ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key}}, + sub_callback, + sub_callback, + single=True) + ''' except Exception as e: print(e) self.timeout(1, _connect) return + ''' self._connect = _connect -- cgit v1.2.3 From 493e29a108255eb3ae3166dc920f40e2f4e5c4c4 Mon Sep 17 00:00:00 2001 From: Devendra Date: Thu, 24 Apr 2014 00:16:57 +0530 Subject: adding single file for all platforms --- python/Pubnub.py | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py index 3bf02a3..a52c46e 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -760,6 +760,19 @@ class PubnubCoreAsync(PubnubBase): self._reset_offline() + def error_callback(response): + ## ERROR ? + if not response or \ + ('message' in response and + response['message'] == 'Forbidden'): + _invoke_error(response['payload'][ + 'channels'], response['message']) + _connect() + return + if 'message' in response: + _invoke_error(err=response['message']) + + def sub_callback(response): ## ERROR ? if not response or \ @@ -807,7 +820,7 @@ class PubnubCoreAsync(PubnubBase): str(self.timetoken) ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key}}, sub_callback, - sub_callback, + error_callback, single=True) ''' except Exception as e: @@ -847,7 +860,7 @@ class PubnubCoreAsync(PubnubBase): try: import urllib.request -except: +except ImportError: import urllib2 import threading @@ -905,20 +918,24 @@ class HTTPClient: else: if latest_sub_callback['callback'] is not None: latest_sub_callback['id'] = 0 + print data try: data = json.loads(data) - except: + except ValueError as e: _invoke(latest_sub_callback['error'], {'error': 'json decoding error'}) return + print code if code != 200: + print 'ERROR' _invoke(latest_sub_callback['error'], data) else: + print 'CALLBACK' _invoke(latest_sub_callback['callback'], data) else: try: data = json.loads(data) - except: + except ValueError: _invoke(self.error, {'error': 'json decoding error'}) return @@ -933,6 +950,12 @@ def _urllib_request_2(url, timeout=320): resp = urllib2.urlopen(url, timeout=timeout) except urllib2.HTTPError as http_error: resp = http_error + except urllib2.URLError as error: + #print error.reason + msg = { "message" : str(error.reason)} + #print str(msg) + return (json.dumps(msg),0) + return (resp.read(), resp.code) @@ -940,7 +963,7 @@ def _urllib_request_3(url, timeout=320): #print(url) try: resp = urllib.request.urlopen(url, timeout=timeout) - except urllib.request.HTTPError as http_error: + except (urllib.request.HTTPError, urllib.request.URLError) as http_error: resp = http_error r = resp.read().decode("utf-8") #print(r) @@ -1017,7 +1040,7 @@ class Pubnub(PubnubCoreAsync): response = _urllib_request(url, timeout=320) try: resp_json = json.loads(response[0]) - except: + except ValueError: return [0, "JSON Error"] if response[1] != 200 and 'status' in resp_json: -- cgit v1.2.3 From c5d2fb446378e78e9e164dbea969edd57314dc4b Mon Sep 17 00:00:00 2001 From: Devendra Date: Thu, 24 Apr 2014 00:59:50 +0530 Subject: removing files --- python/Pubnub.py | 1080 ------------------------------------------------------ 1 file changed, 1080 deletions(-) delete mode 100644 python/Pubnub.py (limited to 'python/Pubnub.py') diff --git a/python/Pubnub.py b/python/Pubnub.py deleted file mode 100644 index a52c46e..0000000 --- a/python/Pubnub.py +++ /dev/null @@ -1,1080 +0,0 @@ -## www.pubnub.com - PubNub Real-time push service in the cloud. -# coding=utf8 - -## PubNub Real-time Push APIs and Notifications Framework -## Copyright (c) 2010 Stephen Blum -## http://www.pubnub.com/ - -## ----------------------------------- -## PubNub 3.3.5 Real-time Push Cloud API -## ----------------------------------- - - -from Crypto.Cipher import AES -from Crypto.Hash import MD5 -from base64 import encodestring, decodestring -import hashlib -import hmac - - -class PubnubCrypto2(): - """ - #** - #* PubnubCrypto - #* - #** - - ## Initiate Class - pc = PubnubCrypto - - """ - - def pad(self, msg, block_size=16): - """ - #** - #* pad - #* - #* pad the text to be encrypted - #* appends a padding character to the end of the String - #* until the string has block_size length - #* @return msg with padding. - #** - """ - padding = block_size - (len(msg) % block_size) - return msg + chr(padding) * padding - - def depad(self, msg): - """ - #** - #* depad - #* - #* depad the decryptet message" - #* @return msg without padding. - #** - """ - return msg[0:-ord(msg[-1])] - - def getSecret(self, key): - """ - #** - #* getSecret - #* - #* hases the key to MD5 - #* @return key in MD5 format - #** - """ - return hashlib.sha256(key).hexdigest() - - def encrypt(self, key, msg): - """ - #** - #* encrypt - #* - #* encrypts the message - #* @return message in encrypted format - #** - """ - secret = self.getSecret(key) - Initial16bytes = '0123456789012345' - cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) - enc = encodestring(cipher.encrypt(self.pad(msg))) - return enc - - def decrypt(self, key, msg): - """ - #** - #* decrypt - #* - #* decrypts the message - #* @return message in decryped format - #** - """ - secret = self.getSecret(key) - Initial16bytes = '0123456789012345' - cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) - return self.depad((cipher.decrypt(decodestring(msg)))) - - -class PubnubCrypto3(): - """ - #** - #* PubnubCrypto - #* - #** - - ## Initiate Class - pc = PubnubCrypto - - """ - - def pad(self, msg, block_size=16): - """ - #** - #* pad - #* - #* pad the text to be encrypted - #* appends a padding character to the end of the String - #* until the string has block_size length - #* @return msg with padding. - #** - """ - padding = block_size - (len(msg) % block_size) - return msg + (chr(padding) * padding).encode('utf-8') - - def depad(self, msg): - """ - #** - #* depad - #* - #* depad the decryptet message" - #* @return msg without padding. - #** - """ - return msg[0:-ord(msg[-1])] - - def getSecret(self, key): - """ - #** - #* getSecret - #* - #* hases the key to MD5 - #* @return key in MD5 format - #** - """ - return hashlib.sha256(key.encode("utf-8")).hexdigest() - - def encrypt(self, key, msg): - """ - #** - #* encrypt - #* - #* encrypts the message - #* @return message in encrypted format - #** - """ - secret = self.getSecret(key) - Initial16bytes = '0123456789012345' - cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) - return encodestring( - cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') - - def decrypt(self, key, msg): - """ - #** - #* decrypt - #* - #* decrypts the message - #* @return message in decryped format - #** - """ - secret = self.getSecret(key) - Initial16bytes = '0123456789012345' - cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) - return (cipher.decrypt( - decodestring(msg.encode('utf-8')))).decode('utf-8') - - -try: - import json -except ImportError: - import simplejson as json - -import time -import hashlib -import uuid -import sys - -try: - from urllib.parse import quote -except ImportError: - from urllib2 import quote - -from base64 import urlsafe_b64encode -from hashlib import sha256 - - -import hmac - - -class PubnubBase(object): - def __init__( - self, - publish_key, - subscribe_key, - secret_key=False, - cipher_key=False, - auth_key=None, - ssl_on=False, - origin='pubsub.pubnub.com', - UUID=None - ): - """ - #** - #* Pubnub - #* - #* Init the Pubnub Client API - #* - #* @param string publish_key required key to send messages. - #* @param string subscribe_key required key to receive messages. - #* @param string secret_key optional key to sign messages. - #* @param boolean ssl required for 2048 bit encrypted messages. - #* @param string origin PUBNUB Server Origin. - #* @param string pres_uuid optional identifier - #* for presence (auto-generated if not supplied) - #** - - ## Initiat Class - pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) - - """ - self.origin = origin - self.limit = 1800 - self.publish_key = publish_key - self.subscribe_key = subscribe_key - self.secret_key = secret_key - self.cipher_key = cipher_key - self.ssl = ssl_on - self.auth_key = auth_key - - if self.ssl: - self.origin = 'https://' + self.origin - else: - self.origin = 'http://' + self.origin - - self.uuid = UUID or str(uuid.uuid4()) - - if type(sys.version_info) is tuple: - self.python_version = 2 - self.pc = PubnubCrypto2() - else: - if sys.version_info.major == 2: - self.python_version = 2 - self.pc = PubnubCrypto2() - else: - self.python_version = 3 - self.pc = PubnubCrypto3() - - if not isinstance(self.uuid, str): - raise AttributeError("pres_uuid must be a string") - - ''' - - def _sign(self, channel, message): - ## Sign Message - if self.secret_key: - signature = hashlib.md5('/'.join([ - self.publish_key, - self.subscribe_key, - self.secret_key, - channel, - message - ])).hexdigest() - else: - signature = '0' - return signature - ''' - - def _pam_sign(self, msg): - """Calculate a signature by secret key and message.""" - - return urlsafe_b64encode(hmac.new( - self.secret_key.encode("utf-8"), - msg.encode("utf-8"), - sha256 - ).digest()) - - def _pam_auth(self, query, apicode=0, callback=None): - """Issue an authenticated request.""" - - if 'timestamp' not in query: - query['timestamp'] = int(time.time()) - - ## Global Grant? - if 'auth' in query and not query['auth']: - del query['auth'] - - if 'channel' in query and not query['channel']: - del query['channel'] - - params = "&".join([ - x + "=" + quote( - str(query[x]), safe="" - ) for x in sorted(query) - ]) - sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format( - subkey=self.subscribe_key, - pubkey=self.publish_key, - apitype="audit" if (apicode) else "grant", - params=params - ) - - query['signature'] = self._pam_sign(sign_input) - - return self._request({"urlcomponents": [ - 'v1', 'auth', "audit" if (apicode) else "grant", - 'sub-key', - self.subscribe_key - ], 'urlparams': query}, - self._return_wrapped_callback(callback)) - - def grant(self, channel, authkey=False, read=True, - write=True, ttl=5, callback=None): - """Grant Access on a Channel.""" - - return self._pam_auth({ - "channel": channel, - "auth": authkey, - "r": read and 1 or 0, - "w": write and 1 or 0, - "ttl": ttl - }, callback=callback) - - def revoke(self, channel, authkey=False, ttl=1, callback=None): - """Revoke Access on a Channel.""" - - return self._pam_auth({ - "channel": channel, - "auth": authkey, - "r": 0, - "w": 0, - "ttl": ttl - }, callback=callback) - - def audit(self, channel=False, authkey=False, callback=None): - return self._pam_auth({ - "channel": channel, - "auth": authkey - }, 1, callback=callback) - - def encrypt(self, message): - if self.cipher_key: - message = json.dumps(self.pc.encrypt( - self.cipher_key, json.dumps(message)).replace('\n', '')) - else: - message = json.dumps(message) - - return message - - def decrypt(self, message): - if self.cipher_key: - message = self.pc.decrypt(self.cipher_key, message) - - return message - - def _return_wrapped_callback(self, callback=None): - def _new_format_callback(response): - if 'payload' in response: - if (callback is not None): - callback({'message': response['message'], - 'payload': response['payload']}) - else: - if (callback is not None): - callback(response) - if (callback is not None): - return _new_format_callback - else: - return None - - def publish(channel, message, callback=None, error=None): - """ - #** - #* Publish - #* - #* Send a message to a channel. - #* - #* @param array args with channel and message. - #* @return array success information. - #** - - ## Publish Example - info = pubnub.publish({ - 'channel' : 'hello_world', - 'message' : { - 'some_text' : 'Hello my World' - } - }) - print(info) - - """ - - message = self.encrypt(message) - - ## Send Message - return self._request({"urlcomponents": [ - 'publish', - self.publish_key, - self.subscribe_key, - '0', - channel, - '0', - message - ], 'urlparams': {'auth': self.auth_key}}, - callback=self._return_wrapped_callback(callback), - error=self._return_wrapped_callback(error)) - - def presence(self, channel, callback, error=None): - """ - #** - #* presence - #* - #* This is BLOCKING. - #* Listen for presence events on a channel. - #* - #* @param array args with channel and callback. - #* @return false on fail, array on success. - #** - - ## Presence Example - def pres_event(message) : - print(message) - return True - - pubnub.presence({ - 'channel' : 'hello_world', - 'callback' : receive - }) - """ - return self.subscribe({ - 'channel': channel + '-pnpres', - 'subscribe_key': self.subscribe_key, - 'callback': self._return_wrapped_callback(callback)}) - - def here_now(self, channel, callback, error=None): - """ - #** - #* Here Now - #* - #* Load current occupancy from a channel. - #* - #* @param array args with 'channel'. - #* @return mixed false on fail, array on success. - #* - - ## Presence Example - here_now = pubnub.here_now({ - 'channel' : 'hello_world', - }) - print(here_now['occupancy']) - print(here_now['uuids']) - - """ - - ## Get Presence Here Now - return self._request({"urlcomponents": [ - 'v2', 'presence', - 'sub_key', self.subscribe_key, - 'channel', channel - ], 'urlparams': {'auth': self.auth_key}}, - callback=self._return_wrapped_callback(callback), - error=self._return_wrapped_callback(error)) - - def history(self, channel, count=100, reverse=False, - start=None, end=None, callback=None, error=None): - """ - #** - #* History - #* - #* Load history from a channel. - #* - - ## History Example - history = pubnub.detailedHistory({ - 'channel' : 'hello_world', - 'count' : 5 - }) - print(history) - - """ - - params = dict() - - params['count'] = count - params['reverse'] = reverse - params['start'] = start - params['end'] = end - - ## Get History - return self._request({'urlcomponents': [ - 'v2', - 'history', - 'sub-key', - self.subscribe_key, - 'channel', - channel, - ], 'urlparams': {'auth': self.auth_key}}, - callback=self._return_wrapped_callback(callback), - error=self._return_wrapped_callback(error)) - - def time(self, callback=None): - """ - #** - #* Time - #* - #* Timestamp from PubNub Cloud. - #* - #* @return int timestamp. - #* - - ## PubNub Server Time Example - timestamp = pubnub.time() - print(timestamp) - - """ - - time = self._request({'urlcomponents': [ - 'time', - '0' - ]}, callback) - if time is not None: - return time[0] - - def _encode(self, request): - return [ - "".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and - hex(ord(ch)).replace('0x', '%').upper() or - ch for ch in list(bit) - ]) for bit in request] - - def getUrl(self, request): - ## Build URL - url = self.origin + '/' + "/".join([ - "".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and - hex(ord(ch)).replace('0x', '%').upper() or - ch for ch in list(bit) - ]) for bit in request["urlcomponents"]]) - if ("urlparams" in request): - url = url + '?' + "&".join([x + "=" + str(y) for x, y in request[ - "urlparams"].items() if y is not None]) - return url - - -try: - from hashlib import sha256 - digestmod = sha256 -except ImportError: - import Crypto.Hash.SHA256 as digestmod - sha256 = digestmod.new -import hmac - - -class EmptyLock(): - def __enter__(self): - pass - - def __exit__(self, a, b, c): - pass - -empty_lock = EmptyLock() - - -class PubnubCoreAsync(PubnubBase): - - def start(self): - pass - - def stop(self): - pass - - def __init__( - self, - publish_key, - subscribe_key, - secret_key=False, - cipher_key=False, - auth_key=None, - ssl_on=False, - origin='pubsub.pubnub.com', - uuid=None, - _tt_lock=empty_lock, - _channel_list_lock=empty_lock - ): - """ - #** - #* Pubnub - #* - #* Init the Pubnub Client API - #* - #* @param string publish_key required key to send messages. - #* @param string subscribe_key required key to receive messages. - #* @param string secret_key required key to sign messages. - #* @param boolean ssl required for 2048 bit encrypted messages. - #* @param string origin PUBNUB Server Origin. - #** - - ## Initiat Class - pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False ) - - """ - super(PubnubCoreAsync, self).__init__( - publish_key=publish_key, - subscribe_key=subscribe_key, - secret_key=secret_key, - cipher_key=cipher_key, - auth_key=auth_key, - ssl_on=ssl_on, - origin=origin, - UUID=uuid - ) - - self.subscriptions = {} - self.timetoken = 0 - self.last_timetoken = 0 - self.version = '3.3.4' - self.accept_encoding = 'gzip' - self.SUB_RECEIVER = None - self._connect = None - self._tt_lock = _tt_lock - self._channel_list_lock = _channel_list_lock - self._connect = lambda: None - - def get_channel_list(self, channels): - channel = '' - first = True - with self._channel_list_lock: - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch - return channel - - def get_channel_array(self): - channels = self.subscriptions - channel = [] - with self._channel_list_lock: - for ch in channels: - if not channels[ch]['subscribed']: - continue - channel.append(ch) - return channel - - def each(l, func): - if func is None: - return - for i in l: - func(i) - - def subscribe(self, channel, callback, error=None, - connect=None, disconnect=None, reconnect=None, sync=False): - """ - #** - #* Subscribe - #* - #* This is NON-BLOCKING. - #* Listen for a message on a channel. - #* - #* @param array args with channel and message. - #* @return false on fail, array on success. - #** - - ## Subscribe Example - def receive(message) : - print(message) - return True - - ## On Connect Callback - def connected() : - pubnub.publish({ - 'channel' : 'hello_world', - 'message' : { 'some_var' : 'text' } - }) - - ## Subscribe - pubnub.subscribe({ - 'channel' : 'hello_world', - 'connect' : connected, - 'callback' : receive - }) - - """ - - with self._tt_lock: - self.last_timetoken = self.timetoken if self.timetoken != 0 \ - else self.last_timetoken - self.timetoken = 0 - - if sync is True and self.susbcribe_sync is not None: - self.susbcribe_sync(args) - return - - def _invoke(func, msg=None): - if func is not None: - if msg is not None: - func(msg) - else: - func() - - def _invoke_connect(): - if self._channel_list_lock: - with self._channel_list_lock: - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - if chobj['connected'] is False: - chobj['connected'] = True - _invoke(chobj['connect'], chobj['name']) - - def _invoke_error(channel_list=None, err=None): - if channel_list is None: - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - _invoke(chobj['error'], err) - else: - for ch in channel_list: - chobj = self.subscriptions[ch] - _invoke(chobj['error'], err) - - def _get_channel(): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - if chobj['subscribed'] is True: - return chobj - - ## New Channel? - if not channel in self.subscriptions or \ - self.subscriptions[channel]['subscribed'] is False: - with self._channel_list_lock: - self.subscriptions[channel] = { - 'name': channel, - 'first': False, - 'connected': False, - 'subscribed': True, - 'callback': callback, - 'connect': connect, - 'disconnect': disconnect, - 'reconnect': reconnect, - 'error': error - } - - ## return if already connected to channel - if channel in self.subscriptions and \ - 'connected' in self.subscriptions[channel] and \ - self.subscriptions[channel]['connected'] is True: - _invoke(error, "Already Connected") - return - - ## SUBSCRIPTION RECURSION - def _connect(): - - self._reset_offline() - - def error_callback(response): - ## ERROR ? - if not response or \ - ('message' in response and - response['message'] == 'Forbidden'): - _invoke_error(response['payload'][ - 'channels'], response['message']) - _connect() - return - if 'message' in response: - _invoke_error(err=response['message']) - - - def sub_callback(response): - ## ERROR ? - if not response or \ - ('message' in response and - response['message'] == 'Forbidden'): - _invoke_error(response['payload'][ - 'channels'], response['message']) - _connect() - return - - _invoke_connect() - - with self._tt_lock: - self.timetoken = \ - self.last_timetoken if self.timetoken == 0 and \ - self.last_timetoken != 0 else response[1] - if len(response) > 2: - channel_list = response[2].split(',') - response_list = response[0] - for ch in enumerate(channel_list): - if ch[1] in self.subscriptions: - chobj = self.subscriptions[ch[1]] - _invoke(chobj['callback'], - self.decrypt(response_list[ch[0]])) - else: - response_list = response[0] - chobj = _get_channel() - for r in response_list: - if chobj: - _invoke(chobj['callback'], self.decrypt(r)) - - _connect() - - channel_list = self.get_channel_list(self.subscriptions) - if len(channel_list) <= 0: - return - - ## CONNECT TO PUBNUB SUBSCRIBE SERVERS - #try: - self.SUB_RECEIVER = self._request({"urlcomponents": [ - 'subscribe', - self.subscribe_key, - channel_list, - '0', - str(self.timetoken) - ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key}}, - sub_callback, - error_callback, - single=True) - ''' - except Exception as e: - print(e) - self.timeout(1, _connect) - return - ''' - - self._connect = _connect - - ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) - _connect() - - def _reset_offline(self): - if self.SUB_RECEIVER is not None: - self.SUB_RECEIVER() - self.SUB_RECEIVER = None - - def CONNECT(self): - self._reset_offline() - self._connect() - - def unsubscribe(self, channel): - - if channel in self.subscriptions is False: - return False - - ## DISCONNECT - with self._channel_list_lock: - if channel in self.subscriptions: - self.subscriptions[channel]['connected'] = 0 - self.subscriptions[channel]['subscribed'] = False - self.subscriptions[channel]['timetoken'] = 0 - self.subscriptions[channel]['first'] = False - self.CONNECT() - - -try: - import urllib.request -except ImportError: - import urllib2 - -import threading -import json -import time -import threading -from threading import current_thread - -latest_sub_callback_lock = threading.RLock() -latest_sub_callback = {'id': None, 'callback': None} - - -class HTTPClient: - def __init__(self, url, urllib_func=None, - callback=None, error=None, id=None): - self.url = url - self.id = id - self.callback = callback - self.error = error - self.stop = False - self._urllib_func = urllib_func - - def cancel(self): - self.stop = True - self.callback = None - self.error = None - - def run(self): - - def _invoke(func, data): - if func is not None: - func(data) - - if self._urllib_func is None: - return - - ''' - try: - resp = urllib2.urlopen(self.url, timeout=320) - except urllib2.HTTPError as http_error: - resp = http_error - ''' - resp = self._urllib_func(self.url, timeout=320) - data = resp[0] - code = resp[1] - - if self.stop is True: - return - if self.callback is None: - global latest_sub_callback - global latest_sub_callback_lock - with latest_sub_callback_lock: - if latest_sub_callback['id'] != self.id: - return - else: - if latest_sub_callback['callback'] is not None: - latest_sub_callback['id'] = 0 - print data - try: - data = json.loads(data) - except ValueError as e: - _invoke(latest_sub_callback['error'], - {'error': 'json decoding error'}) - return - print code - if code != 200: - print 'ERROR' - _invoke(latest_sub_callback['error'], data) - else: - print 'CALLBACK' - _invoke(latest_sub_callback['callback'], data) - else: - try: - data = json.loads(data) - except ValueError: - _invoke(self.error, {'error': 'json decoding error'}) - return - - if code != 200: - _invoke(self.error, data) - else: - _invoke(self.callback, data) - - -def _urllib_request_2(url, timeout=320): - try: - resp = urllib2.urlopen(url, timeout=timeout) - except urllib2.HTTPError as http_error: - resp = http_error - except urllib2.URLError as error: - #print error.reason - msg = { "message" : str(error.reason)} - #print str(msg) - return (json.dumps(msg),0) - - return (resp.read(), resp.code) - - -def _urllib_request_3(url, timeout=320): - #print(url) - try: - resp = urllib.request.urlopen(url, timeout=timeout) - except (urllib.request.HTTPError, urllib.request.URLError) as http_error: - resp = http_error - r = resp.read().decode("utf-8") - #print(r) - return (r, resp.code) - -_urllib_request = None - - -class Pubnub(PubnubCoreAsync): - def __init__( - self, - publish_key, - subscribe_key, - secret_key=False, - cipher_key=False, - auth_key=None, - ssl_on=False, - origin='pubsub.pubnub.com', - pres_uuid=None - ): - super(Pubnub, self).__init__( - publish_key=publish_key, - subscribe_key=subscribe_key, - secret_key=secret_key, - cipher_key=cipher_key, - auth_key=auth_key, - ssl_on=ssl_on, - origin=origin, - uuid=pres_uuid, - _tt_lock=threading.RLock(), - _channel_list_lock=threading.RLock() - ) - global _urllib_request - if self.python_version == 2: - _urllib_request = _urllib_request_2 - else: - _urllib_request = _urllib_request_3 - - def timeout(self, interval, func): - def cb(): - time.sleep(interval) - func() - thread = threading.Thread(target=cb) - thread.start() - - def _request_async(self, request, callback=None, error=None, single=False): - global _urllib_request - ## Build URL - url = self.getUrl(request) - if single is True: - id = time.time() - client = HTTPClient(url=url, urllib_func=_urllib_request, - callback=None, error=None, id=id) - with latest_sub_callback_lock: - latest_sub_callback['id'] = id - latest_sub_callback['callback'] = callback - latest_sub_callback['error'] = error - else: - client = HTTPClient(url=url, urllib_func=_urllib_request, - callback=callback, error=error) - - thread = threading.Thread(target=client.run) - thread.start() - - def abort(): - client.cancel() - return abort - - def _request_sync(self, request): - global _urllib_request - ## Build URL - url = self.getUrl(request) - ## Send Request Expecting JSONP Response - response = _urllib_request(url, timeout=320) - try: - resp_json = json.loads(response[0]) - except ValueError: - return [0, "JSON Error"] - - if response[1] != 200 and 'status' in resp_json: - return {'message': resp_json['message'], - 'payload': resp_json['payload']} - - return resp_json - - def _request(self, request, callback=None, error=None, single=False): - if callback is None: - return self._request_sync(request) - else: - self._request_async(request, callback, error, single=single) - -''' - - def _request3_sync( self, request) : - ## Build URL - url = self.getUrl(request) - ## Send Request Expecting JSONP Response - try: - response = urllib.request.urlopen(url,timeout=310) - resp_json = json.loads(response.read().decode("utf-8")) - except Exception as e: - return None - - return resp_json - - def _request3_async( self, request, callback, single=False ) : - pass - - def _request3(self, request, callback=None, single=False): - if callback is None: - return self._request3_sync(request,single=single) - else: - self._request3_async(request, callback, single=single) - ''' -- cgit v1.2.3