diff options
| author | Devendra | 2015-06-16 10:50:42 +0530 | 
|---|---|---|
| committer | Devendra | 2015-06-16 10:50:42 +0530 | 
| commit | f9cffc2bb1acbef7595bc523095c21fef1530dc4 (patch) | |
| tree | 55d20cea2ea164dcea63f6ce1d72a9acd1b5afd3 /Pubnub.py | |
| parent | 4cc9cfe008ea55237f8343f8b132bd25a1bb2d47 (diff) | |
| download | pubnub-python-f9cffc2bb1acbef7595bc523095c21fef1530dc4.tar.bz2 | |
changed Pubnub module to pubnub
Diffstat (limited to 'Pubnub.py')
| -rw-r--r-- | Pubnub.py | 2376 | 
1 files changed, 0 insertions, 2376 deletions
| diff --git a/Pubnub.py b/Pubnub.py deleted file mode 100644 index 37530f5..0000000 --- a/Pubnub.py +++ /dev/null @@ -1,2376 +0,0 @@ - -## www.pubnub.com - PubNub Real-time push service in the cloud. -# coding=utf8 - -## PubNub Real-time Push APIs and Notifications Framework -## Copyright (c) 2014-15 Stephen Blum -## http://www.pubnub.com/ - -## ----------------------------------- -## PubNub 3.7.1 Real-time Push Cloud API -## ----------------------------------- - - -try: -    import json -except ImportError: -    import simplejson as json - -import time -import hashlib -import uuid as uuid_lib -import random -import sys -from base64 import urlsafe_b64encode -from base64 import encodestring, decodestring -import hmac -from Crypto.Cipher import AES - -try: -    from hashlib import sha256 -    digestmod = sha256 -except ImportError: -    import Crypto.Hash.SHA256 as digestmod -    sha256 = digestmod.new - - -##### vanilla python imports ##### -try: -    from urllib.parse import quote -except ImportError: -    from urllib2 import quote -try: -    import urllib.request -except ImportError: -    import urllib2 - -try: -    import requests -    from requests.adapters import HTTPAdapter -except ImportError: -    pass - -#import urllib -import socket -import threading - -try: -    import urllib3.HTTPConnection -    default_socket_options = urllib3.HTTPConnection.default_socket_options -except: -    default_socket_options = [] - -default_socket_options += [ -    # Enable TCP keepalive -    (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) -] - -if sys.platform.startswith("linux"): -    default_socket_options += [ -        # Send first keepalive packet 200 seconds after last data packet -        (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 200), -        # Resend keepalive packets every second, when unanswered -        (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1), -        # Close the socket after 5 unanswered keepalive packets -        (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) -    ] -elif sys.platform.startswith("darwin"): -    # From /usr/include/netinet/tcp.h - -    # idle time used when SO_KEEPALIVE is enabled -    socket.TCP_KEEPALIVE = socket.TCP_KEEPALIVE \ -        if hasattr(socket, 'TCP_KEEPALIVE') \ -        else 0x10 - -    # interval between keepalives -    socket.TCP_KEEPINTVL = socket.TCP_KEEPINTVL \ -        if hasattr(socket, 'TCP_KEEPINTVL') \ -        else 0x101 - -    # number of keepalives before close -    socket.TCP_KEEPCNT = socket.TCP_KEEPCNT \ -        if hasattr(socket, 'TCP_KEEPCNT') \ -        else 0x102 - -    default_socket_options += [ -        # Send first keepalive packet 200 seconds after last data packet -        (socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, 200), -        # Resend keepalive packets every second, when unanswered -        (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1), -        # Close the socket after 5 unanswered keepalive packets -        (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) -    ] -""" -# The Windows code is currently untested -elif sys.platform.startswith("win"): -    import struct -    from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool - -    def patch_socket_keepalive(conn): -        conn.sock.ioctl(socket.SIO_KEEPALIVE_VALS, ( -            # Enable TCP keepalive -            1, -            # Send first keepalive packet 200 seconds after last data packet -            200, -            # Resend keepalive packets every second, when unanswered -            1 -        )) - -    class PubnubHTTPConnectionPool(HTTPConnectionPool): -        def _validate_conn(self, conn): -            super(PubnubHTTPConnectionPool, self)._validate_conn(conn) - -    class PubnubHTTPSConnectionPool(HTTPSConnectionPool): -        def _validate_conn(self, conn): -            super(PubnubHTTPSConnectionPool, self)._validate_conn(conn) - -    import urllib3.poolmanager -    urllib3.poolmanager.pool_classes_by_scheme = { -        'http'  : PubnubHTTPConnectionPool, -        'https' : PubnubHTTPSConnectionPool -    } -""" - -################################## - - -##### Tornado imports and globals ##### -try: -    import tornado.httpclient -    import tornado.ioloop -    from tornado.stack_context import ExceptionStackContext -    ioloop = tornado.ioloop.IOLoop.instance() -except ImportError: -    pass - -####################################### - - -##### Twisted imports and globals ##### -try: -    from twisted.internet import reactor -    from twisted.internet.defer import Deferred -    from twisted.internet.protocol import Protocol -    from twisted.web.client import Agent, ContentDecoderAgent -    from twisted.web.client import RedirectAgent, GzipDecoder -    from twisted.web.client import HTTPConnectionPool -    from twisted.web.http_headers import Headers -    from twisted.internet.ssl import ClientContextFactory -    import twisted - - -    pnconn_pool = HTTPConnectionPool(reactor, persistent=True) -    pnconn_pool.maxPersistentPerHost = 100000 -    pnconn_pool.cachedConnectionTimeout = 15 -    pnconn_pool.retryAutomatically = True - -    class WebClientContextFactory(ClientContextFactory): -        def getContext(self, hostname, port): -            return ClientContextFactory.getContext(self) - -    class PubNubPamResponse(Protocol): -        def __init__(self, finished): -            self.finished = finished - -        def dataReceived(self, bytes): -            self.finished.callback(bytes) - -    class PubNubResponse(Protocol): -        def __init__(self, finished): -            self.finished = finished - -        def dataReceived(self, bytes): -            self.finished.callback(bytes) -except ImportError: -    pass - - -####################################### - - -def get_data_for_user(data): -    try: -        if 'message' in data and 'payload' in data: -            return {'message': data['message'], 'payload': data['payload']} -        else: -            return data -    except TypeError: -        return data - - -class PubnubCrypto2(): - -    def pad(self, msg, block_size=16): - -        padding = block_size - (len(msg) % block_size) -        return msg + chr(padding) * padding - -    def depad(self, msg): - -        return msg[0:-ord(msg[-1])] - -    def getSecret(self, key): - -        return hashlib.sha256(key).hexdigest() - -    def encrypt(self, key, msg): -        secret = self.getSecret(key) -        Initial16bytes = '0123456789012345' -        cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) -        enc = encodestring(cipher.encrypt(self.pad(msg))) -        return enc - -    def decrypt(self, key, msg): - -        try: -            secret = self.getSecret(key) -            Initial16bytes = '0123456789012345' -            cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) -            plain = self.depad(cipher.decrypt(decodestring(msg))) -        except: -            return msg -        try: -            return eval(plain) -        except SyntaxError: -            return plain - -class PubnubCrypto3(): - -    def pad(self, msg, block_size=16): - -        padding = block_size - (len(msg) % block_size) -        return msg + (chr(padding) * padding).encode('utf-8') - -    def depad(self, msg): - -        return msg[0:-ord(msg[-1])] - -    def getSecret(self, key): - -        return hashlib.sha256(key.encode("utf-8")).hexdigest() - -    def encrypt(self, key, msg): - -        secret = self.getSecret(key) -        Initial16bytes = '0123456789012345' -        cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) -        return encodestring( -            cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8') - -    def decrypt(self, key, msg): - -        secret = self.getSecret(key) -        Initial16bytes = '0123456789012345' -        cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes) -        return (cipher.decrypt( -            decodestring(msg.encode('utf-8')))).decode('utf-8') - - -class PubnubBase(object): -    def __init__( -        self, -        publish_key, -        subscribe_key, -        secret_key=False, -        cipher_key=False, -        auth_key=None, -        ssl_on=False, -        origin='pubsub.pubnub.com', -        uuid=None -    ): -        """Pubnub Class - -        Provides methods to communicate with Pubnub cloud - -        Attributes: -            publish_key: Publish Key -            subscribe_key: Subscribe Key -            secret_key: Secret Key -            cipher_key: Cipher Key -            auth_key: Auth Key (used with Pubnub Access Manager i.e. PAM) -            ssl: SSL enabled ?  -            origin: Origin -        """ - -        self.origin = origin -        self.version = '3.7.1' -        self.limit = 1800 -        self.publish_key = publish_key -        self.subscribe_key = subscribe_key -        self.secret_key = secret_key -        self.cipher_key = cipher_key -        self.ssl = ssl_on -        self.auth_key = auth_key -        self.STATE = {} - -        if self.ssl: -            self.origin = 'https://' + self.origin -        else: -            self.origin = 'http://' + self.origin - -        self.uuid = uuid or str(uuid_lib.uuid4()) - -        if type(sys.version_info) is tuple: -            self.python_version = 2 -            self.pc = PubnubCrypto2() -        else: -            if sys.version_info.major == 2: -                self.python_version = 2 -                self.pc = PubnubCrypto2() -            else: -                self.python_version = 3 -                self.pc = PubnubCrypto3() - -        if not isinstance(self.uuid, str): -            raise AttributeError("uuid must be a string") - -    def _pam_sign(self, msg): - -        sign =  urlsafe_b64encode(hmac.new( -            self.secret_key.encode("utf-8"), -            msg.encode("utf-8"), -            sha256 -        ).digest()) -        return quote(sign, safe="") - -    def set_u(self, u=False): -        self.u = u - -    def _pam_auth(self, query, apicode=0, callback=None, error=None): - -        if 'timestamp' not in query: -            query['timestamp'] = int(time.time()) - -        ## Global Grant? -        if 'auth' in query and not query['auth']: -            del query['auth'] - -        if 'channel' in query and not query['channel']: -            del query['channel'] - -        if 'channel-group' in query and not query['channel-group']: -            del query['channel-group'] - - -        params = "&".join([ -            x + "=" + quote( -                str(query[x]), safe="" -            ) for x in sorted(query) -        ]) -        sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format( -            subkey=self.subscribe_key, -            pubkey=self.publish_key, -            apitype="audit" if (apicode) else "grant", -            params=params -        ) -        query['signature'] = self._pam_sign(sign_input) - -        return self._request({"urlcomponents": [ -            'v1', 'auth', "audit" if (apicode) else "grant", -            'sub-key', -            self.subscribe_key -        ], 'urlparams': query}, -            self._return_wrapped_callback(callback), -            self._return_wrapped_callback(error)) - -    def get_origin(self): -        return self.origin - -    def set_auth_key(self, auth_key): -        self.auth_key = auth_key - -    def get_auth_key(self): -        return self.auth_key - -    def grant(self, channel=None, channel_group=None, auth_key=False, read=False, -              write=False, manage=False, ttl=5, callback=None, error=None): -        """Method for granting permissions. - -        This function establishes subscribe and/or write permissions for -        PubNub Access Manager (PAM) by setting the read or write attribute -        to true. A grant with read or write set to false (or not included) -        will revoke any previous grants with read or write set to true. - -        Permissions can be applied to any one of three levels: -            1. Application level privileges are based on subscribe_key applying to all associated channels. -            2. Channel level privileges are based on a combination of subscribe_key and channel name. -            3. User level privileges are based on the combination of subscribe_key, channel and auth_key. - -        Args: -            channel:    (string) (optional) -                        Specifies channel name to grant permissions to. -                        If channel/channel_group is not specified, the grant applies to all -                        channels associated with the subscribe_key. If auth_key -                        is not specified, it is possible to grant permissions to -                        multiple channels simultaneously by specifying the channels -                        as a comma separated list. -            channel_group:    (string) (optional) -                        Specifies channel group name to grant permissions to. -                        If channel/channel_group is not specified, the grant applies to all -                        channels associated with the subscribe_key. If auth_key -                        is not specified, it is possible to grant permissions to -                        multiple channel groups simultaneously by specifying the channel groups -                        as a comma separated list. - -            auth_key:   (string) (optional)  -                        Specifies auth_key to grant permissions to. -                        It is possible to specify multiple auth_keys as comma -                        separated list in combination with a single channel name. -                        If auth_key is provided as the special-case value "null"  -                        (or included in a comma-separated list, eg. "null,null,abc"),  -                        a new auth_key will be generated and returned for each "null" value. - -            read:       (boolean) (default: True) -                        Read permissions are granted by setting to True. -                        Read permissions are removed by setting to False. - -            write:      (boolean) (default: True) -                        Write permissions are granted by setting to true. -                        Write permissions are removed by setting to false. -            manage:      (boolean) (default: True) -                        Manage permissions are granted by setting to true. -                        Manage permissions are removed by setting to false. - -            ttl:        (int) (default: 1440 i.e 24 hrs) -                        Time in minutes for which granted permissions are valid. -                        Max is 525600 , Min is 1. -                        Setting ttl to 0 will apply the grant indefinitely. - -            callback:   (function) (optional) -                        A callback method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado  - -            error:      (function) (optional) -                        An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -        Returns: -            Returns a dict in sync mode i.e. when callback argument is not given -            The dict returned contains values with keys 'message' and 'payload' - -            Sample Response: -            { -                "message":"Success", -                "payload":{ -                    "ttl":5, -                    "auths":{ -                        "my_ro_authkey":{"r":1,"w":0} -                    }, -                    "subscribe_key":"my_subkey", -                    "level":"user", -                    "channel":"my_channel" -                } -            } -        """ - -        return self._pam_auth({ -            'channel'   : channel, -            'channel-group'   : channel_group, -            'auth'      : auth_key, -            'r'         : read and 1 or 0, -            'w'         : write and 1 or 0, -            'm'         : manage and 1 or 0, -            'ttl'       : ttl, -            'pnsdk'     : self.pnsdk -        }, callback=callback, error=error) - -    def revoke(self, channel=None, channel_group=None, auth_key=None, ttl=1, callback=None, error=None): -        """Method for revoking permissions. - -        Args: -            channel:    (string) (optional) -                        Specifies channel name to revoke permissions to. -                        If channel/channel_group is not specified, the revoke applies to all -                        channels associated with the subscribe_key. If auth_key -                        is not specified, it is possible to grant permissions to -                        multiple channels simultaneously by specifying the channels -                        as a comma separated list. - -            channel_group:    (string) (optional) -                        Specifies channel group name to revoke permissions to. -                        If channel/channel_group is not specified, the grant applies to all -                        channels associated with the subscribe_key. If auth_key -                        is not specified, it is possible to revoke permissions to -                        multiple channel groups simultaneously by specifying the channel groups -                        as a comma separated list. - -            auth_key:   (string) (optional)  -                        Specifies auth_key to revoke permissions to. -                        It is possible to specify multiple auth_keys as comma -                        separated list in combination with a single channel name. -                        If auth_key is provided as the special-case value "null"  -                        (or included in a comma-separated list, eg. "null,null,abc"),  -                        a new auth_key will be generated and returned for each "null" value. - -            ttl:        (int) (default: 1440 i.e 24 hrs) -                        Time in minutes for which granted permissions are valid. -                        Max is 525600 , Min is 1. -                        Setting ttl to 0 will apply the grant indefinitely. - -            callback:   (function) (optional) -                        A callback method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado  - -            error:      (function) (optional) -                        An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -        Returns: -            Returns a dict in sync mode i.e. when callback argument is not given -            The dict returned contains values with keys 'message' and 'payload' - -            Sample Response: -            { -                "message":"Success", -                "payload":{ -                    "ttl":5, -                    "auths":{ -                        "my_authkey":{"r":0,"w":0} -                    }, -                    "subscribe_key":"my_subkey", -                    "level":"user", -                    "channel":"my_channel" -                } -            } - -        """ - -        return self._pam_auth({ -            'channel'   : channel, -            'channel-group' : channel_group, -            'auth'      : auth_key, -            'r'         : 0, -            'w'         : 0, -            'ttl'       : ttl, -            'pnsdk'     : self.pnsdk -        }, callback=callback, error=error) - -    def audit(self, channel=None, channel_group=None, auth_key=None, callback=None, error=None): -        """Method for fetching permissions from pubnub servers. - -        This method provides a mechanism to reveal existing PubNub Access Manager attributes -        for any combination of subscribe_key, channel and auth_key. - -        Args: -            channel:    (string) (optional) -                        Specifies channel name to return PAM  -                        attributes optionally in combination with auth_key. -                        If channel/channel_group is not specified, results for all channels -                        associated with subscribe_key are returned. -                        If auth_key is not specified, it is possible to return -                        results for a comma separated list of channels. -            channel_group:    (string) (optional) -                        Specifies channel group name to return PAM  -                        attributes optionally in combination with auth_key. -                        If channel/channel_group is not specified, results for all channels -                        associated with subscribe_key are returned. -                        If auth_key is not specified, it is possible to return -                        results for a comma separated list of channels. - -            auth_key:   (string) (optional)  -                        Specifies the auth_key to return PAM attributes for. -                        If only a single channel is specified, it is possible to return -                        results for a comma separated list of auth_keys. - -            callback:   (function) (optional)  -                        A callback method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado  - -            error:      (function) (optional) -                        An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -        Returns: -            Returns a dict in sync mode i.e. when callback argument is not given -            The dict returned contains values with keys 'message' and 'payload' - -            Sample Response -            { -                "message":"Success", -                "payload":{ -                    "channels":{ -                        "my_channel":{ -                            "auths":{"my_ro_authkey":{"r":1,"w":0}, -                            "my_rw_authkey":{"r":0,"w":1}, -                            "my_admin_authkey":{"r":1,"w":1} -                        } -                    } -                }, -            } - -        Usage: - -             pubnub.audit ('my_channel');  # Sync Mode  - -        """ - -        return self._pam_auth({ -            'channel'   : channel, -            'channel-group' : channel_group, -            'auth'      : auth_key, -            'pnsdk'     : self.pnsdk -        }, 1, callback=callback, error=error) - -    def encrypt(self, message): -        """Method for encrypting data. - -        This method takes plaintext as input and returns encrypted data. -        This need not be called directly as enncryption/decryption is -        taken care of transparently by Pubnub class if cipher key is  -        provided at time of initializing pubnub object - -        Args: -            message: Message to be encrypted. - -        Returns: -            Returns encrypted message if cipher key is set -        """ -        if self.cipher_key: -            message = json.dumps(self.pc.encrypt( -                self.cipher_key, json.dumps(message)).replace('\n', '')) -        else: -            message = json.dumps(message) - -        return message - -    def decrypt(self, message): -        """Method for decrypting data. - -        This method takes ciphertext as input and returns decrypted data. -        This need not be called directly as enncryption/decryption is -        taken care of transparently by Pubnub class if cipher key is  -        provided at time of initializing pubnub object - -        Args: -            message: Message to be decrypted. - -        Returns: -            Returns decrypted message if cipher key is set -        """ -        if self.cipher_key: -            message = self.pc.decrypt(self.cipher_key, message) - -        return message - -    def _return_wrapped_callback(self, callback=None): -        def _new_format_callback(response): -            if 'payload' in response: -                if (callback is not None): -                    callback_data = dict() -                    callback_data['payload'] = response['payload'] - -                    if 'message' in response: -                        callback_data['message'] = response['message'] -                         -                    callback(callback_data) -            else: -                if (callback is not None): -                    callback(response) -        if (callback is not None): -            return _new_format_callback -        else: -            return None - -    def leave_channel(self, channel, callback=None, error=None): -        ## Send leave -        return self._request({"urlcomponents": [ -            'v2', 'presence', -            'sub_key', -            self.subscribe_key, -            'channel', -            channel, -            'leave' -        ], 'urlparams': {'auth': self.auth_key, 'pnsdk' : self.pnsdk, "uuid": self.uuid,}}, -            callback=self._return_wrapped_callback(callback), -            error=self._return_wrapped_callback(error)) - -    def leave_group(self, channel_group, callback=None, error=None): -        ## Send leave -        return self._request({"urlcomponents": [ -            'v2', 'presence', -            'sub_key', -            self.subscribe_key, -            'channel', -            ',', -            'leave' -        ], 'urlparams': {'auth': self.auth_key, 'pnsdk' : self.pnsdk, 'channel-group' : channel_group, "uuid": self.uuid,}}, -            callback=self._return_wrapped_callback(callback), -            error=self._return_wrapped_callback(error)) - - -    def publish(self, channel, message, callback=None, error=None): -        """Publishes data on a channel. - -        The publish() method is used to send a message to all subscribers of a channel. -        To publish a message you must first specify a valid publish_key at initialization. -        A successfully published message is replicated across the PubNub Real-Time Network -        and sent simultaneously to all subscribed clients on a channel. -            Messages in transit can be secured from potential eavesdroppers with SSL/TLS by -        setting ssl to True during initialization. - -        Published messages can also be encrypted with AES-256 simply by specifying a cipher_key -        during initialization. - -        Args: -            channel:    (string) -                        Specifies channel name to publish messages to. -            message:    (string/int/double/dict/list) -                        Message to be published -            callback:   (optional) -                        A callback method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado -            error:      (optional) -                        An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado - -        Returns: -            Sync Mode  : list -            Async Mode : None - -            The function returns the following formatted response: - -                [ Number, "Status", "Time Token"] -             -            The output below demonstrates the response to a successful call: - -                [1,"Sent","13769558699541401"] - -        """ - -        message = self.encrypt(message) - -        ## Send Message -        return self._request({"urlcomponents": [ -            'publish', -            self.publish_key, -            self.subscribe_key, -            '0', -            channel, -            '0', -            message -        ], 'urlparams': {'auth': self.auth_key, 'pnsdk' : self.pnsdk}}, -            callback=self._return_wrapped_callback(callback), -            error=self._return_wrapped_callback(error)) - -    def presence(self, channel, callback, error=None, connect=None, disconnect=None, reconnect=None): -        """Subscribe to presence events on a channel. -            -           Only works in async mode - -        Args: -            channel: Channel name ( string ) on which to listen for events -            callback: A callback method should be passed as parameter. -                      If passed, the api works in async mode.  -                      Required argument when working with twisted or tornado . -            error: Optional variable. An error method can be passed as parameter. -                      If set, the api works in async mode.  - -        Returns: -            None -        """ -        return self.subscribe(channel+'-pnpres', callback=callback, error=error, connect=connect, disconnect=disconnect, reconnect=reconnect) - -    def presence_group(self, channel_group, callback, error=None, connect=None, disconnect=None, reconnect=None): -        """Subscribe to presence events on a channel group. -            -           Only works in async mode - -        Args: -            channel_group: Channel group name ( string ) -            callback: A callback method should be passed to the method. -                      If passed, the api works in async mode.  -                      Required argument when working with twisted or tornado . -            error: Optional variable. An error method can be passed as parameter. -                      If passed, the api works in async mode.  - -        Returns: -            None -        """ -        return self.subscribe_group(channel_group+'-pnpres', callback=callback, error=error, connect=connect, disconnect=disconnect, reconnect=reconnect) - -    def here_now(self, channel, uuids=True, state=False, callback=None, error=None): -        """Get here now data. - -        You can obtain information about the current state of a channel including -        a list of unique user-ids currently subscribed to the channel and the total -        occupancy count of the channel by calling the here_now() function in your  -        application. - - -        Args: -            channel:    (string) (optional) -                        Specifies the channel name to return occupancy results. -                        If channel is not provided, here_now will return data for all channels. - -            callback:   (optional) -                        A callback method should be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -            error:      (optional) -                        Optional variable. An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -        Returns: -            Sync  Mode: list -            Async Mode: None - -            Response Format: - -            The here_now() method returns a list of uuid s currently subscribed to the channel. - -            uuids:["String","String", ... ,"String"] - List of UUIDs currently subscribed to the channel. - -            occupancy: Number - Total current occupancy of the channel. - -            Example Response: -            { -                occupancy: 4, -                uuids: [ -                    '123123234t234f34fq3dq', -                    '143r34f34t34fq34q34q3', -                    '23f34d3f4rq34r34rq23q', -                    'w34tcw45t45tcw435tww3', -                ] -            } -        """ - -        urlcomponents = [ -            'v2', 'presence', -            'sub_key', self.subscribe_key -        ] - -        if (channel is not None and len(channel) > 0): -            urlcomponents.append('channel') -            urlcomponents.append(channel) - -        data = {'auth': self.auth_key, 'pnsdk' : self.pnsdk} - -        if state is True: -            data['state'] = '1' - -        if uuids is False: -            data['disable_uuids']  = '1' - -        ## Get Presence Here Now -        return self._request({"urlcomponents": urlcomponents, -            'urlparams': data}, -            callback=self._return_wrapped_callback(callback), -            error=self._return_wrapped_callback(error)) - - -    def history(self, channel, count=100, reverse=False, -                start=None, end=None, include_token=False, callback=None, error=None): -        """This method fetches historical messages of a channel. - -        PubNub Storage/Playback Service provides real-time access to an unlimited -        history for all messages published to PubNub. Stored messages are replicated -        across multiple availability zones in several geographical data center -        locations. Stored messages can be encrypted with AES-256 message encryption -        ensuring that they are not readable while stored on PubNub's network. - -        It is possible to control how messages are returned and in what order, -        for example you can: - -            Return messages in the order newest to oldest (default behavior). - -            Return messages in the order oldest to newest by setting reverse to true. - -            Page through results by providing a start or end time token. - -            Retrieve a "slice" of the time line by providing both a start and end time token. - -            Limit the number of messages to a specific quantity using the count parameter. - - - -        Args: -            channel:    (string) -                        Specifies channel to return history messages from - -            count:      (int) (default: 100) -                        Specifies the number of historical messages to return - -            callback:   (optional) -                        A callback method should be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -            error:      (optional) -                        An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -        Returns: -            Returns a list in sync mode i.e. when callback argument is not given - -            Sample Response: -                [["Pub1","Pub2","Pub3","Pub4","Pub5"],13406746729185766,13406746845892666] -        """ - -        params = dict() - -        params['count'] = count -        params['reverse'] = reverse -        params['start'] = start -        params['end'] = end -        params['auth_key'] = self.auth_key -        params['pnsdk'] = self.pnsdk -        params['include_token'] = 'true' if include_token else 'false' - -        ## Get History -        return self._request({'urlcomponents': [ -            'v2', -            'history', -            'sub-key', -            self.subscribe_key, -            'channel', -            channel, -        ], 'urlparams': params}, -            callback=self._return_wrapped_callback(callback), -            error=self._return_wrapped_callback(error)) - -    def time(self, callback=None): -        """This function will return a 17 digit precision Unix epoch. - -        Args: - -            callback:   (optional) -                        A callback method should be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -        Returns: -            Returns a 17 digit number in sync mode i.e. when callback argument is not given - -            Sample: -                13769501243685161 -        """ - -        time = self._request({'urlcomponents': [ -            'time', -            '0' -        ]}, callback) -        if time is not None: -            return time[0] - -    def _encode(self, request): -        return [ -            "".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and -                     hex(ord(ch)).replace('0x', '%').upper() or -                     ch for ch in list(bit) -                     ]) for bit in request] - -    def getUrl(self, request): -  -        if self.u is True and "urlparams" in request: -            request['urlparams']['u'] = str(random.randint(1, 100000000000)) -        ## Build URL -        url = self.origin + '/' + "/".join([ -            "".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and -                     hex(ord(ch)).replace('0x', '%').upper() or -                     ch for ch in list(bit) -                     ]) for bit in request["urlcomponents"]]) -        if ("urlparams" in request): -            url = url + '?' + "&".join([x + "=" + str(y) for x, y in request[ -                "urlparams"].items() if y is not None and len(str(y)) > 0]) -        #print(url) -        return url - -    def _channel_registry(self, url=None, params=None, callback=None, error=None): - -        if (params is None): -            params = dict() - -        urlcomponents = ['v1', 'channel-registration', 'sub-key', self.subscribe_key ] - -        if (url is not None): -            urlcomponents += url - -        params['auth']  = self.auth_key -        params['pnsdk'] = self.pnsdk - -        ## Get History -        return self._request({'urlcomponents': urlcomponents, 'urlparams': params}, -            callback=self._return_wrapped_callback(callback), -            error=self._return_wrapped_callback(error)) - -    def _channel_group(self, channel_group=None, channels=None, cloak=None,mode='add', callback=None, error=None): -        params = dict() -        url = [] -        namespace = None - -        if (channel_group is not None and len(channel_group) > 0): -            ns_ch_a = channel_group.split(':') - -            if len(ns_ch_a) > 1: -                namespace = None if ns_ch_a[0] == '*' else ns_ch_a[0] -                channel_group = ns_ch_a[1] -            else: -                channel_group = ns_ch_a[0] - -        if (namespace is not None): -            url.append('namespace') -            url.append(self._encode(namespace)) - -        url.append('channel-group') - -        if channel_group is not None and channel_group != '*': -            url.append(channel_group) - -        if (channels is not None): -            if (type(channels) is list): -                channels = ','.join(channels) -            params[mode] = channels -            #params['cloak'] = 'true' if CLOAK is True else 'false' -        else: -            if mode == 'remove': -                url.append('remove') - -        return self._channel_registry(url=url, params=params, callback=callback, error=error) - - -    def channel_group_list_namespaces(self, callback=None, error=None): -        """Get list of namespaces. - -        You can obtain list of namespaces for the subscribe key associated with PubNub -        object using this method. - - -        Args: -            callback:   (optional) -                        A callback method should be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado. - -            error:      (optional) -                        Optional variable. An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado. - -        Returns: -            Sync  Mode: dict -            channel_group_list_namespaces method returns a dict which contains list of namespaces -            in payload field -            { -                u'status': 200, -                u'payload': { -                    u'sub_key': u'demo', -                    u'namespaces': [u'dev', u'foo'] -                },  -                u'service': u'channel-registry', -                u'error': False -            } - -            Async Mode: None (callback gets the response as parameter) - -            Response Format: - -            The callback passed to channel_group_list_namespaces gets the a dict containing list of namespaces -            under payload field - -            { -                u'payload': { -                    u'sub_key': u'demo', -                    u'namespaces': [u'dev', u'foo'] -                } -            } - -            namespaces is the list of namespaces for the given subscribe key - - -        """ - -        url = ['namespace'] -        return self._channel_registry(url=url, callback=callback, error=error) - -    def channel_group_remove_namespace(self, namespace, callback=None, error=None): -        """Remove a namespace. - -        A namespace can be deleted using this method. - - -        Args: -            namespace:  (string) namespace to be deleted -            callback:   (optional) -                        A callback method should be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -            error:      (optional) -                        Optional variable. An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -        Returns: -            Sync  Mode: dict -            channel_group_remove_namespace method returns a dict indicating status of the request - -            { -                u'status': 200, -                u'message': 'OK',  -                u'service': u'channel-registry', -                u'error': False -            } - -            Async Mode: None ( callback gets the response as parameter ) - -            Response Format: - -            The callback passed to channel_group_list_namespaces gets the a dict indicating status of the request - -            { -                u'status': 200, -                u'message': 'OK',  -                u'service': u'channel-registry', -                u'error': False -            } - -        """ -        url = ['namespace', self._encode(namespace), 'remove'] -        return self._channel_registry(url=url, callback=callback, error=error) - -    def channel_group_list_groups(self, namespace=None, callback=None, error=None): -        """Get list of groups. - -        Using this method, list of groups for the subscribe key associated with PubNub -        object, can be obtained. If namespace is provided, groups within the namespace -        only are listed - -        Args: -            namespace:  (string) (optional) namespace -            callback:   (optional) -                        A callback method should be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -            error:      (optional) -                        Optional variable. An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -        Returns: -            Sync  Mode: dict -            channel_group_list_groups method returns a dict which contains list of groups -            in payload field -            { -                u'status': 200, -                u'payload': {"namespace": "dev", "groups": ["abcd"]},  -                u'service': u'channel-registry', -                u'error': False -            } - -            Async Mode: None ( callback gets the response as parameter ) - -            Response Format: - -            The callback passed to channel_group_list_namespaces gets the a dict containing list of groups -            under payload field - -            { -                u'payload': {"namespace": "dev", "groups": ["abcd"]} -            } - - - -        """ - -        if (namespace is not None and len(namespace) > 0): -            channel_group = namespace + ':*' -        else: -            channel_group = '*:*' - -        return self._channel_group(channel_group=channel_group, callback=callback, error=error) - -    def channel_group_list_channels(self, channel_group, callback=None, error=None): -        """Get list of channels for a group. - -        Using this method, list of channels for a group, can be obtained.  - -        Args: -            channel_group: (string) (optional)  -                        Channel Group name. It can also contain namespace. -                        If namespace is also specified, then the parameter -                        will be in format namespace:channel_group - -            callback:   (optional) -                        A callback method should be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado. - -            error:      (optional) -                        Optional variable. An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado. - -        Returns: -            Sync  Mode: dict -            channel_group_list_channels method returns a dict which contains list of channels -            in payload field -            { -                u'status': 200, -                u'payload': {"channels": ["hi"], "group": "abcd"},  -                u'service': u'channel-registry', -                u'error': False -            } - -            Async Mode: None ( callback gets the response as parameter ) - -            Response Format: - -            The callback passed to channel_group_list_channels gets the a dict containing list of channels -            under payload field - -            { -                u'payload': {"channels": ["hi"], "group": "abcd"} -            } - - -        """ -        return self._channel_group(channel_group=channel_group, callback=callback, error=error) - -    def channel_group_add_channel(self, channel_group, channel, callback=None, error=None): -        """Add a channel to group. - -        A channel can be added to group using this method. - - -        Args: -            channel_group:  (string)  -                        Channel Group name. It can also contain namespace. -                        If namespace is also specified, then the parameter -                        will be in format namespace:channel_group -            channel:        (string) -                            Can be a channel name, a list of channel names, -                            or a comma separated list of channel names -            callback:       (optional) -                            A callback method should be passed to the method. -                            If set, the api works in async mode.  -                            Required argument when working with twisted or tornado. - -            error:      (optional) -                        Optional variable. An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado. - -        Returns: -            Sync  Mode: dict -            channel_group_add_channel method returns a dict indicating status of the request - -            { -                u'status': 200, -                u'message': 'OK',  -                u'service': u'channel-registry', -                u'error': False -            } - -            Async Mode: None ( callback gets the response as parameter ) - -            Response Format: - -            The callback passed to channel_group_add_channel gets the a dict indicating status of the request - -            { -                u'status': 200, -                u'message': 'OK',  -                u'service': u'channel-registry', -                u'error': False -            } - -        """ - -        return self._channel_group(channel_group=channel_group, channels=channel, mode='add', callback=callback, error=error) - -    def channel_group_remove_channel(self, channel_group, channel, callback=None, error=None): -        """Remove channel. - -        A channel can be removed from a group method. - - -        Args: -            channel_group:  (string) -                        Channel Group name. It can also contain namespace. -                        If namespace is also specified, then the parameter -                        will be in format namespace:channel_group -            channel:        (string) -                            Can be a channel name, a list of channel names, -                            or a comma separated list of channel names -            callback:   (optional) -                        A callback method should be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -            error:      (optional) -                        Optional variable. An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado . - -        Returns: -            Sync  Mode: dict -            channel_group_remove_channel method returns a dict indicating status of the request - -            { -                u'status': 200, -                u'message': 'OK',  -                u'service': u'channel-registry', -                u'error': False -            } - -            Async Mode: None ( callback gets the response as parameter ) - -            Response Format: - -            The callback passed to channel_group_remove_channel gets the a dict indicating status of the request - -            { -                u'status': 200, -                u'message': 'OK',  -                u'service': u'channel-registry', -                u'error': False -            } - -        """ - -        return self._channel_group(channel_group=channel_group, channels=channel, mode='remove', callback=callback, error=error) - -    def channel_group_remove_group(self, channel_group, callback=None, error=None): -        """Remove channel group. - -        A channel group can be removed using this method. - - -        Args: -            channel_group:  (string) -                        Channel Group name. It can also contain namespace. -                        If namespace is also specified, then the parameter -                        will be in format namespace:channel_group -            callback:   (optional) -                        A callback method should be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado. - -            error:      (optional) -                        Optional variable. An error method can be passed to the method. -                        If set, the api works in async mode.  -                        Required argument when working with twisted or tornado. - -        Returns: -            Sync  Mode: dict -            channel_group_remove_group method returns a dict indicating status of the request - -            { -                u'status': 200, -                u'message': 'OK',  -                u'service': u'channel-registry', -                u'error': False -            } - -            Async Mode: None ( callback gets the response as parameter ) - -            Response Format: - -            The callback passed to channel_group_remove_group gets the a dict indicating status of the request - -            { -                u'status': 200, -                u'message': 'OK',  -                u'service': u'channel-registry', -                u'error': False -            } - -        """ - -        return self._channel_group(channel_group=channel_group, mode='remove', callback=callback, error=error) - - - -class EmptyLock(): -    def __enter__(self): -        pass - -    def __exit__(self, a, b, c): -        pass - -empty_lock = EmptyLock() - - -class PubnubCoreAsync(PubnubBase): - -    def start(self): -        pass - -    def stop(self): -        pass - -    def __init__( -        self, -        publish_key, -        subscribe_key, -        secret_key=None, -        cipher_key=None, -        auth_key=None, -        ssl_on=False, -        origin='pubsub.pubnub.com', -        uuid=None, -        _tt_lock=empty_lock, -        _channel_list_lock=empty_lock, -        _channel_group_list_lock=empty_lock -    ): - -        super(PubnubCoreAsync, self).__init__( -            publish_key=publish_key, -            subscribe_key=subscribe_key, -            secret_key=secret_key, -            cipher_key=cipher_key, -            auth_key=auth_key, -            ssl_on=ssl_on, -            origin=origin, -            uuid=uuid -        ) - -        self.subscriptions = {} -        self.subscription_groups = {} -        self.timetoken = 0 -        self.last_timetoken = 0 -        self.accept_encoding = 'gzip' -        self.SUB_RECEIVER = None -        self._connect = None -        self._tt_lock = _tt_lock -        self._channel_list_lock = _channel_list_lock -        self._channel_group_list_lock = _channel_group_list_lock -        self._connect = lambda: None -        self.u = None - -    def get_channel_list(self, channels): -        channel = '' -        first = True -        with self._channel_list_lock: -            for ch in channels: -                if not channels[ch]['subscribed']: -                    continue -                if not first: -                    channel += ',' -                else: -                    first = False -                channel += ch -        return channel - -    def get_channel_group_list(self, channel_groups): -        channel_group = '' -        first = True -        with self._channel_group_list_lock: -            for ch in channel_groups: -                if not channel_groups[ch]['subscribed']: -                    continue -                if not first: -                    channel_group += ',' -                else: -                    first = False -                channel_group += ch -        return channel_group - - -    def get_channel_array(self): -        """Get List of currently subscribed channels - -        Returns: -            Returns a list containing names of channels subscribed - -            Sample return value: -                ["a","b","c] -        """ -        channels = self.subscriptions -        channel = [] -        with self._channel_list_lock: -            for ch in channels: -                if not channels[ch]['subscribed']: -                    continue -                channel.append(ch) -        return channel - -    def get_channel_group_array(self): -        """Get List of currently subscribed channel groups - -        Returns: -            Returns a list containing names of channel groups subscribed - -            Sample return value: -                ["a","b","c] -        """ -        channel_groups = self.subscription_groups -        channel_group = [] -        with self._channel_group_list_lock: -            for ch in channel_groups: -                if not channel_groups[ch]['subscribed']: -                    continue -                channel_group.append(ch) -        return channel_group - -    def each(l, func): -        if func is None: -            return -        for i in l: -            func(i) - -    def subscribe(self, channels, callback, state=None, error=None, -                  connect=None, disconnect=None, reconnect=None, sync=False): -        """Subscribe to data on a channel. - -        This function causes the client to create an open TCP socket to the -        PubNub Real-Time Network and begin listening for messages on a specified channel. -        To subscribe to a channel the client must send the appropriate subscribe_key at -        initialization. -         -        Only works in async mode - -        Args: -            channel:    (string/list) -                        Specifies the channel to subscribe to. It is possible to specify -                        multiple channels as a comma separated list or andarray. - -            callback:   (function) -                        This callback is called on receiving a message from the channel. - -            state:      (dict) -                        State to be set. - -            error:      (function) (optional) -                        This callback is called on an error event - -            connect:    (function) (optional) -                        This callback is called on a successful connection to the PubNub cloud - -            disconnect: (function) (optional) -                        This callback is called on client disconnect from the PubNub cloud -             -            reconnect:  (function) (optional) -                        This callback is called on successfully re-connecting to the PubNub cloud -         -        Returns: -            None -        """ - -        return self._subscribe(channels=channels, callback=callback, state=state, error=error, -            connect=connect, disconnect=disconnect, reconnect=reconnect, sync=sync) - -    def subscribe_group(self, channel_groups, callback, error=None, -                  connect=None, disconnect=None, reconnect=None, sync=False): -        """Subscribe to data on a channel group. - -        This function causes the client to create an open TCP socket to the -        PubNub Real-Time Network and begin listening for messages on a specified channel. -        To subscribe to a channel group the client must send the appropriate subscribe_key at -        initialization. -         -        Only works in async mode - -        Args: -            channel_groups:    (string/list) -                        Specifies the channel groups to subscribe to. It is possible to specify -                        multiple channel groups as a comma separated list or andarray. - -            callback:   (function) -                        This callback is called on receiving a message from the channel. - -            error:      (function) (optional) -                        This callback is called on an error event - -            connect:    (function) (optional) -                        This callback is called on a successful connection to the PubNub cloud - -            disconnect: (function) (optional) -                        This callback is called on client disconnect from the PubNub cloud -             -            reconnect:  (function) (optional) -                        This callback is called on successfully re-connecting to the PubNub cloud -         -        Returns: -            None -        """ - -        return self._subscribe(channel_groups=channel_groups, callback=callback, error=error, -            connect=connect, disconnect=disconnect, reconnect=reconnect, sync=sync) - -    def _subscribe(self, channels=None, channel_groups=None, state=None, callback=None, error=None, -                  connect=None, disconnect=None, reconnect=None): - -        with self._tt_lock: -            self.last_timetoken = self.timetoken if self.timetoken != 0 \ -                else self.last_timetoken -            self.timetoken = 0 - -        def _invoke(func, msg=None, channel=None, real_channel=None): -            if func is not None: -                if msg is not None and channel is not None and real_channel is not None: -                    try: -                        func(get_data_for_user(msg), channel, real_channel) -                    except: -                        func(get_data_for_user(msg), channel) -                elif msg is not None and channel is not None: -                    func(get_data_for_user(msg), channel) -                elif msg is not None: -                    func(get_data_for_user(msg)) -                else: -                    func() - -        def _invoke_connect(): -            if self._channel_list_lock: -                with self._channel_list_lock: -                    for ch in self.subscriptions: -                        chobj = self.subscriptions[ch] -                        if chobj['connected'] is False: -                            chobj['connected'] = True -                            chobj['disconnected'] = False -                            _invoke(chobj['connect'], chobj['name']) -                        else: -                            if chobj['disconnected'] is True: -                                chobj['disconnected'] = False -                                _invoke(chobj['reconnect'], chobj['name']) - -            if self._channel_group_list_lock: -                with self._channel_group_list_lock: -                    for ch in self.subscription_groups: -                        chobj = self.subscription_groups[ch] -                        if chobj['connected'] is False: -                            chobj['connected'] = True -                            chobj['disconnected'] = False -                            _invoke(chobj['connect'], chobj['name']) -                        else: -                            if chobj['disconnected'] is True: -                                chobj['disconnected'] = False -                                _invoke(chobj['reconnect'], chobj['name']) - - -        def _invoke_disconnect(): -            if self._channel_list_lock: -                with self._channel_list_lock: -                    for ch in self.subscriptions: -                        chobj = self.subscriptions[ch] -                        if chobj['connected'] is True: -                            if chobj['disconnected'] is False: -                                chobj['disconnected'] = True -                                _invoke(chobj['disconnect'], chobj['name']) -            if self._channel_group_list_lock: -                with self._channel_group_list_lock: -                    for ch in self.subscription_groups: -                        chobj = self.subscription_groups[ch] -                        if chobj['connected'] is True: -                            if chobj['disconnected'] is False: -                                chobj['disconnected'] = True -                                _invoke(chobj['disconnect'], chobj['name']) - - -        def _invoke_error(channel_list=None, error=None): -            if channel_list is None: -                for ch in self.subscriptions: -                    chobj = self.subscriptions[ch] -                    try: -                    	_invoke(chobj['error'], error, ch) -                    except TypeError: -						_invoke(chobj['error'], error) -            else: -                for ch in channel_list: -                    chobj = self.subscriptions[ch] -                    try: -						_invoke(chobj['error'], error, ch) -                    except TypeError: -						_invoke(chobj['error'], error) - -        def _get_channel(): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                if chobj['subscribed'] is True: -                    return chobj - -        if channels is not None: -            channels = channels if isinstance( -                channels, list) else channels.split(",") -            for channel in channels: -                ## New Channel? -                if len(channel) > 0 and \ -                        (not channel in self.subscriptions or -                         self.subscriptions[channel]['subscribed'] is False): -                        with self._channel_list_lock: -                            self.subscriptions[channel] = { -                                'name': channel, -                                'first': False, -                                'connected': False, -                                'disconnected': True, -                                'subscribed': True, -                                'callback': callback, -                                'connect': connect, -                                'disconnect': disconnect, -                                'reconnect': reconnect, -                                'error': error -                            } -                        if state is not None: -                            if channel in self.STATE: -                                self.STATE[channel] = state[channel] -                            else: -                                self.STATE[channel] = state -         -        if channel_groups is not None: -            channel_groups = channel_groups if isinstance( -                channel_groups, list) else channel_groups.split(",") - -            for channel_group in channel_groups: -                ## New Channel? -                if len(channel_group) > 0 and \ -                        (not channel_group in self.subscription_groups or -                         self.subscription_groups[channel_group]['subscribed'] is False): -                        with self._channel_group_list_lock: -                            self.subscription_groups[channel_group] = { -                                'name': channel_group, -                                'first': False, -                                'connected': False, -                                'disconnected': True, -                                'subscribed': True, -                                'callback': callback, -                                'connect': connect, -                                'disconnect': disconnect, -                                'reconnect': reconnect, -                                'error': error -                            } - -        ''' -        ## return if already connected to channel -        if channel in self.subscriptions and \ -            'connected' in self.subscriptions[channel] and \ -                self.subscriptions[channel]['connected'] is True: -                    _invoke(error, "Already Connected") -                    return -        ''' -        ## SUBSCRIPTION RECURSION -        def _connect(): - -            self._reset_offline() - -            def error_callback(response): -                ## ERROR ? -                if not response or \ -                    ('message' in response and -                        response['message'] == 'Forbidden'): -                            _invoke_error(channel_list=response['payload'][ -                                'channels'], error=response['message']) -                            self.timeout(1, _connect) -                            return -                if 'message' in response: -                    _invoke_error(error=response['message']) -                else: -                    _invoke_disconnect() -                    self.timetoken = 0 -                    self.timeout(1, _connect) - -            def sub_callback(response): -                ## ERROR ? -                if not response or \ -                    ('message' in response and -                        response['message'] == 'Forbidden'): -                            _invoke_error(channel_list=response['payload'][ -                                'channels'], error=response['message']) -                            _connect() -                            return - -                _invoke_connect() - -                with self._tt_lock: -                    self.timetoken = \ -                        self.last_timetoken if self.timetoken == 0 and \ -                        self.last_timetoken != 0 else response[1] - -                    if len(response) > 3: -                        channel_list = response[2].split(',') -                        channel_list_2 = response[3].split(',') -                        response_list = response[0] -                        for ch in enumerate(channel_list): -                            if ch[1] in self.subscription_groups or ch[1] in self.subscriptions: -                                try: -                                    chobj = self.subscription_groups[ch[1]] -                                except KeyError: -                                    chobj = self.subscriptions[ch[1]] -                                _invoke(chobj['callback'], -                                        self.decrypt(response_list[ch[0]]), -                                        chobj['name'].split('-pnpres')[0], channel_list_2[ch[0]].split('-pnpres')[0])                     -                    elif len(response) > 2: -                        channel_list = response[2].split(',') -                        response_list = response[0] -                        for ch in enumerate(channel_list): -                            if ch[1] in self.subscriptions: -                                chobj = self.subscriptions[ch[1]] -                                _invoke(chobj['callback'], -                                        self.decrypt(response_list[ch[0]]), -                                        chobj['name'].split('-pnpres')[0]) -                    else: -                        response_list = response[0] -                        chobj = _get_channel() -                        for r in response_list: -                            if chobj: -                                _invoke(chobj['callback'], self.decrypt(r), -                                        chobj['name'].split('-pnpres')[0]) - -                    _connect() - -            channel_list = self.get_channel_list(self.subscriptions) -            channel_group_list = self.get_channel_group_list(self.subscription_groups) - -            if len(channel_list) <= 0 and len(channel_group_list) <= 0: -                return - -            if len(channel_list) <= 0: -                channel_list = ',' - -            data = {"uuid": self.uuid, "auth": self.auth_key, -            'pnsdk' : self.pnsdk, 'channel-group' : channel_group_list} -             - -            st = json.dumps(self.STATE) - -            if len(st) > 2: -                data['state'] = quote(st,safe="") - -            ## CONNECT TO PUBNUB SUBSCRIBE SERVERS -            #try: -            self.SUB_RECEIVER = self._request({"urlcomponents": [ -                'subscribe', -                self.subscribe_key, -                channel_list, -                '0', -                str(self.timetoken) -            ], "urlparams": data}, -                sub_callback, -                error_callback, -                single=True, timeout=320) -            ''' -            except Exception as e: -                print(e) -                self.timeout(1, _connect) -                return -            ''' - -        self._connect = _connect - -        ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) -        _connect() - -    def _reset_offline(self): -        if self.SUB_RECEIVER is not None: -            self.SUB_RECEIVER() -        self.SUB_RECEIVER = None - -    def CONNECT(self): -        self._reset_offline() -        self._connect() - -    def unsubscribe(self, channel): -        """Unsubscribe from channel . -           Only works in async mode - -        Args: -            channel: Channel name ( string )  -        """ -        if channel in self.subscriptions is False: -            return False - -        ## DISCONNECT -        with self._channel_list_lock: -            if channel in self.subscriptions: -                self.subscriptions[channel]['connected'] = 0 -                self.subscriptions[channel]['subscribed'] = False -                self.subscriptions[channel]['timetoken'] = 0 -                self.subscriptions[channel]['first'] = False -                self.leave_channel(channel=channel) - -            # remove channel from STATE -            self.STATE.pop(channel, None) - -        self.CONNECT() - -    def unsubscribe_group(self, channel_group): -        """Unsubscribe from channel group. -           Only works in async mode - -        Args: -            channel_group: Channel group name ( string ) -        """ -        if channel_group in self.subscription_groups is False: -            return False - -        ## DISCONNECT -        with self._channel_group_list_lock: -            if channel_group in self.subscription_groups: -                self.subscription_groups[channel_group]['connected'] = 0 -                self.subscription_groups[channel_group]['subscribed'] = False -                self.subscription_groups[channel_group]['timetoken'] = 0 -                self.subscription_groups[channel_group]['first'] = False -                self.leave_group(channel_group=channel_group) -        self.CONNECT() - - -class PubnubCore(PubnubCoreAsync): -    def __init__( -        self, -        publish_key, -        subscribe_key, -        secret_key=None, -        cipher_key=None, -        auth_key=None, -        ssl_on=False, -        origin='pubsub.pubnub.com', -        uuid=None, -        _tt_lock=None, -        _channel_list_lock=None, -        _channel_group_list_lock=None - -    ): -        super(PubnubCore, self).__init__( -            publish_key=publish_key, -            subscribe_key=subscribe_key, -            secret_key=secret_key, -            cipher_key=cipher_key, -            auth_key=auth_key, -            ssl_on=ssl_on, -            origin=origin, -            uuid=uuid, -            _tt_lock=_tt_lock, -            _channel_list_lock=_channel_list_lock, -            _channel_group_list_lock=_channel_group_list_lock -        ) - -        self.subscriptions = {} -        self.timetoken = 0 -        self.accept_encoding = 'gzip' - -class HTTPClient: -    def __init__(self, pubnub, url, urllib_func=None, -                 callback=None, error=None, id=None, timeout=5): -        self.url = url -        self.id = id -        self.callback = callback -        self.error = error -        self.stop = False -        self._urllib_func = urllib_func -        self.timeout = timeout -        self.pubnub = pubnub - -    def cancel(self): -        self.stop = True -        self.callback = None -        self.error = None - -    def run(self): - -        def _invoke(func, data): -            if func is not None: -                func(get_data_for_user(data)) - -        if self._urllib_func is None: -            return - -        resp = self._urllib_func(self.url, timeout=self.timeout) -        data = resp[0] -        code = resp[1] - -        if self.stop is True: -            return -        if self.callback is None: -            with self.pubnub.latest_sub_callback_lock: -                if self.pubnub.latest_sub_callback['id'] != self.id: -                    return -                else: -                    if self.pubnub.latest_sub_callback['callback'] is not None: -                        self.pubnub.latest_sub_callback['id'] = 0 -                        try: -                            data = json.loads(data) -                        except ValueError: -                            _invoke(self.pubnub.latest_sub_callback['error'], -                                    {'error': 'json decoding error'}) -                            return -                        if code != 200: -                            _invoke(self.pubnub.latest_sub_callback['error'], data) -                        else: -                            _invoke(self.pubnub.latest_sub_callback['callback'], data) -        else: -            try: -                data = json.loads(data) -            except ValueError: -                _invoke(self.error, {'error': 'json decoding error'}) -                return - -            if code != 200: -                _invoke(self.error, data) -            else: -                _invoke(self.callback, data) - - -def _urllib_request_2(url, timeout=5): -    try: -        resp = urllib2.urlopen(url, timeout=timeout) -    except urllib2.HTTPError as http_error: -        resp = http_error -    except urllib2.URLError as error: -        msg = {"message": str(error.reason)} -        return (json.dumps(msg), 0) - -    return (resp.read(), resp.code) - -class PubnubHTTPAdapter(HTTPAdapter): -    def init_poolmanager(self, *args, **kwargs): -        kwargs.setdefault('socket_options', default_socket_options) - -        super(PubnubHTTPAdapter, self).init_poolmanager(*args, **kwargs) - -s = requests.Session() -#s.mount('http://', PubnubHTTPAdapter(max_retries=1)) -#s.mount('https://', PubnubHTTPAdapter(max_retries=1)) -#s.mount('http://pubsub.pubnub.com', HTTPAdapter(max_retries=1)) -#s.mount('https://pubsub.pubnub.com', HTTPAdapter(max_retries=1)) - - -def _requests_request(url, timeout=5): -    try: -        resp = s.get(url, timeout=timeout) -    except requests.exceptions.HTTPError as http_error: -        resp = http_error -    except requests.exceptions.ConnectionError as error: -        msg = str(error) -        return (json.dumps(msg), 0) -    except requests.exceptions.Timeout as error: -        msg = str(error) -        return (json.dumps(msg), 0) -    #print (resp.text) -    #print (resp.status_code) -    return (resp.text, resp.status_code) - - -def _urllib_request_3(url, timeout=5): -    try: -        resp = urllib.request.urlopen(url, timeout=timeout) -    except (urllib.request.HTTPError, urllib.request.URLError) as http_error: -        resp = http_error -    r = resp.read().decode("utf-8") -    return (r, resp.code) - -_urllib_request = None - - -#  Pubnub - -class Pubnub(PubnubCore): -    def __init__( -        self, -        publish_key, -        subscribe_key, -        secret_key=None, -        cipher_key=None, -        auth_key=None, -        ssl_on=False, -        origin='pubsub.pubnub.com', -        uuid=None, -        pooling=True, -        daemon=False, -        pres_uuid=None, -        azure=False -    ): -        super(Pubnub, self).__init__( -            publish_key=publish_key, -            subscribe_key=subscribe_key, -            secret_key=secret_key, -            cipher_key=cipher_key, -            auth_key=auth_key, -            ssl_on=ssl_on, -            origin=origin, -            uuid=uuid or pres_uuid, -            _tt_lock=threading.RLock(), -            _channel_list_lock=threading.RLock(), -            _channel_group_list_lock=threading.RLock() -        ) -        global _urllib_request -        if self.python_version == 2: -            _urllib_request = _urllib_request_2 -        else: -            _urllib_request = _urllib_request_3 - -        if pooling is True: -            _urllib_request = _requests_request - -        self.latest_sub_callback_lock = threading.RLock() -        self.latest_sub_callback = {'id': None, 'callback': None} -        self.pnsdk = 'PubNub-Python' + '/' + self.version -        self.daemon = daemon -         -        if azure is False: -            s.mount('http://pubsub.pubnub.com', HTTPAdapter(max_retries=1)) -            s.mount('https://pubsub.pubnub.com', HTTPAdapter(max_retries=1)) -        else: -            s.mount('http://', PubnubHTTPAdapter(max_retries=1)) -            s.mount('https://', PubnubHTTPAdapter(max_retries=1))          - -    def timeout(self, interval, func): -        def cb(): -            time.sleep(interval) -            func() -        thread = threading.Thread(target=cb) -        thread.daemon = self.daemon -        thread.start() - -    def _request_async(self, request, callback=None, error=None, single=False, timeout=5): -        global _urllib_request -        ## Build URL -        url = self.getUrl(request) -        if single is True: -            id = time.time() -            client = HTTPClient(self, url=url, urllib_func=_urllib_request, -                                callback=None, error=None, id=id, timeout=timeout) -            with self.latest_sub_callback_lock: -                self.latest_sub_callback['id'] = id -                self.latest_sub_callback['callback'] = callback -                self.latest_sub_callback['error'] = error -        else: -            client = HTTPClient(self, url=url, urllib_func=_urllib_request, -                                callback=callback, error=error, timeout=timeout) - -        thread = threading.Thread(target=client.run) -        thread.daemon = self.daemon -        thread.start() - -        def abort(): -            client.cancel() -        return abort - -    def _request_sync(self, request, timeout=5): -        global _urllib_request -        ## Build URL -        url = self.getUrl(request) -        ## Send Request Expecting JSONP Response -        response = _urllib_request(url, timeout=timeout) -        try: -            resp_json = json.loads(response[0]) -        except ValueError: -            return [0, "JSON Error"] - -        if response[1] != 200 and 'message' in resp_json and 'payload' in resp_json: -            return {'message': resp_json['message'], -                    'payload': resp_json['payload']} - -        if response[1] == 0: -            return [0, resp_json] - -        return resp_json - -    def _request(self, request, callback=None, error=None, single=False, timeout=5): -        if callback is None: -            return get_data_for_user(self._request_sync(request, timeout=timeout)) -        else: -            self._request_async(request, callback, error, single=single, timeout=timeout) - -# Pubnub Twisted - -class PubnubTwisted(PubnubCoreAsync): - -    def start(self): -        reactor.run() - -    def stop(self): -        reactor.stop() - -    def timeout(self, delay, callback): -        reactor.callLater(delay, callback) - -    def __init__( -        self, -        publish_key, -        subscribe_key, -        secret_key=None, -        cipher_key=None, -        auth_key=None, -        ssl_on=False, -        origin='pubsub.pubnub.com' -    ): -        super(PubnubTwisted, self).__init__( -            publish_key=publish_key, -            subscribe_key=subscribe_key, -            secret_key=secret_key, -            cipher_key=cipher_key, -            auth_key=auth_key, -            ssl_on=ssl_on, -            origin=origin, -        ) -        self.headers = {} -        self.headers['User-Agent'] = ['Python-Twisted'] -        self.headers['V'] = [self.version] -        self.pnsdk = 'PubNub-Python-' + 'Twisted' + '/' + self.version - -    def _request(self, request, callback=None, error=None, single=False, timeout=5): -        global pnconn_pool - -        def _invoke(func, data): -            if func is not None: -                func(get_data_for_user(data)) - -        ## Build URL - -        url = self.getUrl(request) - -        agent = ContentDecoderAgent(RedirectAgent(Agent( -            reactor, -            contextFactory=WebClientContextFactory(), -            pool=self.ssl and None or pnconn_pool -        )), [('gzip', GzipDecoder)]) - -        try: -            request = agent.request( -                'GET', url, Headers(self.headers), None) -        except TypeError: -            request = agent.request( -                'GET', url.encode(), Headers(self.headers), None) - -        if single is True: -            id = time.time() -            self.id = id - -        def received(response): -            if not isinstance(response, twisted.web._newclient.Response): -                _invoke(error, {"message": "Not Found"}) -                return - -            finished = Deferred() -            if response.code in [401, 403]: -                response.deliverBody(PubNubPamResponse(finished)) -            else: -                response.deliverBody(PubNubResponse(finished)) - -            return finished - -        def complete(data): -            if single is True: -                if id != self.id: -                    return None -            try: -                data = json.loads(data) -            except ValueError: -                try: -                    data = json.loads(data.decode("utf-8")) -                except ValueError: -                    _invoke(error, {'error': 'json decode error'}) - -            if 'error' in data and 'status' in data and 'status' != 200: -                _invoke(error, data) -            else: -                _invoke(callback, data) - -        def abort(): -            pass - -        request.addCallback(received) -        request.addCallback(complete) - -        return abort - - -# PubnubTornado -class PubnubTornado(PubnubCoreAsync): - -    def stop(self): -        ioloop.stop() - -    def start(self): -        ioloop.start() - -    def timeout(self, delay, callback): -        ioloop.add_timeout(time.time() + float(delay), callback) - -    def __init__( -        self, -        publish_key, -        subscribe_key, -        secret_key=False, -        cipher_key=False, -        auth_key=False, -        ssl_on=False, -        origin='pubsub.pubnub.com' -    ): -        super(PubnubTornado, self).__init__( -            publish_key=publish_key, -            subscribe_key=subscribe_key, -            secret_key=secret_key, -            cipher_key=cipher_key, -            auth_key=auth_key, -            ssl_on=ssl_on, -            origin=origin, -        ) -        self.headers = {} -        self.headers['User-Agent'] = 'Python-Tornado' -        self.headers['Accept-Encoding'] = self.accept_encoding -        self.headers['V'] = self.version -        self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000) -        self.id = None -        self.pnsdk = 'PubNub-Python-' + 'Tornado' + '/' + self.version - -    def _request(self, request, callback=None, error=None, -                 single=False, timeout=5, connect_timeout=5): - -        def _invoke(func, data): -            if func is not None: -                func(get_data_for_user(data)) - -        url = self.getUrl(request) -        request = tornado.httpclient.HTTPRequest( -            url, 'GET', -            self.headers, -            connect_timeout=connect_timeout, -            request_timeout=timeout) -        if single is True: -            id = time.time() -            self.id = id - -        def responseCallback(response): -            if single is True: -                if not id == self.id: -                    return None - -            body = response._get_body() - -            if body is None: -                return - -            def handle_exc(*args): -                return True -            if response.error is not None: -                with ExceptionStackContext(handle_exc): -                    if response.code in [403, 401]: -                        response.rethrow() -                    else: -                        _invoke(error, {"message": response.reason}) -                    return - -            try: -                data = json.loads(body) -            except TypeError: -                try: -                    data = json.loads(body.decode("utf-8")) -                except ValueError: -                    _invoke(error, {'error': 'json decode error'}) - -            if 'error' in data and 'status' in data and 'status' != 200: -                _invoke(error, data) -            else: -                _invoke(callback, data) - -        self.http.fetch( -            request=request, -            callback=responseCallback -        ) - -        def abort(): -            pass - -        return abort | 
