aboutsummaryrefslogtreecommitdiffstats
path: root/python/tests
diff options
context:
space:
mode:
authorDevendra2014-06-18 01:02:15 +0530
committerDevendra2014-06-18 01:02:15 +0530
commita1ee7f8bd90b463318bfa75f7ee4f58d458d2a24 (patch)
tree9e260902f960f792a93fa25e3b619c42ecb5f4d4 /python/tests
parentbfd5c64bdf7ed45f21207cb53c653e7220e39eff (diff)
parent083127cde127dd78fc88ffe2b3b82144b2c07038 (diff)
downloadpubnub-python-a1ee7f8bd90b463318bfa75f7ee4f58d458d2a24.tar.bz2
Merge branch 'master' into develop
Diffstat (limited to 'python/tests')
-rwxr-xr-xpython/tests/detailed-history-unit-test.py137
-rwxr-xr-xpython/tests/subscribe-test.py154
-rw-r--r--python/tests/test_grant.py199
-rw-r--r--python/tests/test_grant_async.py369
-rw-r--r--python/tests/test_publish_async.py228
-rwxr-xr-xpython/tests/unit-test.py66
6 files changed, 950 insertions, 203 deletions
diff --git a/python/tests/detailed-history-unit-test.py b/python/tests/detailed-history-unit-test.py
deleted file mode 100755
index 31bdef8..0000000
--- a/python/tests/detailed-history-unit-test.py
+++ /dev/null
@@ -1,137 +0,0 @@
-## www.pubnub.com - PubNub Real-time push service in the cloud.
-# coding=utf8
-
-## PubNub Real-time Push APIs and Notifications Framework
-## Copyright (c) 2010 Stephen Blum
-## http://www.pubnub.com/
-
-## -----------------------------------
-## PubNub 3.0 Real-time Push Cloud API
-## -----------------------------------
-
-import sys
-sys.path.append('.')
-sys.path.append('..')
-sys.path.append('../common')
-from Pubnub import Pubnub
-import unittest2 as unittest
-
-
-publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
-subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
-secret_key = len(sys.argv) > 3 and sys.argv[3] or None
-ssl_on = len(sys.argv) > 4 and bool(sys.argv[4]) or False
-pubnub = Pubnub(publish_key, subscribe_key, secret_key, ssl_on)
-crazy = ' ~`!@#$%^&*(顶顅Ȓ)+=[]\\{}|;\':",./<>?abcd'
-
-
-class TestDetailedHistory(unittest.TestCase):
- total_msg = 10
- channel = pubnub.time()
- starttime = None
- inputs = []
- endtime = None
- slice_a = 8
- slice_b = 2
- slice_size = slice_a - slice_b
-
- @classmethod
- def publish_msg(cls, start, end, offset):
- print 'Publishing messages'
- inputs = []
- for i in range(start + offset, end + offset):
- message = str(i) + " " + crazy
- success = pubnub.publish({
- 'channel': cls.channel,
- 'message': message,
- })
- t = pubnub.time()
- inputs.append({'timestamp': t, 'message': message})
- print 'Message # ', i, ' published'
- return inputs
-
- @classmethod
- def setUpClass(cls):
- print 'Setting up context for Detailed History tests. Please wait ...'
- cls.starttime = pubnub.time()
- cls.inputs = cls.inputs + cls.publish_msg(0, cls.total_msg / 2, 0)
- cls.midtime = pubnub.time()
- cls.inputs = cls.inputs + cls.publish_msg(
- 0, cls.total_msg / 2, cls.total_msg / 2)
- cls.endtime = pubnub.time()
- print 'Context setup for Detailed History tests. Now running tests'
-
- def test_begin_to_end_count(self):
- count = 5
- history = pubnub.detailedHistory({
- 'channel': self.__class__.channel,
- 'start': self.__class__.starttime,
- 'end': self.__class__.endtime,
- 'count': count
- })[0]
- self.assertTrue(len(history) == count and history[-1].encode(
- 'utf-8') == self.__class__.inputs[count - 1]['message'])
-
- def test_end_to_begin_count(self):
- count = 5
- history = pubnub.detailedHistory({
- 'channel': self.__class__.channel,
- 'start': self.__class__.endtime,
- 'end': self.__class__.starttime,
- 'count': count
- })[0]
- self.assertTrue(len(history) == count and history[-1]
- .encode('utf-8') == self.__class__.inputs[-1]['message'])
-
- def test_start_reverse_true(self):
- history = pubnub.detailedHistory({
- 'channel': self.__class__.channel,
- 'start': self.__class__.midtime,
- 'reverse': True
- })[0]
- self.assertTrue(len(history) == self.__class__.total_msg / 2)
- expected_msg = self.__class__.inputs[-1]['message']
- self.assertTrue(history[-1].encode('utf-8') == expected_msg)
-
- def test_start_reverse_false(self):
- history = pubnub.detailedHistory({
- 'channel': self.__class__.channel,
- 'start': self.__class__.midtime,
- })[0]
- self.assertTrue(history[0].encode('utf-8')
- == self.__class__.inputs[0]['message'])
-
- def test_end_reverse_true(self):
- history = pubnub.detailedHistory({
- 'channel': self.__class__.channel,
- 'end': self.__class__.midtime,
- 'reverse': True
- })[0]
- self.assertTrue(history[0].encode('utf-8')
- == self.__class__.inputs[0]['message'])
-
- def test_end_reverse_false(self):
- history = pubnub.detailedHistory({
- 'channel': self.__class__.channel,
- 'end': self.__class__.midtime,
- })[0]
- self.assertTrue(len(history) == self.__class__.total_msg / 2)
- self.assertTrue(history[-1].encode('utf-8')
- == self.__class__.inputs[-1]['message'])
-
- def test_count(self):
- history = pubnub.detailedHistory({
- 'channel': self.__class__.channel,
- 'count': 5
- })[0]
- self.assertTrue(len(history) == 5)
-
- def test_count_zero(self):
- history = pubnub.detailedHistory({
- 'channel': self.__class__.channel,
- 'count': 0
- })[0]
- self.assertTrue(len(history) == 0)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/python/tests/subscribe-test.py b/python/tests/subscribe-test.py
new file mode 100755
index 0000000..a1b1826
--- /dev/null
+++ b/python/tests/subscribe-test.py
@@ -0,0 +1,154 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+## -----------------------------------
+## PubNub 3.1 Real-time Push Cloud API
+## -----------------------------------
+
+import sys
+import datetime
+from Pubnub import PubnubAsync as Pubnub
+from functools import partial
+from threading import current_thread
+import threading
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or None
+ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+
+## -----------------------------------------------------------------------
+## Initiate Pubnub State
+## -----------------------------------------------------------------------
+#pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )
+pubnub = Pubnub(publish_key, subscribe_key, secret_key, ssl_on)
+crazy = 'hello_world'
+
+current = -1
+
+errors = 0
+received = 0
+
+## -----------------------------------------------------------------------
+## Subscribe Example
+## -----------------------------------------------------------------------
+
+
+def message_received(message):
+ print(message)
+
+
+def check_received(message):
+ global current
+ global errors
+ global received
+ print(message)
+ print(current)
+ if message <= current:
+ print('ERROR')
+ #sys.exit()
+ errors += 1
+ else:
+ received += 1
+ print('active thread count : ' + str(threading.activeCount()))
+ print('errors = ' + str(errors))
+ print(current_thread().getName() + ' , ' + 'received = ' + str(received))
+
+ if received != message:
+ print('********** MISSED **************** ' + str(message - received))
+ current = message
+
+
+def connected_test(ch):
+ print('Connected ' + ch)
+
+
+def connected(ch):
+ pass
+
+
+'''
+pubnub.subscribe({
+ 'channel' : 'abcd1',
+ 'connect' : connected,
+ 'callback' : message_received
+})
+'''
+
+
+def cb1():
+ pubnub.subscribe({
+ 'channel': 'efgh1',
+ 'connect': connected,
+ 'callback': message_received
+ })
+
+
+def cb2():
+ pubnub.subscribe({
+ 'channel': 'dsm-test',
+ 'connect': connected_test,
+ 'callback': check_received
+ })
+
+
+def cb3():
+ pubnub.unsubscribe({'channel': 'efgh1'})
+
+
+def cb4():
+ pubnub.unsubscribe({'channel': 'abcd1'})
+
+
+def subscribe(channel):
+ pubnub.subscribe({
+ 'channel': channel,
+ 'connect': connected,
+ 'callback': message_received
+ })
+
+
+pubnub.timeout(15, cb1)
+
+pubnub.timeout(30, cb2)
+
+
+pubnub.timeout(45, cb3)
+
+pubnub.timeout(60, cb4)
+
+#'''
+for x in range(1, 1000):
+ #print x
+ def y(t):
+ subscribe('channel-' + str(t))
+
+ def z(t):
+ pubnub.unsubscribe({'channel': 'channel-' + str(t)})
+
+ pubnub.timeout(x + 5, partial(y, x))
+ pubnub.timeout(x + 25, partial(z, x))
+ x += 10
+#'''
+
+'''
+for x in range(1,1000):
+ def cb(r): print r , ' : ', threading.activeCount()
+ def y(t):
+ pubnub.publish({
+ 'message' : t,
+ 'callback' : cb,
+ 'channel' : 'dsm-test'
+ })
+
+
+ pubnub.timeout(x + 1, partial(y,x))
+ x += 1
+'''
+
+
+pubnub.start()
diff --git a/python/tests/test_grant.py b/python/tests/test_grant.py
new file mode 100644
index 0000000..6826335
--- /dev/null
+++ b/python/tests/test_grant.py
@@ -0,0 +1,199 @@
+
+
+from Pubnub import Pubnub
+import time
+
+pubnub = Pubnub("demo","demo")
+pubnub_pam = Pubnub("pub-c-c077418d-f83c-4860-b213-2f6c77bde29a",
+ "sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe", "sec-c-OGU3Y2Q4ZWUtNDQwMC00NTI1LThjNWYtNWJmY2M4OGIwNjEy")
+
+
+# Grant permission read true, write true, on channel ( Sync Mode )
+def test_1():
+ resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=True, ttl=1)
+ assert resp['message'] == 'Success'
+ assert resp['payload'] == {
+ 'auths': {'abcd': {'r': 1, 'w': 1}},
+ 'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ 'level': 'user', 'channel': 'abcd', 'ttl': 1
+ }
+
+
+# Grant permission read false, write false, on channel ( Sync Mode )
+def test_2():
+ resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=False, ttl=1)
+ assert resp['message'] == 'Success'
+ assert resp['payload'] == {
+ 'auths': {'abcd': {'r': 0, 'w': 0}},
+ 'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ 'level': 'user', 'channel': 'abcd', 'ttl': 1
+ }
+
+# Grant permission read True, write false, on channel ( Sync Mode )
+def test_3():
+ resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=False, ttl=1)
+ assert resp['message'] == 'Success'
+ assert resp['payload'] == {
+ 'auths': {'abcd': {'r': 1, 'w': 0}},
+ 'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ 'level': 'user', 'channel': 'abcd', 'ttl': 1
+ }
+
+# Grant permission read False, write True, on channel ( Sync Mode )
+def test_4():
+ resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=False, ttl=1)
+ assert resp['message'] == 'Success'
+ assert resp['payload'] == {
+ 'auths': {'abcd': {'r': 1, 'w': 0}},
+ 'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ 'level': 'user', 'channel': 'abcd', 'ttl': 1
+ }
+
+# Grant permission read False, write True, on channel ( Sync Mode ), TTL 10
+def test_5():
+ resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=False, ttl=10)
+ assert resp['message'] == 'Success'
+ assert resp['payload'] == {
+ 'auths': {'abcd': {'r': 1, 'w': 0}},
+ 'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ 'level': 'user', 'channel': 'abcd', 'ttl': 10
+ }
+
+# Grant permission read False, write True, without channel ( Sync Mode ), TTL 10
+def test_6():
+ resp = pubnub_pam.grant(auth_key="abcd", read=True, write=False, ttl=10)
+ assert resp['message'] == 'Success'
+ assert resp['payload'] == {
+ 'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ 'level': 'subkey' , u'r': 1, u'w': 0, 'ttl': 10
+ }
+
+
+# Grant permission read False, write False, without channel ( Sync Mode )
+def test_7():
+ resp = pubnub_pam.grant(auth_key="abcd", read=False, write=False)
+ assert resp['message'] == 'Success'
+ assert resp['payload'] == {
+ 'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ 'level': 'subkey' , u'r': 0, u'w': 0, 'ttl': 1
+ }
+
+
+# Complete flow , try publish on forbidden channel, grant permission to auth key and try again. ( Sync Mode)
+
+def test_8():
+ channel = "test_8-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ resp = pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10)
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': channel, u'ttl': 10}
+ }
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp[0] == 1
+
+
+# Complete flow , try publish on forbidden channel, grant permission to authkey and try again. ( Sync Mode)
+# then revoke and try again
+def test_9():
+ channel = "test_9-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ resp = pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10)
+ print resp
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': channel, u'ttl': 10}
+ }
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp[0] == 1
+ resp = pubnub_pam.revoke(channel=channel, auth_key=auth_key)
+ print resp
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {auth_key : {u'r': 0, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': channel, u'ttl': 1}
+ }
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+
+# Complete flow , try publish on forbidden channel, grant permission channel level for subkey and try again. ( Sync Mode)
+# then revoke and try again
+def test_10():
+ channel = "test_10-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ resp = pubnub_pam.grant(channel=channel, read=True, write=True, ttl=10)
+ print resp
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'channels': {channel: {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'channel', u'ttl': 10}
+ }
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp[0] == 1
+ resp = pubnub_pam.revoke(channel=channel)
+ print resp
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'channels': {channel : {u'r': 0, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'channel', u'ttl': 1}
+ }
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+
+# Complete flow , try publish on forbidden channel, grant permission subkey level for subkey and try again. ( Sync Mode)
+# then revoke and try again
+def test_11():
+ channel = "test_11-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ resp = pubnub_pam.grant(read=True, write=True, ttl=10)
+ print resp
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'r': 1, u'w': 1,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 10}
+ }
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp[0] == 1
+ resp = pubnub_pam.revoke()
+ print resp
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'r': 0, u'w': 0,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 1}
+ }
+ resp = pubnub_pam.publish(channel=channel,message=message)
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+
+
diff --git a/python/tests/test_grant_async.py b/python/tests/test_grant_async.py
new file mode 100644
index 0000000..4d38a0a
--- /dev/null
+++ b/python/tests/test_grant_async.py
@@ -0,0 +1,369 @@
+
+
+from Pubnub import Pubnub
+import time
+
+pubnub = Pubnub("demo","demo")
+pubnub_pam = Pubnub("pub-c-c077418d-f83c-4860-b213-2f6c77bde29a",
+ "sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe", "sec-c-OGU3Y2Q4ZWUtNDQwMC00NTI1LThjNWYtNWJmY2M4OGIwNjEy")
+
+
+
+# Grant permission read true, write true, on channel ( Async Mode )
+def test_1():
+ resp = {'response' : None}
+ def _callback(response):
+ resp['response'] = response
+
+ def _error(response):
+ resp['response'] = response
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=True, ttl=1, callback=_callback, error=_error)
+ time.sleep(2)
+ assert resp['response'] == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 1}
+ }
+
+
+# Grant permission read false, write false, on channel ( Async Mode )
+def test_2():
+ resp = {'response' : None}
+ def _callback(response):
+ resp['response'] = response
+
+ def _error(response):
+ resp['response'] = response
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=False, ttl=1, callback=_callback, error=_error)
+ time.sleep(2)
+ assert resp['response'] == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 0, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 1}
+ }
+
+
+# Grant permission read True, write false, on channel ( Async Mode )
+def test_3():
+ resp = {'response' : None}
+ def _callback(response):
+ resp['response'] = response
+
+ def _error(response):
+ resp['response'] = response
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=False, ttl=1, callback=_callback, error=_error)
+ time.sleep(2)
+ assert resp['response'] == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 1, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 1}
+ }
+
+# Grant permission read False, write True, on channel ( Async Mode )
+def test_4():
+ resp = {'response' : None}
+ def _callback(response):
+ resp['response'] = response
+
+ def _error(response):
+ resp['response'] = response
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=True, ttl=1, callback=_callback, error=_error)
+ time.sleep(2)
+ assert resp['response'] == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 0, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 1}
+ }
+
+
+# Grant permission read False, write True, on channel ( Async Mode ), TTL 10
+def test_5():
+ resp = {'response' : None}
+ def _callback(response):
+ resp['response'] = response
+
+ def _error(response):
+ resp['response'] = response
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=True, ttl=10, callback=_callback, error=_error)
+ time.sleep(2)
+ assert resp['response'] == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 0, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 10}
+ }
+
+
+# Grant permission read False, write True, without channel ( Async Mode ), TTL 10
+def test_6():
+ resp = {'response' : None}
+ def _callback(response):
+ resp['response'] = response
+
+ def _error(response):
+ resp['response'] = response
+
+ pubnub_pam.grant(auth_key="abcd", read=False, write=True, ttl=10, callback=_callback, error=_error)
+ time.sleep(2)
+ assert resp['response'] == {
+ 'message': u'Success',
+ 'payload': { u'r': 0, u'w': 1,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 10}
+ }
+
+
+# Grant permission read False, write False, without channel ( Async Mode )
+def test_7():
+ resp = {'response' : None}
+ def _callback(response):
+ resp['response'] = response
+
+ def _error(response):
+ resp['response'] = response
+
+ pubnub_pam.grant(auth_key="abcd", read=False, write=False, callback=_callback, error=_error)
+ time.sleep(2)
+ assert resp['response'] == {
+ 'message': u'Success',
+ 'payload': { u'r': 0, u'w': 0,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 1}
+ }
+
+
+# Complete flow , try publish on forbidden channel, grant permission to subkey and try again. ( Sync Mode)
+
+def test_8():
+ channel = "test_8-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+
+
+
+ def _cb1(resp, ch=None):
+ assert False
+ def _err1(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1)
+ time.sleep(2)
+
+
+ def _cb2(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': channel, u'ttl': 10}
+ }
+ def _err2(resp):
+ assert False
+
+
+ resp = pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10, callback=_cb2, error=_err2)
+ time.sleep(2)
+
+ def _cb3(resp, ch=None):
+ assert resp[0] == 1
+ def _err3(resp):
+ assert False
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3)
+ time.sleep(2)
+
+
+
+
+
+# Complete flow , try publish on forbidden channel, grant permission to authkey and try again.
+# then revoke and try again
+def test_9():
+ channel = "test_9-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+
+ def _cb1(resp, ch=None):
+ assert False
+ def _err1(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1)
+ time.sleep(2)
+
+
+ def _cb2(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': channel, u'ttl': 10}
+ }
+ def _err2(resp):
+ assert False
+
+
+ pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10, callback=_cb2, error=_err2)
+ time.sleep(2)
+
+ def _cb3(resp, ch=None):
+ assert resp[0] == 1
+ def _err3(resp):
+ assert False
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3)
+ time.sleep(2)
+
+ def _cb4(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {auth_key : {u'r': 0, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': channel, u'ttl': 1}
+ }
+ def _err4(resp):
+ assert False
+ pubnub_pam.revoke(channel=channel, auth_key=auth_key, callback=_cb4, error=_err4)
+ time.sleep(2)
+
+ def _cb5(resp, ch=None):
+ assert False
+ def _err5(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb5, error=_err5)
+ time.sleep(2)
+
+
+
+
+# Complete flow , try publish on forbidden channel, grant permission channel level for subkey and try again.
+# then revoke and try again
+def test_10():
+ channel = "test_10-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+
+ def _cb1(resp, ch=None):
+ assert False
+ def _err1(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1)
+ time.sleep(2)
+
+
+ def _cb2(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'channels': {channel: {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'channel', u'ttl': 10}
+ }
+ def _err2(resp):
+ assert False
+
+
+ pubnub_pam.grant(channel=channel, read=True, write=True, ttl=10, callback=_cb2, error=_err2)
+ time.sleep(2)
+
+ def _cb3(resp, ch=None):
+ assert resp[0] == 1
+ def _err3(resp):
+ assert False
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3)
+ time.sleep(2)
+
+ def _cb4(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'channels': {channel : {u'r': 0, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'channel', u'ttl': 1}
+ }
+ def _err4(resp):
+ assert False
+ pubnub_pam.revoke(channel=channel, callback=_cb4, error=_err4)
+ time.sleep(2)
+
+ def _cb5(resp, ch=None):
+ assert False
+ def _err5(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb5, error=_err5)
+ time.sleep(2)
+
+
+
+# Complete flow , try publish on forbidden channel, grant permission subkey level for subkey and try again.
+# then revoke and try again
+def test_11():
+ channel = "test_11-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+
+ def _cb1(resp, ch=None):
+ assert False
+ def _err1(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1)
+ time.sleep(2)
+
+
+ def _cb2(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'r': 1, u'w': 1,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 10}
+ }
+ def _err2(resp):
+ assert False
+
+
+ pubnub_pam.grant(read=True, write=True, ttl=10, callback=_cb2, error=_err2)
+ time.sleep(2)
+
+ def _cb3(resp, ch=None):
+ assert resp[0] == 1
+ def _err3(resp):
+ assert False
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3)
+ time.sleep(2)
+
+ def _cb4(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'r': 0, u'w': 0,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 1}
+ }
+ def _err4(resp):
+ assert False
+ pubnub_pam.revoke(callback=_cb4, error=_err4)
+ time.sleep(2)
+
+ def _cb5(resp, ch=None):
+ assert False
+ def _err5(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb5, error=_err5)
+ time.sleep(2)
diff --git a/python/tests/test_publish_async.py b/python/tests/test_publish_async.py
new file mode 100644
index 0000000..7270727
--- /dev/null
+++ b/python/tests/test_publish_async.py
@@ -0,0 +1,228 @@
+
+
+from Pubnub import Pubnub
+import time
+
+pubnub = Pubnub("demo","demo")
+pubnub_enc = Pubnub("demo", "demo", cipher_key="enigma")
+pubnub_pam = Pubnub("pub-c-c077418d-f83c-4860-b213-2f6c77bde29a",
+ "sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe", "sec-c-OGU3Y2Q4ZWUtNDQwMC00NTI1LThjNWYtNWJmY2M4OGIwNjEy")
+
+
+
+# Publish and receive string
+def test_1():
+
+ channel = "test_1-" + str(time.time())
+ message = "I am a string"
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive array
+def test_2():
+
+ channel = "test_2-" + str(time.time())
+ message = [1,2]
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive json object
+def test_3():
+
+ channel = "test_2-" + str(time.time())
+ message = { "a" : "b" }
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive number
+def test_4():
+
+ channel = "test_2-" + str(time.time())
+ message = 100
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive number string
+def test_5():
+
+ channel = "test_5-" + str(time.time())
+ message = "100"
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+
+# Publish and receive string (Encryption enabled)
+def test_6():
+
+ channel = "test_6-" + str(time.time())
+ message = "I am a string"
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub_enc.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive array (Encryption enabled)
+def test_7():
+
+ channel = "test_7-" + str(time.time())
+ message = [1,2]
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive json object (Encryption enabled)
+def test_8():
+
+ channel = "test_8-" + str(time.time())
+ message = { "a" : "b" }
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub_enc.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive number (Encryption enabled)
+def test_9():
+
+ channel = "test_9-" + str(time.time())
+ message = 100
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub_enc.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive number string (Encryption enabled)
+def test_10():
+
+ channel = "test_10-" + str(time.time())
+ message = "100"
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub_enc.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive object string (Encryption enabled)
+def test_11():
+
+ channel = "test_11-" + str(time.time())
+ message = '{"a" : "b"}'
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub_enc.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive array string (Encryption enabled)
+def test_12():
+
+ channel = "test_12-" + str(time.time())
+ message = '[1,2]'
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ pubnub_enc.publish(channel,message)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
diff --git a/python/tests/unit-test.py b/python/tests/unit-test.py
deleted file mode 100755
index 1737ace..0000000
--- a/python/tests/unit-test.py
+++ /dev/null
@@ -1,66 +0,0 @@
-## www.pubnub.com - PubNub Real-time push service in the cloud.
-# coding=utf8
-
-## PubNub Real-time Push APIs and Notifications Framework
-## Copyright (c) 2010 Stephen Blum
-## http://www.pubnub.com/
-
-## -----------------------------------
-## PubNub 3.0 Real-time Push Cloud API
-## -----------------------------------
-
-import sys
-sys.path.append('.')
-sys.path.append('..')
-sys.path.append('../common')
-from Pubnub import Pubnub
-
-publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
-subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
-secret_key = len(sys.argv) > 3 and sys.argv[3] or None
-ssl_on = len(sys.argv) > 4 and bool(sys.argv[4]) or False
-
-
-## -----------------------------------------------------------------------
-## Initiat Class
-## -----------------------------------------------------------------------
-
-pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on )
-crazy = 'demo'
-
-## ---------------------------------------------------------------------------
-## Unit Test Function
-## ---------------------------------------------------------------------------
-def test( trial, name ) :
- if trial :
- print( 'PASS: ' + name )
- else :
- print( 'FAIL: ' + name )
-
-## -----------------------------------------------------------------------
-## Publish Example
-## -----------------------------------------------------------------------
-publish_success = pubnub.publish({
- 'channel' : crazy,
- 'message' : crazy
-})
-test( publish_success[0] == 1, 'Publish First Message Success' )
-
-## -----------------------------------------------------------------------
-## History Example
-## -----------------------------------------------------------------------
-history = pubnub.history({
- 'channel' : crazy,
- 'limit' : 1
-})
-test(
- history[0].encode('utf-8') == crazy,
- 'History Message: ' + history[0]
-)
-test( len(history) == 1, 'History Message Count' )
-
-## -----------------------------------------------------------------------
-## PubNub Server Time Example
-## -----------------------------------------------------------------------
-timestamp = pubnub.time()
-test( timestamp > 0, 'PubNub Server Time: ' + str(timestamp) )