diff options
| author | Devendra | 2014-04-11 14:49:43 +0530 | 
|---|---|---|
| committer | Devendra | 2014-04-11 14:49:43 +0530 | 
| commit | 99096b8c11b9a541f6350639e8735495cf90091c (patch) | |
| tree | 446e63037f76cb98d7e3cc0f93316a8bce96e19e | |
| parent | 765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 (diff) | |
| download | pubnub-python-99096b8c11b9a541f6350639e8735495cf90091c.tar.bz2 | |
v1 MX and async code for python, twisted, tornado
| -rw-r--r-- | common/PubnubBase.py | 96 | ||||
| -rw-r--r-- | common/PubnubCore.py | 2 | ||||
| -rw-r--r-- | common/PubnubCoreAsync.py | 149 | ||||
| -rw-r--r-- | python-tornado/Pubnub.py | 246 | ||||
| -rwxr-xr-x | python-tornado/tests/subscribe-test.py | 148 | ||||
| -rw-r--r-- | python-tornado/unassembled/Platform.py | 1 | ||||
| -rw-r--r-- | python-twisted/Pubnub.py | 246 | ||||
| -rwxr-xr-x | python-twisted/tests/subscribe-test.py | 148 | ||||
| -rw-r--r-- | python-twisted/unassembled/Platform.py | 1 | ||||
| -rw-r--r-- | python/Pubnub.py | 286 | ||||
| -rwxr-xr-x | python/tests/subscribe-test.py | 148 | ||||
| -rw-r--r-- | python/unassembled/Platform.py | 41 | 
12 files changed, 1290 insertions, 222 deletions
| diff --git a/common/PubnubBase.py b/common/PubnubBase.py index d287be3..ac41e0e 100644 --- a/common/PubnubBase.py +++ b/common/PubnubBase.py @@ -5,6 +5,14 @@ import time  import hashlib  import uuid  import sys +from urllib  import quote + +from base64  import urlsafe_b64encode +from hashlib import sha256 +from urllib  import quote +from urllib  import urlopen + +import hmac  class PubnubBase(object):      def __init__( @@ -13,6 +21,7 @@ class PubnubBase(object):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          UUID = None @@ -42,6 +51,7 @@ class PubnubBase(object):          self.secret_key    = secret_key          self.cipher_key    = cipher_key          self.ssl           = ssl_on +        self.auth_key      = auth_key          if self.ssl : @@ -76,6 +86,86 @@ class PubnubBase(object):              signature = '0'          return signature +    def _pam_sign( self, msg ): +        """Calculate a signature by secret key and message.""" + +        return urlsafe_b64encode(hmac.new( +            self.secret_key.encode("utf-8"), +            msg.encode("utf-8"), +            sha256 +        ).digest()) + +    def _pam_auth( self, query , apicode=0, callback=None): +        """Issue an authenticated request.""" + +        if 'timestamp' not in query: +            query['timestamp'] = int(time.time()) + +        ## Global Grant? +        if 'auth' in query and not query['auth']: +            del query['auth'] + +        if 'channel' in query and not query['channel']: +            del query['channel'] + +        params = "&".join([ +            x + "=" + quote( +                str(query[x]), safe="" +            ) for x in sorted(query) +        ]) +        sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format( +            subkey=self.subscribe_key, +            pubkey=self.publish_key, +            apitype="audit" if (apicode) else "grant", +            params=params +        ) + +        signature = self._pam_sign(sign_input) + +        ''' +        url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") + +            self.subscribe_key + "?" + +            params + "&signature=" + +            quote(signature, safe="")) +        ''' + +        return self._request({"urlcomponents": [ +            'v1', 'auth', "audit" if (apicode) else "grant" ,  +            'sub-key', +            self.subscribe_key +        ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}},  +        self._return_wrapped_callback(callback)) + +    def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None): +        """Grant Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None): +        """Revoke Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def audit(self, channel=False, authkey=False, callback=None): +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey +        },1, callback=callback) +             + +      def encrypt(self, message):          if self.cipher_key:              message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n','')) @@ -147,7 +237,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) +        ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -349,7 +439,7 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and 'callback' in args : +        if args and 'callback' in args:              callback = args['callback']          else :              callback = None  @@ -376,5 +466,5 @@ class PubnubBase(object):                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]])          if ("urlparams" in request): -            url = url + '?' + "&".join([ x + "=" + y  for x,y in request["urlparams"].items()]) +            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items()])          return url diff --git a/common/PubnubCore.py b/common/PubnubCore.py index 3ed3a68..7fb67d6 100644 --- a/common/PubnubCore.py +++ b/common/PubnubCore.py @@ -5,6 +5,7 @@ class PubnubCore(PubnubCoreAsync):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          uuid = None @@ -32,6 +33,7 @@ class PubnubCore(PubnubCoreAsync):              subscribe_key=subscribe_key,              secret_key=secret_key,              cipher_key=cipher_key, +            auth_key=auth_key,              ssl_on=ssl_on,              origin=origin,              UUID=uuid diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index 0038243..4251d47 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -5,6 +5,9 @@ except ImportError:      import Crypto.Hash.SHA256 as digestmod      sha256 = digestmod.new  import hmac +import threading +from threading import current_thread +import threading  class PubnubCoreAsync(PubnubBase): @@ -17,6 +20,7 @@ class PubnubCoreAsync(PubnubBase):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          uuid = None @@ -43,6 +47,7 @@ class PubnubCoreAsync(PubnubBase):              subscribe_key=subscribe_key,              secret_key=secret_key,              cipher_key=cipher_key, +            auth_key=auth_key,              ssl_on=ssl_on,              origin=origin,              UUID=uuid @@ -50,22 +55,36 @@ class PubnubCoreAsync(PubnubBase):          self.subscriptions = {}          self.timetoken     = 0 +        self.last_timetoken = 0          self.version       = '3.3.4'          self.accept_encoding = 'gzip'          self.SUB_RECEIVER  = None          self._connect    = None +        self._tt_lock    = threading.RLock()      def get_channel_list(self, channels):          channel = ''          first = True -        for ch in channels: -            if not channels[ch]['subscribed']: -                continue -            if not first: -                channel += ',' -            else: -                first = False -            channel += ch +        if self._channel_list_lock: +            with self._channel_list_lock: +                for ch in channels: +                    if not channels[ch]['subscribed']: +                        continue +                    if not first: +                        channel += ',' +                    else: +                        first = False +                    channel += ch +        else: +            for ch in channels: +                if not channels[ch]['subscribed']: +                    continue +                if not first: +                    channel += ',' +                else: +                    first = False +                channel += ch +          return channel      def subscribe( self, args=None, sync=False ) : @@ -100,6 +119,26 @@ class PubnubCoreAsync(PubnubBase):          })          """ +        if args is None: +            _invoke(error, "Arguments Missing") +            return +        channel         = args['channel'] if 'channel' in args else None +        callback        = args['callback'] if 'callback' in args else None +        connect         = args['connect'] if 'connect' in args else None +        disconnect      = args['disconnect'] if 'disconnect' in args else None +        reconnect       = args['reconnect'] if 'reconnect' in args else None +        error           = args['error'] if 'error' in args else None + +        with self._tt_lock: +            self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken +            self.timetoken = 0 + +        if channel is None: +            _invoke(error, "Channel Missing") +            return +        if callback is None: +            _invoke(error, "Callback Missing") +            return          if sync is True and self.susbcribe_sync is not None:              self.susbcribe_sync(args) @@ -113,18 +152,20 @@ class PubnubCoreAsync(PubnubBase):                      func()          def _invoke_connect(): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                if chobj['connected'] is False: -                    chobj['connected'] = True -                    _invoke(chobj['connect']) +            if self._channel_list_lock: +                with self._channel_list_lock: +                    for ch in self.subscriptions: +                        chobj = self.subscriptions[ch] +                        if chobj['connected'] is False: +                            chobj['connected'] = True +                            _invoke(chobj['connect'],chobj['name'])          def _invoke_error(err=None):              for ch in self.subscriptions:                  chobj = self.subscriptions[ch]                  _invoke(chobj.error,err) - +        '''          if callback is None:              _invoke(error, "Callback Missing")              return @@ -132,6 +173,7 @@ class PubnubCoreAsync(PubnubBase):          if channel is None:              _invoke(error, "Channel Missing")              return +        '''          def _get_channel():              for ch in self.subscriptions: @@ -142,22 +184,36 @@ class PubnubCoreAsync(PubnubBase):          ## New Channel?          if not channel in self.subscriptions: -            self.subscriptions[channel] = { -                'name'          : channel, -                'first'         : False, -                'connected'     : False, -                'subscribed'    : True, -                'callback'      : callback, -                'connect'       : connect, -                'disconnect'    : disconnect, -                'reconnect'     : reconnect -            } +            if self._channel_list_lock: +                with self._channel_list_lock: +                    self.subscriptions[channel] = { +                        'name'          : channel, +                        'first'         : False, +                        'connected'     : False, +                        'subscribed'    : True, +                        'callback'      : callback, +                        'connect'       : connect, +                        'disconnect'    : disconnect, +                        'reconnect'     : reconnect +                    } +            else: +                self.subscriptions[channel] = { +                    'name'          : channel, +                    'first'         : False, +                    'connected'     : False, +                    'subscribed'    : True, +                    'callback'      : callback, +                    'connect'       : connect, +                    'disconnect'    : disconnect, +                    'reconnect'     : reconnect +                }          ## return if already connected to channel -        if self.subscriptions[channel]['connected'] : +        if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:              _invoke(error, "Already Connected")              return +                      ## SUBSCRIPTION RECURSION           def _connect(): @@ -165,37 +221,37 @@ class PubnubCoreAsync(PubnubBase):              self._reset_offline()              def sub_callback(response): -                print response                  ## ERROR ?                  if not response or error in response:                      _invoke_error()                  _invoke_connect() - -                self.timetoken = response[1] - -                if len(response) > 2: -                    channel_list = response[2].split(',') -                    response_list = response[0] -                    for ch in enumerate(channel_list): -                        if ch[1] in self.subscriptions: -                            chobj = self.subscriptions[ch[1]] -                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) -                else: -                    response_list = response[0] -                    chobj = _get_channel() -                    for r in response_list: -                        if chobj: -                            _invoke(chobj['callback'], self.decrypt(r)) - - -                _connect() +                with self._tt_lock: +                    #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    if len(response) > 2: +                        channel_list = response[2].split(',') +                        response_list = response[0] +                        for ch in enumerate(channel_list): +                            if ch[1] in self.subscriptions: +                                chobj = self.subscriptions[ch[1]] +                                _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                    else: +                        response_list = response[0] +                        chobj = _get_channel() +                        for r in response_list: +                            if chobj: +                                _invoke(chobj['callback'], self.decrypt(r)) + +                    #with self._tt_lock: +                    #    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    _connect()              channel_list = self.get_channel_list(self.subscriptions) -            print channel_list              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try:                  self.SUB_RECEIVER = self._request( { "urlcomponents" : [ @@ -206,6 +262,7 @@ class PubnubCoreAsync(PubnubBase):                      str(self.timetoken)                  ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )              except Exception as e: +                print e                  self.timeout( 1, _connect)                  return diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py index ee66619..61f7c3d 100644 --- a/python-tornado/Pubnub.py +++ b/python-tornado/Pubnub.py @@ -176,6 +176,14 @@ import time  import hashlib  import uuid  import sys +from urllib  import quote + +from base64  import urlsafe_b64encode +from hashlib import sha256 +from urllib  import quote +from urllib  import urlopen + +import hmac  class PubnubBase(object):      def __init__( @@ -184,6 +192,7 @@ class PubnubBase(object):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          UUID = None @@ -213,6 +222,7 @@ class PubnubBase(object):          self.secret_key    = secret_key          self.cipher_key    = cipher_key          self.ssl           = ssl_on +        self.auth_key      = auth_key          if self.ssl : @@ -247,6 +257,86 @@ class PubnubBase(object):              signature = '0'          return signature +    def _pam_sign( self, msg ): +        """Calculate a signature by secret key and message.""" + +        return urlsafe_b64encode(hmac.new( +            self.secret_key.encode("utf-8"), +            msg.encode("utf-8"), +            sha256 +        ).digest()) + +    def _pam_auth( self, query , apicode=0, callback=None): +        """Issue an authenticated request.""" + +        if 'timestamp' not in query: +            query['timestamp'] = int(time.time()) + +        ## Global Grant? +        if 'auth' in query and not query['auth']: +            del query['auth'] + +        if 'channel' in query and not query['channel']: +            del query['channel'] + +        params = "&".join([ +            x + "=" + quote( +                str(query[x]), safe="" +            ) for x in sorted(query) +        ]) +        sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format( +            subkey=self.subscribe_key, +            pubkey=self.publish_key, +            apitype="audit" if (apicode) else "grant", +            params=params +        ) + +        signature = self._pam_sign(sign_input) + +        ''' +        url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") + +            self.subscribe_key + "?" + +            params + "&signature=" + +            quote(signature, safe="")) +        ''' + +        return self._request({"urlcomponents": [ +            'v1', 'auth', "audit" if (apicode) else "grant" ,  +            'sub-key', +            self.subscribe_key +        ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}},  +        self._return_wrapped_callback(callback)) + +    def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None): +        """Grant Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None): +        """Revoke Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def audit(self, channel=False, authkey=False, callback=None): +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey +        },1, callback=callback) +             + +      def encrypt(self, message):          if self.cipher_key:              message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n','')) @@ -318,7 +408,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) +        ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -520,7 +610,7 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and 'callback' in args : +        if args and 'callback' in args:              callback = args['callback']          else :              callback = None  @@ -547,7 +637,7 @@ class PubnubBase(object):                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]])          if ("urlparams" in request): -            url = url + '?' + "&".join([ x + "=" + y  for x,y in request["urlparams"].items()]) +            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items()])          return url @@ -558,6 +648,9 @@ except ImportError:      import Crypto.Hash.SHA256 as digestmod      sha256 = digestmod.new  import hmac +import threading +from threading import current_thread +import threading  class PubnubCoreAsync(PubnubBase): @@ -570,6 +663,7 @@ class PubnubCoreAsync(PubnubBase):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          uuid = None @@ -596,6 +690,7 @@ class PubnubCoreAsync(PubnubBase):              subscribe_key=subscribe_key,              secret_key=secret_key,              cipher_key=cipher_key, +            auth_key=auth_key,              ssl_on=ssl_on,              origin=origin,              UUID=uuid @@ -603,22 +698,36 @@ class PubnubCoreAsync(PubnubBase):          self.subscriptions = {}          self.timetoken     = 0 +        self.last_timetoken = 0          self.version       = '3.3.4'          self.accept_encoding = 'gzip'          self.SUB_RECEIVER  = None          self._connect    = None +        self._tt_lock    = threading.RLock()      def get_channel_list(self, channels):          channel = ''          first = True -        for ch in channels: -            if not channels[ch]['subscribed']: -                continue -            if not first: -                channel += ',' -            else: -                first = False -            channel += ch +        if self._channel_list_lock: +            with self._channel_list_lock: +                for ch in channels: +                    if not channels[ch]['subscribed']: +                        continue +                    if not first: +                        channel += ',' +                    else: +                        first = False +                    channel += ch +        else: +            for ch in channels: +                if not channels[ch]['subscribed']: +                    continue +                if not first: +                    channel += ',' +                else: +                    first = False +                channel += ch +          return channel      def subscribe( self, args=None, sync=False ) : @@ -653,6 +762,26 @@ class PubnubCoreAsync(PubnubBase):          })          """ +        if args is None: +            _invoke(error, "Arguments Missing") +            return +        channel         = args['channel'] if 'channel' in args else None +        callback        = args['callback'] if 'callback' in args else None +        connect         = args['connect'] if 'connect' in args else None +        disconnect      = args['disconnect'] if 'disconnect' in args else None +        reconnect       = args['reconnect'] if 'reconnect' in args else None +        error           = args['error'] if 'error' in args else None + +        with self._tt_lock: +            self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken +            self.timetoken = 0 + +        if channel is None: +            _invoke(error, "Channel Missing") +            return +        if callback is None: +            _invoke(error, "Callback Missing") +            return          if sync is True and self.susbcribe_sync is not None:              self.susbcribe_sync(args) @@ -666,18 +795,20 @@ class PubnubCoreAsync(PubnubBase):                      func()          def _invoke_connect(): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                if chobj['connected'] is False: -                    chobj['connected'] = True -                    _invoke(chobj['connect']) +            if self._channel_list_lock: +                with self._channel_list_lock: +                    for ch in self.subscriptions: +                        chobj = self.subscriptions[ch] +                        if chobj['connected'] is False: +                            chobj['connected'] = True +                            _invoke(chobj['connect'],chobj['name'])          def _invoke_error(err=None):              for ch in self.subscriptions:                  chobj = self.subscriptions[ch]                  _invoke(chobj.error,err) - +        '''          if callback is None:              _invoke(error, "Callback Missing")              return @@ -685,6 +816,7 @@ class PubnubCoreAsync(PubnubBase):          if channel is None:              _invoke(error, "Channel Missing")              return +        '''          def _get_channel():              for ch in self.subscriptions: @@ -695,22 +827,36 @@ class PubnubCoreAsync(PubnubBase):          ## New Channel?          if not channel in self.subscriptions: -            self.subscriptions[channel] = { -                'name'          : channel, -                'first'         : False, -                'connected'     : False, -                'subscribed'    : True, -                'callback'      : callback, -                'connect'       : connect, -                'disconnect'    : disconnect, -                'reconnect'     : reconnect -            } +            if self._channel_list_lock: +                with self._channel_list_lock: +                    self.subscriptions[channel] = { +                        'name'          : channel, +                        'first'         : False, +                        'connected'     : False, +                        'subscribed'    : True, +                        'callback'      : callback, +                        'connect'       : connect, +                        'disconnect'    : disconnect, +                        'reconnect'     : reconnect +                    } +            else: +                self.subscriptions[channel] = { +                    'name'          : channel, +                    'first'         : False, +                    'connected'     : False, +                    'subscribed'    : True, +                    'callback'      : callback, +                    'connect'       : connect, +                    'disconnect'    : disconnect, +                    'reconnect'     : reconnect +                }          ## return if already connected to channel -        if self.subscriptions[channel]['connected'] : +        if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:              _invoke(error, "Already Connected")              return +                      ## SUBSCRIPTION RECURSION           def _connect(): @@ -718,37 +864,37 @@ class PubnubCoreAsync(PubnubBase):              self._reset_offline()              def sub_callback(response): -                print response                  ## ERROR ?                  if not response or error in response:                      _invoke_error()                  _invoke_connect() - -                self.timetoken = response[1] - -                if len(response) > 2: -                    channel_list = response[2].split(',') -                    response_list = response[0] -                    for ch in enumerate(channel_list): -                        if ch[1] in self.subscriptions: -                            chobj = self.subscriptions[ch[1]] -                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) -                else: -                    response_list = response[0] -                    chobj = _get_channel() -                    for r in response_list: -                        if chobj: -                            _invoke(chobj['callback'], self.decrypt(r)) - - -                _connect() +                with self._tt_lock: +                    #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    if len(response) > 2: +                        channel_list = response[2].split(',') +                        response_list = response[0] +                        for ch in enumerate(channel_list): +                            if ch[1] in self.subscriptions: +                                chobj = self.subscriptions[ch[1]] +                                _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                    else: +                        response_list = response[0] +                        chobj = _get_channel() +                        for r in response_list: +                            if chobj: +                                _invoke(chobj['callback'], self.decrypt(r)) + +                    #with self._tt_lock: +                    #    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    _connect()              channel_list = self.get_channel_list(self.subscriptions) -            print channel_list              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try:                  self.SUB_RECEIVER = self._request( { "urlcomponents" : [ @@ -759,6 +905,7 @@ class PubnubCoreAsync(PubnubBase):                      str(self.timetoken)                  ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )              except Exception as e: +                print e                  self.timeout( 1, _connect)                  return @@ -837,6 +984,7 @@ class Pubnub(PubnubCoreAsync):          self.headers['V'] = self.version          self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)          self.id = None +        self._channel_list_lock = None      def _request( self, request, callback, single=False ) :          url = self.getUrl(request) diff --git a/python-tornado/tests/subscribe-test.py b/python-tornado/tests/subscribe-test.py new file mode 100755 index 0000000..0d4c65e --- /dev/null +++ b/python-tornado/tests/subscribe-test.py @@ -0,0 +1,148 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud.  +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + +## ----------------------------------- +## PubNub 3.1 Real-time Push Cloud API +## ----------------------------------- + +import sys +sys.path.append('../') +import datetime +from Pubnub import Pubnub +from functools import partial +from threading import current_thread +import threading +publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key    = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key    = len(sys.argv) > 4 and sys.argv[4] or None +ssl_on        = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +#pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) +pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on ) +crazy  = 'hello_world' + +current = -1 + +errors = 0 +received = 0 + +## ----------------------------------------------------------------------- +## Subscribe Example +## ----------------------------------------------------------------------- +def message_received(message): +    print message + +def check_received(message): +    global current +    global errors +    global received +    print message +    print current +    if message <= current: +        print 'ERROR' +        #sys.exit() +        errors += 1 +    else: +        received += 1 +    print 'active thread count : ', threading.activeCount() +    print 'errors = ' , errors +    print current_thread().getName(), ' , ', 'received = ', received + +    if received != message: +        print '********** MISSED **************** ', message - received  +    current = message +     + +def connected_test(ch) : +    print 'Connected' , ch + +def connected(ch) : +    pass + + +''' +pubnub.subscribe({ +    'channel'  : 'abcd1', +    'connect'  : connected, +    'callback' : message_received +}) +''' +def cb1(): +	pubnub.subscribe({ +	    'channel'  : 'efgh1', +	    'connect'  : connected, +	    'callback' : message_received +	}) + +def cb2(): +	pubnub.subscribe({ +	    'channel'  : 'dsm-test', +	    'connect'  : connected_test, +	    'callback' : check_received +	}) + +def cb3(): +    pubnub.unsubscribe({'channel' : 'efgh1'}) + +def cb4(): +    pubnub.unsubscribe({'channel' : 'abcd1'}) + +def subscribe(channel): +	pubnub.subscribe({ +	    'channel'  : channel, +	    'connect'  : connected, +	    'callback' : message_received +	}) + + +print threading.activeCount() + + +pubnub.timeout(15,cb1) + +pubnub.timeout(30,cb2) + + +pubnub.timeout(45,cb3) + +pubnub.timeout(60,cb4) + +#''' +for x in range(1,1000): +    #print x +    def y(t): +        subscribe('channel-' + str(t)) + +    def z(t): +        pubnub.unsubscribe({'channel' : 'channel-' + str(t)}) + +    pubnub.timeout(x + 5, partial(y,x)) +    pubnub.timeout(x + 25, partial(z, x))  +    x += 10 +#''' + +''' +for x in range(1,1000): +    def cb(r): print r , ' : ', threading.activeCount() +    def y(t): +        pubnub.publish({ +            'message' : t, +            'callback' : cb, +            'channel' : 'dsm-test' +        }) + + +    pubnub.timeout(x + 1, partial(y,x)) +    x += 1 +''' + + +pubnub.start() diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py index f98befb..501993e 100644 --- a/python-tornado/unassembled/Platform.py +++ b/python-tornado/unassembled/Platform.py @@ -43,6 +43,7 @@ class Pubnub(PubnubCoreAsync):          self.headers['V'] = self.version          self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)          self.id = None +        self._channel_list_lock = None      def _request( self, request, callback, single=False ) :          url = self.getUrl(request) diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py index 3bc2d35..7171efe 100644 --- a/python-twisted/Pubnub.py +++ b/python-twisted/Pubnub.py @@ -176,6 +176,14 @@ import time  import hashlib  import uuid  import sys +from urllib  import quote + +from base64  import urlsafe_b64encode +from hashlib import sha256 +from urllib  import quote +from urllib  import urlopen + +import hmac  class PubnubBase(object):      def __init__( @@ -184,6 +192,7 @@ class PubnubBase(object):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          UUID = None @@ -213,6 +222,7 @@ class PubnubBase(object):          self.secret_key    = secret_key          self.cipher_key    = cipher_key          self.ssl           = ssl_on +        self.auth_key      = auth_key          if self.ssl : @@ -247,6 +257,86 @@ class PubnubBase(object):              signature = '0'          return signature +    def _pam_sign( self, msg ): +        """Calculate a signature by secret key and message.""" + +        return urlsafe_b64encode(hmac.new( +            self.secret_key.encode("utf-8"), +            msg.encode("utf-8"), +            sha256 +        ).digest()) + +    def _pam_auth( self, query , apicode=0, callback=None): +        """Issue an authenticated request.""" + +        if 'timestamp' not in query: +            query['timestamp'] = int(time.time()) + +        ## Global Grant? +        if 'auth' in query and not query['auth']: +            del query['auth'] + +        if 'channel' in query and not query['channel']: +            del query['channel'] + +        params = "&".join([ +            x + "=" + quote( +                str(query[x]), safe="" +            ) for x in sorted(query) +        ]) +        sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format( +            subkey=self.subscribe_key, +            pubkey=self.publish_key, +            apitype="audit" if (apicode) else "grant", +            params=params +        ) + +        signature = self._pam_sign(sign_input) + +        ''' +        url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") + +            self.subscribe_key + "?" + +            params + "&signature=" + +            quote(signature, safe="")) +        ''' + +        return self._request({"urlcomponents": [ +            'v1', 'auth', "audit" if (apicode) else "grant" ,  +            'sub-key', +            self.subscribe_key +        ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}},  +        self._return_wrapped_callback(callback)) + +    def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None): +        """Grant Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None): +        """Revoke Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def audit(self, channel=False, authkey=False, callback=None): +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey +        },1, callback=callback) +             + +      def encrypt(self, message):          if self.cipher_key:              message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n','')) @@ -318,7 +408,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) +        ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -520,7 +610,7 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and 'callback' in args : +        if args and 'callback' in args:              callback = args['callback']          else :              callback = None  @@ -547,7 +637,7 @@ class PubnubBase(object):                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]])          if ("urlparams" in request): -            url = url + '?' + "&".join([ x + "=" + y  for x,y in request["urlparams"].items()]) +            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items()])          return url @@ -558,6 +648,9 @@ except ImportError:      import Crypto.Hash.SHA256 as digestmod      sha256 = digestmod.new  import hmac +import threading +from threading import current_thread +import threading  class PubnubCoreAsync(PubnubBase): @@ -570,6 +663,7 @@ class PubnubCoreAsync(PubnubBase):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          uuid = None @@ -596,6 +690,7 @@ class PubnubCoreAsync(PubnubBase):              subscribe_key=subscribe_key,              secret_key=secret_key,              cipher_key=cipher_key, +            auth_key=auth_key,              ssl_on=ssl_on,              origin=origin,              UUID=uuid @@ -603,22 +698,36 @@ class PubnubCoreAsync(PubnubBase):          self.subscriptions = {}          self.timetoken     = 0 +        self.last_timetoken = 0          self.version       = '3.3.4'          self.accept_encoding = 'gzip'          self.SUB_RECEIVER  = None          self._connect    = None +        self._tt_lock    = threading.RLock()      def get_channel_list(self, channels):          channel = ''          first = True -        for ch in channels: -            if not channels[ch]['subscribed']: -                continue -            if not first: -                channel += ',' -            else: -                first = False -            channel += ch +        if self._channel_list_lock: +            with self._channel_list_lock: +                for ch in channels: +                    if not channels[ch]['subscribed']: +                        continue +                    if not first: +                        channel += ',' +                    else: +                        first = False +                    channel += ch +        else: +            for ch in channels: +                if not channels[ch]['subscribed']: +                    continue +                if not first: +                    channel += ',' +                else: +                    first = False +                channel += ch +          return channel      def subscribe( self, args=None, sync=False ) : @@ -653,6 +762,26 @@ class PubnubCoreAsync(PubnubBase):          })          """ +        if args is None: +            _invoke(error, "Arguments Missing") +            return +        channel         = args['channel'] if 'channel' in args else None +        callback        = args['callback'] if 'callback' in args else None +        connect         = args['connect'] if 'connect' in args else None +        disconnect      = args['disconnect'] if 'disconnect' in args else None +        reconnect       = args['reconnect'] if 'reconnect' in args else None +        error           = args['error'] if 'error' in args else None + +        with self._tt_lock: +            self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken +            self.timetoken = 0 + +        if channel is None: +            _invoke(error, "Channel Missing") +            return +        if callback is None: +            _invoke(error, "Callback Missing") +            return          if sync is True and self.susbcribe_sync is not None:              self.susbcribe_sync(args) @@ -666,18 +795,20 @@ class PubnubCoreAsync(PubnubBase):                      func()          def _invoke_connect(): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                if chobj['connected'] is False: -                    chobj['connected'] = True -                    _invoke(chobj['connect']) +            if self._channel_list_lock: +                with self._channel_list_lock: +                    for ch in self.subscriptions: +                        chobj = self.subscriptions[ch] +                        if chobj['connected'] is False: +                            chobj['connected'] = True +                            _invoke(chobj['connect'],chobj['name'])          def _invoke_error(err=None):              for ch in self.subscriptions:                  chobj = self.subscriptions[ch]                  _invoke(chobj.error,err) - +        '''          if callback is None:              _invoke(error, "Callback Missing")              return @@ -685,6 +816,7 @@ class PubnubCoreAsync(PubnubBase):          if channel is None:              _invoke(error, "Channel Missing")              return +        '''          def _get_channel():              for ch in self.subscriptions: @@ -695,22 +827,36 @@ class PubnubCoreAsync(PubnubBase):          ## New Channel?          if not channel in self.subscriptions: -            self.subscriptions[channel] = { -                'name'          : channel, -                'first'         : False, -                'connected'     : False, -                'subscribed'    : True, -                'callback'      : callback, -                'connect'       : connect, -                'disconnect'    : disconnect, -                'reconnect'     : reconnect -            } +            if self._channel_list_lock: +                with self._channel_list_lock: +                    self.subscriptions[channel] = { +                        'name'          : channel, +                        'first'         : False, +                        'connected'     : False, +                        'subscribed'    : True, +                        'callback'      : callback, +                        'connect'       : connect, +                        'disconnect'    : disconnect, +                        'reconnect'     : reconnect +                    } +            else: +                self.subscriptions[channel] = { +                    'name'          : channel, +                    'first'         : False, +                    'connected'     : False, +                    'subscribed'    : True, +                    'callback'      : callback, +                    'connect'       : connect, +                    'disconnect'    : disconnect, +                    'reconnect'     : reconnect +                }          ## return if already connected to channel -        if self.subscriptions[channel]['connected'] : +        if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:              _invoke(error, "Already Connected")              return +                      ## SUBSCRIPTION RECURSION           def _connect(): @@ -718,37 +864,37 @@ class PubnubCoreAsync(PubnubBase):              self._reset_offline()              def sub_callback(response): -                print response                  ## ERROR ?                  if not response or error in response:                      _invoke_error()                  _invoke_connect() - -                self.timetoken = response[1] - -                if len(response) > 2: -                    channel_list = response[2].split(',') -                    response_list = response[0] -                    for ch in enumerate(channel_list): -                        if ch[1] in self.subscriptions: -                            chobj = self.subscriptions[ch[1]] -                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) -                else: -                    response_list = response[0] -                    chobj = _get_channel() -                    for r in response_list: -                        if chobj: -                            _invoke(chobj['callback'], self.decrypt(r)) - - -                _connect() +                with self._tt_lock: +                    #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    if len(response) > 2: +                        channel_list = response[2].split(',') +                        response_list = response[0] +                        for ch in enumerate(channel_list): +                            if ch[1] in self.subscriptions: +                                chobj = self.subscriptions[ch[1]] +                                _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                    else: +                        response_list = response[0] +                        chobj = _get_channel() +                        for r in response_list: +                            if chobj: +                                _invoke(chobj['callback'], self.decrypt(r)) + +                    #with self._tt_lock: +                    #    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    _connect()              channel_list = self.get_channel_list(self.subscriptions) -            print channel_list              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try:                  self.SUB_RECEIVER = self._request( { "urlcomponents" : [ @@ -759,6 +905,7 @@ class PubnubCoreAsync(PubnubBase):                      str(self.timetoken)                  ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )              except Exception as e: +                print e                  self.timeout( 1, _connect)                  return @@ -838,6 +985,7 @@ class Pubnub(PubnubCoreAsync):          self.headers['User-Agent'] = ['Python-Twisted']          #self.headers['Accept-Encoding'] = [self.accept_encoding]          self.headers['V'] = [self.version] +        self._channel_list_lock = None      def _request( self, request, callback, single=False ) :          global pnconn_pool diff --git a/python-twisted/tests/subscribe-test.py b/python-twisted/tests/subscribe-test.py new file mode 100755 index 0000000..0d4c65e --- /dev/null +++ b/python-twisted/tests/subscribe-test.py @@ -0,0 +1,148 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud.  +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + +## ----------------------------------- +## PubNub 3.1 Real-time Push Cloud API +## ----------------------------------- + +import sys +sys.path.append('../') +import datetime +from Pubnub import Pubnub +from functools import partial +from threading import current_thread +import threading +publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key    = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key    = len(sys.argv) > 4 and sys.argv[4] or None +ssl_on        = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +#pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) +pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on ) +crazy  = 'hello_world' + +current = -1 + +errors = 0 +received = 0 + +## ----------------------------------------------------------------------- +## Subscribe Example +## ----------------------------------------------------------------------- +def message_received(message): +    print message + +def check_received(message): +    global current +    global errors +    global received +    print message +    print current +    if message <= current: +        print 'ERROR' +        #sys.exit() +        errors += 1 +    else: +        received += 1 +    print 'active thread count : ', threading.activeCount() +    print 'errors = ' , errors +    print current_thread().getName(), ' , ', 'received = ', received + +    if received != message: +        print '********** MISSED **************** ', message - received  +    current = message +     + +def connected_test(ch) : +    print 'Connected' , ch + +def connected(ch) : +    pass + + +''' +pubnub.subscribe({ +    'channel'  : 'abcd1', +    'connect'  : connected, +    'callback' : message_received +}) +''' +def cb1(): +	pubnub.subscribe({ +	    'channel'  : 'efgh1', +	    'connect'  : connected, +	    'callback' : message_received +	}) + +def cb2(): +	pubnub.subscribe({ +	    'channel'  : 'dsm-test', +	    'connect'  : connected_test, +	    'callback' : check_received +	}) + +def cb3(): +    pubnub.unsubscribe({'channel' : 'efgh1'}) + +def cb4(): +    pubnub.unsubscribe({'channel' : 'abcd1'}) + +def subscribe(channel): +	pubnub.subscribe({ +	    'channel'  : channel, +	    'connect'  : connected, +	    'callback' : message_received +	}) + + +print threading.activeCount() + + +pubnub.timeout(15,cb1) + +pubnub.timeout(30,cb2) + + +pubnub.timeout(45,cb3) + +pubnub.timeout(60,cb4) + +#''' +for x in range(1,1000): +    #print x +    def y(t): +        subscribe('channel-' + str(t)) + +    def z(t): +        pubnub.unsubscribe({'channel' : 'channel-' + str(t)}) + +    pubnub.timeout(x + 5, partial(y,x)) +    pubnub.timeout(x + 25, partial(z, x))  +    x += 10 +#''' + +''' +for x in range(1,1000): +    def cb(r): print r , ' : ', threading.activeCount() +    def y(t): +        pubnub.publish({ +            'message' : t, +            'callback' : cb, +            'channel' : 'dsm-test' +        }) + + +    pubnub.timeout(x + 1, partial(y,x)) +    x += 1 +''' + + +pubnub.start() diff --git a/python-twisted/unassembled/Platform.py b/python-twisted/unassembled/Platform.py index 3b84b30..5268446 100644 --- a/python-twisted/unassembled/Platform.py +++ b/python-twisted/unassembled/Platform.py @@ -44,6 +44,7 @@ class Pubnub(PubnubCoreAsync):          self.headers['User-Agent'] = ['Python-Twisted']          #self.headers['Accept-Encoding'] = [self.accept_encoding]          self.headers['V'] = [self.version] +        self._channel_list_lock = None      def _request( self, request, callback, single=False ) :          global pnconn_pool diff --git a/python/Pubnub.py b/python/Pubnub.py index a449c2d..f3c518c 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -176,6 +176,14 @@ import time  import hashlib  import uuid  import sys +from urllib  import quote + +from base64  import urlsafe_b64encode +from hashlib import sha256 +from urllib  import quote +from urllib  import urlopen + +import hmac  class PubnubBase(object):      def __init__( @@ -184,6 +192,7 @@ class PubnubBase(object):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          UUID = None @@ -213,6 +222,7 @@ class PubnubBase(object):          self.secret_key    = secret_key          self.cipher_key    = cipher_key          self.ssl           = ssl_on +        self.auth_key      = auth_key          if self.ssl : @@ -247,6 +257,86 @@ class PubnubBase(object):              signature = '0'          return signature +    def _pam_sign( self, msg ): +        """Calculate a signature by secret key and message.""" + +        return urlsafe_b64encode(hmac.new( +            self.secret_key.encode("utf-8"), +            msg.encode("utf-8"), +            sha256 +        ).digest()) + +    def _pam_auth( self, query , apicode=0, callback=None): +        """Issue an authenticated request.""" + +        if 'timestamp' not in query: +            query['timestamp'] = int(time.time()) + +        ## Global Grant? +        if 'auth' in query and not query['auth']: +            del query['auth'] + +        if 'channel' in query and not query['channel']: +            del query['channel'] + +        params = "&".join([ +            x + "=" + quote( +                str(query[x]), safe="" +            ) for x in sorted(query) +        ]) +        sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format( +            subkey=self.subscribe_key, +            pubkey=self.publish_key, +            apitype="audit" if (apicode) else "grant", +            params=params +        ) + +        signature = self._pam_sign(sign_input) + +        ''' +        url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") + +            self.subscribe_key + "?" + +            params + "&signature=" + +            quote(signature, safe="")) +        ''' + +        return self._request({"urlcomponents": [ +            'v1', 'auth', "audit" if (apicode) else "grant" ,  +            'sub-key', +            self.subscribe_key +        ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}},  +        self._return_wrapped_callback(callback)) + +    def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None): +        """Grant Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None): +        """Revoke Access on a Channel.""" + +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey, +            "r"       : read  and 1 or 0, +            "w"       : write and 1 or 0, +            "ttl"     : ttl +        }, callback=callback) + +    def audit(self, channel=False, authkey=False, callback=None): +        return self._pam_auth({ +            "channel" : channel, +            "auth"    : authkey +        },1, callback=callback) +             + +      def encrypt(self, message):          if self.cipher_key:              message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n','')) @@ -318,7 +408,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) +        ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -520,7 +610,7 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and 'callback' in args : +        if args and 'callback' in args:              callback = args['callback']          else :              callback = None  @@ -547,7 +637,7 @@ class PubnubBase(object):                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]])          if ("urlparams" in request): -            url = url + '?' + "&".join([ x + "=" + y  for x,y in request["urlparams"].items()]) +            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items()])          return url @@ -558,6 +648,9 @@ except ImportError:      import Crypto.Hash.SHA256 as digestmod      sha256 = digestmod.new  import hmac +import threading +from threading import current_thread +import threading  class PubnubCoreAsync(PubnubBase): @@ -570,6 +663,7 @@ class PubnubCoreAsync(PubnubBase):          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          uuid = None @@ -596,6 +690,7 @@ class PubnubCoreAsync(PubnubBase):              subscribe_key=subscribe_key,              secret_key=secret_key,              cipher_key=cipher_key, +            auth_key=auth_key,              ssl_on=ssl_on,              origin=origin,              UUID=uuid @@ -603,22 +698,36 @@ class PubnubCoreAsync(PubnubBase):          self.subscriptions = {}          self.timetoken     = 0 +        self.last_timetoken = 0          self.version       = '3.3.4'          self.accept_encoding = 'gzip'          self.SUB_RECEIVER  = None          self._connect    = None +        self._tt_lock    = threading.RLock()      def get_channel_list(self, channels):          channel = ''          first = True -        for ch in channels: -            if not channels[ch]['subscribed']: -                continue -            if not first: -                channel += ',' -            else: -                first = False -            channel += ch +        if self._channel_list_lock: +            with self._channel_list_lock: +                for ch in channels: +                    if not channels[ch]['subscribed']: +                        continue +                    if not first: +                        channel += ',' +                    else: +                        first = False +                    channel += ch +        else: +            for ch in channels: +                if not channels[ch]['subscribed']: +                    continue +                if not first: +                    channel += ',' +                else: +                    first = False +                channel += ch +          return channel      def subscribe( self, args=None, sync=False ) : @@ -653,6 +762,26 @@ class PubnubCoreAsync(PubnubBase):          })          """ +        if args is None: +            _invoke(error, "Arguments Missing") +            return +        channel         = args['channel'] if 'channel' in args else None +        callback        = args['callback'] if 'callback' in args else None +        connect         = args['connect'] if 'connect' in args else None +        disconnect      = args['disconnect'] if 'disconnect' in args else None +        reconnect       = args['reconnect'] if 'reconnect' in args else None +        error           = args['error'] if 'error' in args else None + +        with self._tt_lock: +            self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken +            self.timetoken = 0 + +        if channel is None: +            _invoke(error, "Channel Missing") +            return +        if callback is None: +            _invoke(error, "Callback Missing") +            return          if sync is True and self.susbcribe_sync is not None:              self.susbcribe_sync(args) @@ -666,18 +795,20 @@ class PubnubCoreAsync(PubnubBase):                      func()          def _invoke_connect(): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                if chobj['connected'] is False: -                    chobj['connected'] = True -                    _invoke(chobj['connect']) +            if self._channel_list_lock: +                with self._channel_list_lock: +                    for ch in self.subscriptions: +                        chobj = self.subscriptions[ch] +                        if chobj['connected'] is False: +                            chobj['connected'] = True +                            _invoke(chobj['connect'],chobj['name'])          def _invoke_error(err=None):              for ch in self.subscriptions:                  chobj = self.subscriptions[ch]                  _invoke(chobj.error,err) - +        '''          if callback is None:              _invoke(error, "Callback Missing")              return @@ -685,6 +816,7 @@ class PubnubCoreAsync(PubnubBase):          if channel is None:              _invoke(error, "Channel Missing")              return +        '''          def _get_channel():              for ch in self.subscriptions: @@ -695,22 +827,36 @@ class PubnubCoreAsync(PubnubBase):          ## New Channel?          if not channel in self.subscriptions: -            self.subscriptions[channel] = { -                'name'          : channel, -                'first'         : False, -                'connected'     : False, -                'subscribed'    : True, -                'callback'      : callback, -                'connect'       : connect, -                'disconnect'    : disconnect, -                'reconnect'     : reconnect -            } +            if self._channel_list_lock: +                with self._channel_list_lock: +                    self.subscriptions[channel] = { +                        'name'          : channel, +                        'first'         : False, +                        'connected'     : False, +                        'subscribed'    : True, +                        'callback'      : callback, +                        'connect'       : connect, +                        'disconnect'    : disconnect, +                        'reconnect'     : reconnect +                    } +            else: +                self.subscriptions[channel] = { +                    'name'          : channel, +                    'first'         : False, +                    'connected'     : False, +                    'subscribed'    : True, +                    'callback'      : callback, +                    'connect'       : connect, +                    'disconnect'    : disconnect, +                    'reconnect'     : reconnect +                }          ## return if already connected to channel -        if self.subscriptions[channel]['connected'] : +        if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:              _invoke(error, "Already Connected")              return +                      ## SUBSCRIPTION RECURSION           def _connect(): @@ -718,37 +864,37 @@ class PubnubCoreAsync(PubnubBase):              self._reset_offline()              def sub_callback(response): -                print response                  ## ERROR ?                  if not response or error in response:                      _invoke_error()                  _invoke_connect() - -                self.timetoken = response[1] - -                if len(response) > 2: -                    channel_list = response[2].split(',') -                    response_list = response[0] -                    for ch in enumerate(channel_list): -                        if ch[1] in self.subscriptions: -                            chobj = self.subscriptions[ch[1]] -                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) -                else: -                    response_list = response[0] -                    chobj = _get_channel() -                    for r in response_list: -                        if chobj: -                            _invoke(chobj['callback'], self.decrypt(r)) - - -                _connect() +                with self._tt_lock: +                    #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken +                    if len(response) > 2: +                        channel_list = response[2].split(',') +                        response_list = response[0] +                        for ch in enumerate(channel_list): +                            if ch[1] in self.subscriptions: +                                chobj = self.subscriptions[ch[1]] +                                _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                    else: +                        response_list = response[0] +                        chobj = _get_channel() +                        for r in response_list: +                            if chobj: +                                _invoke(chobj['callback'], self.decrypt(r)) + +                    #with self._tt_lock: +                    #    self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] +                    _connect()              channel_list = self.get_channel_list(self.subscriptions) -            print channel_list              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try:                  self.SUB_RECEIVER = self._request( { "urlcomponents" : [ @@ -759,6 +905,7 @@ class PubnubCoreAsync(PubnubBase):                      str(self.timetoken)                  ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )              except Exception as e: +                print e                  self.timeout( 1, _connect)                  return @@ -800,8 +947,11 @@ except:  import threading  import json  import time +import threading +from threading import current_thread -current_req_id = -1 +latest_sub_callback_lock = threading.RLock() +latest_sub_callback = {'id' : None, 'callback' : None}  class HTTPClient:      def __init__(self, url, callback, id=None): @@ -815,23 +965,32 @@ class HTTPClient:          self.callback = None      def run(self): -        global current_req_id          data = urllib2.urlopen(self.url, timeout=310).read()          if self.stop is True:              return -        if self.id is not None and current_req_id != self.id: -            return -        if self.callback is not None: +        if self.callback is None: +            global latest_sub_callback +            global latest_sub_callback_lock +            with latest_sub_callback_lock: +                if latest_sub_callback['id'] != self.id: +                    return +                else: +                    print(data) +                    if latest_sub_callback['callback'] is not None: +                        latest_sub_callback['id'] = 0 +                        latest_sub_callback['callback'](json.loads(data)) +        else:              self.callback(json.loads(data)) -class Pubnub(PubnubCore): +class Pubnub(PubnubCoreAsync):      def __init__(          self,          publish_key,          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          pres_uuid = None @@ -841,6 +1000,7 @@ class Pubnub(PubnubCore):              subscribe_key = subscribe_key,              secret_key = secret_key,              cipher_key = cipher_key, +            auth_key = auth_key,              ssl_on = ssl_on,              origin = origin,              uuid = pres_uuid @@ -849,6 +1009,7 @@ class Pubnub(PubnubCore):              self._request = self._request2          else:              self._request = self._request3 +        self._channel_list_lock = threading.RLock()      def timeout(self, interval, func):          def cb(): @@ -858,13 +1019,14 @@ class Pubnub(PubnubCore):          thread.start()      def _request2_async( self, request, callback, single=False ) : -        global current_req_id          ## Build URL          url = self.getUrl(request)          if single is True:              id = time.time() -            client = HTTPClient(url, callback, id) -            current_req_id = id +            client = HTTPClient(url, None, id) +            with latest_sub_callback_lock: +                latest_sub_callback['id'] = id +                latest_sub_callback['callback'] = callback          else:              client = HTTPClient(url, callback) @@ -879,7 +1041,6 @@ class Pubnub(PubnubCore):          ## Build URL          url = self.getUrl(request) -          ## Send Request Expecting JSONP Response          try:              try: usock = urllib2.urlopen( url, None, 310 ) @@ -887,15 +1048,16 @@ class Pubnub(PubnubCore):              response = usock.read()              usock.close()              resp_json = json.loads(response) -        except: +        except Exception as e: +            print e              return None -            return resp_json +        return resp_json      def _request2(self, request, callback=None, single=False):          if callback is None: -            return self._request2_sync(request,single=single) +            return self._request2_sync(request)          else:              self._request2_async(request, callback, single=single) diff --git a/python/tests/subscribe-test.py b/python/tests/subscribe-test.py new file mode 100755 index 0000000..0d4c65e --- /dev/null +++ b/python/tests/subscribe-test.py @@ -0,0 +1,148 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud.  +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + +## ----------------------------------- +## PubNub 3.1 Real-time Push Cloud API +## ----------------------------------- + +import sys +sys.path.append('../') +import datetime +from Pubnub import Pubnub +from functools import partial +from threading import current_thread +import threading +publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key    = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key    = len(sys.argv) > 4 and sys.argv[4] or None +ssl_on        = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +#pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) +pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on ) +crazy  = 'hello_world' + +current = -1 + +errors = 0 +received = 0 + +## ----------------------------------------------------------------------- +## Subscribe Example +## ----------------------------------------------------------------------- +def message_received(message): +    print message + +def check_received(message): +    global current +    global errors +    global received +    print message +    print current +    if message <= current: +        print 'ERROR' +        #sys.exit() +        errors += 1 +    else: +        received += 1 +    print 'active thread count : ', threading.activeCount() +    print 'errors = ' , errors +    print current_thread().getName(), ' , ', 'received = ', received + +    if received != message: +        print '********** MISSED **************** ', message - received  +    current = message +     + +def connected_test(ch) : +    print 'Connected' , ch + +def connected(ch) : +    pass + + +''' +pubnub.subscribe({ +    'channel'  : 'abcd1', +    'connect'  : connected, +    'callback' : message_received +}) +''' +def cb1(): +	pubnub.subscribe({ +	    'channel'  : 'efgh1', +	    'connect'  : connected, +	    'callback' : message_received +	}) + +def cb2(): +	pubnub.subscribe({ +	    'channel'  : 'dsm-test', +	    'connect'  : connected_test, +	    'callback' : check_received +	}) + +def cb3(): +    pubnub.unsubscribe({'channel' : 'efgh1'}) + +def cb4(): +    pubnub.unsubscribe({'channel' : 'abcd1'}) + +def subscribe(channel): +	pubnub.subscribe({ +	    'channel'  : channel, +	    'connect'  : connected, +	    'callback' : message_received +	}) + + +print threading.activeCount() + + +pubnub.timeout(15,cb1) + +pubnub.timeout(30,cb2) + + +pubnub.timeout(45,cb3) + +pubnub.timeout(60,cb4) + +#''' +for x in range(1,1000): +    #print x +    def y(t): +        subscribe('channel-' + str(t)) + +    def z(t): +        pubnub.unsubscribe({'channel' : 'channel-' + str(t)}) + +    pubnub.timeout(x + 5, partial(y,x)) +    pubnub.timeout(x + 25, partial(z, x))  +    x += 10 +#''' + +''' +for x in range(1,1000): +    def cb(r): print r , ' : ', threading.activeCount() +    def y(t): +        pubnub.publish({ +            'message' : t, +            'callback' : cb, +            'channel' : 'dsm-test' +        }) + + +    pubnub.timeout(x + 1, partial(y,x)) +    x += 1 +''' + + +pubnub.start() diff --git a/python/unassembled/Platform.py b/python/unassembled/Platform.py index f0f9327..22893f8 100644 --- a/python/unassembled/Platform.py +++ b/python/unassembled/Platform.py @@ -6,8 +6,11 @@ except:  import threading  import json  import time +import threading +from threading import current_thread -current_req_id = -1 +latest_sub_callback_lock = threading.RLock() +latest_sub_callback = {'id' : None, 'callback' : None}  class HTTPClient:      def __init__(self, url, callback, id=None): @@ -21,23 +24,32 @@ class HTTPClient:          self.callback = None      def run(self): -        global current_req_id          data = urllib2.urlopen(self.url, timeout=310).read()          if self.stop is True:              return -        if self.id is not None and current_req_id != self.id: -            return -        if self.callback is not None: +        if self.callback is None: +            global latest_sub_callback +            global latest_sub_callback_lock +            with latest_sub_callback_lock: +                if latest_sub_callback['id'] != self.id: +                    return +                else: +                    print(data) +                    if latest_sub_callback['callback'] is not None: +                        latest_sub_callback['id'] = 0 +                        latest_sub_callback['callback'](json.loads(data)) +        else:              self.callback(json.loads(data)) -class Pubnub(PubnubCore): +class Pubnub(PubnubCoreAsync):      def __init__(          self,          publish_key,          subscribe_key,          secret_key = False,          cipher_key = False, +        auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com',          pres_uuid = None @@ -47,6 +59,7 @@ class Pubnub(PubnubCore):              subscribe_key = subscribe_key,              secret_key = secret_key,              cipher_key = cipher_key, +            auth_key = auth_key,              ssl_on = ssl_on,              origin = origin,              uuid = pres_uuid @@ -55,6 +68,7 @@ class Pubnub(PubnubCore):              self._request = self._request2          else:              self._request = self._request3 +        self._channel_list_lock = threading.RLock()      def timeout(self, interval, func):          def cb(): @@ -64,13 +78,14 @@ class Pubnub(PubnubCore):          thread.start()      def _request2_async( self, request, callback, single=False ) : -        global current_req_id          ## Build URL          url = self.getUrl(request)          if single is True:              id = time.time() -            client = HTTPClient(url, callback, id) -            current_req_id = id +            client = HTTPClient(url, None, id) +            with latest_sub_callback_lock: +                latest_sub_callback['id'] = id +                latest_sub_callback['callback'] = callback          else:              client = HTTPClient(url, callback) @@ -85,7 +100,6 @@ class Pubnub(PubnubCore):          ## Build URL          url = self.getUrl(request) -          ## Send Request Expecting JSONP Response          try:              try: usock = urllib2.urlopen( url, None, 310 ) @@ -93,15 +107,16 @@ class Pubnub(PubnubCore):              response = usock.read()              usock.close()              resp_json = json.loads(response) -        except: +        except Exception as e: +            print e              return None -            return resp_json +        return resp_json      def _request2(self, request, callback=None, single=False):          if callback is None: -            return self._request2_sync(request,single=single) +            return self._request2_sync(request)          else:              self._request2_async(request, callback, single=single) | 
