aboutsummaryrefslogtreecommitdiffstats
path: root/python-tornado/Pubnub.py
diff options
context:
space:
mode:
Diffstat (limited to 'python-tornado/Pubnub.py')
-rw-r--r--python-tornado/Pubnub.py271
1 files changed, 6 insertions, 265 deletions
diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py
index b67859f..e9251f8 100644
--- a/python-tornado/Pubnub.py
+++ b/python-tornado/Pubnub.py
@@ -36,7 +36,7 @@ class Pubnub(PubnubCoreAsync):
def stop(self): ioloop.stop()
def start(self): ioloop.start()
- def timeout( self, callback, delay ):
+ def timeout( self, delay, callback):
ioloop.add_timeout( time.time()+float(delay), callback )
def __init__(
@@ -56,232 +56,6 @@ class Pubnub(PubnubCoreAsync):
origin,
)
-
- def subscribe( self, args ) :
- """
- #**
- #* Subscribe
- #*
- #* This is NON-BLOCKING.
- #* Listen for a message on a channel.
- #*
- #* @param array args with channel and message.
- #* @return false on fail, array on success.
- #**
-
- ## Subscribe Example
- def receive(message) :
- print(message)
- return True
-
- ## On Connect Callback
- def connected() :
- pubnub.publish({
- 'channel' : 'hello_world',
- 'message' : { 'some_var' : 'text' }
- })
-
- ## Subscribe
- pubnub.subscribe({
- 'channel' : 'hello_world',
- 'connect' : connected,
- 'callback' : receive
- })
-
- """
- ## Fail if missing channel
- if not 'channel' in args :
- print('Missing Channel.')
- return False
-
- ## Fail if missing callback
- if not 'callback' in args :
- print('Missing Callback.')
- return False
-
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- connectcb = args['connect']
-
- if 'errorback' in args:
- errorback = args['errorback']
- else:
- errorback = lambda x: x
-
- ## New Channel?
- if not (channel in self.subscriptions) :
- self.subscriptions[channel] = {
- 'first' : False,
- 'connected' : 0,
- 'timetoken' : '0'
- }
-
- ## Ensure Single Connection
- if self.subscriptions[channel]['connected'] :
- print("Already Connected")
- return False
-
- self.subscriptions[channel]['connected'] = 1
-
- ## SUBSCRIPTION RECURSION
- def substabizel():
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
-
- def sub_callback(response):
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
-
- ## CONNECTED CALLBACK
- if not self.subscriptions[channel]['first'] :
- self.subscriptions[channel]['first'] = True
- connectcb()
-
- ## PROBLEM?
- if not response:
- def time_callback(_time):
- if not _time:
- ioloop.add_timeout(time.time()+1, substabizel)
- return errorback("Lost Network Connection")
- else:
- ioloop.add_timeout(time.time()+1, substabizel)
-
- ## ENSURE CONNECTED (Call Time Function)
- return self.time({ 'callback' : time_callback })
-
- self.subscriptions[channel]['timetoken'] = response[1]
- substabizel()
-
- pc = PubnubCrypto()
- out = []
- for message in response[0]:
- if self.cipher_key :
- if type( message ) == type(list()):
- for item in message:
- encryptItem = pc.decrypt(self.cipher_key, item )
- out.append(encryptItem)
- message = out
- elif type( message ) == type(dict()):
- outdict = {}
- for k, item in message.iteritems():
- encryptItem = pc.decrypt(self.cipher_key, item )
- outdict[k] = encryptItem
- out.append(outdict)
- message = out[0]
- else:
- message = pc.decrypt(self.cipher_key, message )
-
- callback(message)
-
- ## CONNECT TO PUBNUB SUBSCRIBE SERVERS
- try :
- self._request( [
- 'subscribe',
- self.subscribe_key,
- channel,
- '0',
- str(self.subscriptions[channel]['timetoken'])
- ], sub_callback )
- except :
- ioloop.add_timeout(time.time()+1, substabizel)
- return
-
- ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
- substabizel()
-
-
- def unsubscribe( self, args ):
- channel = str(args['channel'])
- if not (channel in self.subscriptions):
- return False
-
- ## DISCONNECT
- self.subscriptions[channel]['connected'] = 0
- self.subscriptions[channel]['timetoken'] = 0
- self.subscriptions[channel]['first'] = False
-
-
- def history( self, args ) :
- """
- #**
- #* History
- #*
- #* Load history from a channel.
- #*
- #* @param array args with 'channel' and 'limit'.
- #* @return mixed false on fail, array on success.
- #*
-
- ## History Example
- history = pubnub.history({
- 'channel' : 'hello_world',
- 'limit' : 1
- })
- print(history)
-
- """
- ## Capture User Input
- limit = args.has_key('limit') and int(args['limit']) or 10
- channel = str(args['channel'])
-
- ## Fail if bad input.
- if not channel :
- print('Missing Channel')
- return False
-
- ## Get History
- return self._request( [
- 'history',
- self.subscribe_key,
- channel,
- '0',
- str(limit)
- ], args['callback'] );
-
- def time( self, args ) :
- """
- #**
- #* Time
- #*
- #* Timestamp from PubNub Cloud.
- #*
- #* @return int timestamp.
- #*
-
- ## PubNub Server Time Example
- def time_complete(timestamp):
- print(timestamp)
-
- pubnub.time(time_complete)
-
- """
- def complete(response) :
- args['callback'](response and response[0])
-
- self._request( [
- 'time',
- '0'
- ], complete )
-
- def uuid(self) :
- """
- #**
- #* uuid
- #*
- #* Generate a UUID
- #*
- #* @return UUID.
- #*
-
- ## PubNub UUID Example
- uuid = pubnub.uuid()
- print(uuid)
- """
- return uuid.uuid1()
-
def _request( self, request, callback ) :
## Build URL
url = self.origin + '/' + "/".join([
@@ -289,43 +63,7 @@ class Pubnub(PubnubCoreAsync):
hex(ord(ch)).replace( '0x', '%' ).upper() or
ch for ch in list(bit)
]) for bit in request])
-
- requestType = request[0]
-
- def complete(response) :
- if response.error:
- return callback(None)
- obj = json.loads(response.buffer.getvalue())
- pc = PubnubCrypto()
- out = []
- if self.cipher_key :
- if requestType == "history" :
- if type(obj) == type(list()):
- for item in obj:
- if type(item) == type(list()):
- for subitem in item:
- encryptItem = pc.decrypt(self.cipher_key, subitem )
- out.append(encryptItem)
- elif type(item) == type(dict()):
- outdict = {}
- for k, subitem in item.iteritems():
- encryptItem = pc.decrypt(self.cipher_key, subitem )
- outdict[k] = encryptItem
- out.append(outdict)
- else :
- encryptItem = pc.decrypt(self.cipher_key, item )
- out.append(encryptItem)
- callback(out)
- elif type( obj ) == type(dict()):
- for k, item in obj.iteritems():
- encryptItem = pc.decrypt(self.cipher_key, item )
- out.append(encryptItem)
- callback(out)
- else :
- callback(obj)
- else :
- callback(obj)
-
+ print url
## Send Request Expecting JSON Response
http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
request = tornado.httpclient.HTTPRequest( url, 'GET', dict({
@@ -333,10 +71,13 @@ class Pubnub(PubnubCoreAsync):
'User-Agent' : 'Python-Tornado',
'Accept-Encoding' : 'gzip'
}) )
+ def responseCallback(response):
+ print response._get_body()
+ callback(response._get_body())
http.fetch(
request,
- callback=complete,
+ callback=responseCallback,
connect_timeout=310,
request_timeout=310
)