from twisted.web.client import getPage from twisted.internet import reactor from twisted.internet.defer import Deferred from twisted.internet.protocol import Protocol from twisted.web.client import Agent, ContentDecoderAgent from twisted.web.client import RedirectAgent, GzipDecoder from twisted.web.client import HTTPConnectionPool from twisted.web.http_headers import Headers from twisted.internet.ssl import ClientContextFactory from twisted.internet.task import LoopingCall import twisted from hashlib import sha256 import time import json from twisted.python.compat import ( _PY3, unicode, intToBytes, networkString, nativeString) pnconn_pool = HTTPConnectionPool(reactor, persistent=True) pnconn_pool.maxPersistentPerHost = 100000 pnconn_pool.cachedConnectionTimeout = 310 pnconn_pool.retryAutomatically = True class Pubnub(PubnubCoreAsync): def start(self): reactor.run() def stop(self): reactor.stop() def timeout(self, delay, callback): reactor.callLater(delay, callback) def __init__( self, publish_key, subscribe_key, secret_key=False, cipher_key=False, auth_key=None, ssl_on=False, origin='pubsub.pubnub.com' ): super(Pubnub, self).__init__( publish_key=publish_key, subscribe_key=subscribe_key, secret_key=secret_key, cipher_key=cipher_key, auth_key=auth_key, ssl_on=ssl_on, origin=origin, ) self.headers = {} self.headers['User-Agent'] = ['Python-Twisted'] #self.headers['Accept-Encoding'] = [self.accept_encoding] self.headers['V'] = [self.version] def _request(self, request, callback=None, error=None, single=False): global pnconn_pool def _invoke(func, data): if func is not None: func(data) ## Build URL ''' url = self.origin + '/' + "/".join([ "".join([ ' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and hex(ord(ch)).replace( '0x', '%' ).upper() or ch for ch in list(bit) ]) for bit in request]) ''' url = self.getUrl(request) agent = ContentDecoderAgent(RedirectAgent(Agent( reactor, contextFactory=WebClientContextFactory(), pool=self.ssl and None or pnconn_pool )), [('gzip', GzipDecoder)]) try: request = agent.request( 'GET', url, Headers(self.headers), None) except TypeError as te: print(url.encode()) request = agent.request( 'GET', url.encode(), Headers(self.headers), None) if single is True: id = time.time() self.id = id def received(response): finished = Deferred() if response.code == 403: response.deliverBody(PubNub403Response(finished)) else: response.deliverBody(PubNubResponse(finished)) return finished def error_handler(response): finished = Deferred() if response.code == 403: response.deliverBody(PubNub403Response(finished)) else: response.deliverBody(PubNubResponse(finished)) return finished def complete(data): if single is True: if id != self.id: return None try: data = json.loads(data) except Exception as e: try: data = json.loads(data.decode("utf-8")) except: _invoke(error, {'error': 'json decode error'}) if 'error' in data and 'status' in data and 'status' != 200: _invoke(error, data) else: _invoke(callback, data) def abort(): pass request.addCallback(received) request.addCallback(complete) request.addErrback(error_handler) return abort class WebClientContextFactory(ClientContextFactory): def getContext(self, hostname, port): return ClientContextFactory.getContext(self) class PubNub403Response(Protocol): def __init__(self, finished): self.finished = finished def dataReceived(self, bytes): #print '403 resp ', bytes self.finished.callback(bytes) class PubNubResponse(Protocol): def __init__(self, finished): self.finished = finished def dataReceived(self, bytes): #print bytes self.finished.callback(bytes)