diff options
| author | Devendra | 2014-04-16 00:00:40 +0530 | 
|---|---|---|
| committer | Devendra | 2014-04-16 00:00:40 +0530 | 
| commit | 150ae1566d813acbb773839e919db2c0f467931c (patch) | |
| tree | 6f74d6dcdcb0ecff6d8a51988d8a461b6f9a4668 /python-tornado/Pubnub.py | |
| parent | 99096b8c11b9a541f6350639e8735495cf90091c (diff) | |
| download | pubnub-python-150ae1566d813acbb773839e919db2c0f467931c.tar.bz2 | |
adding code to support async and pam client capabilities with python v2 and v3
Diffstat (limited to 'python-tornado/Pubnub.py')
| -rw-r--r-- | python-tornado/Pubnub.py | 286 | 
1 files changed, 129 insertions, 157 deletions
| diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py index 61f7c3d..ccff021 100644 --- a/python-tornado/Pubnub.py +++ b/python-tornado/Pubnub.py @@ -176,12 +176,13 @@ import time  import hashlib  import uuid  import sys -from urllib  import quote + +try: from urllib.parse  import quote +except: from urllib2 import quote  from base64  import urlsafe_b64encode  from hashlib import sha256 -from urllib  import quote -from urllib  import urlopen +  import hmac @@ -233,12 +234,11 @@ class PubnubBase(object):          self.uuid = UUID or str(uuid.uuid4())          if type(sys.version_info) is tuple: -            self.python_version = 2 -            self.pc             = PubnubCrypto2() +            self.python_version  = 2 +            self.pc              = PubnubCrypto2()          else:              self.python_version = 3              self.pc             = PubnubCrypto3() -          if not isinstance(self.uuid, str):              raise AttributeError("pres_uuid must be a string") @@ -357,7 +357,10 @@ class PubnubBase(object):                  if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})              else:                  if (callback != None):callback(response) -        if (callback != None): return _new_format_callback +        if (callback != None): +            return _new_format_callback +        else: +            return None      def publish( self, args ) : @@ -392,23 +395,28 @@ class PubnubBase(object):          if 'callback' in args :              callback = args['callback']          else : -            callback = None  +            callback = None + +        if 'error' in args : +            error = args['error'] +        else : +            error = None -        #message = json.dumps(args['message'], separators=(',',':'))          message = self.encrypt(args['message']) -        signature = self.sign(channel, message) +        #signature = self.sign(channel, message)          ## Send Message          return self._request({"urlcomponents": [              'publish',              self.publish_key,              self.subscribe_key, -            signature, +            '0',              channel,              '0',              message -        ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) +        ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),  +        error=self._return_wrapped_callback(error))      def presence( self, args ) :          """ @@ -472,12 +480,10 @@ class PubnubBase(object):          """          channel = str(args['channel']) -        ## Capture Callback -        if 'callback' in args : -            callback = args['callback'] -        else : -            callback = None -         + +        callback    = args['callback']  if 'callback'  in args else None +        error       = args['error']     if 'error'     in args else None +          ## Fail if bad input.          if not channel :              raise Exception('Missing Channel') @@ -488,59 +494,16 @@ class PubnubBase(object):              'v2','presence',              'sub_key', self.subscribe_key,              'channel', channel -        ]}, callback); -         -         -    def history( self, args ) : +        ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),  +        error=self._return_wrapped_callback(error)) + +    def history(self, args) :          """          #**          #* History          #*          #* Load history from a channel.          #* -        #* @param array args with 'channel' and 'limit'. -        #* @return mixed false on fail, array on success. -        #* - -        ## History Example -        history = pubnub.history({ -            'channel' : 'hello_world', -            'limit'   : 1 -        }) -        print(history) - -        """ -        ## Capture User Input -        limit   = 'limit' in args and int(args['limit']) or 10 -        channel = str(args['channel']) - -        ## Fail if bad input. -        if not channel : -            raise Exception('Missing Channel') -            return False - -        ## Capture Callback -        if 'callback' in args : -            callback = args['callback'] -        else : -            callback = None - -        ## Get History -        return self._request({ "urlcomponents" : [ -            'history', -            self.subscribe_key, -            channel, -            '0', -            str(limit) -        ] }, callback); - -    def detailedHistory(self, args) : -        """ -        #** -        #* Detailed History -        #* -        #* Load Detailed history from a channel. -        #*          #* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count'          #* @return mixed false on fail, array on success.          #* @@ -556,34 +519,21 @@ class PubnubBase(object):          ## Capture User Input          channel = str(args['channel']) -        params = dict()  -        count = 100     -         -        if 'count' in args: -            count = int(args['count']) - -        params['count'] = str(count)     -         -        if 'reverse' in args: -            params['reverse'] = str(args['reverse']).lower() +        callback            = args['callback']      if 'callback'  in args else None +        error               = args['error']         if 'error'     in args else None -        if 'start' in args: -            params['start'] = str(args['start']) +        params = dict()  -        if 'end' in args: -            params['end'] = str(args['end']) +        params['count']     = str(args['count'])           if 'count'   in args else 100 +        params['reverse']   = str(args['reverse']).lower() if 'reverse' in args else 'false' +        params['start']     = str(args['start'])           if 'start'   in args else None +        params['end']       = str(args['end'])             if 'end'     in args else None          ## Fail if bad input.          if not channel :              raise Exception('Missing Channel')              return False -        ## Capture Callback -        if 'callback' in args : -            callback = args['callback'] -        else : -            callback = None  -          ## Get History          return self._request({ 'urlcomponents' : [              'v2', @@ -592,7 +542,8 @@ class PubnubBase(object):              self.subscribe_key,              'channel',              channel, -        ],'urlparams' : params }, callback=callback); +        ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),  +        error=self._return_wrapped_callback(error))      def time(self, args = None) :          """ @@ -610,10 +561,9 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and 'callback' in args: -            callback = args['callback'] -        else : -            callback = None  + +        callback = callback if args and 'callback' in args else None +          time = self._request({'urlcomponents' : [              'time',              '0' @@ -637,7 +587,8 @@ class PubnubBase(object):                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]])          if ("urlparams" in request): -            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items()]) +            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items() if y is not None]) +        #print(url)          return url @@ -648,9 +599,14 @@ except ImportError:      import Crypto.Hash.SHA256 as digestmod      sha256 = digestmod.new  import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): +    def __enter__(self): +        pass +    def __exit__(self,a,b,c): +        pass + +empty_lock = EmptyLock()  class PubnubCoreAsync(PubnubBase): @@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase):          auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com', -        uuid = None +        uuid = None, +        _tt_lock=empty_lock, +        _channel_list_lock=empty_lock      ) :          """          #** @@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase):              UUID=uuid          )         -        self.subscriptions = {} -        self.timetoken     = 0 -        self.last_timetoken = 0 -        self.version       = '3.3.4' -        self.accept_encoding = 'gzip' -        self.SUB_RECEIVER  = None -        self._connect    = None -        self._tt_lock    = threading.RLock() +        self.subscriptions              = {} +        self.timetoken                  = 0 +        self.last_timetoken             = 0 +        self.version                    = '3.3.4' +        self.accept_encoding            = 'gzip' +        self.SUB_RECEIVER               = None +        self._connect                   = None +        self._tt_lock                   = _tt_lock +        self._channel_list_lock         = _channel_list_lock      def get_channel_list(self, channels):          channel = ''          first = True -        if self._channel_list_lock: -            with self._channel_list_lock: -                for ch in channels: -                    if not channels[ch]['subscribed']: -                        continue -                    if not first: -                        channel += ',' -                    else: -                        first = False -                    channel += ch -        else: +        with self._channel_list_lock:              for ch in channels:                  if not channels[ch]['subscribed']:                      continue @@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase):                  else:                      first = False                  channel += ch -          return channel + +    def each(l, func): +        if func is None: +            return +        for i in l: +            func(i) +      def subscribe( self, args=None, sync=False ) :          """          #** @@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase):          if args is None:              _invoke(error, "Arguments Missing")              return -        channel         = args['channel'] if 'channel' in args else None -        callback        = args['callback'] if 'callback' in args else None -        connect         = args['connect'] if 'connect' in args else None -        disconnect      = args['disconnect'] if 'disconnect' in args else None -        reconnect       = args['reconnect'] if 'reconnect' in args else None -        error           = args['error'] if 'error' in args else None +        channel         = args['channel']       if 'channel'    in args else None +        callback        = args['callback']      if 'callback'   in args else None +        connect         = args['connect']       if 'connect'    in args else None +        disconnect      = args['disconnect']    if 'disconnect' in args else None +        reconnect       = args['reconnect']     if 'reconnect'  in args else None +        error           = args['error']         if 'error'      in args else None          with self._tt_lock:              self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase):                              chobj['connected'] = True                              _invoke(chobj['connect'],chobj['name']) -        def _invoke_error(err=None): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                _invoke(chobj.error,err) +        def _invoke_error(channel_list=None, err=None): +            if channel_list is None: +                for ch in self.subscriptions: +                    chobj = self.subscriptions[ch] +                    _invoke(chobj['error'],err) +            else: +                for ch in channel_list: +                    chobj = self.subscriptions[ch] +                    _invoke(chobj['error'],err)          '''          if callback is None: @@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase):          ## New Channel?          if not channel in self.subscriptions: -            if self._channel_list_lock: -                with self._channel_list_lock: -                    self.subscriptions[channel] = { -                        'name'          : channel, -                        'first'         : False, -                        'connected'     : False, -                        'subscribed'    : True, -                        'callback'      : callback, -                        'connect'       : connect, -                        'disconnect'    : disconnect, -                        'reconnect'     : reconnect -                    } -            else: +            with self._channel_list_lock:                  self.subscriptions[channel] = {                      'name'          : channel,                      'first'         : False, @@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase):                      'callback'      : callback,                      'connect'       : connect,                      'disconnect'    : disconnect, -                    'reconnect'     : reconnect +                    'reconnect'     : reconnect, +                    'error'         : error                  } +          ## return if already connected to channel          if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:              _invoke(error, "Already Connected") @@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase):              def sub_callback(response):                  ## ERROR ? -                if not response or error in response: -                    _invoke_error() +                #print response +                if not response or ('message' in response and response['message'] == 'Forbidden'): +                    _invoke_error(response['payload']['channels'], response['message']) +                    _connect() +                    return                  _invoke_connect() @@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase):                      _connect() -              channel_list = self.get_channel_list(self.subscriptions)              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try: @@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase):                      channel_list,                      '0',                      str(self.timetoken) -                ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) +                ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True )              except Exception as e: -                print e +                print(e)                  self.timeout( 1, _connect)                  return @@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase):      def unsubscribe( self, args ): -        #print(args['channel']) -        channel = str(args['channel']) -        if not (channel in self.subscriptions): + +        if 'channel' in self.subscriptions is False:              return False +        channel = str(args['channel']) + +          ## DISCONNECT -        self.subscriptions[channel]['connected'] = 0 -        self.subscriptions[channel]['subscribed'] = False -        self.subscriptions[channel]['timetoken'] = 0 -        self.subscriptions[channel]['first']     = False +        with self._channel_list_lock: +            if channel in self.subscriptions: +                self.subscriptions[channel]['connected']    = 0 +                self.subscriptions[channel]['subscribed']   = False +                self.subscriptions[channel]['timetoken']    = 0 +                self.subscriptions[channel]['first']        = False          self.CONNECT() @@ -984,9 +940,13 @@ class Pubnub(PubnubCoreAsync):          self.headers['V'] = self.version          self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)          self.id = None -        self._channel_list_lock = None +         +    def _request( self, request, callback=None, error=None, single=False ) : + +        def _invoke(func, data): +            if func is not None: +                func(data) -    def _request( self, request, callback, single=False ) :          url = self.getUrl(request)          request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 )          if single is True: @@ -997,18 +957,30 @@ class Pubnub(PubnubCoreAsync):              if single is True:                  if not id == self.id:                      return None  - +                                  body = response._get_body() +              if body is None:                  return - +            #print(body)              def handle_exc(*args):                  return True              if response.error is not None:                  with ExceptionStackContext(handle_exc):                      response.rethrow() -            elif callback: -                callback(eval(response._get_body())) +                    return +            try: +                data = json.loads(body) +            except TypeError as e: +                try: +                    data = json.loads(body.decode("utf-8")) +                except: +                    _invoke(error, {'error' : 'json decode error'}) + +            if 'error' in data and 'status' in data and 'status' != 200: +                _invoke(error, data) +            else: +                _invoke(callback, data)          self.http.fetch(              request=request, | 
