aboutsummaryrefslogtreecommitdiffstats
path: root/python/examples
diff options
context:
space:
mode:
Diffstat (limited to 'python/examples')
-rwxr-xr-xpython/examples/detailed-history-unit-test.py134
-rw-r--r--python/examples/here-now-example.py33
-rwxr-xr-xpython/examples/history-example.py12
-rwxr-xr-xpython/examples/publish-example.py14
-rwxr-xr-xpython/examples/subscribe-example.py64
5 files changed, 257 insertions, 0 deletions
diff --git a/python/examples/detailed-history-unit-test.py b/python/examples/detailed-history-unit-test.py
new file mode 100755
index 0000000..2169e52
--- /dev/null
+++ b/python/examples/detailed-history-unit-test.py
@@ -0,0 +1,134 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+## -----------------------------------
+## PubNub 3.0 Real-time Push Cloud API
+## -----------------------------------
+
+from Pubnub import Pubnub
+import unittest2 as unittest
+import sys
+
+
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or None
+ssl_on = len(sys.argv) > 4 and bool(sys.argv[4]) or False
+pubnub = Pubnub(publish_key, subscribe_key, secret_key, ssl_on)
+crazy = ' ~`!@#$%^&*(顶顅Ȓ)+=[]\\{}|;\':",./<>?abcd'
+
+
+class TestDetailedHistory(unittest.TestCase):
+ total_msg = 10
+ channel = pubnub.time()
+ starttime = None
+ inputs = []
+ endtime = None
+ slice_a = 8
+ slice_b = 2
+ slice_size = slice_a - slice_b
+
+ @classmethod
+ def publish_msg(cls, start, end, offset):
+ print 'Publishing messages'
+ inputs = []
+ for i in range(start + offset, end + offset):
+ message = str(i) + " " + crazy
+ success = pubnub.publish({
+ 'channel': cls.channel,
+ 'message': message,
+ })
+ t = pubnub.time()
+ inputs.append({'timestamp': t, 'message': message})
+ print 'Message # ', i, ' published'
+ return inputs
+
+ @classmethod
+ def setUpClass(cls):
+ print 'Setting up context for Detailed History tests. Please wait ...'
+ cls.starttime = pubnub.time()
+ cls.inputs = cls.inputs + cls.publish_msg(0, cls.total_msg / 2, 0)
+ cls.midtime = pubnub.time()
+ cls.inputs = cls.inputs + cls.publish_msg(
+ 0, cls.total_msg / 2, cls.total_msg / 2)
+ cls.endtime = pubnub.time()
+ print 'Context setup for Detailed History tests. Now running tests'
+
+ def test_begin_to_end_count(self):
+ count = 5
+ history = pubnub.detailedHistory({
+ 'channel': self.__class__.channel,
+ 'start': self.__class__.starttime,
+ 'end': self.__class__.endtime,
+ 'count': count
+ })[0]
+ self.assertTrue(len(history) == count and history[-1].encode(
+ 'utf-8') == self.__class__.inputs[count - 1]['message'])
+
+ def test_end_to_begin_count(self):
+ count = 5
+ history = pubnub.detailedHistory({
+ 'channel': self.__class__.channel,
+ 'start': self.__class__.endtime,
+ 'end': self.__class__.starttime,
+ 'count': count
+ })[0]
+ self.assertTrue(len(history) == count and history[-1]
+ .encode('utf-8') == self.__class__.inputs[-1]['message'])
+
+ def test_start_reverse_true(self):
+ history = pubnub.detailedHistory({
+ 'channel': self.__class__.channel,
+ 'start': self.__class__.midtime,
+ 'reverse': True
+ })[0]
+ self.assertTrue(len(history) == self.__class__.total_msg / 2)
+ expected_msg = self.__class__.inputs[-1]['message']
+ self.assertTrue(history[-1].encode('utf-8') == expected_msg)
+
+ def test_start_reverse_false(self):
+ history = pubnub.detailedHistory({
+ 'channel': self.__class__.channel,
+ 'start': self.__class__.midtime,
+ })[0]
+ self.assertTrue(history[0].encode('utf-8')
+ == self.__class__.inputs[0]['message'])
+
+ def test_end_reverse_true(self):
+ history = pubnub.detailedHistory({
+ 'channel': self.__class__.channel,
+ 'end': self.__class__.midtime,
+ 'reverse': True
+ })[0]
+ self.assertTrue(history[0].encode('utf-8')
+ == self.__class__.inputs[0]['message'])
+
+ def test_end_reverse_false(self):
+ history = pubnub.detailedHistory({
+ 'channel': self.__class__.channel,
+ 'end': self.__class__.midtime,
+ })[0]
+ self.assertTrue(len(history) == self.__class__.total_msg / 2)
+ self.assertTrue(history[-1].encode('utf-8')
+ == self.__class__.inputs[-1]['message'])
+
+ def test_count(self):
+ history = pubnub.detailedHistory({
+ 'channel': self.__class__.channel,
+ 'count': 5
+ })[0]
+ self.assertTrue(len(history) == 5)
+
+ def test_count_zero(self):
+ history = pubnub.detailedHistory({
+ 'channel': self.__class__.channel,
+ 'count': 0
+ })[0]
+ self.assertTrue(len(history) == 0)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/python/examples/here-now-example.py b/python/examples/here-now-example.py
new file mode 100644
index 0000000..261500a
--- /dev/null
+++ b/python/examples/here-now-example.py
@@ -0,0 +1,33 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+## -----------------------------------
+## PubNub 3.1 Real-time Push Cloud API
+## -----------------------------------
+
+import sys
+from twisted.internet import reactor
+from Pubnub import Pubnub
+
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or ''
+ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+
+## -----------------------------------------------------------------------
+## Initiate Pubnub State
+## -----------------------------------------------------------------------
+pubnub = Pubnub( publish_key=publish_key, subscribe_key=subscribe_key,
+ secret_key=secret_key, cipher_key=cipher_key, ssl_on=ssl_on)
+crazy = 'hello_world'
+
+
+print pubnub.here_now( {
+ 'channel' : crazy
+})
+
diff --git a/python/examples/history-example.py b/python/examples/history-example.py
new file mode 100755
index 0000000..cedf69e
--- /dev/null
+++ b/python/examples/history-example.py
@@ -0,0 +1,12 @@
+from Pubnub import Pubnub
+
+## Initiat Class
+pubnub = Pubnub( 'demo', 'demo', None, False )
+
+## History Example
+history = pubnub.history({
+ 'channel' : 'hello_world',
+ 'limit' : 1
+})
+print(history)
+
diff --git a/python/examples/publish-example.py b/python/examples/publish-example.py
new file mode 100755
index 0000000..c97034b
--- /dev/null
+++ b/python/examples/publish-example.py
@@ -0,0 +1,14 @@
+from Pubnub import Pubnub
+
+## Initiate Class
+pubnub = Pubnub( publish_key='demo', subscribe_key='demo', ssl_on=False )
+
+## Publish Example
+info = pubnub.publish({
+ 'channel' : 'hello_world',
+ 'message' : {
+ 'some_text' : 'Hello my World'
+ }
+})
+print(info)
+
diff --git a/python/examples/subscribe-example.py b/python/examples/subscribe-example.py
new file mode 100755
index 0000000..e458e2b
--- /dev/null
+++ b/python/examples/subscribe-example.py
@@ -0,0 +1,64 @@
+import sys
+import threading
+import time
+import random
+import string
+from Pubnub import Pubnub
+
+## Initiate Class
+pubnub = Pubnub( 'demo', 'demo', None, False )
+
+print("My UUID is: "+pubnub.uuid)
+
+channel = ''.join(random.choice(string.ascii_letters + string.digits) for x in range(20))
+
+## Subscribe Example
+def receive(message) :
+ print(message)
+ return False
+
+def pres_event(message):
+ print(message)
+ return False
+
+def subscribe():
+ print("Listening for messages on '%s' channel..." % channel)
+ pubnub.subscribe({
+ 'channel' : channel,
+ 'callback' : receive
+ })
+
+def presence():
+ print("Listening for presence events on '%s' channel..." % channel)
+ pubnub.presence({
+ 'channel' : channel,
+ 'callback' : pres_event
+ })
+
+def publish():
+ print("Publishing a test message on '%s' channel..." % channel)
+ pubnub.publish({
+ 'channel' : channel,
+ 'message' : { 'text':'foo bar' }
+ })
+
+pres_thread = threading.Thread(target=presence)
+pres_thread.daemon=True
+pres_thread.start()
+
+sub_thread = threading.Thread(target=subscribe)
+sub_thread.daemon=True
+sub_thread.start()
+
+time.sleep(3)
+
+publish()
+
+
+print("waiting for subscribes and presence")
+pres_thread.join()
+
+print pubnub.here_now({'channel':channel})
+
+sub_thread.join()
+