aboutsummaryrefslogtreecommitdiffstats
path: root/python-twisted/tests/unit-tests.py
blob: f143a3ac6818056996640cecc0e3a132e561e0ef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import sys
import time

sys.path.append('../../common')
sys.path.append('../common')
sys.path.append('..')
sys.path.append('.')

from PubnubUnitTest import Suite
from Pubnub import Pubnub

pubnub = Pubnub("demo","demo")

tests_count = 1 + 2 + 1
test_suite = Suite(pubnub,tests_count)

tests = []



def test_publish():
	channel =  "hello" + str(time.time())
	name = "Publish Test"
	def success(r):
		test_suite.test(r[0] == 1, name)

	def fail(e):
		test_suite.test(False, msg , e)


	pubnub.publish({
		'channel' : 'hello',
		'message' : 'hi',
		'callback' : success,
		'error' : fail
	})
tests.append(test_publish)

#"""
def test_subscribe_publish():
	channel = "hello" + str(time.time())
	name = "Subscribe Publish Test"
	publish_msg = "This is Pubnub Python-Twisted"
	def connect():
		#print 'connect'
		def success(r):
			test_suite.test(r[0] == 1, name, "publish success")

		def fail(e):
			test_suite.test(False, name , "Publish Failed", e)

		pubnub.publish({
			'channel' : channel,
			'message' : publish_msg,
			'callback' : success,
			'error' : fail
		})

	def callback(r):
		test_suite.test(r == publish_msg, name, "message received")

	pubnub.subscribe({
		'channel' : channel,
		'callback' : callback,
		'connect' : connect
	})
tests.append(test_subscribe_publish)
#"""

def test_here_now():
	channel = "hello12" #+ str(time.time())
	name = "Here Now Test"

	def connect():
		print 'connect'
		def call_here_now():
			print 'call_here_now'
			def success(r):
				test_suite.test(r['occupancy'] == 1, name, "Here Now success")

			def fail(e):
				test_suite.test(False, name , "Here Now Failed", e)

			pubnub.here_now({
				'channel' : channel,
				'callback' : success,
				'error' : fail
			})
		pubnub.timeout(5, call_here_now)

	def callback(r):
		pass
	print 'Subscribe'
	pubnub.subscribe({
		'channel' : channel,
		'callback' : callback,
		'connect' : connect
	})
tests.append(test_here_now)




for t in tests: t()

pubnub.start()