aboutsummaryrefslogtreecommitdiffstats
path: root/pubnub.py
diff options
context:
space:
mode:
Diffstat (limited to 'pubnub.py')
-rw-r--r--pubnub.py2396
1 files changed, 2396 insertions, 0 deletions
diff --git a/pubnub.py b/pubnub.py
new file mode 100644
index 0000000..fcbf2dc
--- /dev/null
+++ b/pubnub.py
@@ -0,0 +1,2396 @@
+
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2014-15 Stephen Blum
+## http://www.pubnub.com/
+
+## -----------------------------------
+## PubNub 3.7.2 Real-time Push Cloud API
+## -----------------------------------
+
+
+try:
+ import json
+except ImportError:
+ import simplejson as json
+
+import time
+import hashlib
+import uuid as uuid_lib
+import random
+import sys
+from base64 import urlsafe_b64encode
+from base64 import encodestring, decodestring
+import hmac
+from Crypto.Cipher import AES
+
+try:
+ from hashlib import sha256
+ digestmod = sha256
+except ImportError:
+ import Crypto.Hash.SHA256 as digestmod
+ sha256 = digestmod.new
+
+
+##### vanilla python imports #####
+try:
+ from urllib.parse import quote
+except ImportError:
+ from urllib2 import quote
+try:
+ import urllib.request
+except ImportError:
+ import urllib2
+
+try:
+ import requests
+ from requests.adapters import HTTPAdapter
+except ImportError:
+ pass
+
+#import urllib
+import socket
+import threading
+
+try:
+ import urllib3.HTTPConnection
+ default_socket_options = urllib3.HTTPConnection.default_socket_options
+except:
+ default_socket_options = []
+
+default_socket_options += [
+ # Enable TCP keepalive
+ (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+]
+
+if sys.platform.startswith("linux"):
+ default_socket_options += [
+ # Send first keepalive packet 200 seconds after last data packet
+ (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 200),
+ # Resend keepalive packets every second, when unanswered
+ (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1),
+ # Close the socket after 5 unanswered keepalive packets
+ (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
+ ]
+elif sys.platform.startswith("darwin"):
+ # From /usr/include/netinet/tcp.h
+
+ # idle time used when SO_KEEPALIVE is enabled
+ socket.TCP_KEEPALIVE = socket.TCP_KEEPALIVE \
+ if hasattr(socket, 'TCP_KEEPALIVE') \
+ else 0x10
+
+ # interval between keepalives
+ socket.TCP_KEEPINTVL = socket.TCP_KEEPINTVL \
+ if hasattr(socket, 'TCP_KEEPINTVL') \
+ else 0x101
+
+ # number of keepalives before close
+ socket.TCP_KEEPCNT = socket.TCP_KEEPCNT \
+ if hasattr(socket, 'TCP_KEEPCNT') \
+ else 0x102
+
+ default_socket_options += [
+ # Send first keepalive packet 200 seconds after last data packet
+ (socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, 200),
+ # Resend keepalive packets every second, when unanswered
+ (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1),
+ # Close the socket after 5 unanswered keepalive packets
+ (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
+ ]
+"""
+# The Windows code is currently untested
+elif sys.platform.startswith("win"):
+ import struct
+ from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
+
+ def patch_socket_keepalive(conn):
+ conn.sock.ioctl(socket.SIO_KEEPALIVE_VALS, (
+ # Enable TCP keepalive
+ 1,
+ # Send first keepalive packet 200 seconds after last data packet
+ 200,
+ # Resend keepalive packets every second, when unanswered
+ 1
+ ))
+
+ class PubnubHTTPConnectionPool(HTTPConnectionPool):
+ def _validate_conn(self, conn):
+ super(PubnubHTTPConnectionPool, self)._validate_conn(conn)
+
+ class PubnubHTTPSConnectionPool(HTTPSConnectionPool):
+ def _validate_conn(self, conn):
+ super(PubnubHTTPSConnectionPool, self)._validate_conn(conn)
+
+ import urllib3.poolmanager
+ urllib3.poolmanager.pool_classes_by_scheme = {
+ 'http' : PubnubHTTPConnectionPool,
+ 'https' : PubnubHTTPSConnectionPool
+ }
+"""
+
+##################################
+
+
+##### Tornado imports and globals #####
+try:
+ import tornado.httpclient
+ import tornado.ioloop
+ from tornado.stack_context import ExceptionStackContext
+ ioloop = tornado.ioloop.IOLoop.instance()
+except ImportError:
+ pass
+
+#######################################
+
+
+##### Twisted imports and globals #####
+try:
+ from twisted.internet import reactor
+ from twisted.internet.defer import Deferred
+ from twisted.internet.protocol import Protocol
+ from twisted.web.client import Agent, ContentDecoderAgent
+ from twisted.web.client import RedirectAgent, GzipDecoder
+ from twisted.web.client import HTTPConnectionPool
+ from twisted.web.http_headers import Headers
+ from twisted.internet.ssl import ClientContextFactory
+ import twisted
+
+
+ pnconn_pool = HTTPConnectionPool(reactor, persistent=True)
+ pnconn_pool.maxPersistentPerHost = 100000
+ pnconn_pool.cachedConnectionTimeout = 15
+ pnconn_pool.retryAutomatically = True
+
+ class WebClientContextFactory(ClientContextFactory):
+ def getContext(self, hostname, port):
+ return ClientContextFactory.getContext(self)
+
+ class PubNubPamResponse(Protocol):
+ def __init__(self, finished):
+ self.finished = finished
+
+ def dataReceived(self, bytes):
+ self.finished.callback(bytes)
+
+ class PubNubResponse(Protocol):
+ def __init__(self, finished):
+ self.finished = finished
+
+ def dataReceived(self, bytes):
+ self.finished.callback(bytes)
+except ImportError:
+ pass
+
+
+#######################################
+
+
+def get_data_for_user(data):
+ try:
+ if 'message' in data and 'payload' in data:
+ return {'message': data['message'], 'payload': data['payload']}
+ else:
+ return data
+ except TypeError:
+ return data
+
+
+class PubnubCrypto2():
+
+ def pad(self, msg, block_size=16):
+
+ padding = block_size - (len(msg) % block_size)
+ return msg + chr(padding) * padding
+
+ def depad(self, msg):
+
+ return msg[0:-ord(msg[-1])]
+
+ def getSecret(self, key):
+
+ return hashlib.sha256(key).hexdigest()
+
+ def encrypt(self, key, msg):
+ secret = self.getSecret(key)
+ Initial16bytes = '0123456789012345'
+ cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
+ enc = encodestring(cipher.encrypt(self.pad(msg)))
+ return enc
+
+ def decrypt(self, key, msg):
+
+ try:
+ secret = self.getSecret(key)
+ Initial16bytes = '0123456789012345'
+ cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
+ plain = self.depad(cipher.decrypt(decodestring(msg)))
+ except:
+ return msg
+ try:
+ return eval(plain)
+ except SyntaxError:
+ return plain
+
+class PubnubCrypto3():
+
+ def pad(self, msg, block_size=16):
+
+ padding = block_size - (len(msg) % block_size)
+ return msg + (chr(padding) * padding).encode('utf-8')
+
+ def depad(self, msg):
+
+ return msg[0:-ord(msg[-1])]
+
+ def getSecret(self, key):
+
+ return hashlib.sha256(key.encode("utf-8")).hexdigest()
+
+ def encrypt(self, key, msg):
+
+ secret = self.getSecret(key)
+ Initial16bytes = '0123456789012345'
+ cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
+ return encodestring(
+ cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8')
+
+ def decrypt(self, key, msg):
+
+ secret = self.getSecret(key)
+ Initial16bytes = '0123456789012345'
+ cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
+ return (cipher.decrypt(
+ decodestring(msg.encode('utf-8')))).decode('utf-8')
+
+
+class PubnubBase(object):
+ def __init__(
+ self,
+ publish_key,
+ subscribe_key,
+ secret_key=False,
+ cipher_key=False,
+ auth_key=None,
+ ssl_on=False,
+ origin='pubsub.pubnub.com',
+ uuid=None
+ ):
+ """Pubnub Class
+
+ Provides methods to communicate with Pubnub cloud
+
+ Attributes:
+ publish_key: Publish Key
+ subscribe_key: Subscribe Key
+ secret_key: Secret Key
+ cipher_key: Cipher Key
+ auth_key: Auth Key (used with Pubnub Access Manager i.e. PAM)
+ ssl: SSL enabled ?
+ origin: Origin
+ """
+
+ self.origin = origin
+ self.version = '3.7.2'
+ self.limit = 1800
+ self.publish_key = publish_key
+ self.subscribe_key = subscribe_key
+ self.secret_key = secret_key
+ self.cipher_key = cipher_key
+ self.ssl = ssl_on
+ self.auth_key = auth_key
+ self.STATE = {}
+
+ if self.ssl:
+ self.origin = 'https://' + self.origin
+ else:
+ self.origin = 'http://' + self.origin
+
+ self.uuid = uuid or str(uuid_lib.uuid4())
+
+ if type(sys.version_info) is tuple:
+ self.python_version = 2
+ self.pc = PubnubCrypto2()
+ else:
+ if sys.version_info.major == 2:
+ self.python_version = 2
+ self.pc = PubnubCrypto2()
+ else:
+ self.python_version = 3
+ self.pc = PubnubCrypto3()
+
+ if not isinstance(self.uuid, str):
+ raise AttributeError("uuid must be a string")
+
+ def _pam_sign(self, msg):
+
+ sign = urlsafe_b64encode(hmac.new(
+ self.secret_key.encode("utf-8"),
+ msg.encode("utf-8"),
+ sha256
+ ).digest())
+ return quote(sign, safe="")
+
+ def set_u(self, u=False):
+ self.u = u
+
+ def _pam_auth(self, query, apicode=0, callback=None, error=None):
+
+ if 'timestamp' not in query:
+ query['timestamp'] = int(time.time())
+
+ ## Global Grant?
+ if 'auth' in query and not query['auth']:
+ del query['auth']
+
+ if 'channel' in query and not query['channel']:
+ del query['channel']
+
+ if 'channel-group' in query and not query['channel-group']:
+ del query['channel-group']
+
+
+ params = "&".join([
+ x + "=" + quote(
+ str(query[x]), safe=""
+ ) for x in sorted(query)
+ ])
+ sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format(
+ subkey=self.subscribe_key,
+ pubkey=self.publish_key,
+ apitype="audit" if (apicode) else "grant",
+ params=params
+ )
+ query['signature'] = self._pam_sign(sign_input)
+
+ return self._request({"urlcomponents": [
+ 'v1', 'auth', "audit" if (apicode) else "grant",
+ 'sub-key',
+ self.subscribe_key
+ ], 'urlparams': query},
+ self._return_wrapped_callback(callback),
+ self._return_wrapped_callback(error))
+
+ def get_origin(self):
+ return self.origin
+
+ def set_auth_key(self, auth_key):
+ self.auth_key = auth_key
+
+ def get_auth_key(self):
+ return self.auth_key
+
+ def grant(self, channel=None, channel_group=None, auth_key=False, read=False,
+ write=False, manage=False, ttl=5, callback=None, error=None):
+ """Method for granting permissions.
+
+ This function establishes subscribe and/or write permissions for
+ PubNub Access Manager (PAM) by setting the read or write attribute
+ to true. A grant with read or write set to false (or not included)
+ will revoke any previous grants with read or write set to true.
+
+ Permissions can be applied to any one of three levels:
+ 1. Application level privileges are based on subscribe_key applying to all associated channels.
+ 2. Channel level privileges are based on a combination of subscribe_key and channel name.
+ 3. User level privileges are based on the combination of subscribe_key, channel and auth_key.
+
+ Args:
+ channel: (string) (optional)
+ Specifies channel name to grant permissions to.
+ If channel/channel_group is not specified, the grant applies to all
+ channels associated with the subscribe_key. If auth_key
+ is not specified, it is possible to grant permissions to
+ multiple channels simultaneously by specifying the channels
+ as a comma separated list.
+ channel_group: (string) (optional)
+ Specifies channel group name to grant permissions to.
+ If channel/channel_group is not specified, the grant applies to all
+ channels associated with the subscribe_key. If auth_key
+ is not specified, it is possible to grant permissions to
+ multiple channel groups simultaneously by specifying the channel groups
+ as a comma separated list.
+
+ auth_key: (string) (optional)
+ Specifies auth_key to grant permissions to.
+ It is possible to specify multiple auth_keys as comma
+ separated list in combination with a single channel name.
+ If auth_key is provided as the special-case value "null"
+ (or included in a comma-separated list, eg. "null,null,abc"),
+ a new auth_key will be generated and returned for each "null" value.
+
+ read: (boolean) (default: True)
+ Read permissions are granted by setting to True.
+ Read permissions are removed by setting to False.
+
+ write: (boolean) (default: True)
+ Write permissions are granted by setting to true.
+ Write permissions are removed by setting to false.
+ manage: (boolean) (default: True)
+ Manage permissions are granted by setting to true.
+ Manage permissions are removed by setting to false.
+
+ ttl: (int) (default: 1440 i.e 24 hrs)
+ Time in minutes for which granted permissions are valid.
+ Max is 525600 , Min is 1.
+ Setting ttl to 0 will apply the grant indefinitely.
+
+ callback: (function) (optional)
+ A callback method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado
+
+ error: (function) (optional)
+ An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ Returns:
+ Returns a dict in sync mode i.e. when callback argument is not given
+ The dict returned contains values with keys 'message' and 'payload'
+
+ Sample Response:
+ {
+ "message":"Success",
+ "payload":{
+ "ttl":5,
+ "auths":{
+ "my_ro_authkey":{"r":1,"w":0}
+ },
+ "subscribe_key":"my_subkey",
+ "level":"user",
+ "channel":"my_channel"
+ }
+ }
+ """
+
+ return self._pam_auth({
+ 'channel' : channel,
+ 'channel-group' : channel_group,
+ 'auth' : auth_key,
+ 'r' : read and 1 or 0,
+ 'w' : write and 1 or 0,
+ 'm' : manage and 1 or 0,
+ 'ttl' : ttl,
+ 'pnsdk' : self.pnsdk
+ }, callback=callback, error=error)
+
+ def revoke(self, channel=None, channel_group=None, auth_key=None, ttl=1, callback=None, error=None):
+ """Method for revoking permissions.
+
+ Args:
+ channel: (string) (optional)
+ Specifies channel name to revoke permissions to.
+ If channel/channel_group is not specified, the revoke applies to all
+ channels associated with the subscribe_key. If auth_key
+ is not specified, it is possible to grant permissions to
+ multiple channels simultaneously by specifying the channels
+ as a comma separated list.
+
+ channel_group: (string) (optional)
+ Specifies channel group name to revoke permissions to.
+ If channel/channel_group is not specified, the grant applies to all
+ channels associated with the subscribe_key. If auth_key
+ is not specified, it is possible to revoke permissions to
+ multiple channel groups simultaneously by specifying the channel groups
+ as a comma separated list.
+
+ auth_key: (string) (optional)
+ Specifies auth_key to revoke permissions to.
+ It is possible to specify multiple auth_keys as comma
+ separated list in combination with a single channel name.
+ If auth_key is provided as the special-case value "null"
+ (or included in a comma-separated list, eg. "null,null,abc"),
+ a new auth_key will be generated and returned for each "null" value.
+
+ ttl: (int) (default: 1440 i.e 24 hrs)
+ Time in minutes for which granted permissions are valid.
+ Max is 525600 , Min is 1.
+ Setting ttl to 0 will apply the grant indefinitely.
+
+ callback: (function) (optional)
+ A callback method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado
+
+ error: (function) (optional)
+ An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ Returns:
+ Returns a dict in sync mode i.e. when callback argument is not given
+ The dict returned contains values with keys 'message' and 'payload'
+
+ Sample Response:
+ {
+ "message":"Success",
+ "payload":{
+ "ttl":5,
+ "auths":{
+ "my_authkey":{"r":0,"w":0}
+ },
+ "subscribe_key":"my_subkey",
+ "level":"user",
+ "channel":"my_channel"
+ }
+ }
+
+ """
+
+ return self._pam_auth({
+ 'channel' : channel,
+ 'channel-group' : channel_group,
+ 'auth' : auth_key,
+ 'r' : 0,
+ 'w' : 0,
+ 'ttl' : ttl,
+ 'pnsdk' : self.pnsdk
+ }, callback=callback, error=error)
+
+ def audit(self, channel=None, channel_group=None, auth_key=None, callback=None, error=None):
+ """Method for fetching permissions from pubnub servers.
+
+ This method provides a mechanism to reveal existing PubNub Access Manager attributes
+ for any combination of subscribe_key, channel and auth_key.
+
+ Args:
+ channel: (string) (optional)
+ Specifies channel name to return PAM
+ attributes optionally in combination with auth_key.
+ If channel/channel_group is not specified, results for all channels
+ associated with subscribe_key are returned.
+ If auth_key is not specified, it is possible to return
+ results for a comma separated list of channels.
+ channel_group: (string) (optional)
+ Specifies channel group name to return PAM
+ attributes optionally in combination with auth_key.
+ If channel/channel_group is not specified, results for all channels
+ associated with subscribe_key are returned.
+ If auth_key is not specified, it is possible to return
+ results for a comma separated list of channels.
+
+ auth_key: (string) (optional)
+ Specifies the auth_key to return PAM attributes for.
+ If only a single channel is specified, it is possible to return
+ results for a comma separated list of auth_keys.
+
+ callback: (function) (optional)
+ A callback method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado
+
+ error: (function) (optional)
+ An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ Returns:
+ Returns a dict in sync mode i.e. when callback argument is not given
+ The dict returned contains values with keys 'message' and 'payload'
+
+ Sample Response
+ {
+ "message":"Success",
+ "payload":{
+ "channels":{
+ "my_channel":{
+ "auths":{"my_ro_authkey":{"r":1,"w":0},
+ "my_rw_authkey":{"r":0,"w":1},
+ "my_admin_authkey":{"r":1,"w":1}
+ }
+ }
+ },
+ }
+
+ Usage:
+
+ pubnub.audit ('my_channel'); # Sync Mode
+
+ """
+
+ return self._pam_auth({
+ 'channel' : channel,
+ 'channel-group' : channel_group,
+ 'auth' : auth_key,
+ 'pnsdk' : self.pnsdk
+ }, 1, callback=callback, error=error)
+
+ def encrypt(self, message):
+ """Method for encrypting data.
+
+ This method takes plaintext as input and returns encrypted data.
+ This need not be called directly as enncryption/decryption is
+ taken care of transparently by Pubnub class if cipher key is
+ provided at time of initializing pubnub object
+
+ Args:
+ message: Message to be encrypted.
+
+ Returns:
+ Returns encrypted message if cipher key is set
+ """
+ if self.cipher_key:
+ message = json.dumps(self.pc.encrypt(
+ self.cipher_key, json.dumps(message)).replace('\n', ''))
+ else:
+ message = json.dumps(message)
+
+ return message
+
+ def decrypt(self, message):
+ """Method for decrypting data.
+
+ This method takes ciphertext as input and returns decrypted data.
+ This need not be called directly as enncryption/decryption is
+ taken care of transparently by Pubnub class if cipher key is
+ provided at time of initializing pubnub object
+
+ Args:
+ message: Message to be decrypted.
+
+ Returns:
+ Returns decrypted message if cipher key is set
+ """
+ if self.cipher_key:
+ message = self.pc.decrypt(self.cipher_key, message)
+
+ return message
+
+ def _return_wrapped_callback(self, callback=None):
+ def _new_format_callback(response):
+ if 'payload' in response:
+ if (callback is not None):
+ callback_data = dict()
+ callback_data['payload'] = response['payload']
+
+ if 'message' in response:
+ callback_data['message'] = response['message']
+
+ if (callback is not None):
+ callback(callback_data)
+ else:
+ if (callback is not None):
+ callback(response)
+ if (callback is not None):
+ return _new_format_callback
+ else:
+ return None
+
+ def leave_channel(self, channel, callback=None, error=None):
+ ## Send leave
+ return self._request({"urlcomponents": [
+ 'v2', 'presence',
+ 'sub_key',
+ self.subscribe_key,
+ 'channel',
+ channel,
+ 'leave'
+ ], 'urlparams': {'auth': self.auth_key, 'pnsdk' : self.pnsdk, "uuid": self.uuid,}},
+ callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
+
+ def leave_group(self, channel_group, callback=None, error=None):
+ ## Send leave
+ return self._request({"urlcomponents": [
+ 'v2', 'presence',
+ 'sub_key',
+ self.subscribe_key,
+ 'channel',
+ ',',
+ 'leave'
+ ], 'urlparams': {'auth': self.auth_key, 'pnsdk' : self.pnsdk, 'channel-group' : channel_group, "uuid": self.uuid,}},
+ callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
+
+
+ def publish(self, channel, message, callback=None, error=None):
+ """Publishes data on a channel.
+
+ The publish() method is used to send a message to all subscribers of a channel.
+ To publish a message you must first specify a valid publish_key at initialization.
+ A successfully published message is replicated across the PubNub Real-Time Network
+ and sent simultaneously to all subscribed clients on a channel.
+ Messages in transit can be secured from potential eavesdroppers with SSL/TLS by
+ setting ssl to True during initialization.
+
+ Published messages can also be encrypted with AES-256 simply by specifying a cipher_key
+ during initialization.
+
+ Args:
+ channel: (string)
+ Specifies channel name to publish messages to.
+ message: (string/int/double/dict/list)
+ Message to be published
+ callback: (optional)
+ A callback method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado
+ error: (optional)
+ An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado
+
+ Returns:
+ Sync Mode : list
+ Async Mode : None
+
+ The function returns the following formatted response:
+
+ [ Number, "Status", "Time Token"]
+
+ The output below demonstrates the response to a successful call:
+
+ [1,"Sent","13769558699541401"]
+
+ """
+
+ message = self.encrypt(message)
+
+ ## Send Message
+ return self._request({"urlcomponents": [
+ 'publish',
+ self.publish_key,
+ self.subscribe_key,
+ '0',
+ channel,
+ '0',
+ message
+ ], 'urlparams': {'auth': self.auth_key, 'pnsdk' : self.pnsdk}},
+ callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
+
+ def presence(self, channel, callback, error=None, connect=None, disconnect=None, reconnect=None):
+ """Subscribe to presence events on a channel.
+
+ Only works in async mode
+
+ Args:
+ channel: Channel name ( string ) on which to listen for events
+ callback: A callback method should be passed as parameter.
+ If passed, the api works in async mode.
+ Required argument when working with twisted or tornado .
+ error: Optional variable. An error method can be passed as parameter.
+ If set, the api works in async mode.
+
+ Returns:
+ None
+ """
+ return self.subscribe(channel+'-pnpres', callback=callback, error=error, connect=connect, disconnect=disconnect, reconnect=reconnect)
+
+ def presence_group(self, channel_group, callback, error=None, connect=None, disconnect=None, reconnect=None):
+ """Subscribe to presence events on a channel group.
+
+ Only works in async mode
+
+ Args:
+ channel_group: Channel group name ( string )
+ callback: A callback method should be passed to the method.
+ If passed, the api works in async mode.
+ Required argument when working with twisted or tornado .
+ error: Optional variable. An error method can be passed as parameter.
+ If passed, the api works in async mode.
+
+ Returns:
+ None
+ """
+ return self.subscribe_group(channel_group+'-pnpres', callback=callback, error=error, connect=connect, disconnect=disconnect, reconnect=reconnect)
+
+ def here_now(self, channel, uuids=True, state=False, callback=None, error=None):
+ """Get here now data.
+
+ You can obtain information about the current state of a channel including
+ a list of unique user-ids currently subscribed to the channel and the total
+ occupancy count of the channel by calling the here_now() function in your
+ application.
+
+
+ Args:
+ channel: (string) (optional)
+ Specifies the channel name to return occupancy results.
+ If channel is not provided, here_now will return data for all channels.
+
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ error: (optional)
+ Optional variable. An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ Returns:
+ Sync Mode: list
+ Async Mode: None
+
+ Response Format:
+
+ The here_now() method returns a list of uuid s currently subscribed to the channel.
+
+ uuids:["String","String", ... ,"String"] - List of UUIDs currently subscribed to the channel.
+
+ occupancy: Number - Total current occupancy of the channel.
+
+ Example Response:
+ {
+ occupancy: 4,
+ uuids: [
+ '123123234t234f34fq3dq',
+ '143r34f34t34fq34q34q3',
+ '23f34d3f4rq34r34rq23q',
+ 'w34tcw45t45tcw435tww3',
+ ]
+ }
+ """
+
+ urlcomponents = [
+ 'v2', 'presence',
+ 'sub_key', self.subscribe_key
+ ]
+
+ if (channel is not None and len(channel) > 0):
+ urlcomponents.append('channel')
+ urlcomponents.append(channel)
+
+ data = {'auth': self.auth_key, 'pnsdk' : self.pnsdk}
+
+ if state is True:
+ data['state'] = '1'
+
+ if uuids is False:
+ data['disable_uuids'] = '1'
+
+ ## Get Presence Here Now
+ return self._request({"urlcomponents": urlcomponents,
+ 'urlparams': data},
+ callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
+
+
+ def history(self, channel, count=100, reverse=False,
+ start=None, end=None, include_token=False, callback=None, error=None):
+ """This method fetches historical messages of a channel.
+
+ PubNub Storage/Playback Service provides real-time access to an unlimited
+ history for all messages published to PubNub. Stored messages are replicated
+ across multiple availability zones in several geographical data center
+ locations. Stored messages can be encrypted with AES-256 message encryption
+ ensuring that they are not readable while stored on PubNub's network.
+
+ It is possible to control how messages are returned and in what order,
+ for example you can:
+
+ Return messages in the order newest to oldest (default behavior).
+
+ Return messages in the order oldest to newest by setting reverse to true.
+
+ Page through results by providing a start or end time token.
+
+ Retrieve a "slice" of the time line by providing both a start and end time token.
+
+ Limit the number of messages to a specific quantity using the count parameter.
+
+
+
+ Args:
+ channel: (string)
+ Specifies channel to return history messages from
+
+ count: (int) (default: 100)
+ Specifies the number of historical messages to return
+
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ error: (optional)
+ An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ Returns:
+ Returns a list in sync mode i.e. when callback argument is not given
+
+ Sample Response:
+ [["Pub1","Pub2","Pub3","Pub4","Pub5"],13406746729185766,13406746845892666]
+ """
+
+ def _get_decrypted_history(resp):
+ try:
+ if resp is not None and isinstance(resp, (list)) and resp[1] is not None and self.cipher_key:
+ msgs = resp[0]
+ for i in range(0,len(msgs)):
+ msgs[i] = self.decrypt(msgs[i])
+ except KeyError:
+ pass
+ return resp
+
+ def _history_callback(resp):
+ if callback is not None:
+ callback(_get_decrypted_history(resp))
+
+ if callback is None:
+ history_cb = None
+ else:
+ history_cb = _history_callback
+
+ params = dict()
+
+ params['count'] = count
+ params['reverse'] = reverse
+ params['start'] = start
+ params['end'] = end
+ params['auth'] = self.auth_key
+ params['pnsdk'] = self.pnsdk
+ params['include_token'] = 'true' if include_token else 'false'
+
+ ## Get History
+ return _get_decrypted_history(self._request({'urlcomponents': [
+ 'v2',
+ 'history',
+ 'sub-key',
+ self.subscribe_key,
+ 'channel',
+ channel,
+ ], 'urlparams': params},
+ callback=self._return_wrapped_callback(history_cb),
+ error=self._return_wrapped_callback(error)))
+
+ def time(self, callback=None):
+ """This function will return a 17 digit precision Unix epoch.
+
+ Args:
+
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ Returns:
+ Returns a 17 digit number in sync mode i.e. when callback argument is not given
+
+ Sample:
+ 13769501243685161
+ """
+
+ time = self._request({'urlcomponents': [
+ 'time',
+ '0'
+ ]}, callback)
+ if time is not None:
+ return time[0]
+
+ def _encode(self, request):
+ return [
+ "".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and
+ hex(ord(ch)).replace('0x', '%').upper() or
+ ch for ch in list(bit)
+ ]) for bit in request]
+
+ def getUrl(self, request):
+
+ if self.u is True and "urlparams" in request:
+ request['urlparams']['u'] = str(random.randint(1, 100000000000))
+ ## Build URL
+ url = self.origin + '/' + "/".join([
+ "".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and
+ hex(ord(ch)).replace('0x', '%').upper() or
+ ch for ch in list(bit)
+ ]) for bit in request["urlcomponents"]])
+ if ("urlparams" in request):
+ url = url + '?' + "&".join([x + "=" + str(y) for x, y in request[
+ "urlparams"].items() if y is not None and len(str(y)) > 0])
+ #print(url)
+ return url
+
+ def _channel_registry(self, url=None, params=None, callback=None, error=None):
+
+ if (params is None):
+ params = dict()
+
+ urlcomponents = ['v1', 'channel-registration', 'sub-key', self.subscribe_key ]
+
+ if (url is not None):
+ urlcomponents += url
+
+ params['auth'] = self.auth_key
+ params['pnsdk'] = self.pnsdk
+
+ ## Get History
+ return self._request({'urlcomponents': urlcomponents, 'urlparams': params},
+ callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
+
+ def _channel_group(self, channel_group=None, channels=None, cloak=None,mode='add', callback=None, error=None):
+ params = dict()
+ url = []
+ namespace = None
+
+ if (channel_group is not None and len(channel_group) > 0):
+ ns_ch_a = channel_group.split(':')
+
+ if len(ns_ch_a) > 1:
+ namespace = None if ns_ch_a[0] == '*' else ns_ch_a[0]
+ channel_group = ns_ch_a[1]
+ else:
+ channel_group = ns_ch_a[0]
+
+ if (namespace is not None):
+ url.append('namespace')
+ url.append(self._encode(namespace))
+
+ url.append('channel-group')
+
+ if channel_group is not None and channel_group != '*':
+ url.append(channel_group)
+
+ if (channels is not None):
+ if (type(channels) is list):
+ channels = ','.join(channels)
+ params[mode] = channels
+ #params['cloak'] = 'true' if CLOAK is True else 'false'
+ else:
+ if mode == 'remove':
+ url.append('remove')
+
+ return self._channel_registry(url=url, params=params, callback=callback, error=error)
+
+
+ def channel_group_list_namespaces(self, callback=None, error=None):
+ """Get list of namespaces.
+
+ You can obtain list of namespaces for the subscribe key associated with PubNub
+ object using this method.
+
+
+ Args:
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado.
+
+ error: (optional)
+ Optional variable. An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado.
+
+ Returns:
+ Sync Mode: dict
+ channel_group_list_namespaces method returns a dict which contains list of namespaces
+ in payload field
+ {
+ u'status': 200,
+ u'payload': {
+ u'sub_key': u'demo',
+ u'namespaces': [u'dev', u'foo']
+ },
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ Async Mode: None (callback gets the response as parameter)
+
+ Response Format:
+
+ The callback passed to channel_group_list_namespaces gets the a dict containing list of namespaces
+ under payload field
+
+ {
+ u'payload': {
+ u'sub_key': u'demo',
+ u'namespaces': [u'dev', u'foo']
+ }
+ }
+
+ namespaces is the list of namespaces for the given subscribe key
+
+
+ """
+
+ url = ['namespace']
+ return self._channel_registry(url=url, callback=callback, error=error)
+
+ def channel_group_remove_namespace(self, namespace, callback=None, error=None):
+ """Remove a namespace.
+
+ A namespace can be deleted using this method.
+
+
+ Args:
+ namespace: (string) namespace to be deleted
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ error: (optional)
+ Optional variable. An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ Returns:
+ Sync Mode: dict
+ channel_group_remove_namespace method returns a dict indicating status of the request
+
+ {
+ u'status': 200,
+ u'message': 'OK',
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ Async Mode: None ( callback gets the response as parameter )
+
+ Response Format:
+
+ The callback passed to channel_group_list_namespaces gets the a dict indicating status of the request
+
+ {
+ u'status': 200,
+ u'message': 'OK',
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ """
+ url = ['namespace', self._encode(namespace), 'remove']
+ return self._channel_registry(url=url, callback=callback, error=error)
+
+ def channel_group_list_groups(self, namespace=None, callback=None, error=None):
+ """Get list of groups.
+
+ Using this method, list of groups for the subscribe key associated with PubNub
+ object, can be obtained. If namespace is provided, groups within the namespace
+ only are listed
+
+ Args:
+ namespace: (string) (optional) namespace
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ error: (optional)
+ Optional variable. An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ Returns:
+ Sync Mode: dict
+ channel_group_list_groups method returns a dict which contains list of groups
+ in payload field
+ {
+ u'status': 200,
+ u'payload': {"namespace": "dev", "groups": ["abcd"]},
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ Async Mode: None ( callback gets the response as parameter )
+
+ Response Format:
+
+ The callback passed to channel_group_list_namespaces gets the a dict containing list of groups
+ under payload field
+
+ {
+ u'payload': {"namespace": "dev", "groups": ["abcd"]}
+ }
+
+
+
+ """
+
+ if (namespace is not None and len(namespace) > 0):
+ channel_group = namespace + ':*'
+ else:
+ channel_group = '*:*'
+
+ return self._channel_group(channel_group=channel_group, callback=callback, error=error)
+
+ def channel_group_list_channels(self, channel_group, callback=None, error=None):
+ """Get list of channels for a group.
+
+ Using this method, list of channels for a group, can be obtained.
+
+ Args:
+ channel_group: (string) (optional)
+ Channel Group name. It can also contain namespace.
+ If namespace is also specified, then the parameter
+ will be in format namespace:channel_group
+
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado.
+
+ error: (optional)
+ Optional variable. An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado.
+
+ Returns:
+ Sync Mode: dict
+ channel_group_list_channels method returns a dict which contains list of channels
+ in payload field
+ {
+ u'status': 200,
+ u'payload': {"channels": ["hi"], "group": "abcd"},
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ Async Mode: None ( callback gets the response as parameter )
+
+ Response Format:
+
+ The callback passed to channel_group_list_channels gets the a dict containing list of channels
+ under payload field
+
+ {
+ u'payload': {"channels": ["hi"], "group": "abcd"}
+ }
+
+
+ """
+ return self._channel_group(channel_group=channel_group, callback=callback, error=error)
+
+ def channel_group_add_channel(self, channel_group, channel, callback=None, error=None):
+ """Add a channel to group.
+
+ A channel can be added to group using this method.
+
+
+ Args:
+ channel_group: (string)
+ Channel Group name. It can also contain namespace.
+ If namespace is also specified, then the parameter
+ will be in format namespace:channel_group
+ channel: (string)
+ Can be a channel name, a list of channel names,
+ or a comma separated list of channel names
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado.
+
+ error: (optional)
+ Optional variable. An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado.
+
+ Returns:
+ Sync Mode: dict
+ channel_group_add_channel method returns a dict indicating status of the request
+
+ {
+ u'status': 200,
+ u'message': 'OK',
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ Async Mode: None ( callback gets the response as parameter )
+
+ Response Format:
+
+ The callback passed to channel_group_add_channel gets the a dict indicating status of the request
+
+ {
+ u'status': 200,
+ u'message': 'OK',
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ """
+
+ return self._channel_group(channel_group=channel_group, channels=channel, mode='add', callback=callback, error=error)
+
+ def channel_group_remove_channel(self, channel_group, channel, callback=None, error=None):
+ """Remove channel.
+
+ A channel can be removed from a group method.
+
+
+ Args:
+ channel_group: (string)
+ Channel Group name. It can also contain namespace.
+ If namespace is also specified, then the parameter
+ will be in format namespace:channel_group
+ channel: (string)
+ Can be a channel name, a list of channel names,
+ or a comma separated list of channel names
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ error: (optional)
+ Optional variable. An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado .
+
+ Returns:
+ Sync Mode: dict
+ channel_group_remove_channel method returns a dict indicating status of the request
+
+ {
+ u'status': 200,
+ u'message': 'OK',
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ Async Mode: None ( callback gets the response as parameter )
+
+ Response Format:
+
+ The callback passed to channel_group_remove_channel gets the a dict indicating status of the request
+
+ {
+ u'status': 200,
+ u'message': 'OK',
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ """
+
+ return self._channel_group(channel_group=channel_group, channels=channel, mode='remove', callback=callback, error=error)
+
+ def channel_group_remove_group(self, channel_group, callback=None, error=None):
+ """Remove channel group.
+
+ A channel group can be removed using this method.
+
+
+ Args:
+ channel_group: (string)
+ Channel Group name. It can also contain namespace.
+ If namespace is also specified, then the parameter
+ will be in format namespace:channel_group
+ callback: (optional)
+ A callback method should be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado.
+
+ error: (optional)
+ Optional variable. An error method can be passed to the method.
+ If set, the api works in async mode.
+ Required argument when working with twisted or tornado.
+
+ Returns:
+ Sync Mode: dict
+ channel_group_remove_group method returns a dict indicating status of the request
+
+ {
+ u'status': 200,
+ u'message': 'OK',
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ Async Mode: None ( callback gets the response as parameter )
+
+ Response Format:
+
+ The callback passed to channel_group_remove_group gets the a dict indicating status of the request
+
+ {
+ u'status': 200,
+ u'message': 'OK',
+ u'service': u'channel-registry',
+ u'error': False
+ }
+
+ """
+
+ return self._channel_group(channel_group=channel_group, mode='remove', callback=callback, error=error)
+
+
+
+class EmptyLock():
+ def __enter__(self):
+ pass
+
+ def __exit__(self, a, b, c):
+ pass
+
+empty_lock = EmptyLock()
+
+
+class PubnubCoreAsync(PubnubBase):
+
+ def start(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def __init__(
+ self,
+ publish_key,
+ subscribe_key,
+ secret_key=None,
+ cipher_key=None,
+ auth_key=None,
+ ssl_on=False,
+ origin='pubsub.pubnub.com',
+ uuid=None,
+ _tt_lock=empty_lock,
+ _channel_list_lock=empty_lock,
+ _channel_group_list_lock=empty_lock
+ ):
+
+ super(PubnubCoreAsync, self).__init__(
+ publish_key=publish_key,
+ subscribe_key=subscribe_key,
+ secret_key=secret_key,
+ cipher_key=cipher_key,
+ auth_key=auth_key,
+ ssl_on=ssl_on,
+ origin=origin,
+ uuid=uuid
+ )
+
+ self.subscriptions = {}
+ self.subscription_groups = {}
+ self.timetoken = 0
+ self.last_timetoken = 0
+ self.accept_encoding = 'gzip'
+ self.SUB_RECEIVER = None
+ self._connect = None
+ self._tt_lock = _tt_lock
+ self._channel_list_lock = _channel_list_lock
+ self._channel_group_list_lock = _channel_group_list_lock
+ self._connect = lambda: None
+ self.u = None
+
+ def get_channel_list(self, channels):
+ channel = ''
+ first = True
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ return channel
+
+ def get_channel_group_list(self, channel_groups):
+ channel_group = ''
+ first = True
+ with self._channel_group_list_lock:
+ for ch in channel_groups:
+ if not channel_groups[ch]['subscribed']:
+ continue
+ if not first:
+ channel_group += ','
+ else:
+ first = False
+ channel_group += ch
+ return channel_group
+
+
+ def get_channel_array(self):
+ """Get List of currently subscribed channels
+
+ Returns:
+ Returns a list containing names of channels subscribed
+
+ Sample return value:
+ ["a","b","c]
+ """
+ channels = self.subscriptions
+ channel = []
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ channel.append(ch)
+ return channel
+
+ def get_channel_group_array(self):
+ """Get List of currently subscribed channel groups
+
+ Returns:
+ Returns a list containing names of channel groups subscribed
+
+ Sample return value:
+ ["a","b","c]
+ """
+ channel_groups = self.subscription_groups
+ channel_group = []
+ with self._channel_group_list_lock:
+ for ch in channel_groups:
+ if not channel_groups[ch]['subscribed']:
+ continue
+ channel_group.append(ch)
+ return channel_group
+
+ def each(l, func):
+ if func is None:
+ return
+ for i in l:
+ func(i)
+
+ def subscribe(self, channels, callback, state=None, error=None,
+ connect=None, disconnect=None, reconnect=None, sync=False):
+ """Subscribe to data on a channel.
+
+ This function causes the client to create an open TCP socket to the
+ PubNub Real-Time Network and begin listening for messages on a specified channel.
+ To subscribe to a channel the client must send the appropriate subscribe_key at
+ initialization.
+
+ Only works in async mode
+
+ Args:
+ channel: (string/list)
+ Specifies the channel to subscribe to. It is possible to specify
+ multiple channels as a comma separated list or andarray.
+
+ callback: (function)
+ This callback is called on receiving a message from the channel.
+
+ state: (dict)
+ State to be set.
+
+ error: (function) (optional)
+ This callback is called on an error event
+
+ connect: (function) (optional)
+ This callback is called on a successful connection to the PubNub cloud
+
+ disconnect: (function) (optional)
+ This callback is called on client disconnect from the PubNub cloud
+
+ reconnect: (function) (optional)
+ This callback is called on successfully re-connecting to the PubNub cloud
+
+ Returns:
+ None
+ """
+
+ return self._subscribe(channels=channels, callback=callback, state=state, error=error,
+ connect=connect, disconnect=disconnect, reconnect=reconnect)
+
+ def subscribe_group(self, channel_groups, callback, error=None,
+ connect=None, disconnect=None, reconnect=None, sync=False):
+ """Subscribe to data on a channel group.
+
+ This function causes the client to create an open TCP socket to the
+ PubNub Real-Time Network and begin listening for messages on a specified channel.
+ To subscribe to a channel group the client must send the appropriate subscribe_key at
+ initialization.
+
+ Only works in async mode
+
+ Args:
+ channel_groups: (string/list)
+ Specifies the channel groups to subscribe to. It is possible to specify
+ multiple channel groups as a comma separated list or andarray.
+
+ callback: (function)
+ This callback is called on receiving a message from the channel.
+
+ error: (function) (optional)
+ This callback is called on an error event
+
+ connect: (function) (optional)
+ This callback is called on a successful connection to the PubNub cloud
+
+ disconnect: (function) (optional)
+ This callback is called on client disconnect from the PubNub cloud
+
+ reconnect: (function) (optional)
+ This callback is called on successfully re-connecting to the PubNub cloud
+
+ Returns:
+ None
+ """
+
+ return self._subscribe(channel_groups=channel_groups, callback=callback, error=error,
+ connect=connect, disconnect=disconnect, reconnect=reconnect)
+
+ def _subscribe(self, channels=None, channel_groups=None, state=None, callback=None, error=None,
+ connect=None, disconnect=None, reconnect=None):
+
+ with self._tt_lock:
+ self.last_timetoken = self.timetoken if self.timetoken != 0 \
+ else self.last_timetoken
+ self.timetoken = 0
+
+ def _invoke(func, msg=None, channel=None, real_channel=None):
+ if func is not None:
+ if msg is not None and channel is not None and real_channel is not None:
+ try:
+ func(get_data_for_user(msg), channel, real_channel)
+ except:
+ func(get_data_for_user(msg), channel)
+ elif msg is not None and channel is not None:
+ func(get_data_for_user(msg), channel)
+ elif msg is not None:
+ func(get_data_for_user(msg))
+ else:
+ func()
+
+ def _invoke_connect():
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ chobj['disconnected'] = False
+ _invoke(chobj['connect'], chobj['name'])
+ else:
+ if chobj['disconnected'] is True:
+ chobj['disconnected'] = False
+ _invoke(chobj['reconnect'], chobj['name'])
+
+ if self._channel_group_list_lock:
+ with self._channel_group_list_lock:
+ for ch in self.subscription_groups:
+ chobj = self.subscription_groups[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ chobj['disconnected'] = False
+ _invoke(chobj['connect'], chobj['name'])
+ else:
+ if chobj['disconnected'] is True:
+ chobj['disconnected'] = False
+ _invoke(chobj['reconnect'], chobj['name'])
+
+
+ def _invoke_disconnect():
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is True:
+ if chobj['disconnected'] is False:
+ chobj['disconnected'] = True
+ _invoke(chobj['disconnect'], chobj['name'])
+ if self._channel_group_list_lock:
+ with self._channel_group_list_lock:
+ for ch in self.subscription_groups:
+ chobj = self.subscription_groups[ch]
+ if chobj['connected'] is True:
+ if chobj['disconnected'] is False:
+ chobj['disconnected'] = True
+ _invoke(chobj['disconnect'], chobj['name'])
+
+
+ def _invoke_error(channel_list=None, error=None):
+ if channel_list is None:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ try:
+ _invoke(chobj['error'], error, ch)
+ except TypeError:
+ _invoke(chobj['error'], error)
+ else:
+ for ch in channel_list:
+ chobj = self.subscriptions[ch]
+ try:
+ _invoke(chobj['error'], error, ch)
+ except TypeError:
+ _invoke(chobj['error'], error)
+
+ def _get_channel():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['subscribed'] is True:
+ return chobj
+
+ if channels is not None:
+ channels = channels if isinstance(
+ channels, list) else channels.split(",")
+ for channel in channels:
+ ## New Channel?
+ if len(channel) > 0 and \
+ (not channel in self.subscriptions or
+ self.subscriptions[channel]['subscribed'] is False):
+ with self._channel_list_lock:
+ self.subscriptions[channel] = {
+ 'name': channel,
+ 'first': False,
+ 'connected': False,
+ 'disconnected': True,
+ 'subscribed': True,
+ 'callback': callback,
+ 'connect': connect,
+ 'disconnect': disconnect,
+ 'reconnect': reconnect,
+ 'error': error
+ }
+ if state is not None:
+ if channel in self.STATE:
+ self.STATE[channel] = state[channel]
+ else:
+ self.STATE[channel] = state
+
+ if channel_groups is not None:
+ channel_groups = channel_groups if isinstance(
+ channel_groups, list) else channel_groups.split(",")
+
+ for channel_group in channel_groups:
+ ## New Channel?
+ if len(channel_group) > 0 and \
+ (not channel_group in self.subscription_groups or
+ self.subscription_groups[channel_group]['subscribed'] is False):
+ with self._channel_group_list_lock:
+ self.subscription_groups[channel_group] = {
+ 'name': channel_group,
+ 'first': False,
+ 'connected': False,
+ 'disconnected': True,
+ 'subscribed': True,
+ 'callback': callback,
+ 'connect': connect,
+ 'disconnect': disconnect,
+ 'reconnect': reconnect,
+ 'error': error
+ }
+
+ '''
+ ## return if already connected to channel
+ if channel in self.subscriptions and \
+ 'connected' in self.subscriptions[channel] and \
+ self.subscriptions[channel]['connected'] is True:
+ _invoke(error, "Already Connected")
+ return
+ '''
+ ## SUBSCRIPTION RECURSION
+ def _connect():
+
+ self._reset_offline()
+
+ def error_callback(response):
+ ## ERROR ?
+ if not response or \
+ ('message' in response and
+ response['message'] == 'Forbidden'):
+ _invoke_error(channel_list=response['payload'][
+ 'channels'], error=response['message'])
+ self.timeout(1, _connect)
+ return
+ if 'message' in response:
+ _invoke_error(error=response['message'])
+ else:
+ _invoke_disconnect()
+ self.timetoken = 0
+ self.timeout(1, _connect)
+
+ def sub_callback(response):
+ ## ERROR ?
+ if not response or \
+ ('message' in response and
+ response['message'] == 'Forbidden'):
+ _invoke_error(channel_list=response['payload'][
+ 'channels'], error=response['message'])
+ _connect()
+ return
+
+ _invoke_connect()
+
+ with self._tt_lock:
+ self.timetoken = \
+ self.last_timetoken if self.timetoken == 0 and \
+ self.last_timetoken != 0 else response[1]
+
+ if len(response) > 3:
+ channel_list = response[2].split(',')
+ channel_list_2 = response[3].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscription_groups or ch[1] in self.subscriptions:
+ try:
+ chobj = self.subscription_groups[ch[1]]
+ except KeyError:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],
+ self.decrypt(response_list[ch[0]]),
+ chobj['name'].split('-pnpres')[0], channel_list_2[ch[0]].split('-pnpres')[0])
+ elif len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],
+ self.decrypt(response_list[ch[0]]),
+ chobj['name'].split('-pnpres')[0])
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r),
+ chobj['name'].split('-pnpres')[0])
+
+ _connect()
+
+ channel_list = self.get_channel_list(self.subscriptions)
+ channel_group_list = self.get_channel_group_list(self.subscription_groups)
+
+ if len(channel_list) <= 0 and len(channel_group_list) <= 0:
+ return
+
+ if len(channel_list) <= 0:
+ channel_list = ','
+
+ data = {"uuid": self.uuid, "auth": self.auth_key,
+ 'pnsdk' : self.pnsdk, 'channel-group' : channel_group_list}
+
+
+ st = json.dumps(self.STATE)
+
+ if len(st) > 2:
+ data['state'] = quote(st,safe="")
+
+ ## CONNECT TO PUBNUB SUBSCRIBE SERVERS
+ #try:
+ self.SUB_RECEIVER = self._request({"urlcomponents": [
+ 'subscribe',
+ self.subscribe_key,
+ channel_list,
+ '0',
+ str(self.timetoken)
+ ], "urlparams": data},
+ sub_callback,
+ error_callback,
+ single=True, timeout=320)
+ '''
+ except Exception as e:
+ print(e)
+ self.timeout(1, _connect)
+ return
+ '''
+
+ self._connect = _connect
+
+ ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
+ _connect()
+
+ def _reset_offline(self):
+ if self.SUB_RECEIVER is not None:
+ self.SUB_RECEIVER()
+ self.SUB_RECEIVER = None
+
+ def CONNECT(self):
+ self._reset_offline()
+ self._connect()
+
+ def unsubscribe(self, channel):
+ """Unsubscribe from channel .
+ Only works in async mode
+
+ Args:
+ channel: Channel name ( string )
+ """
+ if channel in self.subscriptions is False:
+ return False
+
+ ## DISCONNECT
+ with self._channel_list_lock:
+ if channel in self.subscriptions:
+ self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
+ self.subscriptions[channel]['timetoken'] = 0
+ self.subscriptions[channel]['first'] = False
+ self.leave_channel(channel=channel)
+
+ # remove channel from STATE
+ self.STATE.pop(channel, None)
+
+ self.CONNECT()
+
+ def unsubscribe_group(self, channel_group):
+ """Unsubscribe from channel group.
+ Only works in async mode
+
+ Args:
+ channel_group: Channel group name ( string )
+ """
+ if channel_group in self.subscription_groups is False:
+ return False
+
+ ## DISCONNECT
+ with self._channel_group_list_lock:
+ if channel_group in self.subscription_groups:
+ self.subscription_groups[channel_group]['connected'] = 0
+ self.subscription_groups[channel_group]['subscribed'] = False
+ self.subscription_groups[channel_group]['timetoken'] = 0
+ self.subscription_groups[channel_group]['first'] = False
+ self.leave_group(channel_group=channel_group)
+ self.CONNECT()
+
+
+class PubnubCore(PubnubCoreAsync):
+ def __init__(
+ self,
+ publish_key,
+ subscribe_key,
+ secret_key=None,
+ cipher_key=None,
+ auth_key=None,
+ ssl_on=False,
+ origin='pubsub.pubnub.com',
+ uuid=None,
+ _tt_lock=None,
+ _channel_list_lock=None,
+ _channel_group_list_lock=None
+
+ ):
+ super(PubnubCore, self).__init__(
+ publish_key=publish_key,
+ subscribe_key=subscribe_key,
+ secret_key=secret_key,
+ cipher_key=cipher_key,
+ auth_key=auth_key,
+ ssl_on=ssl_on,
+ origin=origin,
+ uuid=uuid,
+ _tt_lock=_tt_lock,
+ _channel_list_lock=_channel_list_lock,
+ _channel_group_list_lock=_channel_group_list_lock
+ )
+
+ self.subscriptions = {}
+ self.timetoken = 0
+ self.accept_encoding = 'gzip'
+
+class HTTPClient:
+ def __init__(self, pubnub, url, urllib_func=None,
+ callback=None, error=None, id=None, timeout=5):
+ self.url = url
+ self.id = id
+ self.callback = callback
+ self.error = error
+ self.stop = False
+ self._urllib_func = urllib_func
+ self.timeout = timeout
+ self.pubnub = pubnub
+
+ def cancel(self):
+ self.stop = True
+ self.callback = None
+ self.error = None
+
+ def run(self):
+
+ def _invoke(func, data):
+ if func is not None:
+ func(get_data_for_user(data))
+
+ if self._urllib_func is None:
+ return
+
+ resp = self._urllib_func(self.url, timeout=self.timeout)
+ data = resp[0]
+ code = resp[1]
+
+ if self.stop is True:
+ return
+ if self.callback is None:
+ with self.pubnub.latest_sub_callback_lock:
+ if self.pubnub.latest_sub_callback['id'] != self.id:
+ return
+ else:
+ if self.pubnub.latest_sub_callback['callback'] is not None:
+ self.pubnub.latest_sub_callback['id'] = 0
+ try:
+ data = json.loads(data)
+ except ValueError:
+ _invoke(self.pubnub.latest_sub_callback['error'],
+ {'error': 'json decoding error'})
+ return
+ if code != 200:
+ _invoke(self.pubnub.latest_sub_callback['error'], data)
+ else:
+ _invoke(self.pubnub.latest_sub_callback['callback'], data)
+ else:
+ try:
+ data = json.loads(data)
+ except ValueError:
+ _invoke(self.error, {'error': 'json decoding error'})
+ return
+
+ if code != 200:
+ _invoke(self.error, data)
+ else:
+ _invoke(self.callback, data)
+
+
+def _urllib_request_2(url, timeout=5):
+ try:
+ resp = urllib2.urlopen(url, timeout=timeout)
+ except urllib2.HTTPError as http_error:
+ resp = http_error
+ except urllib2.URLError as error:
+ msg = {"message": str(error.reason)}
+ return (json.dumps(msg), 0)
+
+ return (resp.read(), resp.code)
+
+class PubnubHTTPAdapter(HTTPAdapter):
+ def init_poolmanager(self, *args, **kwargs):
+ kwargs.setdefault('socket_options', default_socket_options)
+
+ super(PubnubHTTPAdapter, self).init_poolmanager(*args, **kwargs)
+
+s = requests.Session()
+#s.mount('http://', PubnubHTTPAdapter(max_retries=1))
+#s.mount('https://', PubnubHTTPAdapter(max_retries=1))
+#s.mount('http://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
+#s.mount('https://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
+
+
+def _requests_request(url, timeout=5):
+ try:
+ resp = s.get(url, timeout=timeout)
+ except requests.exceptions.HTTPError as http_error:
+ resp = http_error
+ except requests.exceptions.ConnectionError as error:
+ msg = str(error)
+ return (json.dumps(msg), 0)
+ except requests.exceptions.Timeout as error:
+ msg = str(error)
+ return (json.dumps(msg), 0)
+ #print (resp.text)
+ #print (resp.status_code)
+ return (resp.text, resp.status_code)
+
+
+def _urllib_request_3(url, timeout=5):
+ try:
+ resp = urllib.request.urlopen(url, timeout=timeout)
+ except (urllib.request.HTTPError, urllib.request.URLError) as http_error:
+ resp = http_error
+ r = resp.read().decode("utf-8")
+ return (r, resp.code)
+
+_urllib_request = None
+
+
+# Pubnub
+
+class Pubnub(PubnubCore):
+ def __init__(
+ self,
+ publish_key,
+ subscribe_key,
+ secret_key=None,
+ cipher_key=None,
+ auth_key=None,
+ ssl_on=False,
+ origin='pubsub.pubnub.com',
+ uuid=None,
+ pooling=True,
+ daemon=False,
+ pres_uuid=None,
+ azure=False
+ ):
+ super(Pubnub, self).__init__(
+ publish_key=publish_key,
+ subscribe_key=subscribe_key,
+ secret_key=secret_key,
+ cipher_key=cipher_key,
+ auth_key=auth_key,
+ ssl_on=ssl_on,
+ origin=origin,
+ uuid=uuid or pres_uuid,
+ _tt_lock=threading.RLock(),
+ _channel_list_lock=threading.RLock(),
+ _channel_group_list_lock=threading.RLock()
+ )
+ global _urllib_request
+ if self.python_version == 2:
+ _urllib_request = _urllib_request_2
+ else:
+ _urllib_request = _urllib_request_3
+
+ if pooling is True:
+ _urllib_request = _requests_request
+
+ self.latest_sub_callback_lock = threading.RLock()
+ self.latest_sub_callback = {'id': None, 'callback': None}
+ self.pnsdk = 'PubNub-Python' + '/' + self.version
+ self.daemon = daemon
+
+ if azure is False:
+ s.mount('http://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
+ s.mount('https://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
+ else:
+ s.mount('http://', PubnubHTTPAdapter(max_retries=1))
+ s.mount('https://', PubnubHTTPAdapter(max_retries=1))
+
+ def timeout(self, interval, func):
+ def cb():
+ time.sleep(interval)
+ func()
+ thread = threading.Thread(target=cb)
+ thread.daemon = self.daemon
+ thread.start()
+
+ def _request_async(self, request, callback=None, error=None, single=False, timeout=5):
+ global _urllib_request
+ ## Build URL
+ url = self.getUrl(request)
+ if single is True:
+ id = time.time()
+ client = HTTPClient(self, url=url, urllib_func=_urllib_request,
+ callback=None, error=None, id=id, timeout=timeout)
+ with self.latest_sub_callback_lock:
+ self.latest_sub_callback['id'] = id
+ self.latest_sub_callback['callback'] = callback
+ self.latest_sub_callback['error'] = error
+ else:
+ client = HTTPClient(self, url=url, urllib_func=_urllib_request,
+ callback=callback, error=error, timeout=timeout)
+
+ thread = threading.Thread(target=client.run)
+ thread.daemon = self.daemon
+ thread.start()
+
+ def abort():
+ client.cancel()
+ return abort
+
+ def _request_sync(self, request, timeout=5):
+ global _urllib_request
+ ## Build URL
+ url = self.getUrl(request)
+ ## Send Request Expecting JSONP Response
+ response = _urllib_request(url, timeout=timeout)
+ try:
+ resp_json = json.loads(response[0])
+ except ValueError:
+ return [0, "JSON Error"]
+
+ if response[1] != 200 and 'message' in resp_json and 'payload' in resp_json:
+ return {'message': resp_json['message'],
+ 'payload': resp_json['payload']}
+
+ if response[1] == 0:
+ return [0, resp_json]
+
+ return resp_json
+
+ def _request(self, request, callback=None, error=None, single=False, timeout=5):
+ if callback is None:
+ return get_data_for_user(self._request_sync(request, timeout=timeout))
+ else:
+ return self._request_async(request, callback, error, single=single, timeout=timeout)
+
+# Pubnub Twisted
+
+class PubnubTwisted(PubnubCoreAsync):
+
+ def start(self):
+ reactor.run()
+
+ def stop(self):
+ reactor.stop()
+
+ def timeout(self, delay, callback):
+ reactor.callLater(delay, callback)
+
+ def __init__(
+ self,
+ publish_key,
+ subscribe_key,
+ secret_key=None,
+ cipher_key=None,
+ auth_key=None,
+ ssl_on=False,
+ origin='pubsub.pubnub.com'
+ ):
+ super(PubnubTwisted, self).__init__(
+ publish_key=publish_key,
+ subscribe_key=subscribe_key,
+ secret_key=secret_key,
+ cipher_key=cipher_key,
+ auth_key=auth_key,
+ ssl_on=ssl_on,
+ origin=origin,
+ )
+ self.headers = {}
+ self.headers['User-Agent'] = ['Python-Twisted']
+ self.headers['V'] = [self.version]
+ self.pnsdk = 'PubNub-Python-' + 'Twisted' + '/' + self.version
+
+ def _request(self, request, callback=None, error=None, single=False, timeout=5):
+ global pnconn_pool
+
+ def _invoke(func, data):
+ if func is not None:
+ func(get_data_for_user(data))
+
+ ## Build URL
+
+ url = self.getUrl(request)
+
+ agent = ContentDecoderAgent(RedirectAgent(Agent(
+ reactor,
+ contextFactory=WebClientContextFactory(),
+ pool=self.ssl and None or pnconn_pool
+ )), [('gzip', GzipDecoder)])
+
+ try:
+ request = agent.request(
+ 'GET', url, Headers(self.headers), None)
+ except TypeError:
+ request = agent.request(
+ 'GET', url.encode(), Headers(self.headers), None)
+
+ if single is True:
+ id = time.time()
+ self.id = id
+
+ def received(response):
+ if not isinstance(response, twisted.web._newclient.Response):
+ _invoke(error, {"message": "Not Found"})
+ return
+
+ finished = Deferred()
+ if response.code in [401, 403]:
+ response.deliverBody(PubNubPamResponse(finished))
+ else:
+ response.deliverBody(PubNubResponse(finished))
+
+ return finished
+
+ def complete(data):
+ if single is True:
+ if id != self.id:
+ return None
+ try:
+ data = json.loads(data)
+ except ValueError:
+ try:
+ data = json.loads(data.decode("utf-8"))
+ except ValueError:
+ _invoke(error, {'error': 'json decode error'})
+
+ if 'error' in data and 'status' in data and 'status' != 200:
+ _invoke(error, data)
+ else:
+ _invoke(callback, data)
+
+ def abort():
+ pass
+
+ request.addCallback(received)
+ request.addCallback(complete)
+
+ return abort
+
+
+# PubnubTornado
+class PubnubTornado(PubnubCoreAsync):
+
+ def stop(self):
+ ioloop.stop()
+
+ def start(self):
+ ioloop.start()
+
+ def timeout(self, delay, callback):
+ ioloop.add_timeout(time.time() + float(delay), callback)
+
+ def __init__(
+ self,
+ publish_key,
+ subscribe_key,
+ secret_key=False,
+ cipher_key=False,
+ auth_key=False,
+ ssl_on=False,
+ origin='pubsub.pubnub.com'
+ ):
+ super(PubnubTornado, self).__init__(
+ publish_key=publish_key,
+ subscribe_key=subscribe_key,
+ secret_key=secret_key,
+ cipher_key=cipher_key,
+ auth_key=auth_key,
+ ssl_on=ssl_on,
+ origin=origin,
+ )
+ self.headers = {}
+ self.headers['User-Agent'] = 'Python-Tornado'
+ self.headers['Accept-Encoding'] = self.accept_encoding
+ self.headers['V'] = self.version
+ self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
+ self.id = None
+ self.pnsdk = 'PubNub-Python-' + 'Tornado' + '/' + self.version
+
+ def _request(self, request, callback=None, error=None,
+ single=False, timeout=5, connect_timeout=5):
+
+ def _invoke(func, data):
+ if func is not None:
+ func(get_data_for_user(data))
+
+ url = self.getUrl(request)
+ request = tornado.httpclient.HTTPRequest(
+ url, 'GET',
+ self.headers,
+ connect_timeout=connect_timeout,
+ request_timeout=timeout)
+ if single is True:
+ id = time.time()
+ self.id = id
+
+ def responseCallback(response):
+ if single is True:
+ if not id == self.id:
+ return None
+
+ body = response._get_body()
+
+ if body is None:
+ return
+
+ def handle_exc(*args):
+ return True
+ if response.error is not None:
+ with ExceptionStackContext(handle_exc):
+ if response.code in [403, 401]:
+ response.rethrow()
+ else:
+ _invoke(error, {"message": response.reason})
+ return
+
+ try:
+ data = json.loads(body)
+ except TypeError:
+ try:
+ data = json.loads(body.decode("utf-8"))
+ except ValueError:
+ _invoke(error, {'error': 'json decode error'})
+
+ if 'error' in data and 'status' in data and 'status' != 200:
+ _invoke(error, data)
+ else:
+ _invoke(callback, data)
+
+ self.http.fetch(
+ request=request,
+ callback=responseCallback
+ )
+
+ def abort():
+ pass
+
+ return abort