diff options
| author | Devendra | 2013-02-24 04:56:45 +0530 | 
|---|---|---|
| committer | Devendra | 2013-02-24 04:56:45 +0530 | 
| commit | 51b1dec8b04258f7bff59fb5bb4f69cbab52260c (patch) | |
| tree | 6ae0186ea3876829784f66820fae097cc5800375 | |
| parent | 08c75762c1fd19b90f646a04072cb9bef60f5478 (diff) | |
| download | pubnub-python-51b1dec8b04258f7bff59fb5bb4f69cbab52260c.tar.bz2 | |
saving working version
| -rw-r--r-- | PubnubCoreAsync.py | 33 | ||||
| -rw-r--r-- | python-tornado/Pubnub.py | 271 | ||||
| -rw-r--r-- | python-tornado/examples/subscribe-example.py | 13 | ||||
| -rw-r--r-- | python-twisted/Pubnub.py | 201 | 
4 files changed, 39 insertions, 479 deletions
| diff --git a/PubnubCoreAsync.py b/PubnubCoreAsync.py index bafbca8..0ea03e9 100644 --- a/PubnubCoreAsync.py +++ b/PubnubCoreAsync.py @@ -39,7 +39,7 @@ class PubnubCoreAsync(object):      def start(self): pass       def stop(self):  pass -    def timeout( self, callback, delay ): +    def timeout( self, delay, callback ):          pass      def __init__( @@ -75,6 +75,7 @@ class PubnubCoreAsync(object):          self.cipher_key    = cipher_key          self.ssl           = ssl_on          self.subscriptions = {} +        self.timetoken     = 0          if self.ssl :              self.origin = 'https://' + self.origin @@ -170,6 +171,7 @@ class PubnubCoreAsync(object):          ], publish_response ) +      def subscribe( self, args ) :          """          #** @@ -224,8 +226,7 @@ class PubnubCoreAsync(object):          if not (channel in self.subscriptions) :              self.subscriptions[channel] = {                  'first'     : False, -                'connected' : 0, -                'timetoken' : '0' +                'connected' : False,              }          ## Ensure Single Connection @@ -233,14 +234,15 @@ class PubnubCoreAsync(object):              return "Already Connected"          self.subscriptions[channel]['connected'] = 1 - +        print self.subscriptions          ## SUBSCRIPTION RECURSION           def substabizel():              ## STOP CONNECTION?              if not self.subscriptions[channel]['connected']:                  return - +                        def sub_callback(response): +                response = json.loads(response)                  ## STOP CONNECTION?                  if not self.subscriptions[channel]['connected']:                      return @@ -254,15 +256,15 @@ class PubnubCoreAsync(object):                  if not response:                      def time_callback(_time):                          if not _time: -                            reactor.callLater( 1, substabizel ) +                            self.timeout( 1, substabizel )                              return errorback("Lost Network Connection")                          else: -                            reactor.callLater( 1, substabizel ) +                            self.timeout( 1, substabizel )                      ## ENSURE CONNECTED (Call Time Function)                      return self.time({ 'callback' : time_callback }) -                self.subscriptions[channel]['timetoken'] = response[1] +                self.timetoken = response[1]                  substabizel()                  pc = PubnubCrypto() @@ -295,10 +297,10 @@ class PubnubCoreAsync(object):                      self.subscribe_key,                      channel,                      '0', -                    str(self.subscriptions[channel]['timetoken']) +                    str(self.timetoken)                  ], sub_callback )              except : -                reactor.callLater( 1, substabizel ) +                self.timeout( 1, substabizel )                  return          ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) @@ -476,12 +478,5 @@ class PubnubCoreAsync(object):          request.addCallback(received)          request.addBoth(complete) - - -class PubNubResponse(Protocol): -    def __init__( self, finished ): -        self.finished = finished - -    def dataReceived( self, bytes ): -            self.finished.callback(bytes) - +    def _request( self, request, callback, timeout=30 ) : +        pass diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py index b67859f..e9251f8 100644 --- a/python-tornado/Pubnub.py +++ b/python-tornado/Pubnub.py @@ -36,7 +36,7 @@ class Pubnub(PubnubCoreAsync):      def stop(self): ioloop.stop()      def start(self): ioloop.start() -    def timeout( self, callback, delay ): +    def timeout( self, delay, callback):          ioloop.add_timeout( time.time()+float(delay), callback )      def __init__( @@ -56,232 +56,6 @@ class Pubnub(PubnubCoreAsync):              origin,          )         - -    def subscribe( self, args ) : -        """ -        #** -        #* Subscribe -        #* -        #* This is NON-BLOCKING. -        #* Listen for a message on a channel. -        #* -        #* @param array args with channel and message. -        #* @return false on fail, array on success. -        #** - -        ## Subscribe Example -        def receive(message) : -            print(message) -            return True - -        ## On Connect Callback -        def connected() : -            pubnub.publish({ -                'channel' : 'hello_world', -                'message' : { 'some_var' : 'text' } -            }) - -        ## Subscribe -        pubnub.subscribe({ -            'channel'  : 'hello_world', -            'connect'  : connected, -            'callback' : receive -        }) - -        """ -        ## Fail if missing channel -        if not 'channel' in args : -            print('Missing Channel.') -            return False - -        ## Fail if missing callback -        if not 'callback' in args : -            print('Missing Callback.') -            return False - -        ## Capture User Input -        channel   = str(args['channel']) -        callback  = args['callback'] -        connectcb = args['connect'] - -        if 'errorback' in args: -            errorback = args['errorback'] -        else: -            errorback = lambda x: x - -        ## New Channel? -        if not (channel in self.subscriptions) : -            self.subscriptions[channel] = { -                'first'     : False, -                'connected' : 0, -                'timetoken' : '0' -            } - -        ## Ensure Single Connection -        if self.subscriptions[channel]['connected'] : -            print("Already Connected") -            return False - -        self.subscriptions[channel]['connected'] = 1 - -        ## SUBSCRIPTION RECURSION  -        def substabizel(): -            ## STOP CONNECTION? -            if not self.subscriptions[channel]['connected']: -                return - -            def sub_callback(response): -                ## STOP CONNECTION? -                if not self.subscriptions[channel]['connected']: -                    return - -                ## CONNECTED CALLBACK -                if not self.subscriptions[channel]['first'] : -                    self.subscriptions[channel]['first'] = True -                    connectcb() - -                ## PROBLEM? -                if not response: -                    def time_callback(_time): -                        if not _time: -                            ioloop.add_timeout(time.time()+1, substabizel) -                            return errorback("Lost Network Connection") -                        else: -                            ioloop.add_timeout(time.time()+1, substabizel) - -                    ## ENSURE CONNECTED (Call Time Function) -                    return self.time({ 'callback' : time_callback }) - -                self.subscriptions[channel]['timetoken'] = response[1] -                substabizel() - -                pc = PubnubCrypto() -                out = [] -                for message in response[0]: -                     if self.cipher_key : -                          if type( message ) == type(list()): -                              for item in message: -                                  encryptItem = pc.decrypt(self.cipher_key, item ) -                                  out.append(encryptItem) -                              message = out -                          elif type( message ) == type(dict()): -                              outdict = {} -                              for k, item in message.iteritems(): -                                  encryptItem = pc.decrypt(self.cipher_key, item ) -                                  outdict[k] = encryptItem -                                  out.append(outdict) -                              message = out[0] -                          else: -                              message = pc.decrypt(self.cipher_key, message ) -                           -                     callback(message) - -            ## CONNECT TO PUBNUB SUBSCRIBE SERVERS -            try : -                self._request( [ -                    'subscribe', -                    self.subscribe_key, -                    channel, -                    '0', -                    str(self.subscriptions[channel]['timetoken']) -                ], sub_callback ) -            except : -                ioloop.add_timeout(time.time()+1, substabizel) -                return - -        ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) -        substabizel() - - -    def unsubscribe( self, args ): -        channel = str(args['channel']) -        if not (channel in self.subscriptions): -            return False - -        ## DISCONNECT -        self.subscriptions[channel]['connected'] = 0 -        self.subscriptions[channel]['timetoken'] = 0 -        self.subscriptions[channel]['first']     = False - - -    def history( self, args ) : -        """ -        #** -        #* History -        #* -        #* Load history from a channel. -        #* -        #* @param array args with 'channel' and 'limit'. -        #* @return mixed false on fail, array on success. -        #* - -        ## History Example -        history = pubnub.history({ -            'channel' : 'hello_world', -            'limit'   : 1 -        }) -        print(history) - -        """ -        ## Capture User Input -        limit   = args.has_key('limit') and int(args['limit']) or 10 -        channel = str(args['channel']) - -        ## Fail if bad input. -        if not channel : -            print('Missing Channel') -            return False - -        ## Get History -        return self._request( [ -            'history', -            self.subscribe_key, -            channel, -            '0', -            str(limit) -        ], args['callback'] ); - -    def time( self, args ) : -        """ -        #** -        #* Time -        #* -        #* Timestamp from PubNub Cloud. -        #* -        #* @return int timestamp. -        #* - -        ## PubNub Server Time Example -        def time_complete(timestamp): -            print(timestamp) - -        pubnub.time(time_complete) - -        """ -        def complete(response) : -            args['callback'](response and response[0]) - -        self._request( [ -            'time', -            '0' -        ], complete ) -         -    def uuid(self) : -        """ -        #** -        #* uuid -        #* -        #* Generate a UUID -        #* -        #* @return  UUID. -        #* - -        ## PubNub UUID Example -        uuid = pubnub.uuid() -        print(uuid) -        """ -        return uuid.uuid1() -      def _request( self, request, callback ) :          ## Build URL          url = self.origin + '/' + "/".join([ @@ -289,43 +63,7 @@ class Pubnub(PubnubCoreAsync):                  hex(ord(ch)).replace( '0x', '%' ).upper() or                  ch for ch in list(bit)              ]) for bit in request]) - -        requestType = request[0] - -        def complete(response) : -            if response.error: -                return callback(None) -            obj = json.loads(response.buffer.getvalue()) -            pc = PubnubCrypto() -            out = [] -            if self.cipher_key : -                if requestType == "history" : -                    if type(obj) == type(list()): -                        for item in obj: -                            if type(item) == type(list()): -                                for subitem in item: -                                    encryptItem = pc.decrypt(self.cipher_key, subitem ) -                                    out.append(encryptItem) -                            elif type(item) == type(dict()): -                                outdict = {} -                                for k, subitem in item.iteritems(): -                                    encryptItem = pc.decrypt(self.cipher_key, subitem ) -                                    outdict[k] = encryptItem -                                    out.append(outdict) -                            else :          -                                encryptItem = pc.decrypt(self.cipher_key, item ) -                                out.append(encryptItem) -                        callback(out) -                    elif type( obj ) == type(dict()): -                        for k, item in obj.iteritems(): -                            encryptItem = pc.decrypt(self.cipher_key, item ) -                            out.append(encryptItem) -                        callback(out)     -                else : -                    callback(obj) -            else :         -                callback(obj)         - +        print url          ## Send Request Expecting JSON Response          http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)          request = tornado.httpclient.HTTPRequest( url, 'GET', dict({ @@ -333,10 +71,13 @@ class Pubnub(PubnubCoreAsync):              'User-Agent' : 'Python-Tornado',              'Accept-Encoding' : 'gzip'          }) )  +        def responseCallback(response): +            print response._get_body() +            callback(response._get_body())          http.fetch(              request, -            callback=complete, +            callback=responseCallback,              connect_timeout=310,              request_timeout=310          ) diff --git a/python-tornado/examples/subscribe-example.py b/python-tornado/examples/subscribe-example.py index c819f94..1a2e03d 100644 --- a/python-tornado/examples/subscribe-example.py +++ b/python-tornado/examples/subscribe-example.py @@ -12,6 +12,7 @@  import sys  import tornado  sys.path.append('../') +sys.path.append('../..')  from Pubnub import Pubnub  publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo' @@ -26,6 +27,17 @@ ssl_on        = len(sys.argv) > 5 and bool(sys.argv[5]) or False  pubnub = Pubnub( publish_key, subscribe_key, secret_key,cipher_key, ssl_on )  crazy  = 'hello_world' +def connect_cb(): +    print 'Connect' + +def subscribe_result(response): +    print response + +pubnub.subscribe({ +    'channel' : crazy, +    'callback' : subscribe_result, +    'connect' : connect_cb  +})  ## -----------------------------------------------------------------------  ## Publish Example  ## ----------------------------------------------------------------------- @@ -53,7 +65,6 @@ pubnub.publish({      'message' : { 'some_key' : 'some_val' },      'callback' : publish_complete  }) -  ## -----------------------------------------------------------------------  ## IO Event Loop  ## ----------------------------------------------------------------------- diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py index 97895fd..f7d5c52 100644 --- a/python-twisted/Pubnub.py +++ b/python-twisted/Pubnub.py @@ -44,7 +44,7 @@ class Pubnub(PubnubCoreAsync):      def start(self): reactor.run()      def stop(self):  reactor.stop() -    def timeout( self, callback, delay ): +    def timeout( self, delay, callback ):          reactor.callLater( delay, callback )      def __init__( @@ -64,193 +64,6 @@ class Pubnub(PubnubCoreAsync):              origin,          )         -    def subscribe( self, args ) : -        """ -        #** -        #* Subscribe -        #* -        #* This is NON-BLOCKING. -        #* Listen for a message on a channel. -        #* -        #* @param array args with channel and message. -        #* @return false on fail, array on success. -        #** - -        ## Subscribe Example -        def receive(message) : -            print(message) -            return True - -        ## On Connect Callback -        def connected() : -            pubnub.publish({ -                'channel' : 'hello_world', -                'message' : { 'some_var' : 'text' } -            }) - -        ## Subscribe -        pubnub.subscribe({ -            'channel'  : 'hello_world', -            'connect'  : connected, -            'callback' : receive -        }) - -        """ -        ## Fail if missing channel -        if not 'channel' in args : -            return 'Missing Channel.' - -        ## Fail if missing callback -        if not 'callback' in args : -            return 'Missing Callback.' - -        ## Capture User Input -        channel   = str(args['channel']) -        callback  = args['callback'] -        connectcb = args['connect'] - -        if 'errorback' in args: -            errorback = args['errorback'] -        else: -            errorback = lambda x: x - -        ## New Channel? -        if not (channel in self.subscriptions) : -            self.subscriptions[channel] = { -                'first'     : False, -                'connected' : 0, -                'timetoken' : '0' -            } - -        ## Ensure Single Connection -        if self.subscriptions[channel]['connected'] : -            return "Already Connected" - -        self.subscriptions[channel]['connected'] = 1 - -        ## SUBSCRIPTION RECURSION  -        def substabizel(): -            ## STOP CONNECTION? -            if not self.subscriptions[channel]['connected']: -                return - -            def sub_callback(response): -                ## STOP CONNECTION? -                if not self.subscriptions[channel]['connected']: -                    return - -                ## CONNECTED CALLBACK -                if not self.subscriptions[channel]['first'] : -                    self.subscriptions[channel]['first'] = True -                    connectcb() - -                ## PROBLEM? -                if not response: -                    def time_callback(_time): -                        if not _time: -                            reactor.callLater( 1, substabizel ) -                            return errorback("Lost Network Connection") -                        else: -                            reactor.callLater( 1, substabizel ) - -                    ## ENSURE CONNECTED (Call Time Function) -                    return self.time({ 'callback' : time_callback }) - -                self.subscriptions[channel]['timetoken'] = response[1] -                substabizel() - -                pc = PubnubCrypto() -                out = [] -                for message in response[0]: -                     if self.cipher_key : -                          if type( message ) == type(list()): -                              for item in message: -                                  encryptItem = pc.decrypt(self.cipher_key, item ) -                                  out.append(encryptItem) -                              message = out -                          elif type( message ) == type(dict()): -                              outdict = {} -                              for k, item in message.iteritems(): -                                  encryptItem = pc.decrypt(self.cipher_key, item ) -                                  outdict[k] = encryptItem -                                  out.append(outdict) -                              message = out[0] -                          else: -                              message = pc.decrypt(self.cipher_key, message ) -                     else : -                          message - -                     callback(message) - -            ## CONNECT TO PUBNUB SUBSCRIBE SERVERS -            try : -                self._request( [ -                    'subscribe', -                    self.subscribe_key, -                    channel, -                    '0', -                    str(self.subscriptions[channel]['timetoken']) -                ], sub_callback ) -            except : -                reactor.callLater( 1, substabizel ) -                return - -        ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) -        substabizel() - - -    def unsubscribe( self, args ): -        channel = str(args['channel']) -        if not (channel in self.subscriptions): -            return False - -        ## DISCONNECT -        self.subscriptions[channel]['connected'] = 0 -        self.subscriptions[channel]['timetoken'] = 0 -        self.subscriptions[channel]['first']     = False - -    def time( self, args ) : -        """ -        #** -        #* Time -        #* -        #* Timestamp from PubNub Cloud. -        #* -        #* @return int timestamp. -        #* - -        ## PubNub Server Time Example -        def time_complete(timestamp): -            print(timestamp) - -        pubnub.time(time_complete) - -        """ -        def complete(response) : -            if not response: return 0 -            args['callback'](response[0]) - -        self._request( [ -            'time', -            '0' -        ], complete ) - -    def uuid(self) : -        """ -        #** -        #* uuid -        #* -        #* Generate a UUID -        #* -        #* @return  UUID. -        #* - -        ## PubNub UUID Example -        uuid = pubnub.uuid() -        print(uuid) -        """ -        return uuid.uuid1() -      def _request( self, request, callback, timeout=30 ) :          global pnconn_pool @@ -278,10 +91,10 @@ class Pubnub(PubnubCoreAsync):          gp.addErrback(callback) -class PubNubResponse(Protocol): -    def __init__( self, finished ): -        self.finished = finished - -    def dataReceived( self, bytes ): -            self.finished.callback(bytes) +#class PubNubResponse(Protocol): +#    def __init__( self, finished ): +#        self.finished = finished +# +#    def dataReceived( self, bytes ): +#            self.finished.callback(bytes) | 
