diff options
Diffstat (limited to 'python-tornado')
| -rw-r--r-- | python-tornado/Pubnub.py | 286 | ||||
| -rw-r--r-- | python-tornado/examples/publish-example.py | 35 | ||||
| -rwxr-xr-x | python-tornado/tests/subscribe-test.py | 19 | ||||
| -rw-r--r-- | python-tornado/unassembled/Platform.py | 28 | 
4 files changed, 180 insertions, 188 deletions
| diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py index 61f7c3d..ccff021 100644 --- a/python-tornado/Pubnub.py +++ b/python-tornado/Pubnub.py @@ -176,12 +176,13 @@ import time  import hashlib  import uuid  import sys -from urllib  import quote + +try: from urllib.parse  import quote +except: from urllib2 import quote  from base64  import urlsafe_b64encode  from hashlib import sha256 -from urllib  import quote -from urllib  import urlopen +  import hmac @@ -233,12 +234,11 @@ class PubnubBase(object):          self.uuid = UUID or str(uuid.uuid4())          if type(sys.version_info) is tuple: -            self.python_version = 2 -            self.pc             = PubnubCrypto2() +            self.python_version  = 2 +            self.pc              = PubnubCrypto2()          else:              self.python_version = 3              self.pc             = PubnubCrypto3() -          if not isinstance(self.uuid, str):              raise AttributeError("pres_uuid must be a string") @@ -357,7 +357,10 @@ class PubnubBase(object):                  if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})              else:                  if (callback != None):callback(response) -        if (callback != None): return _new_format_callback +        if (callback != None): +            return _new_format_callback +        else: +            return None      def publish( self, args ) : @@ -392,23 +395,28 @@ class PubnubBase(object):          if 'callback' in args :              callback = args['callback']          else : -            callback = None  +            callback = None + +        if 'error' in args : +            error = args['error'] +        else : +            error = None -        #message = json.dumps(args['message'], separators=(',',':'))          message = self.encrypt(args['message']) -        signature = self.sign(channel, message) +        #signature = self.sign(channel, message)          ## Send Message          return self._request({"urlcomponents": [              'publish',              self.publish_key,              self.subscribe_key, -            signature, +            '0',              channel,              '0',              message -        ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) +        ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),  +        error=self._return_wrapped_callback(error))      def presence( self, args ) :          """ @@ -472,12 +480,10 @@ class PubnubBase(object):          """          channel = str(args['channel']) -        ## Capture Callback -        if 'callback' in args : -            callback = args['callback'] -        else : -            callback = None -         + +        callback    = args['callback']  if 'callback'  in args else None +        error       = args['error']     if 'error'     in args else None +          ## Fail if bad input.          if not channel :              raise Exception('Missing Channel') @@ -488,59 +494,16 @@ class PubnubBase(object):              'v2','presence',              'sub_key', self.subscribe_key,              'channel', channel -        ]}, callback); -         -         -    def history( self, args ) : +        ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),  +        error=self._return_wrapped_callback(error)) + +    def history(self, args) :          """          #**          #* History          #*          #* Load history from a channel.          #* -        #* @param array args with 'channel' and 'limit'. -        #* @return mixed false on fail, array on success. -        #* - -        ## History Example -        history = pubnub.history({ -            'channel' : 'hello_world', -            'limit'   : 1 -        }) -        print(history) - -        """ -        ## Capture User Input -        limit   = 'limit' in args and int(args['limit']) or 10 -        channel = str(args['channel']) - -        ## Fail if bad input. -        if not channel : -            raise Exception('Missing Channel') -            return False - -        ## Capture Callback -        if 'callback' in args : -            callback = args['callback'] -        else : -            callback = None - -        ## Get History -        return self._request({ "urlcomponents" : [ -            'history', -            self.subscribe_key, -            channel, -            '0', -            str(limit) -        ] }, callback); - -    def detailedHistory(self, args) : -        """ -        #** -        #* Detailed History -        #* -        #* Load Detailed history from a channel. -        #*          #* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count'          #* @return mixed false on fail, array on success.          #* @@ -556,34 +519,21 @@ class PubnubBase(object):          ## Capture User Input          channel = str(args['channel']) -        params = dict()  -        count = 100     -         -        if 'count' in args: -            count = int(args['count']) - -        params['count'] = str(count)     -         -        if 'reverse' in args: -            params['reverse'] = str(args['reverse']).lower() +        callback            = args['callback']      if 'callback'  in args else None +        error               = args['error']         if 'error'     in args else None -        if 'start' in args: -            params['start'] = str(args['start']) +        params = dict()  -        if 'end' in args: -            params['end'] = str(args['end']) +        params['count']     = str(args['count'])           if 'count'   in args else 100 +        params['reverse']   = str(args['reverse']).lower() if 'reverse' in args else 'false' +        params['start']     = str(args['start'])           if 'start'   in args else None +        params['end']       = str(args['end'])             if 'end'     in args else None          ## Fail if bad input.          if not channel :              raise Exception('Missing Channel')              return False -        ## Capture Callback -        if 'callback' in args : -            callback = args['callback'] -        else : -            callback = None  -          ## Get History          return self._request({ 'urlcomponents' : [              'v2', @@ -592,7 +542,8 @@ class PubnubBase(object):              self.subscribe_key,              'channel',              channel, -        ],'urlparams' : params }, callback=callback); +        ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),  +        error=self._return_wrapped_callback(error))      def time(self, args = None) :          """ @@ -610,10 +561,9 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and 'callback' in args: -            callback = args['callback'] -        else : -            callback = None  + +        callback = callback if args and 'callback' in args else None +          time = self._request({'urlcomponents' : [              'time',              '0' @@ -637,7 +587,8 @@ class PubnubBase(object):                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]])          if ("urlparams" in request): -            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items()]) +            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items() if y is not None]) +        #print(url)          return url @@ -648,9 +599,14 @@ except ImportError:      import Crypto.Hash.SHA256 as digestmod      sha256 = digestmod.new  import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): +    def __enter__(self): +        pass +    def __exit__(self,a,b,c): +        pass + +empty_lock = EmptyLock()  class PubnubCoreAsync(PubnubBase): @@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase):          auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com', -        uuid = None +        uuid = None, +        _tt_lock=empty_lock, +        _channel_list_lock=empty_lock      ) :          """          #** @@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase):              UUID=uuid          )         -        self.subscriptions = {} -        self.timetoken     = 0 -        self.last_timetoken = 0 -        self.version       = '3.3.4' -        self.accept_encoding = 'gzip' -        self.SUB_RECEIVER  = None -        self._connect    = None -        self._tt_lock    = threading.RLock() +        self.subscriptions              = {} +        self.timetoken                  = 0 +        self.last_timetoken             = 0 +        self.version                    = '3.3.4' +        self.accept_encoding            = 'gzip' +        self.SUB_RECEIVER               = None +        self._connect                   = None +        self._tt_lock                   = _tt_lock +        self._channel_list_lock         = _channel_list_lock      def get_channel_list(self, channels):          channel = ''          first = True -        if self._channel_list_lock: -            with self._channel_list_lock: -                for ch in channels: -                    if not channels[ch]['subscribed']: -                        continue -                    if not first: -                        channel += ',' -                    else: -                        first = False -                    channel += ch -        else: +        with self._channel_list_lock:              for ch in channels:                  if not channels[ch]['subscribed']:                      continue @@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase):                  else:                      first = False                  channel += ch -          return channel + +    def each(l, func): +        if func is None: +            return +        for i in l: +            func(i) +      def subscribe( self, args=None, sync=False ) :          """          #** @@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase):          if args is None:              _invoke(error, "Arguments Missing")              return -        channel         = args['channel'] if 'channel' in args else None -        callback        = args['callback'] if 'callback' in args else None -        connect         = args['connect'] if 'connect' in args else None -        disconnect      = args['disconnect'] if 'disconnect' in args else None -        reconnect       = args['reconnect'] if 'reconnect' in args else None -        error           = args['error'] if 'error' in args else None +        channel         = args['channel']       if 'channel'    in args else None +        callback        = args['callback']      if 'callback'   in args else None +        connect         = args['connect']       if 'connect'    in args else None +        disconnect      = args['disconnect']    if 'disconnect' in args else None +        reconnect       = args['reconnect']     if 'reconnect'  in args else None +        error           = args['error']         if 'error'      in args else None          with self._tt_lock:              self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase):                              chobj['connected'] = True                              _invoke(chobj['connect'],chobj['name']) -        def _invoke_error(err=None): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                _invoke(chobj.error,err) +        def _invoke_error(channel_list=None, err=None): +            if channel_list is None: +                for ch in self.subscriptions: +                    chobj = self.subscriptions[ch] +                    _invoke(chobj['error'],err) +            else: +                for ch in channel_list: +                    chobj = self.subscriptions[ch] +                    _invoke(chobj['error'],err)          '''          if callback is None: @@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase):          ## New Channel?          if not channel in self.subscriptions: -            if self._channel_list_lock: -                with self._channel_list_lock: -                    self.subscriptions[channel] = { -                        'name'          : channel, -                        'first'         : False, -                        'connected'     : False, -                        'subscribed'    : True, -                        'callback'      : callback, -                        'connect'       : connect, -                        'disconnect'    : disconnect, -                        'reconnect'     : reconnect -                    } -            else: +            with self._channel_list_lock:                  self.subscriptions[channel] = {                      'name'          : channel,                      'first'         : False, @@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase):                      'callback'      : callback,                      'connect'       : connect,                      'disconnect'    : disconnect, -                    'reconnect'     : reconnect +                    'reconnect'     : reconnect, +                    'error'         : error                  } +          ## return if already connected to channel          if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:              _invoke(error, "Already Connected") @@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase):              def sub_callback(response):                  ## ERROR ? -                if not response or error in response: -                    _invoke_error() +                #print response +                if not response or ('message' in response and response['message'] == 'Forbidden'): +                    _invoke_error(response['payload']['channels'], response['message']) +                    _connect() +                    return                  _invoke_connect() @@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase):                      _connect() -              channel_list = self.get_channel_list(self.subscriptions)              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try: @@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase):                      channel_list,                      '0',                      str(self.timetoken) -                ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) +                ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True )              except Exception as e: -                print e +                print(e)                  self.timeout( 1, _connect)                  return @@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase):      def unsubscribe( self, args ): -        #print(args['channel']) -        channel = str(args['channel']) -        if not (channel in self.subscriptions): + +        if 'channel' in self.subscriptions is False:              return False +        channel = str(args['channel']) + +          ## DISCONNECT -        self.subscriptions[channel]['connected'] = 0 -        self.subscriptions[channel]['subscribed'] = False -        self.subscriptions[channel]['timetoken'] = 0 -        self.subscriptions[channel]['first']     = False +        with self._channel_list_lock: +            if channel in self.subscriptions: +                self.subscriptions[channel]['connected']    = 0 +                self.subscriptions[channel]['subscribed']   = False +                self.subscriptions[channel]['timetoken']    = 0 +                self.subscriptions[channel]['first']        = False          self.CONNECT() @@ -984,9 +940,13 @@ class Pubnub(PubnubCoreAsync):          self.headers['V'] = self.version          self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)          self.id = None -        self._channel_list_lock = None +         +    def _request( self, request, callback=None, error=None, single=False ) : + +        def _invoke(func, data): +            if func is not None: +                func(data) -    def _request( self, request, callback, single=False ) :          url = self.getUrl(request)          request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 )          if single is True: @@ -997,18 +957,30 @@ class Pubnub(PubnubCoreAsync):              if single is True:                  if not id == self.id:                      return None  - +                                  body = response._get_body() +              if body is None:                  return - +            #print(body)              def handle_exc(*args):                  return True              if response.error is not None:                  with ExceptionStackContext(handle_exc):                      response.rethrow() -            elif callback: -                callback(eval(response._get_body())) +                    return +            try: +                data = json.loads(body) +            except TypeError as e: +                try: +                    data = json.loads(body.decode("utf-8")) +                except: +                    _invoke(error, {'error' : 'json decode error'}) + +            if 'error' in data and 'status' in data and 'status' != 200: +                _invoke(error, data) +            else: +                _invoke(callback, data)          self.http.fetch(              request=request, diff --git a/python-tornado/examples/publish-example.py b/python-tornado/examples/publish-example.py index b9eaa15..bb8b199 100644 --- a/python-tornado/examples/publish-example.py +++ b/python-tornado/examples/publish-example.py @@ -10,54 +10,59 @@  ## -----------------------------------  import sys -import tornado +from twisted.internet import reactor  sys.path.append('../') -sys.path.append('../..') -sys.path.append('../../common') +sys.path.append('../../')  from Pubnub import Pubnub  publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo'  subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'  secret_key    = len(sys.argv) > 3 and sys.argv[3] or 'demo' -cipher_key    = len(sys.argv) > 4 and sys.argv[4] or 'demo' ##(Cipher key is Optional) +cipher_key    = len(sys.argv) > 4 and sys.argv[4] or ''     ##(Cipher key is Optional)  ssl_on        = len(sys.argv) > 5 and bool(sys.argv[5]) or False  ## -----------------------------------------------------------------------  ## Initiate Pubnub State  ## ----------------------------------------------------------------------- -pubnub = Pubnub( publish_key=publish_key, subscribe_key=subscribe_key, secret_key=secret_key,cipher_key=cipher_key, ssl_on=ssl_on ) -#pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on ) +pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )  crazy  = 'hello_world' +## ----------------------------------------------------------------------- +## Publish Example +## -----------------------------------------------------------------------  def publish_complete(info):      print(info) +def publish_error(info): +    print('ERROR : ' +  str(info)) +  ## Publish string  pubnub.publish({      'channel' : crazy,      'message' : 'Hello World!', -    'callback' : publish_complete +    'callback' : publish_complete, +    'error' : publish_error  })  ## Publish list  li = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'] +  pubnub.publish({      'channel' : crazy,      'message' : li, -    'callback' : publish_complete +    'callback' : publish_complete, +    'error' : publish_error  })  def done_cb(info):      publish_complete(info) -    tornado.ioloop.IOLoop.instance().stop() -## Publish Dictionary Object  pubnub.publish({      'channel' : crazy,      'message' : { 'some_key' : 'some_val' }, -    'callback' : done_cb +    'callback' : done_cb, +    'error' : publish_error  }) -## ----------------------------------------------------------------------- -## IO Event Loop -## ----------------------------------------------------------------------- -tornado.ioloop.IOLoop.instance().start() + + +pubnub.start() diff --git a/python-tornado/tests/subscribe-test.py b/python-tornado/tests/subscribe-test.py index 0d4c65e..be4a416 100755 --- a/python-tornado/tests/subscribe-test.py +++ b/python-tornado/tests/subscribe-test.py @@ -38,31 +38,31 @@ received = 0  ## Subscribe Example  ## -----------------------------------------------------------------------  def message_received(message): -    print message +    print(message)  def check_received(message):      global current      global errors      global received -    print message -    print current +    print(message) +    print(current)      if message <= current: -        print 'ERROR' +        print('ERROR')          #sys.exit()          errors += 1      else:          received += 1 -    print 'active thread count : ', threading.activeCount() -    print 'errors = ' , errors -    print current_thread().getName(), ' , ', 'received = ', received +    print('active thread count : ' + str( threading.activeCount())) +    print('errors = ' + str(errors)) +    print(current_thread().getName() +  ' , ' +  'received = ' +  str(received))      if received != message: -        print '********** MISSED **************** ', message - received  +        print('********** MISSED **************** ' + str( message - received ))      current = message  def connected_test(ch) : -    print 'Connected' , ch +    print('Connected '  +  ch)  def connected(ch) :      pass @@ -103,7 +103,6 @@ def subscribe(channel):  	}) -print threading.activeCount()  pubnub.timeout(15,cb1) diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py index 501993e..5200136 100644 --- a/python-tornado/unassembled/Platform.py +++ b/python-tornado/unassembled/Platform.py @@ -43,9 +43,13 @@ class Pubnub(PubnubCoreAsync):          self.headers['V'] = self.version          self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)          self.id = None -        self._channel_list_lock = None +         +    def _request( self, request, callback=None, error=None, single=False ) : + +        def _invoke(func, data): +            if func is not None: +                func(data) -    def _request( self, request, callback, single=False ) :          url = self.getUrl(request)          request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 )          if single is True: @@ -56,18 +60,30 @@ class Pubnub(PubnubCoreAsync):              if single is True:                  if not id == self.id:                      return None  - +                                  body = response._get_body() +              if body is None:                  return - +            #print(body)              def handle_exc(*args):                  return True              if response.error is not None:                  with ExceptionStackContext(handle_exc):                      response.rethrow() -            elif callback: -                callback(eval(response._get_body())) +                    return +            try: +                data = json.loads(body) +            except TypeError as e: +                try: +                    data = json.loads(body.decode("utf-8")) +                except: +                    _invoke(error, {'error' : 'json decode error'}) + +            if 'error' in data and 'status' in data and 'status' != 200: +                _invoke(error, data) +            else: +                _invoke(callback, data)          self.http.fetch(              request=request, | 
