diff options
Diffstat (limited to 'python/tests')
| -rwxr-xr-x | python/tests/detailed-history-unit-test.py | 137 | ||||
| -rwxr-xr-x | python/tests/subscribe-test.py | 154 | ||||
| -rw-r--r-- | python/tests/test_grant.py | 199 | ||||
| -rw-r--r-- | python/tests/test_grant_async.py | 369 | ||||
| -rw-r--r-- | python/tests/test_publish_async.py | 228 | ||||
| -rwxr-xr-x | python/tests/unit-test.py | 66 | 
6 files changed, 950 insertions, 203 deletions
| diff --git a/python/tests/detailed-history-unit-test.py b/python/tests/detailed-history-unit-test.py deleted file mode 100755 index 31bdef8..0000000 --- a/python/tests/detailed-history-unit-test.py +++ /dev/null @@ -1,137 +0,0 @@ -## www.pubnub.com - PubNub Real-time push service in the cloud. -# coding=utf8 - -## PubNub Real-time Push APIs and Notifications Framework -## Copyright (c) 2010 Stephen Blum -## http://www.pubnub.com/ - -## ----------------------------------- -## PubNub 3.0 Real-time Push Cloud API -## ----------------------------------- - -import sys -sys.path.append('.') -sys.path.append('..') -sys.path.append('../common') -from Pubnub import Pubnub -import unittest2 as unittest - - -publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' -subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' -secret_key = len(sys.argv) > 3 and sys.argv[3] or None -ssl_on = len(sys.argv) > 4 and bool(sys.argv[4]) or False -pubnub = Pubnub(publish_key, subscribe_key, secret_key, ssl_on) -crazy = ' ~`!@#$%^&*(顶顅Ȓ)+=[]\\{}|;\':",./<>?abcd' - - -class TestDetailedHistory(unittest.TestCase): -    total_msg = 10 -    channel = pubnub.time() -    starttime = None -    inputs = [] -    endtime = None -    slice_a = 8 -    slice_b = 2 -    slice_size = slice_a - slice_b - -    @classmethod -    def publish_msg(cls, start, end, offset): -        print 'Publishing messages' -        inputs = [] -        for i in range(start + offset, end + offset): -            message = str(i) + " " + crazy -            success = pubnub.publish({ -                                     'channel': cls.channel, -                                     'message': message, -                                     }) -            t = pubnub.time() -            inputs.append({'timestamp': t, 'message': message}) -            print 'Message # ', i, ' published' -        return inputs - -    @classmethod -    def setUpClass(cls): -        print 'Setting up context for Detailed History tests. Please wait ...' -        cls.starttime = pubnub.time() -        cls.inputs = cls.inputs + cls.publish_msg(0, cls.total_msg / 2, 0) -        cls.midtime = pubnub.time() -        cls.inputs = cls.inputs + cls.publish_msg( -            0, cls.total_msg / 2, cls.total_msg / 2) -        cls.endtime = pubnub.time() -        print 'Context setup for Detailed History tests. Now running tests' - -    def test_begin_to_end_count(self): -        count = 5 -        history = pubnub.detailedHistory({ -                                         'channel': self.__class__.channel, -                                         'start': self.__class__.starttime, -                                         'end': self.__class__.endtime, -                                         'count': count -                                         })[0] -        self.assertTrue(len(history) == count and history[-1].encode( -            'utf-8') == self.__class__.inputs[count - 1]['message']) - -    def test_end_to_begin_count(self): -        count = 5 -        history = pubnub.detailedHistory({ -                                         'channel': self.__class__.channel, -                                         'start': self.__class__.endtime, -                                         'end': self.__class__.starttime, -                                         'count': count -                                         })[0] -        self.assertTrue(len(history) == count and history[-1] -            .encode('utf-8') == self.__class__.inputs[-1]['message']) - -    def test_start_reverse_true(self): -        history = pubnub.detailedHistory({ -                                         'channel': self.__class__.channel, -                                         'start': self.__class__.midtime, -                                         'reverse': True -                                         })[0] -        self.assertTrue(len(history) == self.__class__.total_msg / 2) -        expected_msg = self.__class__.inputs[-1]['message'] -        self.assertTrue(history[-1].encode('utf-8') == expected_msg) - -    def test_start_reverse_false(self): -        history = pubnub.detailedHistory({ -                                         'channel': self.__class__.channel, -                                         'start': self.__class__.midtime, -                                         })[0] -        self.assertTrue(history[0].encode('utf-8') -                        == self.__class__.inputs[0]['message']) - -    def test_end_reverse_true(self): -        history = pubnub.detailedHistory({ -                                         'channel': self.__class__.channel, -                                         'end': self.__class__.midtime, -                                         'reverse': True -                                         })[0] -        self.assertTrue(history[0].encode('utf-8') -                        == self.__class__.inputs[0]['message']) - -    def test_end_reverse_false(self): -        history = pubnub.detailedHistory({ -                                         'channel': self.__class__.channel, -                                         'end': self.__class__.midtime, -                                         })[0] -        self.assertTrue(len(history) == self.__class__.total_msg / 2) -        self.assertTrue(history[-1].encode('utf-8') -                        == self.__class__.inputs[-1]['message']) - -    def test_count(self): -        history = pubnub.detailedHistory({ -                                         'channel': self.__class__.channel, -                                         'count': 5 -                                         })[0] -        self.assertTrue(len(history) == 5) - -    def test_count_zero(self): -        history = pubnub.detailedHistory({ -                                         'channel': self.__class__.channel, -                                         'count': 0 -                                         })[0] -        self.assertTrue(len(history) == 0) - -if __name__ == '__main__': -    unittest.main() diff --git a/python/tests/subscribe-test.py b/python/tests/subscribe-test.py new file mode 100755 index 0000000..a1b1826 --- /dev/null +++ b/python/tests/subscribe-test.py @@ -0,0 +1,154 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud. +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + +## ----------------------------------- +## PubNub 3.1 Real-time Push Cloud API +## ----------------------------------- + +import sys +import datetime +from Pubnub import PubnubAsync as Pubnub +from functools import partial +from threading import current_thread +import threading +publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key = len(sys.argv) > 4 and sys.argv[4] or None +ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +#pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) +pubnub = Pubnub(publish_key, subscribe_key, secret_key, ssl_on) +crazy = 'hello_world' + +current = -1 + +errors = 0 +received = 0 + +## ----------------------------------------------------------------------- +## Subscribe Example +## ----------------------------------------------------------------------- + + +def message_received(message): +    print(message) + + +def check_received(message): +    global current +    global errors +    global received +    print(message) +    print(current) +    if message <= current: +        print('ERROR') +        #sys.exit() +        errors += 1 +    else: +        received += 1 +    print('active thread count : ' + str(threading.activeCount())) +    print('errors = ' + str(errors)) +    print(current_thread().getName() + ' , ' + 'received = ' + str(received)) + +    if received != message: +        print('********** MISSED **************** ' + str(message - received)) +    current = message + + +def connected_test(ch): +    print('Connected ' + ch) + + +def connected(ch): +    pass + + +''' +pubnub.subscribe({ +    'channel'  : 'abcd1', +    'connect'  : connected, +    'callback' : message_received +}) +''' + + +def cb1(): +    pubnub.subscribe({ +        'channel': 'efgh1', +        'connect': connected, +        'callback': message_received +    }) + + +def cb2(): +    pubnub.subscribe({ +        'channel': 'dsm-test', +        'connect': connected_test, +        'callback': check_received +    }) + + +def cb3(): +    pubnub.unsubscribe({'channel': 'efgh1'}) + + +def cb4(): +    pubnub.unsubscribe({'channel': 'abcd1'}) + + +def subscribe(channel): +    pubnub.subscribe({ +        'channel': channel, +        'connect': connected, +        'callback': message_received +    }) + + +pubnub.timeout(15, cb1) + +pubnub.timeout(30, cb2) + + +pubnub.timeout(45, cb3) + +pubnub.timeout(60, cb4) + +#''' +for x in range(1, 1000): +    #print x +    def y(t): +        subscribe('channel-' + str(t)) + +    def z(t): +        pubnub.unsubscribe({'channel': 'channel-' + str(t)}) + +    pubnub.timeout(x + 5, partial(y, x)) +    pubnub.timeout(x + 25, partial(z, x)) +    x += 10 +#''' + +''' +for x in range(1,1000): +    def cb(r): print r , ' : ', threading.activeCount() +    def y(t): +        pubnub.publish({ +            'message' : t, +            'callback' : cb, +            'channel' : 'dsm-test' +        }) + + +    pubnub.timeout(x + 1, partial(y,x)) +    x += 1 +''' + + +pubnub.start() diff --git a/python/tests/test_grant.py b/python/tests/test_grant.py new file mode 100644 index 0000000..6826335 --- /dev/null +++ b/python/tests/test_grant.py @@ -0,0 +1,199 @@ + + +from Pubnub import Pubnub +import time + +pubnub = Pubnub("demo","demo") +pubnub_pam = Pubnub("pub-c-c077418d-f83c-4860-b213-2f6c77bde29a",  +	"sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe", "sec-c-OGU3Y2Q4ZWUtNDQwMC00NTI1LThjNWYtNWJmY2M4OGIwNjEy") + + +# Grant permission read true, write true, on channel ( Sync Mode ) +def test_1(): +	resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=True, ttl=1) +	assert resp['message'] == 'Success' +	assert resp['payload'] == { +								'auths': {'abcd': {'r': 1, 'w': 1}}, +								'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								'level': 'user', 'channel': 'abcd', 'ttl': 1 +							  } +							 + +# Grant permission read false, write false, on channel ( Sync Mode ) +def test_2(): +	resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=False, ttl=1) +	assert resp['message'] == 'Success' +	assert resp['payload'] == { +								'auths': {'abcd': {'r': 0, 'w': 0}}, +								'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								'level': 'user', 'channel': 'abcd', 'ttl': 1 +							  } + +# Grant permission read True, write false, on channel ( Sync Mode ) +def test_3(): +	resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=False, ttl=1) +	assert resp['message'] == 'Success' +	assert resp['payload'] == { +								'auths': {'abcd': {'r': 1, 'w': 0}}, +								'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								'level': 'user', 'channel': 'abcd', 'ttl': 1 +							  } + +# Grant permission read False, write True, on channel ( Sync Mode ) +def test_4(): +	resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=False, ttl=1) +	assert resp['message'] == 'Success' +	assert resp['payload'] == { +								'auths': {'abcd': {'r': 1, 'w': 0}}, +								'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								'level': 'user', 'channel': 'abcd', 'ttl': 1 +							  } + +# Grant permission read False, write True, on channel ( Sync Mode ), TTL 10 +def test_5(): +	resp = pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=False, ttl=10) +	assert resp['message'] == 'Success' +	assert resp['payload'] == { +								'auths': {'abcd': {'r': 1, 'w': 0}}, +								'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								'level': 'user', 'channel': 'abcd', 'ttl': 10 +							  } + +# Grant permission read False, write True, without channel ( Sync Mode ), TTL 10 +def test_6(): +	resp = pubnub_pam.grant(auth_key="abcd", read=True, write=False, ttl=10) +	assert resp['message'] == 'Success' +	assert resp['payload'] == { +								'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								'level': 'subkey' , u'r': 1, u'w': 0, 'ttl': 10 +							  } + + +# Grant permission read False, write False, without channel ( Sync Mode ) +def test_7(): +	resp = pubnub_pam.grant(auth_key="abcd", read=False, write=False) +	assert resp['message'] == 'Success' +	assert resp['payload'] == { +								'subscribe_key': 'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								'level': 'subkey' , u'r': 0, u'w': 0, 'ttl': 1 +							  } + + +# Complete flow , try publish on forbidden channel, grant permission to auth key and try again. ( Sync Mode) + +def test_8(): +	channel = "test_8-" + str(time.time()) +	message = "Hello World" +	auth_key = "auth-" + channel +	pubnub_pam.set_auth_key(auth_key) +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp['message'] == 'Forbidden' +	assert resp['payload'] == {u'channels': [channel]} +	resp = pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10) +	assert resp == 			{ +								'message': u'Success', +								'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}}, +								u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								u'level': u'user', u'channel': channel, u'ttl': 10} +							} +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp[0] == 1 + + +# Complete flow , try publish on forbidden channel, grant permission to authkey and try again. ( Sync Mode) +# then revoke and try again +def test_9(): +	channel = "test_9-" + str(time.time()) +	message = "Hello World" +	auth_key = "auth-" + channel +	pubnub_pam.set_auth_key(auth_key) +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp['message'] == 'Forbidden' +	assert resp['payload'] == {u'channels': [channel]} +	resp = pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10) +	print resp +	assert resp == 			{ +								'message': u'Success', +								'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}}, +								u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								u'level': u'user', u'channel': channel, u'ttl': 10} +							} +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp[0] == 1 +	resp = pubnub_pam.revoke(channel=channel, auth_key=auth_key) +	print resp +	assert resp == 			{ +								'message': u'Success', +								'payload': {u'auths': {auth_key : {u'r': 0, u'w': 0}}, +								u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								u'level': u'user', u'channel': channel, u'ttl': 1} +							} +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp['message'] == 'Forbidden' +	assert resp['payload'] == {u'channels': [channel]} + +# Complete flow , try publish on forbidden channel, grant permission channel level for subkey and try again. ( Sync Mode) +# then revoke and try again +def test_10(): +	channel = "test_10-" + str(time.time()) +	message = "Hello World" +	auth_key = "auth-" + channel +	pubnub_pam.set_auth_key(auth_key) +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp['message'] == 'Forbidden' +	assert resp['payload'] == {u'channels': [channel]} +	resp = pubnub_pam.grant(channel=channel, read=True, write=True, ttl=10) +	print resp +	assert resp == 			{ +									'message': u'Success', +									'payload': { u'channels': {channel: {u'r': 1, u'w': 1}}, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'channel', u'ttl': 10} +								} +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp[0] == 1 +	resp = pubnub_pam.revoke(channel=channel) +	print resp +	assert resp == 			{ +									'message': u'Success', +									'payload': { u'channels': {channel : {u'r': 0, u'w': 0}}, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'channel', u'ttl': 1} +								} +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp['message'] == 'Forbidden' +	assert resp['payload'] == {u'channels': [channel]} + +# Complete flow , try publish on forbidden channel, grant permission subkey level for subkey and try again. ( Sync Mode) +# then revoke and try again +def test_11(): +	channel = "test_11-" + str(time.time()) +	message = "Hello World" +	auth_key = "auth-" + channel +	pubnub_pam.set_auth_key(auth_key) +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp['message'] == 'Forbidden' +	assert resp['payload'] == {u'channels': [channel]} +	resp = pubnub_pam.grant(read=True, write=True, ttl=10) +	print resp +	assert resp == 			{ +									'message': u'Success', +									'payload': { u'r': 1, u'w': 1, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'subkey', u'ttl': 10} +								} +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp[0] == 1 +	resp = pubnub_pam.revoke() +	print resp +	assert resp == 			{ +									'message': u'Success', +									'payload': {u'r': 0, u'w': 0, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'subkey', u'ttl': 1} +								} +	resp = pubnub_pam.publish(channel=channel,message=message) +	assert resp['message'] == 'Forbidden' +	assert resp['payload'] == {u'channels': [channel]} + + diff --git a/python/tests/test_grant_async.py b/python/tests/test_grant_async.py new file mode 100644 index 0000000..4d38a0a --- /dev/null +++ b/python/tests/test_grant_async.py @@ -0,0 +1,369 @@ + + +from Pubnub import Pubnub +import time + +pubnub = Pubnub("demo","demo") +pubnub_pam = Pubnub("pub-c-c077418d-f83c-4860-b213-2f6c77bde29a",  +	"sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe", "sec-c-OGU3Y2Q4ZWUtNDQwMC00NTI1LThjNWYtNWJmY2M4OGIwNjEy") + + + +# Grant permission read true, write true, on channel ( Async Mode ) +def test_1(): +	resp = {'response' : None} +	def _callback(response): +		resp['response'] = response + +	def _error(response): +		resp['response'] = response + +	pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=True, ttl=1, callback=_callback, error=_error) +	time.sleep(2) +	assert resp['response'] == { +									'message': u'Success', +									'payload': {u'auths': {u'abcd': {u'r': 1, u'w': 1}}, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'user', u'channel': u'abcd', u'ttl': 1} +								} +							 + +# Grant permission read false, write false, on channel ( Async Mode ) +def test_2(): +	resp = {'response' : None} +	def _callback(response): +		resp['response'] = response + +	def _error(response): +		resp['response'] = response + +	pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=False, ttl=1, callback=_callback, error=_error) +	time.sleep(2) +	assert resp['response'] == { +									'message': u'Success', +									'payload': {u'auths': {u'abcd': {u'r': 0, u'w': 0}}, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'user', u'channel': u'abcd', u'ttl': 1} +								} + + +# Grant permission read True, write false, on channel ( Async Mode ) +def test_3(): +	resp = {'response' : None} +	def _callback(response): +		resp['response'] = response + +	def _error(response): +		resp['response'] = response + +	pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=False, ttl=1, callback=_callback, error=_error) +	time.sleep(2) +	assert resp['response'] == { +									'message': u'Success', +									'payload': {u'auths': {u'abcd': {u'r': 1, u'w': 0}}, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'user', u'channel': u'abcd', u'ttl': 1} +								} + +# Grant permission read False, write True, on channel ( Async Mode ) +def test_4(): +	resp = {'response' : None} +	def _callback(response): +		resp['response'] = response + +	def _error(response): +		resp['response'] = response + +	pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=True, ttl=1, callback=_callback, error=_error) +	time.sleep(2) +	assert resp['response'] == { +									'message': u'Success', +									'payload': {u'auths': {u'abcd': {u'r': 0, u'w': 1}}, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'user', u'channel': u'abcd', u'ttl': 1} +								} + + +# Grant permission read False, write True, on channel ( Async Mode ), TTL 10 +def test_5(): +	resp = {'response' : None} +	def _callback(response): +		resp['response'] = response + +	def _error(response): +		resp['response'] = response + +	pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=True, ttl=10, callback=_callback, error=_error) +	time.sleep(2) +	assert resp['response'] == { +									'message': u'Success', +									'payload': {u'auths': {u'abcd': {u'r': 0, u'w': 1}}, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'user', u'channel': u'abcd', u'ttl': 10} +								} + + +# Grant permission read False, write True, without channel ( Async Mode ), TTL 10 +def test_6(): +	resp = {'response' : None} +	def _callback(response): +		resp['response'] = response + +	def _error(response): +		resp['response'] = response + +	pubnub_pam.grant(auth_key="abcd", read=False, write=True, ttl=10, callback=_callback, error=_error) +	time.sleep(2) +	assert resp['response'] == { +									'message': u'Success', +									'payload': { u'r': 0, u'w': 1, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'subkey', u'ttl': 10} +								} + + +# Grant permission read False, write False, without channel ( Async Mode ) +def test_7(): +	resp = {'response' : None} +	def _callback(response): +		resp['response'] = response + +	def _error(response): +		resp['response'] = response + +	pubnub_pam.grant(auth_key="abcd", read=False, write=False, callback=_callback, error=_error) +	time.sleep(2) +	assert resp['response'] == { +									'message': u'Success', +									'payload': { u'r': 0, u'w': 0, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'subkey', u'ttl': 1} +								} + + +# Complete flow , try publish on forbidden channel, grant permission to subkey and try again. ( Sync Mode) + +def test_8(): +	channel = "test_8-" + str(time.time()) +	message = "Hello World" +	auth_key = "auth-" + channel +	pubnub_pam.set_auth_key(auth_key) + + + +	def _cb1(resp, ch=None): +		assert False +	def _err1(resp): +		assert resp['message'] == 'Forbidden' +		assert resp['payload'] == {u'channels': [channel]} +	pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1) +	time.sleep(2) + + +	def _cb2(resp, ch=None): +		assert resp == 		{ +								'message': u'Success', +								'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}}, +								u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								u'level': u'user', u'channel': channel, u'ttl': 10} +							} +	def _err2(resp): +		assert False + + +	resp = pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10, callback=_cb2, error=_err2) +	time.sleep(2) + +	def _cb3(resp, ch=None): +		assert resp[0] == 1 +	def _err3(resp): +		assert False + +	pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3) +	time.sleep(2) + + + + + +# Complete flow , try publish on forbidden channel, grant permission to authkey and try again.  +# then revoke and try again +def test_9(): +	channel = "test_9-" + str(time.time()) +	message = "Hello World" +	auth_key = "auth-" + channel +	pubnub_pam.set_auth_key(auth_key) + +	def _cb1(resp, ch=None): +		assert False +	def _err1(resp): +		assert resp['message'] == 'Forbidden' +		assert resp['payload'] == {u'channels': [channel]} +	pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1) +	time.sleep(2) + + +	def _cb2(resp, ch=None): +		assert resp == 		{ +								'message': u'Success', +								'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}}, +								u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								u'level': u'user', u'channel': channel, u'ttl': 10} +							} +	def _err2(resp): +		assert False + + +	pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10, callback=_cb2, error=_err2) +	time.sleep(2) + +	def _cb3(resp, ch=None): +		assert resp[0] == 1 +	def _err3(resp): +		assert False + +	pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3) +	time.sleep(2) + +	def _cb4(resp, ch=None): +		assert resp == 		{ +								'message': u'Success', +								'payload': {u'auths': {auth_key : {u'r': 0, u'w': 0}}, +								u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +								u'level': u'user', u'channel': channel, u'ttl': 1} +							} +	def _err4(resp): +		assert False +	pubnub_pam.revoke(channel=channel, auth_key=auth_key, callback=_cb4, error=_err4) +	time.sleep(2) + +	def _cb5(resp, ch=None): +		assert False +	def _err5(resp): +		assert resp['message'] == 'Forbidden' +		assert resp['payload'] == {u'channels': [channel]} +	pubnub_pam.publish(channel=channel,message=message, callback=_cb5, error=_err5) +	time.sleep(2) + + + + +# Complete flow , try publish on forbidden channel, grant permission channel level for subkey and try again. +# then revoke and try again +def test_10(): +	channel = "test_10-" + str(time.time()) +	message = "Hello World" +	auth_key = "auth-" + channel +	pubnub_pam.set_auth_key(auth_key) + +	def _cb1(resp, ch=None): +		assert False +	def _err1(resp): +		assert resp['message'] == 'Forbidden' +		assert resp['payload'] == {u'channels': [channel]} +	pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1) +	time.sleep(2) + + +	def _cb2(resp, ch=None): +		assert resp == 		{ +									'message': u'Success', +									'payload': { u'channels': {channel: {u'r': 1, u'w': 1}}, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'channel', u'ttl': 10} +								} +	def _err2(resp): +		assert False + + +	pubnub_pam.grant(channel=channel, read=True, write=True, ttl=10, callback=_cb2, error=_err2) +	time.sleep(2) + +	def _cb3(resp, ch=None): +		assert resp[0] == 1 +	def _err3(resp): +		assert False + +	pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3) +	time.sleep(2) + +	def _cb4(resp, ch=None): +		assert resp == 		{ +									'message': u'Success', +									'payload': { u'channels': {channel : {u'r': 0, u'w': 0}}, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'channel', u'ttl': 1} +								} +	def _err4(resp): +		assert False +	pubnub_pam.revoke(channel=channel, callback=_cb4, error=_err4) +	time.sleep(2) + +	def _cb5(resp, ch=None): +		assert False +	def _err5(resp): +		assert resp['message'] == 'Forbidden' +		assert resp['payload'] == {u'channels': [channel]} +	pubnub_pam.publish(channel=channel,message=message, callback=_cb5, error=_err5) +	time.sleep(2) + + + +# Complete flow , try publish on forbidden channel, grant permission subkey level for subkey and try again. +# then revoke and try again +def test_11(): +	channel = "test_11-" + str(time.time()) +	message = "Hello World" +	auth_key = "auth-" + channel +	pubnub_pam.set_auth_key(auth_key) + +	def _cb1(resp, ch=None): +		assert False +	def _err1(resp): +		assert resp['message'] == 'Forbidden' +		assert resp['payload'] == {u'channels': [channel]} +	pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1) +	time.sleep(2) + + +	def _cb2(resp, ch=None): +		assert resp == 		{ +									'message': u'Success', +									'payload': { u'r': 1, u'w': 1, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'subkey', u'ttl': 10} +								} +	def _err2(resp): +		assert False + + +	pubnub_pam.grant(read=True, write=True, ttl=10, callback=_cb2, error=_err2) +	time.sleep(2) + +	def _cb3(resp, ch=None): +		assert resp[0] == 1 +	def _err3(resp): +		assert False + +	pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3) +	time.sleep(2) + +	def _cb4(resp, ch=None): +		assert resp == 		{ +									'message': u'Success', +									'payload': {u'r': 0, u'w': 0, +									u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe', +									u'level': u'subkey', u'ttl': 1} +								} +	def _err4(resp): +		assert False +	pubnub_pam.revoke(callback=_cb4, error=_err4) +	time.sleep(2) + +	def _cb5(resp, ch=None): +		assert False +	def _err5(resp): +		assert resp['message'] == 'Forbidden' +		assert resp['payload'] == {u'channels': [channel]} +	pubnub_pam.publish(channel=channel,message=message, callback=_cb5, error=_err5) +	time.sleep(2) diff --git a/python/tests/test_publish_async.py b/python/tests/test_publish_async.py new file mode 100644 index 0000000..7270727 --- /dev/null +++ b/python/tests/test_publish_async.py @@ -0,0 +1,228 @@ + + +from Pubnub import Pubnub +import time + +pubnub = Pubnub("demo","demo") +pubnub_enc = Pubnub("demo", "demo", cipher_key="enigma") +pubnub_pam = Pubnub("pub-c-c077418d-f83c-4860-b213-2f6c77bde29a",  +	"sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe", "sec-c-OGU3Y2Q4ZWUtNDQwMC00NTI1LThjNWYtNWJmY2M4OGIwNjEy") + + + +# Publish and receive string +def test_1(): + +	channel = "test_1-" + str(time.time()) +	message = "I am a string" + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub.unsubscribe(channel) + +	def _connect(resp): +		pubnub.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive array +def test_2(): + +	channel = "test_2-" + str(time.time()) +	message = [1,2] + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub.unsubscribe(channel) + +	def _connect(resp): +		pubnub.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive json object +def test_3(): + +	channel = "test_2-" + str(time.time()) +	message = { "a" : "b" } + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub.unsubscribe(channel) + +	def _connect(resp): +		pubnub.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive number +def test_4(): + +	channel = "test_2-" + str(time.time()) +	message = 100 + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub.unsubscribe(channel) + +	def _connect(resp): +		pubnub.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive number string +def test_5(): + +	channel = "test_5-" + str(time.time()) +	message = "100" + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub.unsubscribe(channel) + +	def _connect(resp): +		pubnub.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error) + + +# Publish and receive string (Encryption enabled) +def test_6(): + +	channel = "test_6-" + str(time.time()) +	message = "I am a string" + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub_enc.unsubscribe(channel) + +	def _connect(resp): +		pubnub_enc.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive array (Encryption enabled) +def test_7(): + +	channel = "test_7-" + str(time.time()) +	message = [1,2] + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub_enc.unsubscribe(channel) + +	def _connect(resp): +		pubnub.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive json object (Encryption enabled) +def test_8(): + +	channel = "test_8-" + str(time.time()) +	message = { "a" : "b" } + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub_enc.unsubscribe(channel) + +	def _connect(resp): +		pubnub_enc.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive number (Encryption enabled) +def test_9(): + +	channel = "test_9-" + str(time.time()) +	message = 100 + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub_enc.unsubscribe(channel) + +	def _connect(resp): +		pubnub_enc.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive number string (Encryption enabled) +def test_10(): + +	channel = "test_10-" + str(time.time()) +	message = "100" + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub_enc.unsubscribe(channel) + +	def _connect(resp): +		pubnub_enc.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive object string (Encryption enabled) +def test_11(): + +	channel = "test_11-" + str(time.time()) +	message = '{"a" : "b"}' + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub_enc.unsubscribe(channel) + +	def _connect(resp): +		pubnub_enc.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error) + +# Publish and receive array string (Encryption enabled) +def test_12(): + +	channel = "test_12-" + str(time.time()) +	message = '[1,2]' + +	def _cb(resp, ch=None): +		assert resp == message +		pubnub_enc.unsubscribe(channel) + +	def _connect(resp): +		pubnub_enc.publish(channel,message) + +	def _error(resp): +		assert False + +	pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error) diff --git a/python/tests/unit-test.py b/python/tests/unit-test.py deleted file mode 100755 index 1737ace..0000000 --- a/python/tests/unit-test.py +++ /dev/null @@ -1,66 +0,0 @@ -## www.pubnub.com - PubNub Real-time push service in the cloud.  -# coding=utf8 - -## PubNub Real-time Push APIs and Notifications Framework -## Copyright (c) 2010 Stephen Blum -## http://www.pubnub.com/ - -## ----------------------------------- -## PubNub 3.0 Real-time Push Cloud API -## ----------------------------------- - -import sys -sys.path.append('.') -sys.path.append('..') -sys.path.append('../common') -from Pubnub import Pubnub - -publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo' -subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' -secret_key    = len(sys.argv) > 3 and sys.argv[3] or None -ssl_on        = len(sys.argv) > 4 and bool(sys.argv[4]) or False - - -## ----------------------------------------------------------------------- -## Initiat Class -## ----------------------------------------------------------------------- - -pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on ) -crazy  = 'demo' - -## --------------------------------------------------------------------------- -## Unit Test Function -## --------------------------------------------------------------------------- -def test( trial, name ) : -    if trial : -        print( 'PASS: ' + name ) -    else : -        print( 'FAIL: ' + name ) - -## ----------------------------------------------------------------------- -## Publish Example -## ----------------------------------------------------------------------- -publish_success = pubnub.publish({ -    'channel' : crazy, -    'message' : crazy -}) -test( publish_success[0] == 1, 'Publish First Message Success' ) - -## ----------------------------------------------------------------------- -## History Example -## ----------------------------------------------------------------------- -history = pubnub.history({ -    'channel' : crazy, -    'limit'   : 1 -}) -test( -    history[0].encode('utf-8') == crazy, -    'History Message: ' + history[0] -) -test( len(history) == 1, 'History Message Count' ) - -## ----------------------------------------------------------------------- -## PubNub Server Time Example -## ----------------------------------------------------------------------- -timestamp = pubnub.time() -test( timestamp > 0, 'PubNub Server Time: ' + str(timestamp) ) | 
