aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDevendra2013-02-24 04:56:45 +0530
committerDevendra2013-02-24 04:56:45 +0530
commit51b1dec8b04258f7bff59fb5bb4f69cbab52260c (patch)
tree6ae0186ea3876829784f66820fae097cc5800375
parent08c75762c1fd19b90f646a04072cb9bef60f5478 (diff)
downloadpubnub-python-51b1dec8b04258f7bff59fb5bb4f69cbab52260c.tar.bz2
saving working version
-rw-r--r--PubnubCoreAsync.py33
-rw-r--r--python-tornado/Pubnub.py271
-rw-r--r--python-tornado/examples/subscribe-example.py13
-rw-r--r--python-twisted/Pubnub.py201
4 files changed, 39 insertions, 479 deletions
diff --git a/PubnubCoreAsync.py b/PubnubCoreAsync.py
index bafbca8..0ea03e9 100644
--- a/PubnubCoreAsync.py
+++ b/PubnubCoreAsync.py
@@ -39,7 +39,7 @@ class PubnubCoreAsync(object):
def start(self): pass
def stop(self): pass
- def timeout( self, callback, delay ):
+ def timeout( self, delay, callback ):
pass
def __init__(
@@ -75,6 +75,7 @@ class PubnubCoreAsync(object):
self.cipher_key = cipher_key
self.ssl = ssl_on
self.subscriptions = {}
+ self.timetoken = 0
if self.ssl :
self.origin = 'https://' + self.origin
@@ -170,6 +171,7 @@ class PubnubCoreAsync(object):
], publish_response )
+
def subscribe( self, args ) :
"""
#**
@@ -224,8 +226,7 @@ class PubnubCoreAsync(object):
if not (channel in self.subscriptions) :
self.subscriptions[channel] = {
'first' : False,
- 'connected' : 0,
- 'timetoken' : '0'
+ 'connected' : False,
}
## Ensure Single Connection
@@ -233,14 +234,15 @@ class PubnubCoreAsync(object):
return "Already Connected"
self.subscriptions[channel]['connected'] = 1
-
+ print self.subscriptions
## SUBSCRIPTION RECURSION
def substabizel():
## STOP CONNECTION?
if not self.subscriptions[channel]['connected']:
return
-
+
def sub_callback(response):
+ response = json.loads(response)
## STOP CONNECTION?
if not self.subscriptions[channel]['connected']:
return
@@ -254,15 +256,15 @@ class PubnubCoreAsync(object):
if not response:
def time_callback(_time):
if not _time:
- reactor.callLater( 1, substabizel )
+ self.timeout( 1, substabizel )
return errorback("Lost Network Connection")
else:
- reactor.callLater( 1, substabizel )
+ self.timeout( 1, substabizel )
## ENSURE CONNECTED (Call Time Function)
return self.time({ 'callback' : time_callback })
- self.subscriptions[channel]['timetoken'] = response[1]
+ self.timetoken = response[1]
substabizel()
pc = PubnubCrypto()
@@ -295,10 +297,10 @@ class PubnubCoreAsync(object):
self.subscribe_key,
channel,
'0',
- str(self.subscriptions[channel]['timetoken'])
+ str(self.timetoken)
], sub_callback )
except :
- reactor.callLater( 1, substabizel )
+ self.timeout( 1, substabizel )
return
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
@@ -476,12 +478,5 @@ class PubnubCoreAsync(object):
request.addCallback(received)
request.addBoth(complete)
-
-
-class PubNubResponse(Protocol):
- def __init__( self, finished ):
- self.finished = finished
-
- def dataReceived( self, bytes ):
- self.finished.callback(bytes)
-
+ def _request( self, request, callback, timeout=30 ) :
+ pass
diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py
index b67859f..e9251f8 100644
--- a/python-tornado/Pubnub.py
+++ b/python-tornado/Pubnub.py
@@ -36,7 +36,7 @@ class Pubnub(PubnubCoreAsync):
def stop(self): ioloop.stop()
def start(self): ioloop.start()
- def timeout( self, callback, delay ):
+ def timeout( self, delay, callback):
ioloop.add_timeout( time.time()+float(delay), callback )
def __init__(
@@ -56,232 +56,6 @@ class Pubnub(PubnubCoreAsync):
origin,
)
-
- def subscribe( self, args ) :
- """
- #**
- #* Subscribe
- #*
- #* This is NON-BLOCKING.
- #* Listen for a message on a channel.
- #*
- #* @param array args with channel and message.
- #* @return false on fail, array on success.
- #**
-
- ## Subscribe Example
- def receive(message) :
- print(message)
- return True
-
- ## On Connect Callback
- def connected() :
- pubnub.publish({
- 'channel' : 'hello_world',
- 'message' : { 'some_var' : 'text' }
- })
-
- ## Subscribe
- pubnub.subscribe({
- 'channel' : 'hello_world',
- 'connect' : connected,
- 'callback' : receive
- })
-
- """
- ## Fail if missing channel
- if not 'channel' in args :
- print('Missing Channel.')
- return False
-
- ## Fail if missing callback
- if not 'callback' in args :
- print('Missing Callback.')
- return False
-
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- connectcb = args['connect']
-
- if 'errorback' in args:
- errorback = args['errorback']
- else:
- errorback = lambda x: x
-
- ## New Channel?
- if not (channel in self.subscriptions) :
- self.subscriptions[channel] = {
- 'first' : False,
- 'connected' : 0,
- 'timetoken' : '0'
- }
-
- ## Ensure Single Connection
- if self.subscriptions[channel]['connected'] :
- print("Already Connected")
- return False
-
- self.subscriptions[channel]['connected'] = 1
-
- ## SUBSCRIPTION RECURSION
- def substabizel():
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
-
- def sub_callback(response):
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
-
- ## CONNECTED CALLBACK
- if not self.subscriptions[channel]['first'] :
- self.subscriptions[channel]['first'] = True
- connectcb()
-
- ## PROBLEM?
- if not response:
- def time_callback(_time):
- if not _time:
- ioloop.add_timeout(time.time()+1, substabizel)
- return errorback("Lost Network Connection")
- else:
- ioloop.add_timeout(time.time()+1, substabizel)
-
- ## ENSURE CONNECTED (Call Time Function)
- return self.time({ 'callback' : time_callback })
-
- self.subscriptions[channel]['timetoken'] = response[1]
- substabizel()
-
- pc = PubnubCrypto()
- out = []
- for message in response[0]:
- if self.cipher_key :
- if type( message ) == type(list()):
- for item in message:
- encryptItem = pc.decrypt(self.cipher_key, item )
- out.append(encryptItem)
- message = out
- elif type( message ) == type(dict()):
- outdict = {}
- for k, item in message.iteritems():
- encryptItem = pc.decrypt(self.cipher_key, item )
- outdict[k] = encryptItem
- out.append(outdict)
- message = out[0]
- else:
- message = pc.decrypt(self.cipher_key, message )
-
- callback(message)
-
- ## CONNECT TO PUBNUB SUBSCRIBE SERVERS
- try :
- self._request( [
- 'subscribe',
- self.subscribe_key,
- channel,
- '0',
- str(self.subscriptions[channel]['timetoken'])
- ], sub_callback )
- except :
- ioloop.add_timeout(time.time()+1, substabizel)
- return
-
- ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
- substabizel()
-
-
- def unsubscribe( self, args ):
- channel = str(args['channel'])
- if not (channel in self.subscriptions):
- return False
-
- ## DISCONNECT
- self.subscriptions[channel]['connected'] = 0
- self.subscriptions[channel]['timetoken'] = 0
- self.subscriptions[channel]['first'] = False
-
-
- def history( self, args ) :
- """
- #**
- #* History
- #*
- #* Load history from a channel.
- #*
- #* @param array args with 'channel' and 'limit'.
- #* @return mixed false on fail, array on success.
- #*
-
- ## History Example
- history = pubnub.history({
- 'channel' : 'hello_world',
- 'limit' : 1
- })
- print(history)
-
- """
- ## Capture User Input
- limit = args.has_key('limit') and int(args['limit']) or 10
- channel = str(args['channel'])
-
- ## Fail if bad input.
- if not channel :
- print('Missing Channel')
- return False
-
- ## Get History
- return self._request( [
- 'history',
- self.subscribe_key,
- channel,
- '0',
- str(limit)
- ], args['callback'] );
-
- def time( self, args ) :
- """
- #**
- #* Time
- #*
- #* Timestamp from PubNub Cloud.
- #*
- #* @return int timestamp.
- #*
-
- ## PubNub Server Time Example
- def time_complete(timestamp):
- print(timestamp)
-
- pubnub.time(time_complete)
-
- """
- def complete(response) :
- args['callback'](response and response[0])
-
- self._request( [
- 'time',
- '0'
- ], complete )
-
- def uuid(self) :
- """
- #**
- #* uuid
- #*
- #* Generate a UUID
- #*
- #* @return UUID.
- #*
-
- ## PubNub UUID Example
- uuid = pubnub.uuid()
- print(uuid)
- """
- return uuid.uuid1()
-
def _request( self, request, callback ) :
## Build URL
url = self.origin + '/' + "/".join([
@@ -289,43 +63,7 @@ class Pubnub(PubnubCoreAsync):
hex(ord(ch)).replace( '0x', '%' ).upper() or
ch for ch in list(bit)
]) for bit in request])
-
- requestType = request[0]
-
- def complete(response) :
- if response.error:
- return callback(None)
- obj = json.loads(response.buffer.getvalue())
- pc = PubnubCrypto()
- out = []
- if self.cipher_key :
- if requestType == "history" :
- if type(obj) == type(list()):
- for item in obj:
- if type(item) == type(list()):
- for subitem in item:
- encryptItem = pc.decrypt(self.cipher_key, subitem )
- out.append(encryptItem)
- elif type(item) == type(dict()):
- outdict = {}
- for k, subitem in item.iteritems():
- encryptItem = pc.decrypt(self.cipher_key, subitem )
- outdict[k] = encryptItem
- out.append(outdict)
- else :
- encryptItem = pc.decrypt(self.cipher_key, item )
- out.append(encryptItem)
- callback(out)
- elif type( obj ) == type(dict()):
- for k, item in obj.iteritems():
- encryptItem = pc.decrypt(self.cipher_key, item )
- out.append(encryptItem)
- callback(out)
- else :
- callback(obj)
- else :
- callback(obj)
-
+ print url
## Send Request Expecting JSON Response
http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
request = tornado.httpclient.HTTPRequest( url, 'GET', dict({
@@ -333,10 +71,13 @@ class Pubnub(PubnubCoreAsync):
'User-Agent' : 'Python-Tornado',
'Accept-Encoding' : 'gzip'
}) )
+ def responseCallback(response):
+ print response._get_body()
+ callback(response._get_body())
http.fetch(
request,
- callback=complete,
+ callback=responseCallback,
connect_timeout=310,
request_timeout=310
)
diff --git a/python-tornado/examples/subscribe-example.py b/python-tornado/examples/subscribe-example.py
index c819f94..1a2e03d 100644
--- a/python-tornado/examples/subscribe-example.py
+++ b/python-tornado/examples/subscribe-example.py
@@ -12,6 +12,7 @@
import sys
import tornado
sys.path.append('../')
+sys.path.append('../..')
from Pubnub import Pubnub
publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
@@ -26,6 +27,17 @@ ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
pubnub = Pubnub( publish_key, subscribe_key, secret_key,cipher_key, ssl_on )
crazy = 'hello_world'
+def connect_cb():
+ print 'Connect'
+
+def subscribe_result(response):
+ print response
+
+pubnub.subscribe({
+ 'channel' : crazy,
+ 'callback' : subscribe_result,
+ 'connect' : connect_cb
+})
## -----------------------------------------------------------------------
## Publish Example
## -----------------------------------------------------------------------
@@ -53,7 +65,6 @@ pubnub.publish({
'message' : { 'some_key' : 'some_val' },
'callback' : publish_complete
})
-
## -----------------------------------------------------------------------
## IO Event Loop
## -----------------------------------------------------------------------
diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py
index 97895fd..f7d5c52 100644
--- a/python-twisted/Pubnub.py
+++ b/python-twisted/Pubnub.py
@@ -44,7 +44,7 @@ class Pubnub(PubnubCoreAsync):
def start(self): reactor.run()
def stop(self): reactor.stop()
- def timeout( self, callback, delay ):
+ def timeout( self, delay, callback ):
reactor.callLater( delay, callback )
def __init__(
@@ -64,193 +64,6 @@ class Pubnub(PubnubCoreAsync):
origin,
)
- def subscribe( self, args ) :
- """
- #**
- #* Subscribe
- #*
- #* This is NON-BLOCKING.
- #* Listen for a message on a channel.
- #*
- #* @param array args with channel and message.
- #* @return false on fail, array on success.
- #**
-
- ## Subscribe Example
- def receive(message) :
- print(message)
- return True
-
- ## On Connect Callback
- def connected() :
- pubnub.publish({
- 'channel' : 'hello_world',
- 'message' : { 'some_var' : 'text' }
- })
-
- ## Subscribe
- pubnub.subscribe({
- 'channel' : 'hello_world',
- 'connect' : connected,
- 'callback' : receive
- })
-
- """
- ## Fail if missing channel
- if not 'channel' in args :
- return 'Missing Channel.'
-
- ## Fail if missing callback
- if not 'callback' in args :
- return 'Missing Callback.'
-
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- connectcb = args['connect']
-
- if 'errorback' in args:
- errorback = args['errorback']
- else:
- errorback = lambda x: x
-
- ## New Channel?
- if not (channel in self.subscriptions) :
- self.subscriptions[channel] = {
- 'first' : False,
- 'connected' : 0,
- 'timetoken' : '0'
- }
-
- ## Ensure Single Connection
- if self.subscriptions[channel]['connected'] :
- return "Already Connected"
-
- self.subscriptions[channel]['connected'] = 1
-
- ## SUBSCRIPTION RECURSION
- def substabizel():
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
-
- def sub_callback(response):
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
-
- ## CONNECTED CALLBACK
- if not self.subscriptions[channel]['first'] :
- self.subscriptions[channel]['first'] = True
- connectcb()
-
- ## PROBLEM?
- if not response:
- def time_callback(_time):
- if not _time:
- reactor.callLater( 1, substabizel )
- return errorback("Lost Network Connection")
- else:
- reactor.callLater( 1, substabizel )
-
- ## ENSURE CONNECTED (Call Time Function)
- return self.time({ 'callback' : time_callback })
-
- self.subscriptions[channel]['timetoken'] = response[1]
- substabizel()
-
- pc = PubnubCrypto()
- out = []
- for message in response[0]:
- if self.cipher_key :
- if type( message ) == type(list()):
- for item in message:
- encryptItem = pc.decrypt(self.cipher_key, item )
- out.append(encryptItem)
- message = out
- elif type( message ) == type(dict()):
- outdict = {}
- for k, item in message.iteritems():
- encryptItem = pc.decrypt(self.cipher_key, item )
- outdict[k] = encryptItem
- out.append(outdict)
- message = out[0]
- else:
- message = pc.decrypt(self.cipher_key, message )
- else :
- message
-
- callback(message)
-
- ## CONNECT TO PUBNUB SUBSCRIBE SERVERS
- try :
- self._request( [
- 'subscribe',
- self.subscribe_key,
- channel,
- '0',
- str(self.subscriptions[channel]['timetoken'])
- ], sub_callback )
- except :
- reactor.callLater( 1, substabizel )
- return
-
- ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
- substabizel()
-
-
- def unsubscribe( self, args ):
- channel = str(args['channel'])
- if not (channel in self.subscriptions):
- return False
-
- ## DISCONNECT
- self.subscriptions[channel]['connected'] = 0
- self.subscriptions[channel]['timetoken'] = 0
- self.subscriptions[channel]['first'] = False
-
- def time( self, args ) :
- """
- #**
- #* Time
- #*
- #* Timestamp from PubNub Cloud.
- #*
- #* @return int timestamp.
- #*
-
- ## PubNub Server Time Example
- def time_complete(timestamp):
- print(timestamp)
-
- pubnub.time(time_complete)
-
- """
- def complete(response) :
- if not response: return 0
- args['callback'](response[0])
-
- self._request( [
- 'time',
- '0'
- ], complete )
-
- def uuid(self) :
- """
- #**
- #* uuid
- #*
- #* Generate a UUID
- #*
- #* @return UUID.
- #*
-
- ## PubNub UUID Example
- uuid = pubnub.uuid()
- print(uuid)
- """
- return uuid.uuid1()
-
def _request( self, request, callback, timeout=30 ) :
global pnconn_pool
@@ -278,10 +91,10 @@ class Pubnub(PubnubCoreAsync):
gp.addErrback(callback)
-class PubNubResponse(Protocol):
- def __init__( self, finished ):
- self.finished = finished
-
- def dataReceived( self, bytes ):
- self.finished.callback(bytes)
+#class PubNubResponse(Protocol):
+# def __init__( self, finished ):
+# self.finished = finished
+#
+# def dataReceived( self, bytes ):
+# self.finished.callback(bytes)