## www.pubnub.com - PubNub Real-time push service in the cloud. # coding=utf8 ## PubNub Real-time Push APIs and Notifications Framework ## Copyright (c) 2010 Stephen Blum ## http://www.pubnub.com/ ## TODO Tests ## ## - wait 20 minutes, send a message, receive and success. ## - ## - ## ## ## ----------------------------------- ## PubNub 3.1 Real-time Push Cloud API ## ----------------------------------- import sys from pubnub import PubnubTwisted as Pubnub publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' secret_key = len(sys.argv) > 3 and sys.argv[3] or None cipher_key = len(sys.argv) > 4 and sys.argv[4] or None ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False ## ----------------------------------------------------------------------- ## Command Line Options Supplied PubNub ## ----------------------------------------------------------------------- pubnub_user_supplied_options = Pubnub( publish_key, # OPTIONAL (supply None to disable) subscribe_key, # REQUIRED secret_key, # OPTIONAL (supply None to disable) cipher_key, # OPTIONAL (supply None to disable) ssl_on # OPTIONAL (supply None to disable) ) ## ----------------------------------------------------------------------- ## High Security PubNub ## ----------------------------------------------------------------------- pubnub_high_security = Pubnub( ## Publish Key 'pub-c-a30c030e-9f9c-408d-be89-d70b336ca7a0', ## Subscribe Key 'sub-c-387c90f3-c018-11e1-98c9-a5220e0555fd', ## Secret Key 'sec-c-MTliNDE0NTAtYjY4Ni00MDRkLTllYTItNDhiZGE0N2JlYzBl', ## Cipher Key 'YWxzamRmbVjFaa05HVnGFqZHM3NXRBS73jxmhVMkjiwVVXV1d5UrXR1JLSkZFRr' + 'WVd4emFtUm1iR0TFpUZvbiBoYXMgYmVlbxWkhNaF3uUi8kM0YkJTEVlZYVFjBYi' + 'jFkWFIxSkxTa1pGUjd874hjklaTFpUwRVuIFNob3VsZCB5UwRkxUR1J6YVhlQWa' + 'V1ZkNGVH32mDkdho3pqtRnRVbTFpUjBaeGUgYXNrZWQtZFoKjda40ZWlyYWl1eX' + 'U4RkNtdmNub2l1dHE2TTA1jd84jkdJTbFJXYkZwWlZtRnKkWVrSRhhWbFpZVmFz' + 'c2RkZmTFpUpGa1dGSXhTa3hUYTFwR1Vpkm9yIGluZm9ybWFNfdsWQdSiiYXNWVX' + 'RSblJWYlRGcFVqQmFlRmRyYUU0MFpXbHlZV2wxZVhVNFJrTnR51YjJsMWRIRTJU' + 'W91ciBpbmZvcm1hdGliBzdWJtaXR0ZWQb3UZSBhIHJlc3BvbnNlLCB3ZWxsIHJl' + 'VEExWdHVybiB0am0aW9uIb24gYXMgd2UgcG9zc2libHkgY2FuLuhcFe24ldWVns' + 'dSaTFpU3hVUjFKNllWaFdhRmxZUWpCaQo34gcmVxdWlGFzIHNveqQl83snBfVl3', ## 2048bit SSL ON - ENABLED TRUE True ) ## ----------------------------------------------------------------------- ## Channel | Message Test Data (UTF-8) ## ----------------------------------------------------------------------- crazy = ' ~`â¦â§!@#$%^&*(顶顅Ȓ)+=[]\\{}|;\':",./<>?abcd' many_channels = [str(x) + '-many_channel_test' for x in range(10)] runthroughs = 0 planned_tests = 2 delivery_retries = 0 max_retries = 10 ## ----------------------------------------------------------------------- ## Unit Test Function ## ----------------------------------------------------------------------- def test(trial, name): if trial: print('PASS: ' + name) else: print('- FAIL - ' + name) def test_pubnub(pubnub): global runthroughs, planned_tests, delivery_retries, max_retries ## ----------------------------------------------------------------------- ## Many Channels ## ----------------------------------------------------------------------- def phase2(): status = { 'sent': 0, 'received': 0, 'connections': 0 } def received(message, chan): global runthroughs test(status['received'] <= status['sent'], 'many sends') status['received'] += 1 pubnub.unsubscribe({'channel': chan}) if status['received'] == len(many_channels): runthroughs += 1 if runthroughs == planned_tests: pubnub.stop() def publish_complete(info, chan): global delivery_retries, max_retries status['sent'] += 1 test(info, 'publish complete') test(info and len(info) > 2, 'publish response') if not info[0]: delivery_retries += 1 if max_retries > delivery_retries: sendit(chan) def sendit(chan): tchan = chan pubnub.publish({ 'channel': chan, 'message': "Hello World", 'callback': (lambda msg: publish_complete(msg, tchan)) }) def connected(chan): status['connections'] += 1 sendit(chan) def delivered(info): if info and info[0]: status['sent'] += 1 def subscribe(chan): pubnub.subscribe({ 'channel': chan, 'connect': (lambda: connected(chan + '')), 'callback': (lambda msg: received(msg, chan)) }) ## Subscribe All Channels for chan in many_channels: subscribe(chan) ## ----------------------------------------------------------------------- ## Time Example ## ----------------------------------------------------------------------- def time_complete(timetoken): test(timetoken, 'timetoken fetch') test(isinstance(timetoken, int), 'timetoken int type') pubnub.time({'callback': time_complete}) ## ----------------------------------------------------------------------- ## Publish Example ## ----------------------------------------------------------------------- def publish_complete(info): test(info, 'publish complete') test(info and len(info) > 2, 'publish response') pubnub.history({ 'channel': crazy, 'limit': 10, 'callback': history_complete }) ## ----------------------------------------------------------------------- ## History Example ## ----------------------------------------------------------------------- def history_complete(messages): test(messages and len(messages) > 0, 'history') test(messages, 'history') pubnub.publish({ 'channel': crazy, 'message': "Hello World", 'callback': publish_complete }) ## ----------------------------------------------------------------------- ## Subscribe Example ## ----------------------------------------------------------------------- def message_received(message): test(message, 'message received') pubnub.unsubscribe({'channel': crazy}) def done(): pubnub.unsubscribe({'channel': crazy}) pubnub.publish({ 'channel': crazy, 'message': "Hello World", 'callback': (lambda x: x) }) def dumpster(message): test(0, 'never see this') pubnub.subscribe({ 'channel': crazy, 'connect': done, 'callback': dumpster }) def connected(): pubnub.publish({ 'channel': crazy, 'message': {'Info': 'Connected!'} }) pubnub.subscribe({ 'channel': crazy, 'connect': connected, 'callback': message_received }) phase2() ## ----------------------------------------------------------------------- ## Run Tests ## ----------------------------------------------------------------------- test_pubnub(pubnub_user_supplied_options) test_pubnub(pubnub_high_security) pubnub_high_security.start()