diff options
-rw-r--r-- | common/unit-test-async.py | 143 | ||||
-rw-r--r-- | python-tornado/tests/unit-test.py | 226 | ||||
-rw-r--r-- | python-twisted/tests/unit-test.py | 116 |
3 files changed, 143 insertions, 342 deletions
diff --git a/common/unit-test-async.py b/common/unit-test-async.py new file mode 100644 index 0000000..258bf6a --- /dev/null +++ b/common/unit-test-async.py @@ -0,0 +1,143 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud. +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + +## ----------------------------------- +## PubNub 3.1 Real-time Push Cloud API +## ----------------------------------- + +import sys +import time +sys.path.append('../') +sys.path.append('./') +sys.path.append('../common/') +from Pubnub import Pubnub + +publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key = len(sys.argv) > 3 and sys.argv[3] or None +cipher_key = len(sys.argv) > 4 and sys.argv[4] or None +ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiat Class +## ----------------------------------------------------------------------- +pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) +crazy = 'python-async-test-channel' +expect = 0 +done = 0 +failures = 0 +passes = 0 + +def stop(): + global done + global count + pubnub.stop() + print "============================" + print 'Total\t:\t' , failures + passes + print 'PASS\t:\t' , passes + print 'FAIL\t:\t', failures + print "============================" + +## --------------------------------------------------------------------------- +## Unit Test Function +## --------------------------------------------------------------------------- +def test( trial, name ) : + global failures + global passes + global done + done += 1 + #print trial + if trial == False: + print 'FAIL : ', name + failures += 1 + else: + print 'PASS : ', name + passes += 1 + if done == expect: + stop() + +def test_publish(): + def publish_cb(messages): + test(messages[0] == 1, "Publish Test") + + pubnub.publish({ + 'channel' : crazy, + 'message' : {'one': 'Hello World! --> ɂ顶@#$%^&*()!', 'two': 'hello2'}, + 'callback' : publish_cb + }) + + +def test_history(): + def history_cb(messages): + test(len(messages) <= 1, "History Test") + pubnub.history({ + 'channel' : crazy, + 'limit' : 1, + 'callback' : history_cb + }) + + + +def test_subscribe(): + message = "Testing Subscribe" + def subscribe_connect_cb(): + def publish_cb(response): + test(response[0] == 1, 'Publish Test in subscribe Connect Callback') + pubnub.publish({ + 'channel' : crazy, + 'message' : message, + 'callback' : publish_cb + }) + def subscribe_cb(response): + print response + test(response == message , 'Subscribe Receive Test in subscribe Callback') + pubnub.subscribe({ + 'channel' : crazy, + 'connect' : subscribe_connect_cb, + 'callback': subscribe_cb + }) + + +def test_here_now(): + channel = crazy + '-here-now' + message = "Testing Subscribe" + def subscribe_connect_cb(): + def here_now_cb(response): + test(response["occupancy"] > 0, 'Here Now Test') + def publish_cb(response): + test(response[0] == 1, 'Here Now Test: Publish Test in subscribe Connect Callback') + pubnub.publish({ + 'channel' : channel, + 'message' : message, + 'callback' : publish_cb + }) + time.sleep(5) + pubnub.here_now({ + 'channel' : channel, + 'callback' : here_now_cb + }) + + + def subscribe_cb(response): + test(response == message , 'Here Now Test: Subscribe Receive Test in subscribe Callback') + pubnub.subscribe({ + 'channel' : channel, + 'connect' : subscribe_connect_cb, + 'callback': subscribe_cb + }) + +expect = 7 +test_publish() +test_history() +test_subscribe() +test_here_now() + + + +pubnub.start() +if failures > 0: + raise Exception('Fail', failures) diff --git a/python-tornado/tests/unit-test.py b/python-tornado/tests/unit-test.py deleted file mode 100644 index c5940af..0000000 --- a/python-tornado/tests/unit-test.py +++ /dev/null @@ -1,226 +0,0 @@ -## www.pubnub.com - PubNub Real-time push service in the cloud. -# coding=utf8 - -## PubNub Real-time Push APIs and Notifications Framework -## Copyright (c) 2010 Stephen Blum -## http://www.pubnub.com/ - -## TODO Tests -## -## - wait 20 minutes, send a message, receive and success. -## - -## - -## -## - -## ----------------------------------- -## PubNub 3.1 Real-time Push Cloud API -## ----------------------------------- - -import sys -sys.path.append('../') -sys.path.append('./') -sys.path.append('../common/') -from Pubnub import Pubnub - -publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' -subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' -secret_key = len(sys.argv) > 3 and sys.argv[3] or None -cipher_key = len(sys.argv) > 4 and sys.argv[4] or None -ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False - -## ----------------------------------------------------------------------- -## Command Line Options Supplied PubNub -## ----------------------------------------------------------------------- -pubnub_user_supplied_options = Pubnub( - publish_key, ## OPTIONAL (supply None to disable) - subscribe_key, ## REQUIRED - secret_key, ## OPTIONAL (supply None to disable) - cipher_key, ## OPTIONAL (supply None to disable) - ssl_on ## OPTIONAL (supply None to disable) -) - -## ----------------------------------------------------------------------- -## High Security PubNub -## ----------------------------------------------------------------------- -pubnub_high_security = Pubnub( - ## Publish Key - 'pub-c-a30c030e-9f9c-408d-be89-d70b336ca7a0', - - ## Subscribe Key - 'sub-c-387c90f3-c018-11e1-98c9-a5220e0555fd', - - ## Secret Key - 'sec-c-MTliNDE0NTAtYjY4Ni00MDRkLTllYTItNDhiZGE0N2JlYzBl', - - ## Cipher Key - 'YWxzamRmbVjFaa05HVnGFqZHM3NXRBS73jxmhVMkjiwVVXV1d5UrXR1JLSkZFRr'+ - 'WVd4emFtUm1iR0TFpUZvbiBoYXMgYmVlbxWkhNaF3uUi8kM0YkJTEVlZYVFjBYi'+ - 'jFkWFIxSkxTa1pGUjd874hjklaTFpUwRVuIFNob3VsZCB5UwRkxUR1J6YVhlQWa'+ - 'V1ZkNGVH32mDkdho3pqtRnRVbTFpUjBaeGUgYXNrZWQtZFoKjda40ZWlyYWl1eX'+ - 'U4RkNtdmNub2l1dHE2TTA1jd84jkdJTbFJXYkZwWlZtRnKkWVrSRhhWbFpZVmFz'+ - 'c2RkZmTFpUpGa1dGSXhTa3hUYTFwR1Vpkm9yIGluZm9ybWFNfdsWQdSiiYXNWVX'+ - 'RSblJWYlRGcFVqQmFlRmRyYUU0MFpXbHlZV2wxZVhVNFJrTnR51YjJsMWRIRTJU'+ - 'W91ciBpbmZvcm1hdGliBzdWJtaXR0ZWQb3UZSBhIHJlc3BvbnNlLCB3ZWxsIHJl'+ - 'VEExWdHVybiB0am0aW9uIb24gYXMgd2UgcG9zc2libHkgY2FuLuhcFe24ldWVns'+ - 'dSaTFpU3hVUjFKNllWaFdhRmxZUWpCaQo34gcmVxdWlGFzIHNveqQl83snBfVl3', - - ## 2048bit SSL ON - ENABLED TRUE - True -) - -## ----------------------------------------------------------------------- -## Channel | Message Test Data (UTF-8) -## ----------------------------------------------------------------------- -crazy = ' ~`â¦â§!@#$%^&*(顶顅Ȓ)+=[]\\{}|;\':",./<>?abcd' -many_channels = [ str(x) + '-many_channel_test' for x in range(10) ] -runthroughs = 0 -planned_tests = 2 -delivery_retries = 0 -max_retries = 10 - -## ----------------------------------------------------------------------- -## Unit Test Function -## ----------------------------------------------------------------------- -def test( trial, name ) : - if trial : print( 'PASS: ' + name ) - else : print( '- FAIL - ' + name ) - -def test_pubnub(pubnub): - global runthroughs, planned_tests, delivery_retries, max_retries - - ## ----------------------------------------------------------------------- - ## Many Channels - ## ----------------------------------------------------------------------- - def phase2(): - status = { - 'sent' : 0, - 'received' : 0, - 'connections' : 0 - } - - def received( message, chan ): - global runthroughs - - test( status['received'] <= status['sent'], 'many sends' ) - status['received'] += 1 - pubnub.unsubscribe({ 'channel' : chan }) - if status['received'] == len(many_channels): - runthroughs += 1 - if runthroughs == planned_tests: pubnub.stop() - - def publish_complete( info, chan ): - global delivery_retries, max_retries - status['sent'] += 1 - test( info, 'publish complete' ) - test( info and len(info) > 2, 'publish response' ) - if not info[0]: - delivery_retries += 1 - if max_retries > delivery_retries: sendit(chan) - - def sendit(chan): - tchan = chan - pubnub.publish({ - 'channel' : chan, - 'message' : "Hello World", - 'callback' : (lambda msg:publish_complete( msg, tchan )) - }) - - def connected(chan): - status['connections'] += 1 - sendit(chan) - - def delivered(info): - if info and info[0]: status['sent'] += 1 - - def subscribe(chan): - pubnub.subscribe({ - 'channel' : chan, - 'connect' : (lambda:connected(chan+'')), - 'callback' : (lambda msg:received( msg, chan )) - }) - - ## Subscribe All Channels - for chan in many_channels: subscribe(chan) - - ## ----------------------------------------------------------------------- - ## Time Example - ## ----------------------------------------------------------------------- - def time_complete(timetoken): - test( timetoken, 'timetoken fetch' ) - test( isinstance( timetoken, int ), 'timetoken int type' ) - - pubnub.time({ 'callback' : time_complete }) - - ## ----------------------------------------------------------------------- - ## Publish Example - ## ----------------------------------------------------------------------- - def publish_complete(info): - test( info, 'publish complete' ) - test( info and len(info) > 2, 'publish response' ) - - pubnub.history( { - 'channel' : crazy, - 'limit' : 10, - 'callback' : history_complete - }) - - ## ----------------------------------------------------------------------- - ## History Example - ## ----------------------------------------------------------------------- - def history_complete(messages): - test( messages and len(messages) > 0, 'history' ) - test( messages, 'history' ) - - - pubnub.publish({ - 'channel' : crazy, - 'message' : "Hello World", - 'callback' : publish_complete - }) - - ## ----------------------------------------------------------------------- - ## Subscribe Example - ## ----------------------------------------------------------------------- - def message_received(message): - test( message, 'message received' ) - pubnub.unsubscribe({ 'channel' : crazy }) - - def done() : - pubnub.unsubscribe({ 'channel' : crazy }) - pubnub.publish({ - 'channel' : crazy, - 'message' : "Hello World", - 'callback' : (lambda x:x) - }) - - def dumpster(message) : - test( 0, 'never see this' ) - - pubnub.subscribe({ - 'channel' : crazy, - 'connect' : done, - 'callback' : dumpster - }) - - def connected() : - pubnub.publish({ - 'channel' : crazy, - 'message' : { 'Info' : 'Connected!' } - }) - - pubnub.subscribe({ - 'channel' : crazy, - 'connect' : connected, - 'callback' : message_received - }) - - phase2() - -## ----------------------------------------------------------------------- -## Run Tests -## ----------------------------------------------------------------------- -test_pubnub(pubnub_user_supplied_options) -test_pubnub(pubnub_high_security) -pubnub_high_security.start() - diff --git a/python-twisted/tests/unit-test.py b/python-twisted/tests/unit-test.py deleted file mode 100644 index ed7fcd5..0000000 --- a/python-twisted/tests/unit-test.py +++ /dev/null @@ -1,116 +0,0 @@ -## www.pubnub.com - PubNub Real-time push service in the cloud. -# coding=utf8 - -## PubNub Real-time Push APIs and Notifications Framework -## Copyright (c) 2010 Stephen Blum -## http://www.pubnub.com/ - -## ----------------------------------- -## PubNub 3.1 Real-time Push Cloud API -## ----------------------------------- - -import sys -sys.path.append('../') -sys.path.append('./') -sys.path.append('../common/') -from Pubnub import Pubnub - -publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' -subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' -secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo' -cipher_key = len(sys.argv) > 4 and sys.argv[4] or '' ##(Cipher key is Optional) -ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False - -## ----------------------------------------------------------------------- -## Initiat Class -## ----------------------------------------------------------------------- -pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) -crazy = 'python-twisted-test-channel' - -## --------------------------------------------------------------------------- -## Unit Test Function -## --------------------------------------------------------------------------- -def test( trial, name ) : - if trial : - print( 'PASS: ' + name ) - else : - print( 'FAIL: ' + name ) - -## ----------------------------------------------------------------------- -## Time Example -## ----------------------------------------------------------------------- -def time_complete(timestamp): - print(timestamp) - -pubnub.time({ 'callback' : time_complete }) - -## ----------------------------------------------------------------------- -## History Example -## ----------------------------------------------------------------------- -def history_complete(messages): - print(messages) - -pubnub.history( { - 'channel' : crazy, - 'limit' : 10, - 'callback' : history_complete -}) - -## ----------------------------------------------------------------------- -## Publish Example -## ----------------------------------------------------------------------- -def publish_complete(info): - print(info) - -pubnub.publish({ - 'channel' : crazy, - 'message' : {'one': 'Hello World! --> ɂ顶@#$%^&*()!', 'two': 'hello2'}, - 'callback' : publish_complete -}) - -## ----------------------------------------------------------------------- -## Subscribe Example -## ----------------------------------------------------------------------- -def message_received(message): - print(message) - print('Disconnecting...') - pubnub.unsubscribe({ 'channel' : crazy }) - - def done() : - print('final connection, done :)') - pubnub.unsubscribe({ 'channel' : crazy }) - pubnub.stop() - - def dumpster(message) : - print('never see this') - print(message) - - print('reconnecting...') - pubnub.subscribe({ - 'channel' : crazy, - 'connect' : done, - 'callback' : dumpster - }) - - -def connected() : - ch = crazy - def dump_message( msg): - print ch, msg - pubnub.unsubscribe({'channel':ch}) - pubnub.stop() - pubnub.publish({ - 'channel' : ch, - 'message' : { 'Info' : 'Connected!' }, - 'callback' : dump_message - }) - -pubnub.subscribe({ - 'channel' : crazy, - 'connect' : connected, - 'callback' : message_received -}) -## ----------------------------------------------------------------------- -## IO Event Loop -## ----------------------------------------------------------------------- -pubnub.start() |