diff options
| -rw-r--r-- | common/PubnubBase.py | 12 | ||||
| -rw-r--r-- | common/PubnubCore.py | 4 | ||||
| -rw-r--r-- | common/PubnubCoreAsync.py | 171 | ||||
| -rw-r--r-- | python-tornado/Pubnub.py | 329 | ||||
| -rw-r--r-- | python-tornado/unassembled/Platform.py | 28 | ||||
| -rw-r--r-- | python-twisted/Pubnub.py | 329 | ||||
| -rw-r--r-- | python-twisted/unassembled/Platform.py | 28 | ||||
| -rw-r--r-- | python/Makefile | 2 | ||||
| -rw-r--r-- | python/Pubnub.py | 326 | ||||
| -rw-r--r-- | python/unassembled/Platform.py | 82 | 
10 files changed, 1002 insertions, 309 deletions
| diff --git a/common/PubnubBase.py b/common/PubnubBase.py index 4c5b422..d287be3 100644 --- a/common/PubnubBase.py +++ b/common/PubnubBase.py @@ -90,6 +90,14 @@ class PubnubBase(object):          return message +    def _return_wrapped_callback(self, callback=None): +        def _new_format_callback(response): +            if 'payload' in response: +                if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) +            else: +                if (callback != None):callback(response) +        if (callback != None): return _new_format_callback +      def publish( self, args ) :          """ @@ -139,7 +147,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]}, callback) +        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -179,7 +187,7 @@ class PubnubBase(object):          callback  = args['callback']          subscribe_key = args.get('subscribe_key') or self.subscribe_key -        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) +        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})      def here_now( self, args ) : diff --git a/common/PubnubCore.py b/common/PubnubCore.py index dcfd319..3ed3a68 100644 --- a/common/PubnubCore.py +++ b/common/PubnubCore.py @@ -1,4 +1,4 @@ -class PubnubCore(PubnubBase): +class PubnubCore(PubnubCoreAsync):      def __init__(          self,          publish_key, @@ -44,7 +44,7 @@ class PubnubCore(PubnubBase): -    def subscribe( self, args ) : +    def subscribe_sync( self, args ) :          """          #**          #* Subscribe diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index a7fbb7d..0038243 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -10,8 +10,6 @@ class PubnubCoreAsync(PubnubBase):      def start(self): pass       def stop(self):  pass -    def timeout( self, delay, callback ): -        pass      def __init__(          self, @@ -54,8 +52,23 @@ class PubnubCoreAsync(PubnubBase):          self.timetoken     = 0          self.version       = '3.3.4'          self.accept_encoding = 'gzip' - -    def subscribe( self, args ) : +        self.SUB_RECEIVER  = None +        self._connect    = None + +    def get_channel_list(self, channels): +        channel = '' +        first = True +        for ch in channels: +            if not channels[ch]['subscribed']: +                continue +            if not first: +                channel += ',' +            else: +                first = False +            channel += ch +        return channel + +    def subscribe( self, args=None, sync=False ) :          """          #**          #* Subscribe @@ -87,94 +100,140 @@ class PubnubCoreAsync(PubnubBase):          })          """ -        ## Fail if missing channel -        if not 'channel' in args : -            return 'Missing Channel.' -        ## Fail if missing callback -        if not 'callback' in args : -            return 'Missing Callback.' +        if sync is True and self.susbcribe_sync is not None: +            self.susbcribe_sync(args) +            return + +        def _invoke(func,msg=None): +            if func is not None: +                if msg is not None: +                    func(msg) +                else: +                    func() + +        def _invoke_connect(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['connected'] is False: +                    chobj['connected'] = True +                    _invoke(chobj['connect']) + +        def _invoke_error(err=None): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                _invoke(chobj.error,err) + -        ## Capture User Input -        channel   = str(args['channel']) -        callback  = args['callback'] -        connectcb = args['connect'] +        if callback is None: +            _invoke(error, "Callback Missing") +            return + +        if channel is None: +            _invoke(error, "Channel Missing") +            return + +        def _get_channel(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['subscribed'] is True: +                    return chobj -        if 'errorback' in args: -            errorback = args['errorback'] -        else: -            errorback = lambda x: x          ## New Channel? -        if not (channel in self.subscriptions) : +        if not channel in self.subscriptions:              self.subscriptions[channel] = { -                'first'     : False, -                'connected' : False, +                'name'          : channel, +                'first'         : False, +                'connected'     : False, +                'subscribed'    : True, +                'callback'      : callback, +                'connect'       : connect, +                'disconnect'    : disconnect, +                'reconnect'     : reconnect              } -        ## Ensure Single Connection +        ## return if already connected to channel          if self.subscriptions[channel]['connected'] : -            return "Already Connected" +            _invoke(error, "Already Connected") +            return +             -        self.subscriptions[channel]['connected'] = 1          ## SUBSCRIPTION RECURSION  -        def _subscribe(): -            ## STOP CONNECTION? -            if not self.subscriptions[channel]['connected']: -                return +        def _connect(): +            self._reset_offline() +              def sub_callback(response): -                if not self.subscriptions[channel]['first'] : -                    self.subscriptions[channel]['first'] = True -                    connectcb() +                print response +                ## ERROR ? +                if not response or error in response: +                    _invoke_error() -                ## STOP CONNECTION? -                if not self.subscriptions[channel]['connected']: -                    return +                _invoke_connect() +                self.timetoken = response[1] -                ## PROBLEM? -                if not response: -                    def time_callback(_time): -                        if not _time: -                            self.timeout( 1, _subscribe ) -                            return errorback("Lost Network Connection") -                        else: -                            self.timeout( 1, _subscribe) +                if len(response) > 2: +                    channel_list = response[2].split(',') +                    response_list = response[0] +                    for ch in enumerate(channel_list): +                        if ch[1] in self.subscriptions: +                            chobj = self.subscriptions[ch[1]] +                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                else: +                    response_list = response[0] +                    chobj = _get_channel() +                    for r in response_list: +                        if chobj: +                            _invoke(chobj['callback'], self.decrypt(r)) -                    ## ENSURE CONNECTED (Call Time Function) -                    return self.time({ 'callback' : time_callback }) -                self.timetoken = response[1] -                _subscribe() +                _connect() + -                pc = PubnubCrypto() -                out = [] -                for message in response[0]: -                     callback(self.decrypt(message)) +            channel_list = self.get_channel_list(self.subscriptions) +            print channel_list              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try: -                self._request( { "urlcomponents" : [ +                self.SUB_RECEIVER = self._request( { "urlcomponents" : [                      'subscribe',                      self.subscribe_key, -                    channel, +                    channel_list,                      '0',                      str(self.timetoken) -                ], "urlparams" : {"uuid":self.uuid} }, sub_callback ) -            except : -                self.timeout( 1, _subscribe) +                ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) +            except Exception as e: +                self.timeout( 1, _connect)                  return +        self._connect = _connect + +          ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) -        _subscribe() +        _connect() + +    def _reset_offline(self): +        if self.SUB_RECEIVER is not None: +            self.SUB_RECEIVER() +        self.SUB_RECEIVER = None + +    def CONNECT(self): +        self._reset_offline() +        self._connect() + +      def unsubscribe( self, args ): +        #print(args['channel'])          channel = str(args['channel'])          if not (channel in self.subscriptions):              return False          ## DISCONNECT          self.subscriptions[channel]['connected'] = 0 +        self.subscriptions[channel]['subscribed'] = False          self.subscriptions[channel]['timetoken'] = 0          self.subscriptions[channel]['first']     = False +        self.CONNECT() diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py index 89c0d97..ee66619 100644 --- a/python-tornado/Pubnub.py +++ b/python-tornado/Pubnub.py @@ -16,7 +16,7 @@ from base64 import encodestring, decodestring  import hashlib  import hmac -class PubnubCrypto() : +class PubnubCrypto2() :      """      #**      #* PubnubCrypto @@ -93,13 +93,89 @@ class PubnubCrypto() :          return self.depad((cipher.decrypt(decodestring(msg)))) +class PubnubCrypto3() : +    """ +    #** +    #* PubnubCrypto +    #* +    #** + +    ## Initiate Class +    pc = PubnubCrypto + +    """ +    +    def pad( self, msg, block_size=16 ): +        """ +        #** +        #* pad +        #* +        #* pad the text to be encrypted +        #* appends a padding character to the end of the String +        #* until the string has block_size length +        #* @return msg with padding. +        #** +        """ +        padding = block_size - (len(msg) % block_size) +        return msg + (chr(padding)*padding).encode('utf-8') +        +    def depad( self, msg ): +        """ +        #** +        #* depad +        #* +        #* depad the decryptet message" +        #* @return msg without padding. +        #** +        """ +        return msg[0:-ord(msg[-1])] + +    def getSecret( self, key ): +        """ +        #** +        #* getSecret +        #* +        #* hases the key to MD5 +        #* @return key in MD5 format +        #** +        """ +        return hashlib.sha256(key.encode("utf-8")).hexdigest() + +    def encrypt( self, key, msg ): +        """ +        #** +        #* encrypt +        #* +        #* encrypts the message +        #* @return message in encrypted format +        #** +        """ +        secret = self.getSecret(key) +        Initial16bytes='0123456789012345' +        cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) +        return encodestring(cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') +    def decrypt( self, key, msg ): +        """ +        #** +        #* decrypt +        #* +        #* decrypts the message +        #* @return message in decryped format +        #** +        """ +        secret = self.getSecret(key) +        Initial16bytes='0123456789012345' +        cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) +        return (cipher.decrypt(decodestring(msg.encode('utf-8')))).decode('utf-8') + +  try: import json  except ImportError: import simplejson as json  import time  import hashlib -import urllib2 -import uuid  +import uuid +import sys  class PubnubBase(object):      def __init__( @@ -137,7 +213,7 @@ class PubnubBase(object):          self.secret_key    = secret_key          self.cipher_key    = cipher_key          self.ssl           = ssl_on -        self.pc            = PubnubCrypto() +          if self.ssl :              self.origin = 'https://' + self.origin @@ -145,8 +221,16 @@ class PubnubBase(object):              self.origin = 'http://'  + self.origin          self.uuid = UUID or str(uuid.uuid4()) + +        if type(sys.version_info) is tuple: +            self.python_version = 2 +            self.pc             = PubnubCrypto2() +        else: +            self.python_version = 3 +            self.pc             = PubnubCrypto3() + -        if not isinstance(self.uuid, basestring): +        if not isinstance(self.uuid, str):              raise AttributeError("pres_uuid must be a string")      def sign(self, channel, message): @@ -177,6 +261,14 @@ class PubnubBase(object):          return message +    def _return_wrapped_callback(self, callback=None): +        def _new_format_callback(response): +            if 'payload' in response: +                if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) +            else: +                if (callback != None):callback(response) +        if (callback != None): return _new_format_callback +      def publish( self, args ) :          """ @@ -207,7 +299,7 @@ class PubnubBase(object):          channel = str(args['channel'])          ## Capture Callback -        if args.has_key('callback') : +        if 'callback' in args :              callback = args['callback']          else :              callback = None  @@ -226,7 +318,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]}, callback) +        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -266,7 +358,7 @@ class PubnubBase(object):          callback  = args['callback']          subscribe_key = args.get('subscribe_key') or self.subscribe_key -        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) +        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})      def here_now( self, args ) : @@ -291,7 +383,7 @@ class PubnubBase(object):          channel = str(args['channel'])          ## Capture Callback -        if args.has_key('callback') : +        if 'callback' in args :              callback = args['callback']          else :              callback = None @@ -329,7 +421,7 @@ class PubnubBase(object):          """          ## Capture User Input -        limit   = args.has_key('limit') and int(args['limit']) or 10 +        limit   = 'limit' in args and int(args['limit']) or 10          channel = str(args['channel'])          ## Fail if bad input. @@ -338,7 +430,7 @@ class PubnubBase(object):              return False          ## Capture Callback -        if args.has_key('callback') : +        if 'callback' in args :              callback = args['callback']          else :              callback = None @@ -377,18 +469,18 @@ class PubnubBase(object):          params = dict()           count = 100     -        if args.has_key('count'): +        if 'count' in args:              count = int(args['count'])          params['count'] = str(count)     -        if args.has_key('reverse'): +        if 'reverse' in args:              params['reverse'] = str(args['reverse']).lower() -        if args.has_key('start'): +        if 'start' in args:              params['start'] = str(args['start']) -        if args.has_key('end'): +        if 'end' in args:              params['end'] = str(args['end'])          ## Fail if bad input. @@ -397,7 +489,7 @@ class PubnubBase(object):              return False          ## Capture Callback -        if args.has_key('callback') : +        if 'callback' in args :              callback = args['callback']          else :              callback = None  @@ -428,7 +520,7 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and args.has_key('callback') : +        if args and 'callback' in args :              callback = args['callback']          else :              callback = None  @@ -454,8 +546,8 @@ class PubnubBase(object):                  hex(ord(ch)).replace( '0x', '%' ).upper() or                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]]) -        if (request.has_key("urlparams")): -            url = url + '?' + "&".join([ x + "=" + y  for x,y in request["urlparams"].iteritems()]) +        if ("urlparams" in request): +            url = url + '?' + "&".join([ x + "=" + y  for x,y in request["urlparams"].items()])          return url @@ -471,8 +563,6 @@ class PubnubCoreAsync(PubnubBase):      def start(self): pass       def stop(self):  pass -    def timeout( self, delay, callback ): -        pass      def __init__(          self, @@ -515,8 +605,23 @@ class PubnubCoreAsync(PubnubBase):          self.timetoken     = 0          self.version       = '3.3.4'          self.accept_encoding = 'gzip' - -    def subscribe( self, args ) : +        self.SUB_RECEIVER  = None +        self._connect    = None + +    def get_channel_list(self, channels): +        channel = '' +        first = True +        for ch in channels: +            if not channels[ch]['subscribed']: +                continue +            if not first: +                channel += ',' +            else: +                first = False +            channel += ch +        return channel + +    def subscribe( self, args=None, sync=False ) :          """          #**          #* Subscribe @@ -548,97 +653,143 @@ class PubnubCoreAsync(PubnubBase):          })          """ -        ## Fail if missing channel -        if not 'channel' in args : -            return 'Missing Channel.' -        ## Fail if missing callback -        if not 'callback' in args : -            return 'Missing Callback.' +        if sync is True and self.susbcribe_sync is not None: +            self.susbcribe_sync(args) +            return -        ## Capture User Input -        channel   = str(args['channel']) -        callback  = args['callback'] -        connectcb = args['connect'] +        def _invoke(func,msg=None): +            if func is not None: +                if msg is not None: +                    func(msg) +                else: +                    func() + +        def _invoke_connect(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['connected'] is False: +                    chobj['connected'] = True +                    _invoke(chobj['connect']) + +        def _invoke_error(err=None): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                _invoke(chobj.error,err) + + +        if callback is None: +            _invoke(error, "Callback Missing") +            return + +        if channel is None: +            _invoke(error, "Channel Missing") +            return + +        def _get_channel(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['subscribed'] is True: +                    return chobj -        if 'errorback' in args: -            errorback = args['errorback'] -        else: -            errorback = lambda x: x          ## New Channel? -        if not (channel in self.subscriptions) : +        if not channel in self.subscriptions:              self.subscriptions[channel] = { -                'first'     : False, -                'connected' : False, +                'name'          : channel, +                'first'         : False, +                'connected'     : False, +                'subscribed'    : True, +                'callback'      : callback, +                'connect'       : connect, +                'disconnect'    : disconnect, +                'reconnect'     : reconnect              } -        ## Ensure Single Connection +        ## return if already connected to channel          if self.subscriptions[channel]['connected'] : -            return "Already Connected" +            _invoke(error, "Already Connected") +            return +             -        self.subscriptions[channel]['connected'] = 1          ## SUBSCRIPTION RECURSION  -        def _subscribe(): -            ## STOP CONNECTION? -            if not self.subscriptions[channel]['connected']: -                return +        def _connect(): +            self._reset_offline() +              def sub_callback(response): -                if not self.subscriptions[channel]['first'] : -                    self.subscriptions[channel]['first'] = True -                    connectcb() +                print response +                ## ERROR ? +                if not response or error in response: +                    _invoke_error() -                ## STOP CONNECTION? -                if not self.subscriptions[channel]['connected']: -                    return +                _invoke_connect() +                self.timetoken = response[1] -                ## PROBLEM? -                if not response: -                    def time_callback(_time): -                        if not _time: -                            self.timeout( 1, _subscribe ) -                            return errorback("Lost Network Connection") -                        else: -                            self.timeout( 1, _subscribe) +                if len(response) > 2: +                    channel_list = response[2].split(',') +                    response_list = response[0] +                    for ch in enumerate(channel_list): +                        if ch[1] in self.subscriptions: +                            chobj = self.subscriptions[ch[1]] +                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                else: +                    response_list = response[0] +                    chobj = _get_channel() +                    for r in response_list: +                        if chobj: +                            _invoke(chobj['callback'], self.decrypt(r)) -                    ## ENSURE CONNECTED (Call Time Function) -                    return self.time({ 'callback' : time_callback }) -                self.timetoken = response[1] -                _subscribe() +                _connect() + -                pc = PubnubCrypto() -                out = [] -                for message in response[0]: -                     callback(self.decrypt(message)) +            channel_list = self.get_channel_list(self.subscriptions) +            print channel_list              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try: -                self._request( { "urlcomponents" : [ +                self.SUB_RECEIVER = self._request( { "urlcomponents" : [                      'subscribe',                      self.subscribe_key, -                    channel, +                    channel_list,                      '0',                      str(self.timetoken) -                ], "urlparams" : {"uuid":self.uuid} }, sub_callback ) -            except : -                self.timeout( 1, _subscribe) +                ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) +            except Exception as e: +                self.timeout( 1, _connect)                  return +        self._connect = _connect + +          ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) -        _subscribe() +        _connect() + +    def _reset_offline(self): +        if self.SUB_RECEIVER is not None: +            self.SUB_RECEIVER() +        self.SUB_RECEIVER = None + +    def CONNECT(self): +        self._reset_offline() +        self._connect() + +      def unsubscribe( self, args ): +        #print(args['channel'])          channel = str(args['channel'])          if not (channel in self.subscriptions):              return False          ## DISCONNECT          self.subscriptions[channel]['connected'] = 0 +        self.subscriptions[channel]['subscribed'] = False          self.subscriptions[channel]['timetoken'] = 0          self.subscriptions[channel]['first']     = False +        self.CONNECT()  import tornado.httpclient @@ -685,15 +836,24 @@ class Pubnub(PubnubCoreAsync):          self.headers['Accept-Encoding'] = self.accept_encoding          self.headers['V'] = self.version          self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000) +        self.id = None -    def _request( self, request, callback ) : +    def _request( self, request, callback, single=False ) :          url = self.getUrl(request) -        ## Send Request Expecting JSON Response -        #print self.headers -          request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 ) +        if single is True: +            id = time.time() +            self.id = id          def responseCallback(response): +            if single is True: +                if not id == self.id: +                    return None  + +            body = response._get_body() +            if body is None: +                return +              def handle_exc(*args):                  return True              if response.error is not None: @@ -701,9 +861,14 @@ class Pubnub(PubnubCoreAsync):                      response.rethrow()              elif callback:                  callback(eval(response._get_body())) -         +          self.http.fetch( -            request, -            callback=responseCallback, +            request=request, +            callback=responseCallback          ) +        def abort(): +            pass + +        return abort + diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py index 62d3a26..f98befb 100644 --- a/python-tornado/unassembled/Platform.py +++ b/python-tornado/unassembled/Platform.py @@ -42,15 +42,24 @@ class Pubnub(PubnubCoreAsync):          self.headers['Accept-Encoding'] = self.accept_encoding          self.headers['V'] = self.version          self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000) +        self.id = None -    def _request( self, request, callback ) : +    def _request( self, request, callback, single=False ) :          url = self.getUrl(request) -        ## Send Request Expecting JSON Response -        #print self.headers -          request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 ) +        if single is True: +            id = time.time() +            self.id = id          def responseCallback(response): +            if single is True: +                if not id == self.id: +                    return None  + +            body = response._get_body() +            if body is None: +                return +              def handle_exc(*args):                  return True              if response.error is not None: @@ -58,9 +67,14 @@ class Pubnub(PubnubCoreAsync):                      response.rethrow()              elif callback:                  callback(eval(response._get_body())) -         +          self.http.fetch( -            request, -            callback=responseCallback, +            request=request, +            callback=responseCallback          ) +        def abort(): +            pass + +        return abort + diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py index 66534b5..3bc2d35 100644 --- a/python-twisted/Pubnub.py +++ b/python-twisted/Pubnub.py @@ -16,7 +16,7 @@ from base64 import encodestring, decodestring  import hashlib  import hmac -class PubnubCrypto() : +class PubnubCrypto2() :      """      #**      #* PubnubCrypto @@ -93,13 +93,89 @@ class PubnubCrypto() :          return self.depad((cipher.decrypt(decodestring(msg)))) +class PubnubCrypto3() : +    """ +    #** +    #* PubnubCrypto +    #* +    #** + +    ## Initiate Class +    pc = PubnubCrypto + +    """ +    +    def pad( self, msg, block_size=16 ): +        """ +        #** +        #* pad +        #* +        #* pad the text to be encrypted +        #* appends a padding character to the end of the String +        #* until the string has block_size length +        #* @return msg with padding. +        #** +        """ +        padding = block_size - (len(msg) % block_size) +        return msg + (chr(padding)*padding).encode('utf-8') +        +    def depad( self, msg ): +        """ +        #** +        #* depad +        #* +        #* depad the decryptet message" +        #* @return msg without padding. +        #** +        """ +        return msg[0:-ord(msg[-1])] + +    def getSecret( self, key ): +        """ +        #** +        #* getSecret +        #* +        #* hases the key to MD5 +        #* @return key in MD5 format +        #** +        """ +        return hashlib.sha256(key.encode("utf-8")).hexdigest() + +    def encrypt( self, key, msg ): +        """ +        #** +        #* encrypt +        #* +        #* encrypts the message +        #* @return message in encrypted format +        #** +        """ +        secret = self.getSecret(key) +        Initial16bytes='0123456789012345' +        cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) +        return encodestring(cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') +    def decrypt( self, key, msg ): +        """ +        #** +        #* decrypt +        #* +        #* decrypts the message +        #* @return message in decryped format +        #** +        """ +        secret = self.getSecret(key) +        Initial16bytes='0123456789012345' +        cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes) +        return (cipher.decrypt(decodestring(msg.encode('utf-8')))).decode('utf-8') + +  try: import json  except ImportError: import simplejson as json  import time  import hashlib -import urllib2 -import uuid  +import uuid +import sys  class PubnubBase(object):      def __init__( @@ -137,7 +213,7 @@ class PubnubBase(object):          self.secret_key    = secret_key          self.cipher_key    = cipher_key          self.ssl           = ssl_on -        self.pc            = PubnubCrypto() +          if self.ssl :              self.origin = 'https://' + self.origin @@ -145,8 +221,16 @@ class PubnubBase(object):              self.origin = 'http://'  + self.origin          self.uuid = UUID or str(uuid.uuid4()) + +        if type(sys.version_info) is tuple: +            self.python_version = 2 +            self.pc             = PubnubCrypto2() +        else: +            self.python_version = 3 +            self.pc             = PubnubCrypto3() + -        if not isinstance(self.uuid, basestring): +        if not isinstance(self.uuid, str):              raise AttributeError("pres_uuid must be a string")      def sign(self, channel, message): @@ -177,6 +261,14 @@ class PubnubBase(object):          return message +    def _return_wrapped_callback(self, callback=None): +        def _new_format_callback(response): +            if 'payload' in response: +                if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) +            else: +                if (callback != None):callback(response) +        if (callback != None): return _new_format_callback +      def publish( self, args ) :          """ @@ -207,7 +299,7 @@ class PubnubBase(object):          channel = str(args['channel'])          ## Capture Callback -        if args.has_key('callback') : +        if 'callback' in args :              callback = args['callback']          else :              callback = None  @@ -226,7 +318,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]}, callback) +        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -266,7 +358,7 @@ class PubnubBase(object):          callback  = args['callback']          subscribe_key = args.get('subscribe_key') or self.subscribe_key -        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) +        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})      def here_now( self, args ) : @@ -291,7 +383,7 @@ class PubnubBase(object):          channel = str(args['channel'])          ## Capture Callback -        if args.has_key('callback') : +        if 'callback' in args :              callback = args['callback']          else :              callback = None @@ -329,7 +421,7 @@ class PubnubBase(object):          """          ## Capture User Input -        limit   = args.has_key('limit') and int(args['limit']) or 10 +        limit   = 'limit' in args and int(args['limit']) or 10          channel = str(args['channel'])          ## Fail if bad input. @@ -338,7 +430,7 @@ class PubnubBase(object):              return False          ## Capture Callback -        if args.has_key('callback') : +        if 'callback' in args :              callback = args['callback']          else :              callback = None @@ -377,18 +469,18 @@ class PubnubBase(object):          params = dict()           count = 100     -        if args.has_key('count'): +        if 'count' in args:              count = int(args['count'])          params['count'] = str(count)     -        if args.has_key('reverse'): +        if 'reverse' in args:              params['reverse'] = str(args['reverse']).lower() -        if args.has_key('start'): +        if 'start' in args:              params['start'] = str(args['start']) -        if args.has_key('end'): +        if 'end' in args:              params['end'] = str(args['end'])          ## Fail if bad input. @@ -397,7 +489,7 @@ class PubnubBase(object):              return False          ## Capture Callback -        if args.has_key('callback') : +        if 'callback' in args :              callback = args['callback']          else :              callback = None  @@ -428,7 +520,7 @@ class PubnubBase(object):          """          ## Capture Callback -        if args and args.has_key('callback') : +        if args and 'callback' in args :              callback = args['callback']          else :              callback = None  @@ -454,8 +546,8 @@ class PubnubBase(object):                  hex(ord(ch)).replace( '0x', '%' ).upper() or                  ch for ch in list(bit)              ]) for bit in request["urlcomponents"]]) -        if (request.has_key("urlparams")): -            url = url + '?' + "&".join([ x + "=" + y  for x,y in request["urlparams"].iteritems()]) +        if ("urlparams" in request): +            url = url + '?' + "&".join([ x + "=" + y  for x,y in request["urlparams"].items()])          return url @@ -471,8 +563,6 @@ class PubnubCoreAsync(PubnubBase):      def start(self): pass       def stop(self):  pass -    def timeout( self, delay, callback ): -        pass      def __init__(          self, @@ -515,8 +605,23 @@ class PubnubCoreAsync(PubnubBase):          self.timetoken     = 0          self.version       = '3.3.4'          self.accept_encoding = 'gzip' - -    def subscribe( self, args ) : +        self.SUB_RECEIVER  = None +        self._connect    = None + +    def get_channel_list(self, channels): +        channel = '' +        first = True +        for ch in channels: +            if not channels[ch]['subscribed']: +                continue +            if not first: +                channel += ',' +            else: +                first = False +            channel += ch +        return channel + +    def subscribe( self, args=None, sync=False ) :          """          #**          #* Subscribe @@ -548,97 +653,143 @@ class PubnubCoreAsync(PubnubBase):          })          """ -        ## Fail if missing channel -        if not 'channel' in args : -            return 'Missing Channel.' -        ## Fail if missing callback -        if not 'callback' in args : -            return 'Missing Callback.' +        if sync is True and self.susbcribe_sync is not None: +            self.susbcribe_sync(args) +            return -        ## Capture User Input -        channel   = str(args['channel']) -        callback  = args['callback'] -        connectcb = args['connect'] +        def _invoke(func,msg=None): +            if func is not None: +                if msg is not None: +                    func(msg) +                else: +                    func() + +        def _invoke_connect(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['connected'] is False: +                    chobj['connected'] = True +                    _invoke(chobj['connect']) + +        def _invoke_error(err=None): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                _invoke(chobj.error,err) + + +        if callback is None: +            _invoke(error, "Callback Missing") +            return + +        if channel is None: +            _invoke(error, "Channel Missing") +            return + +        def _get_channel(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['subscribed'] is True: +                    return chobj -        if 'errorback' in args: -            errorback = args['errorback'] -        else: -            errorback = lambda x: x          ## New Channel? -        if not (channel in self.subscriptions) : +        if not channel in self.subscriptions:              self.subscriptions[channel] = { -                'first'     : False, -                'connected' : False, +                'name'          : channel, +                'first'         : False, +                'connected'     : False, +                'subscribed'    : True, +                'callback'      : callback, +                'connect'       : connect, +                'disconnect'    : disconnect, +                'reconnect'     : reconnect              } -        ## Ensure Single Connection +        ## return if already connected to channel          if self.subscriptions[channel]['connected'] : -            return "Already Connected" +            _invoke(error, "Already Connected") +            return +             -        self.subscriptions[channel]['connected'] = 1          ## SUBSCRIPTION RECURSION  -        def _subscribe(): -            ## STOP CONNECTION? -            if not self.subscriptions[channel]['connected']: -                return +        def _connect(): +            self._reset_offline() +              def sub_callback(response): -                if not self.subscriptions[channel]['first'] : -                    self.subscriptions[channel]['first'] = True -                    connectcb() +                print response +                ## ERROR ? +                if not response or error in response: +                    _invoke_error() -                ## STOP CONNECTION? -                if not self.subscriptions[channel]['connected']: -                    return +                _invoke_connect() +                self.timetoken = response[1] -                ## PROBLEM? -                if not response: -                    def time_callback(_time): -                        if not _time: -                            self.timeout( 1, _subscribe ) -                            return errorback("Lost Network Connection") -                        else: -                            self.timeout( 1, _subscribe) +                if len(response) > 2: +                    channel_list = response[2].split(',') +                    response_list = response[0] +                    for ch in enumerate(channel_list): +                        if ch[1] in self.subscriptions: +                            chobj = self.subscriptions[ch[1]] +                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                else: +                    response_list = response[0] +                    chobj = _get_channel() +                    for r in response_list: +                        if chobj: +                            _invoke(chobj['callback'], self.decrypt(r)) -                    ## ENSURE CONNECTED (Call Time Function) -                    return self.time({ 'callback' : time_callback }) -                self.timetoken = response[1] -                _subscribe() +                _connect() + -                pc = PubnubCrypto() -                out = [] -                for message in response[0]: -                     callback(self.decrypt(message)) +            channel_list = self.get_channel_list(self.subscriptions) +            print channel_list              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try: -                self._request( { "urlcomponents" : [ +                self.SUB_RECEIVER = self._request( { "urlcomponents" : [                      'subscribe',                      self.subscribe_key, -                    channel, +                    channel_list,                      '0',                      str(self.timetoken) -                ], "urlparams" : {"uuid":self.uuid} }, sub_callback ) -            except : -                self.timeout( 1, _subscribe) +                ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) +            except Exception as e: +                self.timeout( 1, _connect)                  return +        self._connect = _connect + +          ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) -        _subscribe() +        _connect() + +    def _reset_offline(self): +        if self.SUB_RECEIVER is not None: +            self.SUB_RECEIVER() +        self.SUB_RECEIVER = None + +    def CONNECT(self): +        self._reset_offline() +        self._connect() + +      def unsubscribe( self, args ): +        #print(args['channel'])          channel = str(args['channel'])          if not (channel in self.subscriptions):              return False          ## DISCONNECT          self.subscriptions[channel]['connected'] = 0 +        self.subscriptions[channel]['subscribed'] = False          self.subscriptions[channel]['timetoken'] = 0          self.subscriptions[channel]['first']     = False +        self.CONNECT()  from twisted.web.client import getPage @@ -650,10 +801,14 @@ from twisted.web.client import HTTPConnectionPool  from twisted.web.http_headers import Headers  from twisted.internet.ssl import ClientContextFactory  from twisted.internet.task import LoopingCall +import twisted +from hashlib import sha256 +import time  pnconn_pool = HTTPConnectionPool(reactor, persistent=True) -pnconn_pool.maxPersistentPerHost    = 100 +pnconn_pool.maxPersistentPerHost    = 100000  pnconn_pool.cachedConnectionTimeout = 310 +pnconn_pool.retryAutomatically = True  class Pubnub(PubnubCoreAsync): @@ -684,7 +839,7 @@ class Pubnub(PubnubCoreAsync):          #self.headers['Accept-Encoding'] = [self.accept_encoding]          self.headers['V'] = [self.version] -    def _request( self, request, callback ) : +    def _request( self, request, callback, single=False ) :          global pnconn_pool          ## Build URL @@ -696,24 +851,42 @@ class Pubnub(PubnubCoreAsync):              ]) for bit in request])          '''          url = self.getUrl(request) +          agent       = ContentDecoderAgent(RedirectAgent(Agent(              reactor,              contextFactory = WebClientContextFactory(),              pool = self.ssl and None or pnconn_pool          )), [('gzip', GzipDecoder)]) +          request     = agent.request( 'GET', url, Headers(self.headers), None ) +        if single is True: +            id = time.time() +            self.id = id +          def received(response):              finished = Deferred()              response.deliverBody(PubNubResponse(finished))              return finished          def complete(data): -            callback(eval(data)) +            if single is True: +                if not id == self.id: +                    return None +            try: +                callback(eval(data)) +            except Exception as e: +                pass +                #need error handling here + +        def abort(): +            pass          request.addCallback(received)          request.addBoth(complete) +        return abort +  class WebClientContextFactory(ClientContextFactory):      def getContext(self, hostname, port):          return ClientContextFactory.getContext(self) diff --git a/python-twisted/unassembled/Platform.py b/python-twisted/unassembled/Platform.py index 7318703..3b84b30 100644 --- a/python-twisted/unassembled/Platform.py +++ b/python-twisted/unassembled/Platform.py @@ -7,10 +7,14 @@ from twisted.web.client import HTTPConnectionPool  from twisted.web.http_headers import Headers  from twisted.internet.ssl import ClientContextFactory  from twisted.internet.task import LoopingCall +import twisted +from hashlib import sha256 +import time  pnconn_pool = HTTPConnectionPool(reactor, persistent=True) -pnconn_pool.maxPersistentPerHost    = 100 +pnconn_pool.maxPersistentPerHost    = 100000  pnconn_pool.cachedConnectionTimeout = 310 +pnconn_pool.retryAutomatically = True  class Pubnub(PubnubCoreAsync): @@ -41,7 +45,7 @@ class Pubnub(PubnubCoreAsync):          #self.headers['Accept-Encoding'] = [self.accept_encoding]          self.headers['V'] = [self.version] -    def _request( self, request, callback ) : +    def _request( self, request, callback, single=False ) :          global pnconn_pool          ## Build URL @@ -53,24 +57,42 @@ class Pubnub(PubnubCoreAsync):              ]) for bit in request])          '''          url = self.getUrl(request) +          agent       = ContentDecoderAgent(RedirectAgent(Agent(              reactor,              contextFactory = WebClientContextFactory(),              pool = self.ssl and None or pnconn_pool          )), [('gzip', GzipDecoder)]) +          request     = agent.request( 'GET', url, Headers(self.headers), None ) +        if single is True: +            id = time.time() +            self.id = id +          def received(response):              finished = Deferred()              response.deliverBody(PubNubResponse(finished))              return finished          def complete(data): -            callback(eval(data)) +            if single is True: +                if not id == self.id: +                    return None +            try: +                callback(eval(data)) +            except Exception as e: +                pass +                #need error handling here + +        def abort(): +            pass          request.addCallback(received)          request.addBoth(complete) +        return abort +  class WebClientContextFactory(ClientContextFactory):      def getContext(self, hostname, port):          return ClientContextFactory.getContext(self) diff --git a/python/Makefile b/python/Makefile index 5eb9e2f..b693cf8 100644 --- a/python/Makefile +++ b/python/Makefile @@ -12,7 +12,7 @@ build:  	echo "\n" >> ./Pubnub.py  	cat ../common/PubnubBase.py >> ./Pubnub.py  	echo "\n" >> ./Pubnub.py -	cat ../common/PubnubCore.py >> ./Pubnub.py +	cat ../common/PubnubCoreAsync.py >> ./Pubnub.py  	echo "\n" >> ./Pubnub.py  	cat ./unassembled/Platform.py >> ./Pubnub.py  	find -name "Pubnub*py" | xargs sed -i "s/PubNub\ [0-9]\.[0-9]\.[0-9]/PubNub\ $(VERSION)/g" diff --git a/python/Pubnub.py b/python/Pubnub.py index 91f67ad..a449c2d 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -206,13 +206,13 @@ class PubnubBase(object):          pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False )          """ -        self.origin         = origin -        self.limit          = 1800 -        self.publish_key    = publish_key -        self.subscribe_key  = subscribe_key -        self.secret_key     = secret_key -        self.cipher_key     = cipher_key -        self.ssl            = ssl_on +        self.origin        = origin +        self.limit         = 1800 +        self.publish_key   = publish_key +        self.subscribe_key = subscribe_key +        self.secret_key    = secret_key +        self.cipher_key    = cipher_key +        self.ssl           = ssl_on          if self.ssl : @@ -261,6 +261,14 @@ class PubnubBase(object):          return message +    def _return_wrapped_callback(self, callback=None): +        def _new_format_callback(response): +            if 'payload' in response: +                if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) +            else: +                if (callback != None):callback(response) +        if (callback != None): return _new_format_callback +      def publish( self, args ) :          """ @@ -310,7 +318,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]}, callback) +        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -350,7 +358,7 @@ class PubnubBase(object):          callback  = args['callback']          subscribe_key = args.get('subscribe_key') or self.subscribe_key -        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) +        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})      def here_now( self, args ) : @@ -543,7 +551,19 @@ class PubnubBase(object):          return url -class PubnubCore(PubnubBase): +try: +    from hashlib import sha256 +    digestmod = sha256 +except ImportError: +    import Crypto.Hash.SHA256 as digestmod +    sha256 = digestmod.new +import hmac + +class PubnubCoreAsync(PubnubBase): + +    def start(self): pass  +    def stop(self):  pass +      def __init__(          self,          publish_key, @@ -562,17 +582,16 @@ class PubnubCore(PubnubBase):          #*          #* @param string publish_key required key to send messages.          #* @param string subscribe_key required key to receive messages. -        #* @param string secret_key optional key to sign messages. +        #* @param string secret_key required key to sign messages.          #* @param boolean ssl required for 2048 bit encrypted messages.          #* @param string origin PUBNUB Server Origin. -        #* @param string pres_uuid optional identifier for presence (auto-generated if not supplied)          #**          ## Initiat Class          pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False )          """ -        super(PubnubCore, self).__init__( +        super(PubnubCoreAsync, self).__init__(              publish_key=publish_key,              subscribe_key=subscribe_key,              secret_key=secret_key, @@ -584,20 +603,33 @@ class PubnubCore(PubnubBase):          self.subscriptions = {}          self.timetoken     = 0 -        self.version       = '3.4' +        self.version       = '3.3.4'          self.accept_encoding = 'gzip' +        self.SUB_RECEIVER  = None +        self._connect    = None +    def get_channel_list(self, channels): +        channel = '' +        first = True +        for ch in channels: +            if not channels[ch]['subscribed']: +                continue +            if not first: +                channel += ',' +            else: +                first = False +            channel += ch +        return channel - -    def subscribe( self, args ) : +    def subscribe( self, args=None, sync=False ) :          """          #**          #* Subscribe          #* -        #* This is BLOCKING. +        #* This is NON-BLOCKING.          #* Listen for a message on a channel.          #* -        #* @param array args with channel and callback. +        #* @param array args with channel and message.          #* @return false on fail, array on success.          #** @@ -606,58 +638,158 @@ class PubnubCore(PubnubBase):              print(message)              return True +        ## On Connect Callback +        def connected() : +            pubnub.publish({ +                'channel' : 'hello_world', +                'message' : { 'some_var' : 'text' } +            }) + +        ## Subscribe          pubnub.subscribe({              'channel'  : 'hello_world', -            'callback' : receive  +            'connect'  : connected, +            'callback' : receive          })          """ -        ## Fail if missing channel -        if not 'channel' in args : -            raise Exception('Missing Channel.') -            return False +        if sync is True and self.susbcribe_sync is not None: +            self.susbcribe_sync(args) +            return + +        def _invoke(func,msg=None): +            if func is not None: +                if msg is not None: +                    func(msg) +                else: +                    func() + +        def _invoke_connect(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['connected'] is False: +                    chobj['connected'] = True +                    _invoke(chobj['connect']) + +        def _invoke_error(err=None): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                _invoke(chobj.error,err) + + +        if callback is None: +            _invoke(error, "Callback Missing") +            return + +        if channel is None: +            _invoke(error, "Channel Missing") +            return + +        def _get_channel(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['subscribed'] is True: +                    return chobj + + +        ## New Channel? +        if not channel in self.subscriptions: +            self.subscriptions[channel] = { +                'name'          : channel, +                'first'         : False, +                'connected'     : False, +                'subscribed'    : True, +                'callback'      : callback, +                'connect'       : connect, +                'disconnect'    : disconnect, +                'reconnect'     : reconnect +            } -        ## Fail if missing callback -        if not 'callback' in args : -            raise Exception('Missing Callback.') -            return False +        ## return if already connected to channel +        if self.subscriptions[channel]['connected'] : +            _invoke(error, "Already Connected") +            return +             -        ## Capture User Input -        channel   = str(args['channel']) -        callback  = args['callback'] -        subscribe_key = args.get('subscribe_key') or self.subscribe_key +        ## SUBSCRIPTION RECURSION  +        def _connect(): +           +            self._reset_offline() + +            def sub_callback(response): +                print response +                ## ERROR ? +                if not response or error in response: +                    _invoke_error() + +                _invoke_connect() + + +                self.timetoken = response[1] -        ## Begin Subscribe -        while True : +                if len(response) > 2: +                    channel_list = response[2].split(',') +                    response_list = response[0] +                    for ch in enumerate(channel_list): +                        if ch[1] in self.subscriptions: +                            chobj = self.subscriptions[ch[1]] +                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                else: +                    response_list = response[0] +                    chobj = _get_channel() +                    for r in response_list: +                        if chobj: +                            _invoke(chobj['callback'], self.decrypt(r)) -            timetoken = 'timetoken' in args and args['timetoken'] or 0 -            try : -                ## Wait for Message -                response = self._request({"urlcomponents" : [ + +                _connect() + + + +            channel_list = self.get_channel_list(self.subscriptions) +            print channel_list +            ## CONNECT TO PUBNUB SUBSCRIBE SERVERS +            try: +                self.SUB_RECEIVER = self._request( { "urlcomponents" : [                      'subscribe', -                    subscribe_key, -                    channel, +                    self.subscribe_key, +                    channel_list,                      '0', -                    str(timetoken) -                ],"urlparams" : {"uuid" : self.uuid }}) +                    str(self.timetoken) +                ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) +            except Exception as e: +                self.timeout( 1, _connect) +                return + +        self._connect = _connect -                messages          = response[0] -                args['timetoken'] = response[1] -                ## If it was a timeout -                if not len(messages) : -                    continue +        ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) +        _connect() -                ## Run user Callback and Reconnect if user permits. -                for message in messages : -                    if not callback(self.decrypt(message)) : -                        return +    def _reset_offline(self): +        if self.SUB_RECEIVER is not None: +            self.SUB_RECEIVER() +        self.SUB_RECEIVER = None -            except Exception: -                time.sleep(1) +    def CONNECT(self): +        self._reset_offline() +        self._connect() + + +    def unsubscribe( self, args ): +        #print(args['channel']) +        channel = str(args['channel']) +        if not (channel in self.subscriptions): +            return False -        return True +        ## DISCONNECT +        self.subscriptions[channel]['connected'] = 0 +        self.subscriptions[channel]['subscribed'] = False +        self.subscriptions[channel]['timetoken'] = 0 +        self.subscriptions[channel]['first']     = False +        self.CONNECT()  try: @@ -665,6 +797,34 @@ try:  except:      import urllib2 +import threading +import json +import time + +current_req_id = -1 + +class HTTPClient: +    def __init__(self, url, callback, id=None): +        self.url = url +        self.id = id +        self.callback = callback +        self.stop = False + +    def cancel(self): +        self.stop = True +        self.callback = None + +    def run(self): +        global current_req_id +        data = urllib2.urlopen(self.url, timeout=310).read() +        if self.stop is True: +            return +        if self.id is not None and current_req_id != self.id: +            return +        if self.callback is not None: +            self.callback(json.loads(data)) + +  class Pubnub(PubnubCore):      def __init__(          self, @@ -690,7 +850,33 @@ class Pubnub(PubnubCore):          else:              self._request = self._request3 -    def _request2( self, request, callback = None ) : +    def timeout(self, interval, func): +        def cb(): +            time.sleep(interval) +            func() +        thread = threading.Thread(target=cb) +        thread.start() + +    def _request2_async( self, request, callback, single=False ) : +        global current_req_id +        ## Build URL +        url = self.getUrl(request) +        if single is True: +            id = time.time() +            client = HTTPClient(url, callback, id) +            current_req_id = id +        else: +            client = HTTPClient(url, callback) + +        thread = threading.Thread(target=client.run) +        thread.start() +        def abort(): +            client.cancel(); +        return abort + + +    def _request2_sync( self, request) : +          ## Build URL          url = self.getUrl(request) @@ -704,13 +890,18 @@ class Pubnub(PubnubCore):          except:              return None -        if (callback): -            callback(resp_json) -        else:              return resp_json -    def _request3( self, request, callback = None ) : +    def _request2(self, request, callback=None, single=False): +        if callback is None: +            return self._request2_sync(request,single=single) +        else: +            self._request2_async(request, callback, single=single) + + + +    def _request3_sync( self, request) :          ## Build URL          url = self.getUrl(request)          ## Send Request Expecting JSONP Response @@ -718,18 +909,15 @@ class Pubnub(PubnubCore):              response = urllib.request.urlopen(url,timeout=310)              resp_json = json.loads(response.read().decode("utf-8"))          except Exception as e: -            print(e)              return None -        if (callback): -            callback(resp_json) -        else: -            return resp_json +        return resp_json -    '''         -    def _request(self, request, callback = None): -        if self.python_version == 2: -            return self._request2(request,callback) +    def _request3_async( self, request, callback, single=False ) : +        pass + +    def _request3(self, request, callback=None, single=False): +        if callback is None: +            return self._request3_sync(request,single=single)          else: -            return self._request3(request, callback) -    ''' +            self._request3_async(request, callback, single=single) diff --git a/python/unassembled/Platform.py b/python/unassembled/Platform.py index c60690f..f0f9327 100644 --- a/python/unassembled/Platform.py +++ b/python/unassembled/Platform.py @@ -3,6 +3,34 @@ try:  except:      import urllib2 +import threading +import json +import time + +current_req_id = -1 + +class HTTPClient: +    def __init__(self, url, callback, id=None): +        self.url = url +        self.id = id +        self.callback = callback +        self.stop = False + +    def cancel(self): +        self.stop = True +        self.callback = None + +    def run(self): +        global current_req_id +        data = urllib2.urlopen(self.url, timeout=310).read() +        if self.stop is True: +            return +        if self.id is not None and current_req_id != self.id: +            return +        if self.callback is not None: +            self.callback(json.loads(data)) + +  class Pubnub(PubnubCore):      def __init__(          self, @@ -28,7 +56,33 @@ class Pubnub(PubnubCore):          else:              self._request = self._request3 -    def _request2( self, request, callback = None ) : +    def timeout(self, interval, func): +        def cb(): +            time.sleep(interval) +            func() +        thread = threading.Thread(target=cb) +        thread.start() + +    def _request2_async( self, request, callback, single=False ) : +        global current_req_id +        ## Build URL +        url = self.getUrl(request) +        if single is True: +            id = time.time() +            client = HTTPClient(url, callback, id) +            current_req_id = id +        else: +            client = HTTPClient(url, callback) + +        thread = threading.Thread(target=client.run) +        thread.start() +        def abort(): +            client.cancel(); +        return abort + + +    def _request2_sync( self, request) : +          ## Build URL          url = self.getUrl(request) @@ -42,13 +96,18 @@ class Pubnub(PubnubCore):          except:              return None -        if (callback): -            callback(resp_json) -        else:              return resp_json -    def _request3( self, request, callback = None ) : +    def _request2(self, request, callback=None, single=False): +        if callback is None: +            return self._request2_sync(request,single=single) +        else: +            self._request2_async(request, callback, single=single) + + + +    def _request3_sync( self, request) :          ## Build URL          url = self.getUrl(request)          ## Send Request Expecting JSONP Response @@ -56,10 +115,15 @@ class Pubnub(PubnubCore):              response = urllib.request.urlopen(url,timeout=310)              resp_json = json.loads(response.read().decode("utf-8"))          except Exception as e: -            print(e)              return None -        if (callback): -            callback(resp_json) +        return resp_json + +    def _request3_async( self, request, callback, single=False ) : +        pass + +    def _request3(self, request, callback=None, single=False): +        if callback is None: +            return self._request3_sync(request,single=single)          else: -            return resp_json +            self._request3_async(request, callback, single=single) | 
