diff options
| author | Devendra | 2013-07-12 10:11:07 +0530 | 
|---|---|---|
| committer | Devendra | 2013-07-12 10:11:07 +0530 | 
| commit | 523802043bf1ce3616d3bac382f166e34984d5bf (patch) | |
| tree | b1d60455e214f1c7acabc65825b48e869c05c986 | |
| parent | e65cc45e333cc33d19b97b5f93e69121ca2beffa (diff) | |
| download | pubnub-python-523802043bf1ce3616d3bac382f166e34984d5bf.tar.bz2 | |
adding common test from tornado and twisted
| -rw-r--r-- | common/unit-test-async.py | 143 | ||||
| -rw-r--r-- | python-tornado/tests/unit-test.py | 226 | ||||
| -rw-r--r-- | python-twisted/tests/unit-test.py | 116 | 
3 files changed, 143 insertions, 342 deletions
| diff --git a/common/unit-test-async.py b/common/unit-test-async.py new file mode 100644 index 0000000..258bf6a --- /dev/null +++ b/common/unit-test-async.py @@ -0,0 +1,143 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud.  +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + +## ----------------------------------- +## PubNub 3.1 Real-time Push Cloud API +## ----------------------------------- + +import sys +import time +sys.path.append('../') +sys.path.append('./') +sys.path.append('../common/') +from Pubnub import Pubnub + +publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key    = len(sys.argv) > 3 and sys.argv[3] or None  +cipher_key    = len(sys.argv) > 4 and sys.argv[4] or None +ssl_on        = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiat Class +## ----------------------------------------------------------------------- +pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) +crazy  = 'python-async-test-channel' +expect = 0 +done = 0 +failures = 0 +passes = 0 + +def stop(): +    global done +    global count +    pubnub.stop() +    print "============================" +    print 'Total\t:\t' , failures + passes +    print 'PASS\t:\t' , passes +    print 'FAIL\t:\t', failures +    print "============================" + +## --------------------------------------------------------------------------- +## Unit Test Function +## --------------------------------------------------------------------------- +def test( trial, name ) : +    global failures +    global passes +    global done +    done += 1 +    #print trial +    if trial == False: +        print 'FAIL : ', name +        failures += 1 +    else: +        print 'PASS : ', name +        passes += 1 +    if done == expect: +        stop() + +def test_publish(): +    def publish_cb(messages): +        test(messages[0] == 1, "Publish Test") + +    pubnub.publish({ +        'channel' : crazy, +        'message' :  {'one': 'Hello World! --> ɂ顶@#$%^&*()!', 'two': 'hello2'}, +        'callback' : publish_cb +    })  + + +def test_history(): +    def history_cb(messages): +        test(len(messages) <= 1, "History Test")     +    pubnub.history({ +        'channel' : crazy, +        'limit' :  1, +        'callback' : history_cb  +    }) + + + +def test_subscribe(): +    message = "Testing Subscribe" +    def subscribe_connect_cb(): +        def publish_cb(response): +            test(response[0] == 1, 'Publish Test in subscribe Connect Callback') +        pubnub.publish({ +            'channel' : crazy, +            'message' :  message, +            'callback' : publish_cb +        }) +    def subscribe_cb(response): +        print response +        test(response == message , 'Subscribe Receive Test in subscribe Callback') +    pubnub.subscribe({ +        'channel' : crazy, +        'connect' : subscribe_connect_cb, +        'callback': subscribe_cb +    })  +     + +def test_here_now(): +    channel = crazy + '-here-now' +    message = "Testing Subscribe" +    def subscribe_connect_cb(): +        def here_now_cb(response): +            test(response["occupancy"] > 0, 'Here Now Test') +            def publish_cb(response): +                test(response[0] == 1, 'Here Now Test: Publish Test in subscribe Connect Callback') +            pubnub.publish({ +                'channel' : channel, +                'message' :  message, +                'callback' : publish_cb +            }) +        time.sleep(5) +        pubnub.here_now({ +            'channel' : channel, +            'callback' : here_now_cb +        }) + + +    def subscribe_cb(response): +        test(response == message , 'Here Now Test: Subscribe Receive Test in subscribe Callback') +    pubnub.subscribe({ +        'channel' : channel, +        'connect' : subscribe_connect_cb, +        'callback': subscribe_cb +    })  + +expect = 7 +test_publish() +test_history() +test_subscribe() +test_here_now() + + + +pubnub.start() +if failures > 0: +    raise Exception('Fail', failures) diff --git a/python-tornado/tests/unit-test.py b/python-tornado/tests/unit-test.py deleted file mode 100644 index c5940af..0000000 --- a/python-tornado/tests/unit-test.py +++ /dev/null @@ -1,226 +0,0 @@ -## www.pubnub.com - PubNub Real-time push service in the cloud.  -# coding=utf8 - -## PubNub Real-time Push APIs and Notifications Framework -## Copyright (c) 2010 Stephen Blum -## http://www.pubnub.com/ - -## TODO Tests -## -## - wait 20 minutes, send a message, receive and success. -## -  -## -  -##  -##  - -## ----------------------------------- -## PubNub 3.1 Real-time Push Cloud API -## ----------------------------------- - -import sys -sys.path.append('../') -sys.path.append('./') -sys.path.append('../common/') -from Pubnub import Pubnub - -publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo' -subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' -secret_key    = len(sys.argv) > 3 and sys.argv[3] or None  -cipher_key    = len(sys.argv) > 4 and sys.argv[4] or None -ssl_on        = len(sys.argv) > 5 and bool(sys.argv[5]) or False - -## ----------------------------------------------------------------------- -## Command Line Options Supplied PubNub -## ----------------------------------------------------------------------- -pubnub_user_supplied_options = Pubnub( -    publish_key,   ## OPTIONAL (supply None to disable) -    subscribe_key, ## REQUIRED -    secret_key,    ## OPTIONAL (supply None to disable) -    cipher_key,    ## OPTIONAL (supply None to disable) -    ssl_on         ## OPTIONAL (supply None to disable) -) - -## ----------------------------------------------------------------------- -## High Security PubNub -## ----------------------------------------------------------------------- -pubnub_high_security = Pubnub( -    ## Publish Key -    'pub-c-a30c030e-9f9c-408d-be89-d70b336ca7a0', - -    ## Subscribe Key -    'sub-c-387c90f3-c018-11e1-98c9-a5220e0555fd', - -    ## Secret Key -    'sec-c-MTliNDE0NTAtYjY4Ni00MDRkLTllYTItNDhiZGE0N2JlYzBl', - -    ## Cipher Key -    'YWxzamRmbVjFaa05HVnGFqZHM3NXRBS73jxmhVMkjiwVVXV1d5UrXR1JLSkZFRr'+ -    'WVd4emFtUm1iR0TFpUZvbiBoYXMgYmVlbxWkhNaF3uUi8kM0YkJTEVlZYVFjBYi'+ -    'jFkWFIxSkxTa1pGUjd874hjklaTFpUwRVuIFNob3VsZCB5UwRkxUR1J6YVhlQWa'+ -    'V1ZkNGVH32mDkdho3pqtRnRVbTFpUjBaeGUgYXNrZWQtZFoKjda40ZWlyYWl1eX'+ -    'U4RkNtdmNub2l1dHE2TTA1jd84jkdJTbFJXYkZwWlZtRnKkWVrSRhhWbFpZVmFz'+ -    'c2RkZmTFpUpGa1dGSXhTa3hUYTFwR1Vpkm9yIGluZm9ybWFNfdsWQdSiiYXNWVX'+ -    'RSblJWYlRGcFVqQmFlRmRyYUU0MFpXbHlZV2wxZVhVNFJrTnR51YjJsMWRIRTJU'+ -    'W91ciBpbmZvcm1hdGliBzdWJtaXR0ZWQb3UZSBhIHJlc3BvbnNlLCB3ZWxsIHJl'+ -    'VEExWdHVybiB0am0aW9uIb24gYXMgd2UgcG9zc2libHkgY2FuLuhcFe24ldWVns'+ -    'dSaTFpU3hVUjFKNllWaFdhRmxZUWpCaQo34gcmVxdWlGFzIHNveqQl83snBfVl3', - -    ## 2048bit SSL ON - ENABLED TRUE -    True -) - -## ----------------------------------------------------------------------- -## Channel | Message Test Data (UTF-8) -## ----------------------------------------------------------------------- -crazy            = ' ~`â¦â§!@#$%^&*(顶顅Ȓ)+=[]\\{}|;\':",./<>?abcd' -many_channels    = [ str(x) + '-many_channel_test' for x in range(10) ] -runthroughs      = 0 -planned_tests    = 2 -delivery_retries = 0 -max_retries      = 10 - -## ----------------------------------------------------------------------- -## Unit Test Function -## ----------------------------------------------------------------------- -def test( trial, name ) : -    if trial : print( 'PASS: ' + name ) -    else :     print( '- FAIL - ' + name ) - -def test_pubnub(pubnub): -    global runthroughs, planned_tests, delivery_retries, max_retries - -    ## ----------------------------------------------------------------------- -    ## Many Channels -    ## ----------------------------------------------------------------------- -    def phase2(): -        status = { -            'sent'        : 0, -            'received'    : 0, -            'connections' : 0 -        } - -        def received( message, chan ): -            global runthroughs - -            test( status['received'] <= status['sent'], 'many sends' ) -            status['received'] += 1 -            pubnub.unsubscribe({ 'channel' : chan }) -            if status['received'] == len(many_channels): -                runthroughs += 1 -                if runthroughs == planned_tests: pubnub.stop() - -        def publish_complete( info, chan ): -            global delivery_retries, max_retries -            status['sent'] += 1 -            test( info, 'publish complete' ) -            test( info and len(info) > 2, 'publish response' ) -            if not info[0]: -                delivery_retries += 1 -                if max_retries > delivery_retries: sendit(chan) - -        def sendit(chan): -            tchan = chan -            pubnub.publish({ -                'channel'  : chan, -                'message'  : "Hello World", -                'callback' : (lambda msg:publish_complete( msg, tchan )) -            }) - -        def connected(chan): -            status['connections'] += 1 -            sendit(chan) - -        def delivered(info): -            if info and info[0]: status['sent'] += 1 - -        def subscribe(chan): -            pubnub.subscribe({ -                'channel'  : chan, -                'connect'  : (lambda:connected(chan+'')), -                'callback' : (lambda msg:received( msg, chan )) -            }) - -        ## Subscribe All Channels -        for chan in many_channels: subscribe(chan) -         -    ## ----------------------------------------------------------------------- -    ## Time Example -    ## ----------------------------------------------------------------------- -    def time_complete(timetoken): -        test( timetoken, 'timetoken fetch' ) -        test( isinstance( timetoken, int ), 'timetoken int type' ) - -    pubnub.time({ 'callback' : time_complete }) - -    ## ----------------------------------------------------------------------- -    ## Publish Example -    ## ----------------------------------------------------------------------- -    def publish_complete(info): -        test( info, 'publish complete' ) -        test( info and len(info) > 2, 'publish response' ) - -        pubnub.history( { -            'channel'  : crazy, -            'limit'    : 10, -            'callback' : history_complete -        }) - -    ## ----------------------------------------------------------------------- -    ## History Example -    ## ----------------------------------------------------------------------- -    def history_complete(messages): -        test( messages and len(messages) > 0, 'history' ) -        test( messages, 'history' ) - - -    pubnub.publish({ -        'channel'  : crazy, -        'message'  : "Hello World", -        'callback' : publish_complete -    }) - -    ## ----------------------------------------------------------------------- -    ## Subscribe Example -    ## ----------------------------------------------------------------------- -    def message_received(message): -        test( message, 'message received' ) -        pubnub.unsubscribe({ 'channel' : crazy }) - -        def done() : -            pubnub.unsubscribe({ 'channel' : crazy }) -            pubnub.publish({ -                'channel'  : crazy, -                'message'  : "Hello World", -                'callback' : (lambda x:x) -            }) - -        def dumpster(message) : -            test( 0, 'never see this' ) - -        pubnub.subscribe({ -            'channel'  : crazy, -            'connect'  : done, -            'callback' : dumpster -        }) - -    def connected() : -        pubnub.publish({ -            'channel' : crazy, -            'message' : { 'Info' : 'Connected!' } -        }) - -    pubnub.subscribe({ -        'channel'  : crazy, -        'connect'  : connected, -        'callback' : message_received -    }) - -    phase2() - -## ----------------------------------------------------------------------- -## Run Tests -## ----------------------------------------------------------------------- -test_pubnub(pubnub_user_supplied_options) -test_pubnub(pubnub_high_security) -pubnub_high_security.start() - diff --git a/python-twisted/tests/unit-test.py b/python-twisted/tests/unit-test.py deleted file mode 100644 index ed7fcd5..0000000 --- a/python-twisted/tests/unit-test.py +++ /dev/null @@ -1,116 +0,0 @@ -## www.pubnub.com - PubNub Real-time push service in the cloud.  -# coding=utf8 - -## PubNub Real-time Push APIs and Notifications Framework -## Copyright (c) 2010 Stephen Blum -## http://www.pubnub.com/ - -## ----------------------------------- -## PubNub 3.1 Real-time Push Cloud API -## ----------------------------------- - -import sys -sys.path.append('../') -sys.path.append('./') -sys.path.append('../common/') -from Pubnub import Pubnub - -publish_key   = len(sys.argv) > 1 and sys.argv[1] or 'demo' -subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' -secret_key    = len(sys.argv) > 3 and sys.argv[3] or 'demo' -cipher_key    = len(sys.argv) > 4 and sys.argv[4] or ''      ##(Cipher key is Optional) -ssl_on        = len(sys.argv) > 5 and bool(sys.argv[5]) or False - -## ----------------------------------------------------------------------- -## Initiat Class -## ----------------------------------------------------------------------- -pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) -crazy  = 'python-twisted-test-channel' - -## --------------------------------------------------------------------------- -## Unit Test Function -## --------------------------------------------------------------------------- -def test( trial, name ) : -    if trial : -        print( 'PASS: ' + name ) -    else : -        print( 'FAIL: ' + name ) - -## ----------------------------------------------------------------------- -## Time Example -## ----------------------------------------------------------------------- -def time_complete(timestamp): -    print(timestamp) - -pubnub.time({ 'callback' : time_complete }) - -## ----------------------------------------------------------------------- -## History Example -## ----------------------------------------------------------------------- -def history_complete(messages): -    print(messages) - -pubnub.history( { -    'channel'  : crazy, -    'limit'    : 10, -    'callback' : history_complete -}) - -## ----------------------------------------------------------------------- -## Publish Example -## ----------------------------------------------------------------------- -def publish_complete(info): -    print(info) - -pubnub.publish({ -    'channel' : crazy, -    'message' :  {'one': 'Hello World! --> ɂ顶@#$%^&*()!', 'two': 'hello2'}, -    'callback' : publish_complete -}) - -## ----------------------------------------------------------------------- -## Subscribe Example -## ----------------------------------------------------------------------- -def message_received(message): -    print(message) -    print('Disconnecting...') -    pubnub.unsubscribe({ 'channel' : crazy }) - -    def done() : -        print('final connection, done :)') -        pubnub.unsubscribe({ 'channel' : crazy }) -        pubnub.stop() - -    def dumpster(message) : -        print('never see this') -        print(message) - -    print('reconnecting...') -    pubnub.subscribe({ -        'channel'  : crazy, -        'connect'  : done, -        'callback' : dumpster -    }) - - -def connected() : -    ch = crazy -    def dump_message( msg): -        print ch, msg -        pubnub.unsubscribe({'channel':ch}) -        pubnub.stop() -    pubnub.publish({ -        'channel' : ch, -        'message' : { 'Info' : 'Connected!' }, -        'callback' : dump_message -    }) - -pubnub.subscribe({ -    'channel'  : crazy, -    'connect'  : connected, -    'callback' : message_received -}) -## ----------------------------------------------------------------------- -## IO Event Loop -## ----------------------------------------------------------------------- -pubnub.start() | 
