diff options
Diffstat (limited to 'common')
| -rw-r--r-- | common/PubnubBase.py | 96 | ||||
| -rw-r--r-- | common/PubnubCore.py | 2 | ||||
| -rw-r--r-- | common/PubnubCoreAsync.py | 149 | 
3 files changed, 198 insertions, 49 deletions
| diff --git a/common/PubnubBase.py b/common/PubnubBase.py index d287be3..ac41e0e 100644 --- a/common/PubnubBase.py +++ b/common/PubnubBase.py @@ -5,6 +5,14 @@ import time  import hashlib  import uuid  import sys +from urllib  import quote + +from base64  import urlsafe_b64encode +from hashlib import sha256 +from urllib  import quote +from urllib  import urlopen + +import hmac  class PubnubBase(object):      def __init__( @@ -13,6 +21,7 @@ class PubnubBase(object):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          UUID = None @@ -42,6 +51,7 @@ class PubnubBase(object):          self.secret_key    = secret_key          self.cipher_key    = cipher_key          self.ssl           = ssl_on +        self.auth_key      = auth_key          if self.ssl : @@ -76,6 +86,86 @@ class PubnubBase(object):              signature = '0'          return signature +    def _pam_sign( self, msg ): +        """Calculate a signature by secret key and message.""" + +        return urlsafe_b64encode(hmac.new( +            self.secret_key.encode("utf-8"), +            msg.encode("utf-8"), +            sha256 +        ).digest()) + +    def _pam_auth( self, query , apicode=0, callback=None): +        """Issue an authenticated request.""" + +        if 'timestamp' not in query: +            query['timestamp'] = int(time.time()) + +        ## Global Grant? +        if 'auth' in query and not query['auth']: +            del query['auth'] + +        if 'channel' in query and not query['channel']: +            del query['channel'] + +        params = "&".join([ +            x + "=" + quote( +                str(query[x]), safe="" +            ) for x in sorted(query) +        ]) +        sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format( +            subkey=self.subscribe_key, +            pubkey=self.publish_key, +            apitype="audit" if (apicode) else "grant", +            params=params +        ) + +        signature = self._pam_sign(sign_input) + +        ''' +        url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") + +            self.subscribe_key + "?" + +            params + "&signature=" + +            quote(signature, safe="")) +        ''' + +        return self._request({"urlcomponents": [ +            'v1', 'auth', "audit" if (apicode) else "grant" ,  +            'sub-key', +            self.subscribe_key +        ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}},  +        self._return_wrapped_callback(callback)) + +    def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None): +        """Grant Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None): +        """Revoke Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def audit(self, channel=False, authkey=False, callback=None): +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey +        },1, callback=callback) +             + +      def encrypt(self, message):          if self.cipher_key:              message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n','')) @@ -147,7 +237,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) +        ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -349,7 +439,7 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and 'callback' in args : +        if args and 'callback' in args:              callback = args['callback']          else :              callback = None  @@ -376,5 +466,5 @@ class PubnubBase(object):                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]])          if ("urlparams" in request): -            url = url + '?' + "&".join([ x + "=" + y  for x,y in request["urlparams"].items()]) +            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items()])          return url diff --git a/common/PubnubCore.py b/common/PubnubCore.py index 3ed3a68..7fb67d6 100644 --- a/common/PubnubCore.py +++ b/common/PubnubCore.py @@ -5,6 +5,7 @@ class PubnubCore(PubnubCoreAsync):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          uuid = None @@ -32,6 +33,7 @@ class PubnubCore(PubnubCoreAsync):              subscribe_key=subscribe_key,              secret_key=secret_key,              cipher_key=cipher_key, +            auth_key=auth_key,              ssl_on=ssl_on,              origin=origin,              UUID=uuid diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index 0038243..4251d47 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -5,6 +5,9 @@ except ImportError:      import Crypto.Hash.SHA256 as digestmod      sha256 = digestmod.new  import hmac +import threading +from threading import current_thread +import threading  class PubnubCoreAsync(PubnubBase): @@ -17,6 +20,7 @@ class PubnubCoreAsync(PubnubBase):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          uuid = None @@ -43,6 +47,7 @@ class PubnubCoreAsync(PubnubBase):              subscribe_key=subscribe_key,              secret_key=secret_key,              cipher_key=cipher_key, +            auth_key=auth_key,              ssl_on=ssl_on,              origin=origin,              UUID=uuid @@ -50,22 +55,36 @@ class PubnubCoreAsync(PubnubBase):          self.subscriptions = {}          self.timetoken     = 0 +        self.last_timetoken = 0          self.version       = '3.3.4'          self.accept_encoding = 'gzip'          self.SUB_RECEIVER  = None          self._connect    = None +        self._tt_lock    = threading.RLock()      def get_channel_list(self, channels):          channel = ''          first = True -        for ch in channels: -            if not channels[ch]['subscribed']: -                continue -            if not first: -                channel += ',' -            else: -                first = False -            channel += ch +        if self._channel_list_lock: +            with self._channel_list_lock: +                for ch in channels: +                    if not channels[ch]['subscribed']: +                        continue +                    if not first: +                        channel += ',' +                    else: +                        first = False +                    channel += ch +        else: +            for ch in channels: +                if not channels[ch]['subscribed']: +                    continue +                if not first: +                    channel += ',' +                else: +                    first = False +                channel += ch +          return channel      def subscribe( self, args=None, sync=False ) : @@ -100,6 +119,26 @@ class PubnubCoreAsync(PubnubBase):          })          """ +        if args is None: +            _invoke(error, "Arguments Missing") +            return +        channel         = args['channel'] if 'channel' in args else None +        callback        = args['callback'] if 'callback' in args else None +        connect         = args['connect'] if 'connect' in args else None +        disconnect      = args['disconnect'] if 'disconnect' in args else None +        reconnect       = args['reconnect'] if 'reconnect' in args else None +        error           = args['error'] if 'error' in args else None + +        with self._tt_lock: +            self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken +            self.timetoken = 0 + +        if channel is None: +            _invoke(error, "Channel Missing") +            return +        if callback is None: +            _invoke(error, "Callback Missing") +            return          if sync is True and self.susbcribe_sync is not None:              self.susbcribe_sync(args) @@ -113,18 +152,20 @@ class PubnubCoreAsync(PubnubBase):                      func()          def _invoke_connect(): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                if chobj['connected'] is False: -                    chobj['connected'] = True -                    _invoke(chobj['connect']) +            if self._channel_list_lock: +                with self._channel_list_lock: +                    for ch in self.subscriptions: +                        chobj = self.subscriptions[ch] +                        if chobj['connected'] is False: +                            chobj['connected'] = True +                            _invoke(chobj['connect'],chobj['name'])          def _invoke_error(err=None):              for ch in self.subscriptions:                  chobj = self.subscriptions[ch]                  _invoke(chobj.error,err) - +        '''          if callback is None:              _invoke(error, "Callback Missing")              return @@ -132,6 +173,7 @@ class PubnubCoreAsync(PubnubBase):          if channel is None:              _invoke(error, "Channel Missing")              return +        '''          def _get_channel():              for ch in self.subscriptions: @@ -142,22 +184,36 @@ class PubnubCoreAsync(PubnubBase):          ## New Channel?          if not channel in self.subscriptions: -            self.subscriptions[channel] = { -                'name'          : channel, -                'first'         : False, -                'connected'     : False, -                'subscribed'    : True, -                'callback'      : callback, -                'connect'       : connect, -                'disconnect'    : disconnect, -                'reconnect'     : reconnect -            } +            if self._channel_list_lock: +                with self._channel_list_lock: +                    self.subscriptions[channel] = { +                        'name'          : channel, +                        'first'         : False, +                        'connected'     : False, +                        'subscribed'    : True, +                        'callback'      : callback, +                        'connect'       : connect, +                        'disconnect'    : disconnect, +                        'reconnect'     : reconnect +                    } +            else: +                self.subscriptions[channel] = { +                    'name'          : channel, +                    'first'         : False, +                    'connected'     : False, +                    'subscribed'    : True, +                    'callback'      : callback, +                    'connect'       : connect, +                    'disconnect'    : disconnect, +                    'reconnect'     : reconnect +                }          ## return if already connected to channel -        if self.subscriptions[channel]['connected'] : +        if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:              _invoke(error, "Already Connected")              return +                      ## SUBSCRIPTION RECURSION           def _connect(): @@ -165,37 +221,37 @@ class PubnubCoreAsync(PubnubBase):              self._reset_offline()              def sub_callback(response): -                print response                  ## ERROR ?                  if not response or error in response:                      _invoke_error()                  _invoke_connect() - -                self.timetoken = response[1] - -                if len(response) > 2: -                    channel_list = response[2].split(',') -                    response_list = response[0] -                    for ch in enumerate(channel_list): -                        if ch[1] in self.subscriptions: -                            chobj = self.subscriptions[ch[1]] -                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) -                else: -                    response_list = response[0] -                    chobj = _get_channel() -                    for r in response_list: -                        if chobj: -                            _invoke(chobj['callback'], self.decrypt(r)) - - -                _connect() +                with self._tt_lock: +                    #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    if len(response) > 2: +                        channel_list = response[2].split(',') +                        response_list = response[0] +                        for ch in enumerate(channel_list): +                            if ch[1] in self.subscriptions: +                                chobj = self.subscriptions[ch[1]] +                                _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                    else: +                        response_list = response[0] +                        chobj = _get_channel() +                        for r in response_list: +                            if chobj: +                                _invoke(chobj['callback'], self.decrypt(r)) + +                    #with self._tt_lock: +                    #    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    _connect()              channel_list = self.get_channel_list(self.subscriptions) -            print channel_list              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try:                  self.SUB_RECEIVER = self._request( { "urlcomponents" : [ @@ -206,6 +262,7 @@ class PubnubCoreAsync(PubnubBase):                      str(self.timetoken)                  ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )              except Exception as e: +                print e                  self.timeout( 1, _connect)                  return | 
