diff options
| author | Tom Christie | 2013-03-12 20:30:14 +0000 | 
|---|---|---|
| committer | Tom Christie | 2013-03-12 20:30:14 +0000 | 
| commit | b6b686d285e376dbf4f2d2f15bd0e3ef0f1c3a37 (patch) | |
| tree | 1ef5df29e29dfd333240212f955e5b52b69b7a91 /rest_framework | |
| parent | 2f1951910f264852b530c94c3a9946afe10eedd2 (diff) | |
| parent | 043d748b539a6f5b4cfdf6de650b072541f1c6da (diff) | |
| download | django-rest-framework-b6b686d285e376dbf4f2d2f15bd0e3ef0f1c3a37.tar.bz2 | |
Merge branch 'master' into basic-nested-serialization
Diffstat (limited to 'rest_framework')
| -rw-r--r-- | rest_framework/authentication.py | 233 | ||||
| -rw-r--r-- | rest_framework/compat.py | 31 | ||||
| -rw-r--r-- | rest_framework/permissions.py | 25 | ||||
| -rw-r--r-- | rest_framework/runtests/settings.py | 23 | ||||
| -rw-r--r-- | rest_framework/tests/authentication.py | 363 | 
5 files changed, 644 insertions, 31 deletions
diff --git a/rest_framework/authentication.py b/rest_framework/authentication.py index 14b2136b..b4b73699 100644 --- a/rest_framework/authentication.py +++ b/rest_framework/authentication.py @@ -3,13 +3,28 @@ Provides a set of pluggable authentication policies.  """  from __future__ import unicode_literals  from django.contrib.auth import authenticate -from django.utils.encoding import DjangoUnicodeDecodeError +from django.core.exceptions import ImproperlyConfigured  from rest_framework import exceptions, HTTP_HEADER_ENCODING  from rest_framework.compat import CsrfViewMiddleware +from rest_framework.compat import oauth, oauth_provider, oauth_provider_store +from rest_framework.compat import oauth2_provider, oauth2_provider_forms, oauth2_provider_backends  from rest_framework.authtoken.models import Token  import base64 +def get_authorization_header(request): +    """ +    Return request's 'Authorization:' header, as a bytestring. + +    Hide some test client ickyness where the header can be unicode. +    """ +    auth = request.META.get('HTTP_AUTHORIZATION', b'') +    if type(auth) == type(''): +        # Work around django test client oddness +        auth = auth.encode(HTTP_HEADER_ENCODING) +    return auth + +  class BaseAuthentication(object):      """      All authentication classes should extend BaseAuthentication. @@ -41,28 +56,25 @@ class BasicAuthentication(BaseAuthentication):          Returns a `User` if a correct username and password have been supplied          using HTTP Basic authentication.  Otherwise returns `None`.          """ -        auth = request.META.get('HTTP_AUTHORIZATION', b'') -        if type(auth) == type(''): -            # Work around django test client oddness -            auth = auth.encode(HTTP_HEADER_ENCODING) -        auth = auth.split() +        auth = get_authorization_header(request).split()          if not auth or auth[0].lower() != b'basic':              return None -        if len(auth) != 2: -            raise exceptions.AuthenticationFailed('Invalid basic header') +        if len(auth) == 1: +            msg = 'Invalid basic header. No credentials provided.' +            raise exceptions.AuthenticationFailed(msg) +        elif len(auth) > 2: +            msg = 'Invalid basic header. Credentials string should not contain spaces.' +            raise exceptions.AuthenticationFailed(msg)          try:              auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')          except (TypeError, UnicodeDecodeError): -            raise exceptions.AuthenticationFailed('Invalid basic header') - -        try: -            userid, password = auth_parts[0], auth_parts[2] -        except DjangoUnicodeDecodeError: -            raise exceptions.AuthenticationFailed('Invalid basic header') +            msg = 'Invalid basic header. Credentials not correctly base64 encoded' +            raise exceptions.AuthenticationFailed(msg) +        userid, password = auth_parts[0], auth_parts[2]          return self.authenticate_credentials(userid, password)      def authenticate_credentials(self, userid, password): @@ -70,9 +82,9 @@ class BasicAuthentication(BaseAuthentication):          Authenticate the userid and password against username and password.          """          user = authenticate(username=userid, password=password) -        if user is not None and user.is_active: -            return (user, None) -        raise exceptions.AuthenticationFailed('Invalid username/password') +        if user is None or not user.is_active: +            raise exceptions.AuthenticationFailed('Invalid username/password') +        return (user, None)      def authenticate_header(self, request):          return 'Basic realm="%s"' % self.www_authenticate_realm @@ -131,13 +143,17 @@ class TokenAuthentication(BaseAuthentication):      """      def authenticate(self, request): -        auth = request.META.get('HTTP_AUTHORIZATION', '').split() +        auth = get_authorization_header(request).split() -        if not auth or auth[0].lower() != "token": +        if not auth or auth[0].lower() != b'token':              return None -        if len(auth) != 2: -            raise exceptions.AuthenticationFailed('Invalid token header') +        if len(auth) == 1: +            msg = 'Invalid token header. No credentials provided.' +            raise exceptions.AuthenticationFailed(msg) +        elif len(auth) > 2: +            msg = 'Invalid token header. Token string should not contain spaces.' +            raise exceptions.AuthenticationFailed(msg)          return self.authenticate_credentials(auth[1]) @@ -147,12 +163,179 @@ class TokenAuthentication(BaseAuthentication):          except self.model.DoesNotExist:              raise exceptions.AuthenticationFailed('Invalid token') -        if token.user.is_active: -            return (token.user, token) -        raise exceptions.AuthenticationFailed('User inactive or deleted') +        if not token.user.is_active: +            raise exceptions.AuthenticationFailed('User inactive or deleted') + +        return (token.user, token)      def authenticate_header(self, request):          return 'Token' -# TODO: OAuthAuthentication +class OAuthAuthentication(BaseAuthentication): +    """ +    OAuth 1.0a authentication backend using `django-oauth-plus` and `oauth2`. + +    Note: The `oauth2` package actually provides oauth1.0a support.  Urg. +          We import it from the `compat` module as `oauth`. +    """ +    www_authenticate_realm = 'api' + +    def __init__(self, *args, **kwargs): +        super(OAuthAuthentication, self).__init__(*args, **kwargs) + +        if oauth is None: +            raise ImproperlyConfigured( +                "The 'oauth2' package could not be imported." +                "It is required for use with the 'OAuthAuthentication' class.") + +        if oauth_provider is None: +            raise ImproperlyConfigured( +                "The 'django-oauth-plus' package could not be imported." +                "It is required for use with the 'OAuthAuthentication' class.") + +    def authenticate(self, request): +        """ +        Returns two-tuple of (user, token) if authentication succeeds, +        or None otherwise. +        """ +        try: +            oauth_request = oauth_provider.utils.get_oauth_request(request) +        except oauth.Error as err: +            raise exceptions.AuthenticationFailed(err.message) + +        oauth_params = oauth_provider.consts.OAUTH_PARAMETERS_NAMES + +        found = any(param for param in oauth_params if param in oauth_request) +        missing = list(param for param in oauth_params if param not in oauth_request) + +        if not found: +            # OAuth authentication was not attempted. +            return None + +        if missing: +            # OAuth was attempted but missing parameters. +            msg = 'Missing parameters: %s' % (', '.join(missing)) +            raise exceptions.AuthenticationFailed(msg) + +        if not self.check_nonce(request, oauth_request): +            msg = 'Nonce check failed' +            raise exceptions.AuthenticationFailed(msg) + +        try: +            consumer_key = oauth_request.get_parameter('oauth_consumer_key') +            consumer = oauth_provider_store.get_consumer(request, oauth_request, consumer_key) +        except oauth_provider_store.InvalidConsumerError as err: +            raise exceptions.AuthenticationFailed(err) + +        if consumer.status != oauth_provider.consts.ACCEPTED: +            msg = 'Invalid consumer key status: %s' % consumer.get_status_display() +            raise exceptions.AuthenticationFailed(msg) + +        try: +            token_param = oauth_request.get_parameter('oauth_token') +            token = oauth_provider_store.get_access_token(request, oauth_request, consumer, token_param) +        except oauth_provider_store.InvalidTokenError: +            msg = 'Invalid access token: %s' % oauth_request.get_parameter('oauth_token') +            raise exceptions.AuthenticationFailed(msg) + +        try: +            self.validate_token(request, consumer, token) +        except oauth.Error as err: +            raise exceptions.AuthenticationFailed(err.message) + +        user = token.user + +        if not user.is_active: +            msg = 'User inactive or deleted: %s' % user.username +            raise exceptions.AuthenticationFailed(msg) + +        return (token.user, token) + +    def authenticate_header(self, request): +        """ +        If permission is denied, return a '401 Unauthorized' response, +        with an appropraite 'WWW-Authenticate' header. +        """ +        return 'OAuth realm="%s"' % self.www_authenticate_realm + +    def validate_token(self, request, consumer, token): +        """ +        Check the token and raise an `oauth.Error` exception if invalid. +        """ +        oauth_server, oauth_request = oauth_provider.utils.initialize_server_request(request) +        oauth_server.verify_request(oauth_request, consumer, token) + +    def check_nonce(self, request, oauth_request): +        """ +        Checks nonce of request, and return True if valid. +        """ +        return oauth_provider_store.check_nonce(request, oauth_request, oauth_request['oauth_nonce']) + + +class OAuth2Authentication(BaseAuthentication): +    """ +    OAuth 2 authentication backend using `django-oauth2-provider` +    """ +    www_authenticate_realm = 'api' + +    def __init__(self, *args, **kwargs): +        super(OAuth2Authentication, self).__init__(*args, **kwargs) + +        if oauth2_provider is None: +            raise ImproperlyConfigured( +                "The 'django-oauth2-provider' package could not be imported. " +                "It is required for use with the 'OAuth2Authentication' class.") + +    def authenticate(self, request): +        """ +        Returns two-tuple of (user, token) if authentication succeeds, +        or None otherwise. +        """ + +        auth = get_authorization_header(request).split() + +        if not auth or auth[0].lower() != b'bearer': +            return None + +        if len(auth) == 1: +            msg = 'Invalid bearer header. No credentials provided.' +            raise exceptions.AuthenticationFailed(msg) +        elif len(auth) > 2: +            msg = 'Invalid bearer header. Token string should not contain spaces.' +            raise exceptions.AuthenticationFailed(msg) + +        return self.authenticate_credentials(request, auth[1]) + +    def authenticate_credentials(self, request, access_token): +        """ +        Authenticate the request, given the access token. +        """ + +        # Authenticate the client +        oauth2_client_form = oauth2_provider_forms.ClientAuthForm(request.REQUEST) +        if not oauth2_client_form.is_valid(): +            raise exceptions.AuthenticationFailed('Client could not be validated') +        client = oauth2_client_form.cleaned_data.get('client') + +        # Retrieve the `OAuth2AccessToken` instance from the access_token +        auth_backend = oauth2_provider_backends.AccessTokenBackend() +        token = auth_backend.authenticate(access_token, client) +        if token is None: +            raise exceptions.AuthenticationFailed('Invalid token') + +        user = token.user + +        if not user.is_active: +            msg = 'User inactive or deleted: %s' % user.username +            raise exceptions.AuthenticationFailed(msg) + +        return (token.user, token) + +    def authenticate_header(self, request): +        """ +        Bearer is the only finalized type currently + +        Check details on the `OAuth2Authentication.authenticate` method +        """ +        return 'Bearer realm="%s"' % self.www_authenticate_realm diff --git a/rest_framework/compat.py b/rest_framework/compat.py index 07fdddce..7b2ef738 100644 --- a/rest_framework/compat.py +++ b/rest_framework/compat.py @@ -426,3 +426,34 @@ try:      import defusedxml.ElementTree as etree  except ImportError:      etree = None + +# OAuth is optional +try: +    # Note: The `oauth2` package actually provides oauth1.0a support.  Urg. +    import oauth2 as oauth +except ImportError: +    oauth = None + +# OAuth is optional +try: +    import oauth_provider +    from oauth_provider.store import store as oauth_provider_store +except ImportError: +    oauth_provider = None +    oauth_provider_store = None + +# OAuth 2 support is optional +try: +    import provider.oauth2 as oauth2_provider +    from provider.oauth2 import backends as oauth2_provider_backends +    from provider.oauth2 import models as oauth2_provider_models +    from provider.oauth2 import forms as oauth2_provider_forms +    from provider import scope as oauth2_provider_scope +    from provider import constants as oauth2_constants +except ImportError: +    oauth2_provider = None +    oauth2_provider_backends = None +    oauth2_provider_models = None +    oauth2_provider_forms = None +    oauth2_provider_scope = None +    oauth2_constants = None diff --git a/rest_framework/permissions.py b/rest_framework/permissions.py index f18fb53e..ae895f39 100644 --- a/rest_framework/permissions.py +++ b/rest_framework/permissions.py @@ -7,6 +7,8 @@ import warnings  SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS'] +from rest_framework.compat import oauth2_provider_scope, oauth2_constants +  class BasePermission(object):      """ @@ -132,3 +134,26 @@ class DjangoModelPermissions(BasePermission):              request.user.has_perms(perms)):              return True          return False + + +class TokenHasReadWriteScope(BasePermission): +    """ +    The request is authenticated as a user and the token used has the right scope +    """ + +    def has_permission(self, request, view): +        token = request.auth +        read_only = request.method in SAFE_METHODS + +        if not token: +            return False + +        if hasattr(token, 'resource'):  # OAuth 1 +            return read_only or not request.auth.resource.is_readonly +        elif hasattr(token, 'scope'):  # OAuth 2 +            required = oauth2_constants.READ if read_only else oauth2_constants.WRITE +            return oauth2_provider_scope.check(required, request.auth.scope) + +        assert False, ('TokenHasReadWriteScope requires either the' +        '`OAuthAuthentication` or `OAuth2Authentication` authentication ' +        'class to be used.') diff --git a/rest_framework/runtests/settings.py b/rest_framework/runtests/settings.py index 03bfc216..9b519f27 100644 --- a/rest_framework/runtests/settings.py +++ b/rest_framework/runtests/settings.py @@ -97,9 +97,30 @@ INSTALLED_APPS = (      # 'django.contrib.admindocs',      'rest_framework',      'rest_framework.authtoken', -    'rest_framework.tests' +    'rest_framework.tests',  ) +# OAuth is optional and won't work if there is no oauth_provider & oauth2 +try: +    import oauth_provider +    import oauth2 +except ImportError: +    pass +else: +    INSTALLED_APPS += ( +        'oauth_provider', +    ) + +try: +    import provider +except ImportError: +    pass +else: +    INSTALLED_APPS += ( +        'provider', +        'provider.oauth2', +    ) +  STATIC_URL = '/static/'  PASSWORD_HASHERS = ( diff --git a/rest_framework/tests/authentication.py b/rest_framework/tests/authentication.py index 7b754af5..b663ca48 100644 --- a/rest_framework/tests/authentication.py +++ b/rest_framework/tests/authentication.py @@ -2,23 +2,29 @@ from __future__ import unicode_literals  from django.contrib.auth.models import User  from django.http import HttpResponse  from django.test import Client, TestCase +from django.utils import unittest  from rest_framework import HTTP_HEADER_ENCODING  from rest_framework import exceptions  from rest_framework import permissions  from rest_framework import status -from rest_framework.authtoken.models import Token  from rest_framework.authentication import (      BaseAuthentication,      TokenAuthentication,      BasicAuthentication, -    SessionAuthentication +    SessionAuthentication, +    OAuthAuthentication, +    OAuth2Authentication  ) -from rest_framework.compat import patterns +from rest_framework.authtoken.models import Token +from rest_framework.compat import patterns, url, include +from rest_framework.compat import oauth2_provider, oauth2_provider_models, oauth2_provider_scope +from rest_framework.compat import oauth, oauth_provider  from rest_framework.tests.utils import RequestFactory  from rest_framework.views import APIView  import json  import base64 - +import time +import datetime  factory = RequestFactory() @@ -41,8 +47,19 @@ urlpatterns = patterns('',      (r'^basic/$', MockView.as_view(authentication_classes=[BasicAuthentication])),      (r'^token/$', MockView.as_view(authentication_classes=[TokenAuthentication])),      (r'^auth-token/$', 'rest_framework.authtoken.views.obtain_auth_token'), +    (r'^oauth/$', MockView.as_view(authentication_classes=[OAuthAuthentication])), +    (r'^oauth-with-scope/$', MockView.as_view(authentication_classes=[OAuthAuthentication],  +        permission_classes=[permissions.TokenHasReadWriteScope]))  ) +if oauth2_provider is not None: +    urlpatterns += patterns('', +        url(r'^oauth2/', include('provider.oauth2.urls', namespace='oauth2')), +        url(r'^oauth2-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication])), +        url(r'^oauth2-with-scope-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication],  +            permission_classes=[permissions.TokenHasReadWriteScope])), +    ) +  class BasicAuthTests(TestCase):      """Basic authentication""" @@ -146,7 +163,7 @@ class TokenAuthTests(TestCase):      def test_post_form_passing_token_auth(self):          """Ensure POSTing json over token auth with correct credentials passes and does not require CSRF""" -        auth = "Token " + self.key +        auth = 'Token ' + self.key          response = self.csrf_client.post('/token/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)          self.assertEqual(response.status_code, status.HTTP_200_OK) @@ -222,3 +239,339 @@ class IncorrectCredentialsTests(TestCase):          response = view(request)          self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)          self.assertEqual(response.data, {'detail': 'Bad credentials'}) + + +class OAuthTests(TestCase): +    """OAuth 1.0a authentication""" +    urls = 'rest_framework.tests.authentication' + +    def setUp(self): +        # these imports are here because oauth is optional and hiding them in try..except block or compat +        # could obscure problems if something breaks +        from oauth_provider.models import Consumer, Resource +        from oauth_provider.models import Token as OAuthToken +        from oauth_provider import consts + +        self.consts = consts + +        self.csrf_client = Client(enforce_csrf_checks=True) +        self.username = 'john' +        self.email = 'lennon@thebeatles.com' +        self.password = 'password' +        self.user = User.objects.create_user(self.username, self.email, self.password) + +        self.CONSUMER_KEY = 'consumer_key' +        self.CONSUMER_SECRET = 'consumer_secret' +        self.TOKEN_KEY = "token_key" +        self.TOKEN_SECRET = "token_secret" + +        self.consumer = Consumer.objects.create(key=self.CONSUMER_KEY, secret=self.CONSUMER_SECRET, +            name='example', user=self.user, status=self.consts.ACCEPTED) + +        self.resource = Resource.objects.create(name="resource name", url="api/") +        self.token = OAuthToken.objects.create(user=self.user, consumer=self.consumer, resource=self.resource, +            token_type=OAuthToken.ACCESS, key=self.TOKEN_KEY, secret=self.TOKEN_SECRET, is_approved=True +        ) + +    def _create_authorization_header(self): +        params = { +            'oauth_version': "1.0", +            'oauth_nonce': oauth.generate_nonce(), +            'oauth_timestamp': int(time.time()), +            'oauth_token': self.token.key, +            'oauth_consumer_key': self.consumer.key +        } + +        req = oauth.Request(method="GET", url="http://example.com", parameters=params) + +        signature_method = oauth.SignatureMethod_PLAINTEXT() +        req.sign_request(signature_method, self.consumer, self.token) + +        return req.to_header()["Authorization"] + +    def _create_authorization_url_parameters(self): +        params = { +            'oauth_version': "1.0", +            'oauth_nonce': oauth.generate_nonce(), +            'oauth_timestamp': int(time.time()), +            'oauth_token': self.token.key, +            'oauth_consumer_key': self.consumer.key +        } + +        req = oauth.Request(method="GET", url="http://example.com", parameters=params) + +        signature_method = oauth.SignatureMethod_PLAINTEXT() +        req.sign_request(signature_method, self.consumer, self.token) +        return dict(req) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_post_form_passing_oauth(self): +        """Ensure POSTing form over OAuth with correct credentials passes and does not require CSRF""" +        auth = self._create_authorization_header() +        response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 200) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_post_form_repeated_nonce_failing_oauth(self): +        """Ensure POSTing form over OAuth with repeated auth (same nonces and timestamp) credentials fails""" +        auth = self._create_authorization_header() +        response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 200) + +        # simulate reply attack auth header containes already used (nonce, timestamp) pair +        response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth) +        self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_post_form_token_removed_failing_oauth(self): +        """Ensure POSTing when there is no OAuth access token in db fails""" +        self.token.delete() +        auth = self._create_authorization_header() +        response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth) +        self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_post_form_consumer_status_not_accepted_failing_oauth(self): +        """Ensure POSTing when consumer status is anything other than ACCEPTED fails""" +        for consumer_status in (self.consts.CANCELED, self.consts.PENDING, self.consts.REJECTED): +            self.consumer.status = consumer_status +            self.consumer.save() + +            auth = self._create_authorization_header() +            response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth) +            self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_post_form_with_request_token_failing_oauth(self): +        """Ensure POSTing with unauthorized request token instead of access token fails""" +        self.token.token_type = self.token.REQUEST +        self.token.save() + +        auth = self._create_authorization_header() +        response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth) +        self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_post_form_with_urlencoded_parameters(self): +        """Ensure POSTing with x-www-form-urlencoded auth parameters passes""" +        params = self._create_authorization_url_parameters() +        response = self.csrf_client.post('/oauth/', params) +        self.assertEqual(response.status_code, 200) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_get_form_with_url_parameters(self): +        """Ensure GETing with auth in url parameters passes""" +        params = self._create_authorization_url_parameters() +        response = self.csrf_client.get('/oauth/', params) +        self.assertEqual(response.status_code, 200) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_post_hmac_sha1_signature_passes(self): +        """Ensure POSTing using HMAC_SHA1 signature method passes""" +        params = { +            'oauth_version': "1.0", +            'oauth_nonce': oauth.generate_nonce(), +            'oauth_timestamp': int(time.time()), +            'oauth_token': self.token.key, +            'oauth_consumer_key': self.consumer.key +        } + +        req = oauth.Request(method="POST", url="http://testserver/oauth/", parameters=params) + +        signature_method = oauth.SignatureMethod_HMAC_SHA1() +        req.sign_request(signature_method, self.consumer, self.token) +        auth = req.to_header()["Authorization"] + +        response = self.csrf_client.post('/oauth/', HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 200) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_get_form_with_readonly_resource_passing_auth(self): +        """Ensure POSTing with a readonly resource instead of a write scope fails""" +        read_only_access_token = self.token +        read_only_access_token.resource.is_readonly = True +        read_only_access_token.resource.save() +        params = self._create_authorization_url_parameters() +        response = self.csrf_client.get('/oauth-with-scope/', params) +        self.assertEqual(response.status_code, 200) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_post_form_with_readonly_resource_failing_auth(self): +        """Ensure POSTing with a readonly resource instead of a write scope fails""" +        read_only_access_token = self.token +        read_only_access_token.resource.is_readonly = True +        read_only_access_token.resource.save() +        params = self._create_authorization_url_parameters() +        response = self.csrf_client.post('/oauth-with-scope/', params) +        self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)) + +    @unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed') +    @unittest.skipUnless(oauth, 'oauth2 not installed') +    def test_post_form_with_write_resource_passing_auth(self): +        """Ensure POSTing with a write resource succeed""" +        read_write_access_token = self.token +        read_write_access_token.resource.is_readonly = False +        read_write_access_token.resource.save() +        params = self._create_authorization_url_parameters() +        response = self.csrf_client.post('/oauth-with-scope/', params) +        self.assertEqual(response.status_code, 200) + + +class OAuth2Tests(TestCase): +    """OAuth 2.0 authentication""" +    urls = 'rest_framework.tests.authentication' + +    def setUp(self): +        self.csrf_client = Client(enforce_csrf_checks=True) +        self.username = 'john' +        self.email = 'lennon@thebeatles.com' +        self.password = 'password' +        self.user = User.objects.create_user(self.username, self.email, self.password) + +        self.CLIENT_ID = 'client_key' +        self.CLIENT_SECRET = 'client_secret' +        self.ACCESS_TOKEN = "access_token" +        self.REFRESH_TOKEN = "refresh_token" + +        self.oauth2_client = oauth2_provider_models.Client.objects.create( +                client_id=self.CLIENT_ID, +                client_secret=self.CLIENT_SECRET, +                redirect_uri='', +                client_type=0, +                name='example', +                user=None, +            ) + +        self.access_token = oauth2_provider_models.AccessToken.objects.create( +                token=self.ACCESS_TOKEN, +                client=self.oauth2_client, +                user=self.user, +            ) +        self.refresh_token = oauth2_provider_models.RefreshToken.objects.create( +                user=self.user, +                access_token=self.access_token, +                client=self.oauth2_client +            ) + +    def _create_authorization_header(self, token=None): +        return "Bearer {0}".format(token or self.access_token.token) + +    def _client_credentials_params(self): +        return {'client_id': self.CLIENT_ID, 'client_secret': self.CLIENT_SECRET} + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_get_form_with_wrong_authorization_header_token_type_failing(self): +        """Ensure that a wrong token type lead to the correct HTTP error status code""" +        auth = "Wrong token-type-obsviously" +        response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 401) +        params = self._client_credentials_params() +        response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 401) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_get_form_with_wrong_authorization_header_token_format_failing(self): +        """Ensure that a wrong token format lead to the correct HTTP error status code""" +        auth = "Bearer wrong token format" +        response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 401) +        params = self._client_credentials_params() +        response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 401) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_get_form_with_wrong_authorization_header_token_failing(self): +        """Ensure that a wrong token lead to the correct HTTP error status code""" +        auth = "Bearer wrong-token" +        response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 401) +        params = self._client_credentials_params() +        response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 401) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_get_form_with_wrong_client_data_failing_auth(self): +        """Ensure GETing form over OAuth with incorrect client credentials fails""" +        auth = self._create_authorization_header() +        params = self._client_credentials_params() +        params['client_id'] += 'a' +        response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 401) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_get_form_passing_auth(self): +        """Ensure GETing form over OAuth with correct client credentials succeed""" +        auth = self._create_authorization_header() +        params = self._client_credentials_params() +        response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 200) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_post_form_passing_auth(self): +        """Ensure POSTing form over OAuth with correct credentials passes and does not require CSRF""" +        auth = self._create_authorization_header() +        params = self._client_credentials_params() +        response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 200) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_post_form_token_removed_failing_auth(self): +        """Ensure POSTing when there is no OAuth access token in db fails""" +        self.access_token.delete() +        auth = self._create_authorization_header() +        params = self._client_credentials_params() +        response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_post_form_with_refresh_token_failing_auth(self): +        """Ensure POSTing with refresh token instead of access token fails""" +        auth = self._create_authorization_header(token=self.refresh_token.token) +        params = self._client_credentials_params() +        response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_post_form_with_expired_access_token_failing_auth(self): +        """Ensure POSTing with expired access token fails with an 'Invalid token' error""" +        self.access_token.expires = datetime.datetime.now() - datetime.timedelta(seconds=10)  # 10 seconds late +        self.access_token.save() +        auth = self._create_authorization_header() +        params = self._client_credentials_params() +        response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)) +        self.assertIn('Invalid token', response.content) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_post_form_with_invalid_scope_failing_auth(self): +        """Ensure POSTing with a readonly scope instead of a write scope fails""" +        read_only_access_token = self.access_token +        read_only_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['read'] +        read_only_access_token.save() +        auth = self._create_authorization_header(token=read_only_access_token.token) +        params = self._client_credentials_params() +        response = self.csrf_client.get('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 200) +        response = self.csrf_client.post('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + +    @unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed') +    def test_post_form_with_valid_scope_passing_auth(self): +        """Ensure POSTing with a write scope succeed""" +        read_write_access_token = self.access_token +        read_write_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['write'] +        read_write_access_token.save() +        auth = self._create_authorization_header(token=read_write_access_token.token) +        params = self._client_credentials_params() +        response = self.csrf_client.post('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth) +        self.assertEqual(response.status_code, 200)  | 
