| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
 | 
import sys
import time
sys.path.append('../../common')
sys.path.append('../common')
sys.path.append('..')
sys.path.append('.')
from PubnubUnitTest import Suite
from Pubnub import Pubnub
pubnub = Pubnub("demo","demo")
tests_count = 1 + 2 + 1
test_suite = Suite(pubnub,tests_count)
tests = []
def test_publish():
	channel =  "hello" + str(time.time())
	name = "Publish Test"
	def success(r):
		test_suite.test(r[0] == 1, name)
	def fail(e):
		test_suite.test(False, msg , e)
	pubnub.publish({
		'channel' : 'hello',
		'message' : 'hi',
		'callback' : success,
		'error' : fail
	})
tests.append(test_publish)
#"""
def test_subscribe_publish():
	channel = "hello" + str(time.time())
	name = "Subscribe Publish Test"
	publish_msg = "This is Pubnub Python-Twisted"
	def connect():
		#print 'connect'
		def success(r):
			test_suite.test(r[0] == 1, name, "publish success")
		def fail(e):
			test_suite.test(False, name , "Publish Failed", e)
		pubnub.publish({
			'channel' : channel,
			'message' : publish_msg,
			'callback' : success,
			'error' : fail
		})
	def callback(r):
		test_suite.test(r == publish_msg, name, "message received")
	pubnub.subscribe({
		'channel' : channel,
		'callback' : callback,
		'connect' : connect
	})
tests.append(test_subscribe_publish)
#"""
def test_here_now():
	channel = "hello12" #+ str(time.time())
	name = "Here Now Test"
	def connect():
		print 'connect'
		def call_here_now():
			print 'call_here_now'
			def success(r):
				test_suite.test(r['occupancy'] == 1, name, "Here Now success")
			def fail(e):
				test_suite.test(False, name , "Here Now Failed", e)
			pubnub.here_now({
				'channel' : channel,
				'callback' : success,
				'error' : fail
			})
		pubnub.timeout(5, call_here_now)
	def callback(r):
		pass
	print 'Subscribe'
	pubnub.subscribe({
		'channel' : channel,
		'callback' : callback,
		'connect' : connect
	})
tests.append(test_here_now)
for t in tests: t()
pubnub.start()
 |