diff options
| author | Devendra | 2014-04-16 00:00:40 +0530 | 
|---|---|---|
| committer | Devendra | 2014-04-16 00:00:40 +0530 | 
| commit | 150ae1566d813acbb773839e919db2c0f467931c (patch) | |
| tree | 6f74d6dcdcb0ecff6d8a51988d8a461b6f9a4668 /python | |
| parent | 99096b8c11b9a541f6350639e8735495cf90091c (diff) | |
| download | pubnub-python-150ae1566d813acbb773839e919db2c0f467931c.tar.bz2 | |
adding code to support async and pam client capabilities with python v2 and v3
Diffstat (limited to 'python')
| -rw-r--r-- | python/Pubnub.py | 376 | ||||
| -rwxr-xr-x | python/examples/publish-example.py | 87 | ||||
| -rwxr-xr-x | python/tests/subscribe-test.py | 19 | ||||
| -rw-r--r-- | python/unassembled/Platform.py | 120 | 
4 files changed, 355 insertions, 247 deletions
| diff --git a/python/Pubnub.py b/python/Pubnub.py index f3c518c..95eafd0 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -176,12 +176,13 @@ import time  import hashlib  import uuid  import sys -from urllib  import quote + +try: from urllib.parse  import quote +except: from urllib2 import quote  from base64  import urlsafe_b64encode  from hashlib import sha256 -from urllib  import quote -from urllib  import urlopen +  import hmac @@ -233,12 +234,11 @@ class PubnubBase(object):          self.uuid = UUID or str(uuid.uuid4())          if type(sys.version_info) is tuple: -            self.python_version = 2 -            self.pc             = PubnubCrypto2() +            self.python_version  = 2 +            self.pc              = PubnubCrypto2()          else:              self.python_version = 3              self.pc             = PubnubCrypto3() -          if not isinstance(self.uuid, str):              raise AttributeError("pres_uuid must be a string") @@ -357,7 +357,10 @@ class PubnubBase(object):                  if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})              else:                  if (callback != None):callback(response) -        if (callback != None): return _new_format_callback +        if (callback != None): +            return _new_format_callback +        else: +            return None      def publish( self, args ) : @@ -392,23 +395,28 @@ class PubnubBase(object):          if 'callback' in args :              callback = args['callback']          else : -            callback = None  +            callback = None + +        if 'error' in args : +            error = args['error'] +        else : +            error = None -        #message = json.dumps(args['message'], separators=(',',':'))          message = self.encrypt(args['message']) -        signature = self.sign(channel, message) +        #signature = self.sign(channel, message)          ## Send Message          return self._request({"urlcomponents": [              'publish',              self.publish_key,              self.subscribe_key, -            signature, +            '0',              channel,              '0',              message -        ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) +        ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),  +        error=self._return_wrapped_callback(error))      def presence( self, args ) :          """ @@ -472,12 +480,10 @@ class PubnubBase(object):          """          channel = str(args['channel']) -        ## Capture Callback -        if 'callback' in args : -            callback = args['callback'] -        else : -            callback = None -         + +        callback    = args['callback']  if 'callback'  in args else None +        error       = args['error']     if 'error'     in args else None +          ## Fail if bad input.          if not channel :              raise Exception('Missing Channel') @@ -488,59 +494,16 @@ class PubnubBase(object):              'v2','presence',              'sub_key', self.subscribe_key,              'channel', channel -        ]}, callback); -         -         -    def history( self, args ) : +        ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),  +        error=self._return_wrapped_callback(error)) + +    def history(self, args) :          """          #**          #* History          #*          #* Load history from a channel.          #* -        #* @param array args with 'channel' and 'limit'. -        #* @return mixed false on fail, array on success. -        #* - -        ## History Example -        history = pubnub.history({ -            'channel' : 'hello_world', -            'limit'   : 1 -        }) -        print(history) - -        """ -        ## Capture User Input -        limit   = 'limit' in args and int(args['limit']) or 10 -        channel = str(args['channel']) - -        ## Fail if bad input. -        if not channel : -            raise Exception('Missing Channel') -            return False - -        ## Capture Callback -        if 'callback' in args : -            callback = args['callback'] -        else : -            callback = None - -        ## Get History -        return self._request({ "urlcomponents" : [ -            'history', -            self.subscribe_key, -            channel, -            '0', -            str(limit) -        ] }, callback); - -    def detailedHistory(self, args) : -        """ -        #** -        #* Detailed History -        #* -        #* Load Detailed history from a channel. -        #*          #* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count'          #* @return mixed false on fail, array on success.          #* @@ -556,34 +519,21 @@ class PubnubBase(object):          ## Capture User Input          channel = str(args['channel']) -        params = dict()  -        count = 100     -         -        if 'count' in args: -            count = int(args['count']) +        callback            = args['callback']      if 'callback'  in args else None +        error               = args['error']         if 'error'     in args else None -        params['count'] = str(count)     -         -        if 'reverse' in args: -            params['reverse'] = str(args['reverse']).lower() - -        if 'start' in args: -            params['start'] = str(args['start']) +        params = dict()  -        if 'end' in args: -            params['end'] = str(args['end']) +        params['count']     = str(args['count'])           if 'count'   in args else 100 +        params['reverse']   = str(args['reverse']).lower() if 'reverse' in args else 'false' +        params['start']     = str(args['start'])           if 'start'   in args else None +        params['end']       = str(args['end'])             if 'end'     in args else None          ## Fail if bad input.          if not channel :              raise Exception('Missing Channel')              return False -        ## Capture Callback -        if 'callback' in args : -            callback = args['callback'] -        else : -            callback = None  -          ## Get History          return self._request({ 'urlcomponents' : [              'v2', @@ -592,7 +542,8 @@ class PubnubBase(object):              self.subscribe_key,              'channel',              channel, -        ],'urlparams' : params }, callback=callback); +        ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),  +        error=self._return_wrapped_callback(error))      def time(self, args = None) :          """ @@ -610,10 +561,9 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and 'callback' in args: -            callback = args['callback'] -        else : -            callback = None  + +        callback = callback if args and 'callback' in args else None +          time = self._request({'urlcomponents' : [              'time',              '0' @@ -637,7 +587,8 @@ class PubnubBase(object):                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]])          if ("urlparams" in request): -            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items()]) +            url = url + '?' + "&".join([ x + "=" + str(y)  for x,y in request["urlparams"].items() if y is not None]) +        #print(url)          return url @@ -648,9 +599,14 @@ except ImportError:      import Crypto.Hash.SHA256 as digestmod      sha256 = digestmod.new  import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): +    def __enter__(self): +        pass +    def __exit__(self,a,b,c): +        pass + +empty_lock = EmptyLock()  class PubnubCoreAsync(PubnubBase): @@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase):          auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com', -        uuid = None +        uuid = None, +        _tt_lock=empty_lock, +        _channel_list_lock=empty_lock      ) :          """          #** @@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase):              UUID=uuid          )         -        self.subscriptions = {} -        self.timetoken     = 0 -        self.last_timetoken = 0 -        self.version       = '3.3.4' -        self.accept_encoding = 'gzip' -        self.SUB_RECEIVER  = None -        self._connect    = None -        self._tt_lock    = threading.RLock() +        self.subscriptions              = {} +        self.timetoken                  = 0 +        self.last_timetoken             = 0 +        self.version                    = '3.3.4' +        self.accept_encoding            = 'gzip' +        self.SUB_RECEIVER               = None +        self._connect                   = None +        self._tt_lock                   = _tt_lock +        self._channel_list_lock         = _channel_list_lock      def get_channel_list(self, channels):          channel = ''          first = True -        if self._channel_list_lock: -            with self._channel_list_lock: -                for ch in channels: -                    if not channels[ch]['subscribed']: -                        continue -                    if not first: -                        channel += ',' -                    else: -                        first = False -                    channel += ch -        else: +        with self._channel_list_lock:              for ch in channels:                  if not channels[ch]['subscribed']:                      continue @@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase):                  else:                      first = False                  channel += ch -          return channel + +    def each(l, func): +        if func is None: +            return +        for i in l: +            func(i) +      def subscribe( self, args=None, sync=False ) :          """          #** @@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase):          if args is None:              _invoke(error, "Arguments Missing")              return -        channel         = args['channel'] if 'channel' in args else None -        callback        = args['callback'] if 'callback' in args else None -        connect         = args['connect'] if 'connect' in args else None -        disconnect      = args['disconnect'] if 'disconnect' in args else None -        reconnect       = args['reconnect'] if 'reconnect' in args else None -        error           = args['error'] if 'error' in args else None +        channel         = args['channel']       if 'channel'    in args else None +        callback        = args['callback']      if 'callback'   in args else None +        connect         = args['connect']       if 'connect'    in args else None +        disconnect      = args['disconnect']    if 'disconnect' in args else None +        reconnect       = args['reconnect']     if 'reconnect'  in args else None +        error           = args['error']         if 'error'      in args else None          with self._tt_lock:              self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase):                              chobj['connected'] = True                              _invoke(chobj['connect'],chobj['name']) -        def _invoke_error(err=None): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                _invoke(chobj.error,err) +        def _invoke_error(channel_list=None, err=None): +            if channel_list is None: +                for ch in self.subscriptions: +                    chobj = self.subscriptions[ch] +                    _invoke(chobj['error'],err) +            else: +                for ch in channel_list: +                    chobj = self.subscriptions[ch] +                    _invoke(chobj['error'],err)          '''          if callback is None: @@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase):          ## New Channel?          if not channel in self.subscriptions: -            if self._channel_list_lock: -                with self._channel_list_lock: -                    self.subscriptions[channel] = { -                        'name'          : channel, -                        'first'         : False, -                        'connected'     : False, -                        'subscribed'    : True, -                        'callback'      : callback, -                        'connect'       : connect, -                        'disconnect'    : disconnect, -                        'reconnect'     : reconnect -                    } -            else: +            with self._channel_list_lock:                  self.subscriptions[channel] = {                      'name'          : channel,                      'first'         : False, @@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase):                      'callback'      : callback,                      'connect'       : connect,                      'disconnect'    : disconnect, -                    'reconnect'     : reconnect +                    'reconnect'     : reconnect, +                    'error'         : error                  } +          ## return if already connected to channel          if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:              _invoke(error, "Already Connected") @@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase):              def sub_callback(response):                  ## ERROR ? -                if not response or error in response: -                    _invoke_error() +                #print response +                if not response or ('message' in response and response['message'] == 'Forbidden'): +                    _invoke_error(response['payload']['channels'], response['message']) +                    _connect() +                    return                  _invoke_connect() @@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase):                      _connect() -              channel_list = self.get_channel_list(self.subscriptions)              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try: @@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase):                      channel_list,                      '0',                      str(self.timetoken) -                ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) +                ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True )              except Exception as e: -                print e +                print(e)                  self.timeout( 1, _connect)                  return @@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase):      def unsubscribe( self, args ): -        #print(args['channel']) -        channel = str(args['channel']) -        if not (channel in self.subscriptions): + +        if 'channel' in self.subscriptions is False:              return False +        channel = str(args['channel']) + +          ## DISCONNECT -        self.subscriptions[channel]['connected'] = 0 -        self.subscriptions[channel]['subscribed'] = False -        self.subscriptions[channel]['timetoken'] = 0 -        self.subscriptions[channel]['first']     = False +        with self._channel_list_lock: +            if channel in self.subscriptions: +                self.subscriptions[channel]['connected']    = 0 +                self.subscriptions[channel]['subscribed']   = False +                self.subscriptions[channel]['timetoken']    = 0 +                self.subscriptions[channel]['first']        = False          self.CONNECT() @@ -953,19 +909,43 @@ from threading import current_thread  latest_sub_callback_lock = threading.RLock()  latest_sub_callback = {'id' : None, 'callback' : None} + + +  class HTTPClient: -    def __init__(self, url, callback, id=None): +    def __init__(self, url, urllib_func=None, callback=None, error=None, id=None):          self.url = url          self.id = id          self.callback = callback +        self.error = error          self.stop = False +        self._urllib_func = urllib_func      def cancel(self):          self.stop = True          self.callback = None +        self.error = None +      def run(self): -        data = urllib2.urlopen(self.url, timeout=310).read() + +        def _invoke(func, data): +            if func is not None: +                func(data) + +        if self._urllib_func is None: +            return + +        ''' +        try: +            resp = urllib2.urlopen(self.url, timeout=320) +        except urllib2.HTTPError as http_error: +            resp = http_error +        ''' +        resp = self._urllib_func(self.url, timeout=320) +        data = resp[0] +        code = resp[1] +          if self.stop is True:              return          if self.callback is None: @@ -975,14 +955,49 @@ class HTTPClient:                  if latest_sub_callback['id'] != self.id:                      return                  else: -                    print(data)                      if latest_sub_callback['callback'] is not None:                          latest_sub_callback['id'] = 0 -                        latest_sub_callback['callback'](json.loads(data)) +                        try: +                            data = json.loads(data) +                        except: +                            _invoke(latest_sub_callback['error'], {'error' : 'json decoding error'}) +                            return +                        if code != 200: +                            _invoke(latest_sub_callback['error'],data) +                        else: +                            _invoke(latest_sub_callback['callback'],data)          else: -            self.callback(json.loads(data)) +            try: +                data = json.loads(data) +            except: +                _invoke(self.error, {'error' : 'json decoding error'}) +                return + +            if code != 200: +                _invoke(self.error,data) +            else: +                _invoke(self.callback,data) +def _urllib_request_2(url, timeout=320): +    try: +        resp = urllib2.urlopen(url,timeout=timeout) +    except urllib2.HTTPError as http_error: +        resp = http_error +    return (resp.read(),resp.code) + +def _urllib_request_3(url, timeout=320): +    #print(url) +    try: +        resp = urllib.request.urlopen(url,timeout=timeout) +    except urllib.request.HTTPError as http_error: +        resp = http_error +    r =   resp.read().decode("utf-8") +    #print(r) +    return (r,resp.code) + +_urllib_request = None +  class Pubnub(PubnubCoreAsync):      def __init__(          self, @@ -1003,13 +1018,15 @@ class Pubnub(PubnubCoreAsync):              auth_key = auth_key,              ssl_on = ssl_on,              origin = origin, -            uuid = pres_uuid +            uuid = pres_uuid, +            _tt_lock=threading.RLock(), +            _channel_list_lock=threading.RLock()          ) +        global _urllib_request          if self.python_version == 2: -            self._request = self._request2 +            _urllib_request = _urllib_request_2          else: -            self._request = self._request3 -        self._channel_list_lock = threading.RLock() +            _urllib_request = _urllib_request_3      def timeout(self, interval, func):          def cb(): @@ -1018,17 +1035,20 @@ class Pubnub(PubnubCoreAsync):          thread = threading.Thread(target=cb)          thread.start() -    def _request2_async( self, request, callback, single=False ) : + +    def _request_async( self, request, callback=None, error=None, single=False ) : +        global _urllib_request          ## Build URL          url = self.getUrl(request)          if single is True:              id = time.time() -            client = HTTPClient(url, None, id) +            client = HTTPClient(url=url, urllib_func=_urllib_request, callback=None, error=None, id=id)              with latest_sub_callback_lock:                  latest_sub_callback['id'] = id                  latest_sub_callback['callback'] = callback +                latest_sub_callback['error'] = error          else: -            client = HTTPClient(url, callback) +            client = HTTPClient(url=url, urllib_func=_urllib_request, callback=callback, error=error)          thread = threading.Thread(target=client.run)          thread.start() @@ -1037,31 +1057,30 @@ class Pubnub(PubnubCoreAsync):          return abort -    def _request2_sync( self, request) : - +    def _request_sync( self, request) : +        global _urllib_request          ## Build URL          url = self.getUrl(request)          ## Send Request Expecting JSONP Response +        response = _urllib_request(url, timeout=320)          try: -            try: usock = urllib2.urlopen( url, None, 310 ) -            except TypeError: usock = urllib2.urlopen( url, None ) -            response = usock.read() -            usock.close() -            resp_json = json.loads(response) -        except Exception as e: -            print e -            return None -             +            resp_json = json.loads(response[0]) +        except: +            return [0,"JSON Error"] + +        if response[1] != 200 and 'status' in resp_json: +            return {'message' : resp_json['message'], 'payload' : resp_json['payload']} +          return resp_json -    def _request2(self, request, callback=None, single=False): +    def _request(self, request, callback=None, error=None, single=False):          if callback is None: -            return self._request2_sync(request) +            return self._request_sync(request)          else: -            self._request2_async(request, callback, single=single) - +            self._request_async(request, callback, error, single=single) +'''      def _request3_sync( self, request) :          ## Build URL @@ -1083,3 +1102,4 @@ class Pubnub(PubnubCoreAsync):              return self._request3_sync(request,single=single)          else:              self._request3_async(request, callback, single=single) +            ''' diff --git a/python/examples/publish-example.py b/python/examples/publish-example.py index 31ae198..bb8b199 100755 --- a/python/examples/publish-example.py +++ b/python/examples/publish-example.py @@ -1,43 +1,68 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud.  +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + +## ----------------------------------- +## PubNub 3.1 Real-time Push Cloud API +## ----------------------------------- +  import sys -sys.path.append('.') -sys.path.append('..') +from twisted.internet import reactor +sys.path.append('../') +sys.path.append('../../')  from Pubnub import Pubnub -## Initiate Class -pubnub = Pubnub( publish_key='demo', subscribe_key='demo', cipher_key='enigma', ssl_on=False ) -#pubnub = Pubnub( publish_key='demo', subscribe_key='demo', ssl_on=False ) +publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key    = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key    = len(sys.argv) > 4 and sys.argv[4] or ''     ##(Cipher key is Optional) +ssl_on        = len(sys.argv) > 5 and bool(sys.argv[5]) or False +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) +crazy  = 'hello_world' + +## -----------------------------------------------------------------------  ## Publish Example -info = pubnub.publish({ -    'channel' : 'abcd', -    'message' : { -        'iam' : 'object' -    } -}) -print(info) +## ----------------------------------------------------------------------- +def publish_complete(info): +    print(info) -info = pubnub.publish({ -    'channel' : 'abcd', -    'message' : "hi I am string" -}) -print(info) +def publish_error(info): +    print('ERROR : ' +  str(info)) -info = pubnub.publish({ -    'channel' : 'abcd', -    'message' : 1234 +## Publish string +pubnub.publish({ +    'channel' : crazy, +    'message' : 'Hello World!', +    'callback' : publish_complete, +    'error' : publish_error  }) -print(info) -info = pubnub.publish({ -    'channel' : 'abcd', -    'message' : "1234" +## Publish list +li = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'] + +pubnub.publish({ +    'channel' : crazy, +    'message' : li, +    'callback' : publish_complete, +    'error' : publish_error  }) -print(info) -info = pubnub.publish({ -    'channel' : 'abcd', -    'message' : [ -        'i' , 'am', 'array' -    ] +def done_cb(info): +    publish_complete(info) + +pubnub.publish({ +    'channel' : crazy, +    'message' : { 'some_key' : 'some_val' }, +    'callback' : done_cb, +    'error' : publish_error  }) -print(info) + + +pubnub.start() diff --git a/python/tests/subscribe-test.py b/python/tests/subscribe-test.py index 0d4c65e..be4a416 100755 --- a/python/tests/subscribe-test.py +++ b/python/tests/subscribe-test.py @@ -38,31 +38,31 @@ received = 0  ## Subscribe Example  ## -----------------------------------------------------------------------  def message_received(message): -    print message +    print(message)  def check_received(message):      global current      global errors      global received -    print message -    print current +    print(message) +    print(current)      if message <= current: -        print 'ERROR' +        print('ERROR')          #sys.exit()          errors += 1      else:          received += 1 -    print 'active thread count : ', threading.activeCount() -    print 'errors = ' , errors -    print current_thread().getName(), ' , ', 'received = ', received +    print('active thread count : ' + str( threading.activeCount())) +    print('errors = ' + str(errors)) +    print(current_thread().getName() +  ' , ' +  'received = ' +  str(received))      if received != message: -        print '********** MISSED **************** ', message - received  +        print('********** MISSED **************** ' + str( message - received ))      current = message  def connected_test(ch) : -    print 'Connected' , ch +    print('Connected '  +  ch)  def connected(ch) :      pass @@ -103,7 +103,6 @@ def subscribe(channel):  	}) -print threading.activeCount()  pubnub.timeout(15,cb1) diff --git a/python/unassembled/Platform.py b/python/unassembled/Platform.py index 22893f8..0ffccbb 100644 --- a/python/unassembled/Platform.py +++ b/python/unassembled/Platform.py @@ -12,19 +12,43 @@ from threading import current_thread  latest_sub_callback_lock = threading.RLock()  latest_sub_callback = {'id' : None, 'callback' : None} + + +  class HTTPClient: -    def __init__(self, url, callback, id=None): +    def __init__(self, url, urllib_func=None, callback=None, error=None, id=None):          self.url = url          self.id = id          self.callback = callback +        self.error = error          self.stop = False +        self._urllib_func = urllib_func      def cancel(self):          self.stop = True          self.callback = None +        self.error = None +      def run(self): -        data = urllib2.urlopen(self.url, timeout=310).read() + +        def _invoke(func, data): +            if func is not None: +                func(data) + +        if self._urllib_func is None: +            return + +        ''' +        try: +            resp = urllib2.urlopen(self.url, timeout=320) +        except urllib2.HTTPError as http_error: +            resp = http_error +        ''' +        resp = self._urllib_func(self.url, timeout=320) +        data = resp[0] +        code = resp[1] +          if self.stop is True:              return          if self.callback is None: @@ -34,13 +58,48 @@ class HTTPClient:                  if latest_sub_callback['id'] != self.id:                      return                  else: -                    print(data)                      if latest_sub_callback['callback'] is not None:                          latest_sub_callback['id'] = 0 -                        latest_sub_callback['callback'](json.loads(data)) +                        try: +                            data = json.loads(data) +                        except: +                            _invoke(latest_sub_callback['error'], {'error' : 'json decoding error'}) +                            return +                        if code != 200: +                            _invoke(latest_sub_callback['error'],data) +                        else: +                            _invoke(latest_sub_callback['callback'],data)          else: -            self.callback(json.loads(data)) - +            try: +                data = json.loads(data) +            except: +                _invoke(self.error, {'error' : 'json decoding error'}) +                return + +            if code != 200: +                _invoke(self.error,data) +            else: +                _invoke(self.callback,data) + + +def _urllib_request_2(url, timeout=320): +    try: +        resp = urllib2.urlopen(url,timeout=timeout) +    except urllib2.HTTPError as http_error: +        resp = http_error +    return (resp.read(),resp.code) + +def _urllib_request_3(url, timeout=320): +    #print(url) +    try: +        resp = urllib.request.urlopen(url,timeout=timeout) +    except urllib.request.HTTPError as http_error: +        resp = http_error +    r =   resp.read().decode("utf-8") +    #print(r) +    return (r,resp.code) + +_urllib_request = None  class Pubnub(PubnubCoreAsync):      def __init__( @@ -62,13 +121,15 @@ class Pubnub(PubnubCoreAsync):              auth_key = auth_key,              ssl_on = ssl_on,              origin = origin, -            uuid = pres_uuid +            uuid = pres_uuid, +            _tt_lock=threading.RLock(), +            _channel_list_lock=threading.RLock()          ) +        global _urllib_request          if self.python_version == 2: -            self._request = self._request2 +            _urllib_request = _urllib_request_2          else: -            self._request = self._request3 -        self._channel_list_lock = threading.RLock() +            _urllib_request = _urllib_request_3      def timeout(self, interval, func):          def cb(): @@ -77,17 +138,20 @@ class Pubnub(PubnubCoreAsync):          thread = threading.Thread(target=cb)          thread.start() -    def _request2_async( self, request, callback, single=False ) : + +    def _request_async( self, request, callback=None, error=None, single=False ) : +        global _urllib_request          ## Build URL          url = self.getUrl(request)          if single is True:              id = time.time() -            client = HTTPClient(url, None, id) +            client = HTTPClient(url=url, urllib_func=_urllib_request, callback=None, error=None, id=id)              with latest_sub_callback_lock:                  latest_sub_callback['id'] = id                  latest_sub_callback['callback'] = callback +                latest_sub_callback['error'] = error          else: -            client = HTTPClient(url, callback) +            client = HTTPClient(url=url, urllib_func=_urllib_request, callback=callback, error=error)          thread = threading.Thread(target=client.run)          thread.start() @@ -96,31 +160,30 @@ class Pubnub(PubnubCoreAsync):          return abort -    def _request2_sync( self, request) : - +    def _request_sync( self, request) : +        global _urllib_request          ## Build URL          url = self.getUrl(request)          ## Send Request Expecting JSONP Response +        response = _urllib_request(url, timeout=320)          try: -            try: usock = urllib2.urlopen( url, None, 310 ) -            except TypeError: usock = urllib2.urlopen( url, None ) -            response = usock.read() -            usock.close() -            resp_json = json.loads(response) -        except Exception as e: -            print e -            return None -             +            resp_json = json.loads(response[0]) +        except: +            return [0,"JSON Error"] + +        if response[1] != 200 and 'status' in resp_json: +            return {'message' : resp_json['message'], 'payload' : resp_json['payload']} +          return resp_json -    def _request2(self, request, callback=None, single=False): +    def _request(self, request, callback=None, error=None, single=False):          if callback is None: -            return self._request2_sync(request) +            return self._request_sync(request)          else: -            self._request2_async(request, callback, single=single) - +            self._request_async(request, callback, error, single=single) +'''      def _request3_sync( self, request) :          ## Build URL @@ -142,3 +205,4 @@ class Pubnub(PubnubCoreAsync):              return self._request3_sync(request,single=single)          else:              self._request3_async(request, callback, single=single) +            ''' | 
