From 99096b8c11b9a541f6350639e8735495cf90091c Mon Sep 17 00:00:00 2001 From: Devendra Date: Fri, 11 Apr 2014 14:49:43 +0530 Subject: v1 MX and async code for python, twisted, tornado --- python-tornado/Pubnub.py | 246 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 197 insertions(+), 49 deletions(-) (limited to 'python-tornado/Pubnub.py') diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py index ee66619..61f7c3d 100644 --- a/python-tornado/Pubnub.py +++ b/python-tornado/Pubnub.py @@ -176,6 +176,14 @@ import time import hashlib import uuid import sys +from urllib import quote + +from base64 import urlsafe_b64encode +from hashlib import sha256 +from urllib import quote +from urllib import urlopen + +import hmac class PubnubBase(object): def __init__( @@ -184,6 +192,7 @@ class PubnubBase(object): subscribe_key, secret_key = False, cipher_key = False, + auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', UUID = None @@ -213,6 +222,7 @@ class PubnubBase(object): self.secret_key = secret_key self.cipher_key = cipher_key self.ssl = ssl_on + self.auth_key = auth_key if self.ssl : @@ -247,6 +257,86 @@ class PubnubBase(object): signature = '0' return signature + def _pam_sign( self, msg ): + """Calculate a signature by secret key and message.""" + + return urlsafe_b64encode(hmac.new( + self.secret_key.encode("utf-8"), + msg.encode("utf-8"), + sha256 + ).digest()) + + def _pam_auth( self, query , apicode=0, callback=None): + """Issue an authenticated request.""" + + if 'timestamp' not in query: + query['timestamp'] = int(time.time()) + + ## Global Grant? + if 'auth' in query and not query['auth']: + del query['auth'] + + if 'channel' in query and not query['channel']: + del query['channel'] + + params = "&".join([ + x + "=" + quote( + str(query[x]), safe="" + ) for x in sorted(query) + ]) + sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format( + subkey=self.subscribe_key, + pubkey=self.publish_key, + apitype="audit" if (apicode) else "grant", + params=params + ) + + signature = self._pam_sign(sign_input) + + ''' + url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") + + self.subscribe_key + "?" + + params + "&signature=" + + quote(signature, safe="")) + ''' + + return self._request({"urlcomponents": [ + 'v1', 'auth', "audit" if (apicode) else "grant" , + 'sub-key', + self.subscribe_key + ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}}, + self._return_wrapped_callback(callback)) + + def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None): + """Grant Access on a Channel.""" + + return self._pam_auth({ + "channel" : channel, + "auth" : authkey, + "r" : read and 1 or 0, + "w" : write and 1 or 0, + "ttl" : ttl + }, callback=callback) + + def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None): + """Revoke Access on a Channel.""" + + return self._pam_auth({ + "channel" : channel, + "auth" : authkey, + "r" : read and 1 or 0, + "w" : write and 1 or 0, + "ttl" : ttl + }, callback=callback) + + def audit(self, channel=False, authkey=False, callback=None): + return self._pam_auth({ + "channel" : channel, + "auth" : authkey + },1, callback=callback) + + + def encrypt(self, message): if self.cipher_key: message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n','')) @@ -318,7 +408,7 @@ class PubnubBase(object): channel, '0', message - ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) + ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) def presence( self, args ) : """ @@ -520,7 +610,7 @@ class PubnubBase(object): """ ## Capture Callback - if args and 'callback' in args : + if args and 'callback' in args: callback = args['callback'] else : callback = None @@ -547,7 +637,7 @@ class PubnubBase(object): ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) if ("urlparams" in request): - url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()]) + url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()]) return url @@ -558,6 +648,9 @@ except ImportError: import Crypto.Hash.SHA256 as digestmod sha256 = digestmod.new import hmac +import threading +from threading import current_thread +import threading class PubnubCoreAsync(PubnubBase): @@ -570,6 +663,7 @@ class PubnubCoreAsync(PubnubBase): subscribe_key, secret_key = False, cipher_key = False, + auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', uuid = None @@ -596,6 +690,7 @@ class PubnubCoreAsync(PubnubBase): subscribe_key=subscribe_key, secret_key=secret_key, cipher_key=cipher_key, + auth_key=auth_key, ssl_on=ssl_on, origin=origin, UUID=uuid @@ -603,22 +698,36 @@ class PubnubCoreAsync(PubnubBase): self.subscriptions = {} self.timetoken = 0 + self.last_timetoken = 0 self.version = '3.3.4' self.accept_encoding = 'gzip' self.SUB_RECEIVER = None self._connect = None + self._tt_lock = threading.RLock() def get_channel_list(self, channels): channel = '' first = True - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch + if self._channel_list_lock: + with self._channel_list_lock: + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + else: + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel def subscribe( self, args=None, sync=False ) : @@ -653,6 +762,26 @@ class PubnubCoreAsync(PubnubBase): }) """ + if args is None: + _invoke(error, "Arguments Missing") + return + channel = args['channel'] if 'channel' in args else None + callback = args['callback'] if 'callback' in args else None + connect = args['connect'] if 'connect' in args else None + disconnect = args['disconnect'] if 'disconnect' in args else None + reconnect = args['reconnect'] if 'reconnect' in args else None + error = args['error'] if 'error' in args else None + + with self._tt_lock: + self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken + self.timetoken = 0 + + if channel is None: + _invoke(error, "Channel Missing") + return + if callback is None: + _invoke(error, "Callback Missing") + return if sync is True and self.susbcribe_sync is not None: self.susbcribe_sync(args) @@ -666,18 +795,20 @@ class PubnubCoreAsync(PubnubBase): func() def _invoke_connect(): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - if chobj['connected'] is False: - chobj['connected'] = True - _invoke(chobj['connect']) + if self._channel_list_lock: + with self._channel_list_lock: + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect'],chobj['name']) def _invoke_error(err=None): for ch in self.subscriptions: chobj = self.subscriptions[ch] _invoke(chobj.error,err) - + ''' if callback is None: _invoke(error, "Callback Missing") return @@ -685,6 +816,7 @@ class PubnubCoreAsync(PubnubBase): if channel is None: _invoke(error, "Channel Missing") return + ''' def _get_channel(): for ch in self.subscriptions: @@ -695,22 +827,36 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? if not channel in self.subscriptions: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect - } + if self._channel_list_lock: + with self._channel_list_lock: + self.subscriptions[channel] = { + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect + } + else: + self.subscriptions[channel] = { + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect + } ## return if already connected to channel - if self.subscriptions[channel]['connected'] : + if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: _invoke(error, "Already Connected") return + ## SUBSCRIPTION RECURSION def _connect(): @@ -718,37 +864,37 @@ class PubnubCoreAsync(PubnubBase): self._reset_offline() def sub_callback(response): - print response ## ERROR ? if not response or error in response: _invoke_error() _invoke_connect() - - self.timetoken = response[1] - - if len(response) > 2: - channel_list = response[2].split(',') - response_list = response[0] - for ch in enumerate(channel_list): - if ch[1] in self.subscriptions: - chobj = self.subscriptions[ch[1]] - _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) - else: - response_list = response[0] - chobj = _get_channel() - for r in response_list: - if chobj: - _invoke(chobj['callback'], self.decrypt(r)) - - - _connect() + with self._tt_lock: + #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken + self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] + #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) + + #with self._tt_lock: + # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] + _connect() channel_list = self.get_channel_list(self.subscriptions) - print channel_list ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: self.SUB_RECEIVER = self._request( { "urlcomponents" : [ @@ -759,6 +905,7 @@ class PubnubCoreAsync(PubnubBase): str(self.timetoken) ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) except Exception as e: + print e self.timeout( 1, _connect) return @@ -837,6 +984,7 @@ class Pubnub(PubnubCoreAsync): self.headers['V'] = self.version self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000) self.id = None + self._channel_list_lock = None def _request( self, request, callback, single=False ) : url = self.getUrl(request) -- cgit v1.2.3