aboutsummaryrefslogtreecommitdiffstats
path: root/rest_framework
diff options
context:
space:
mode:
authorTom Christie2013-09-10 20:21:15 +0100
committerTom Christie2013-09-10 20:21:15 +0100
commit75fb4b02b40da04f16c6c288bbe20ea0bc0b4154 (patch)
tree97e7b26506eed113a15d00ae8e9d8f438e0bef60 /rest_framework
parentf5c34926d6a4b4b29fb083d25b99b10d7431eee4 (diff)
parent23fc9dd53fcd9cc25e2c77e5ffae395f04d4990d (diff)
downloaddjango-rest-framework-75fb4b02b40da04f16c6c288bbe20ea0bc0b4154.tar.bz2
Merge branch 'master' of git://github.com/bwreilly/django-rest-framework into bwreilly-master
Diffstat (limited to 'rest_framework')
-rw-r--r--rest_framework/compat.py6
-rw-r--r--rest_framework/filters.py18
-rw-r--r--rest_framework/permissions.py45
-rw-r--r--rest_framework/runtests/settings.py15
-rw-r--r--rest_framework/tests/test_permissions.py173
5 files changed, 214 insertions, 43 deletions
diff --git a/rest_framework/compat.py b/rest_framework/compat.py
index 6f7447ad..b9d1dae6 100644
--- a/rest_framework/compat.py
+++ b/rest_framework/compat.py
@@ -47,6 +47,12 @@ try:
except ImportError:
django_filters = None
+# guardian is optional
+try:
+ import guardian
+except ImportError:
+ guardian = None
+
# cStringIO only if it's available, otherwise StringIO
try:
diff --git a/rest_framework/filters.py b/rest_framework/filters.py
index 4079e1bd..6d46ad23 100644
--- a/rest_framework/filters.py
+++ b/rest_framework/filters.py
@@ -4,7 +4,7 @@ returned by list views.
"""
from __future__ import unicode_literals
from django.db import models
-from rest_framework.compat import django_filters, six
+from rest_framework.compat import django_filters, six, guardian
from functools import reduce
import operator
@@ -23,6 +23,22 @@ class BaseFilterBackend(object):
raise NotImplementedError(".filter_queryset() must be overridden.")
+class ObjectPermissionReaderFilter(BaseFilterBackend):
+ """
+ A filter backend that limits results to those where the requesting user
+ has read object level permissions.
+ """
+ def __init__(self):
+ assert guardian, 'Using ObjectPermissionReaderFilter, but django-guardian is not installed'
+
+ def filter_queryset(self, request, queryset, view):
+ user = request.user
+ model_cls = queryset.model
+ model_name = model_cls._meta.module_name
+ permission = 'read_' + model_name
+ return guardian.shortcuts.get_objects_for_user(user, permission, queryset)
+
+
class DjangoFilterBackend(BaseFilterBackend):
"""
A filter backend that uses django-filter.
diff --git a/rest_framework/permissions.py b/rest_framework/permissions.py
index 1036663e..70bf9c61 100644
--- a/rest_framework/permissions.py
+++ b/rest_framework/permissions.py
@@ -7,6 +7,7 @@ import warnings
SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS']
+from django.http import Http404
from rest_framework.compat import oauth2_provider_scope, oauth2_constants
@@ -151,6 +152,50 @@ class DjangoModelPermissionsOrAnonReadOnly(DjangoModelPermissions):
authenticated_users_only = False
+class DjangoObjectLevelModelPermissions(DjangoModelPermissions):
+ """
+ The request is authenticated using `django.contrib.auth` permissions.
+ See: https://docs.djangoproject.com/en/dev/topics/auth/#permissions
+
+ It ensures that the user is authenticated, and has the appropriate
+ `add`/`change`/`delete` permissions on the object using .has_perms.
+
+ This permission can only be applied against view classes that
+ provide a `.model` or `.queryset` attribute.
+ """
+
+ actions_map = {
+ 'GET': ['read_%(model_name)s'],
+ 'OPTIONS': ['read_%(model_name)s'],
+ 'HEAD': ['read_%(model_name)s'],
+ 'POST': ['add_%(model_name)s'],
+ 'PUT': ['change_%(model_name)s'],
+ 'PATCH': ['change_%(model_name)s'],
+ 'DELETE': ['delete_%(model_name)s'],
+ }
+
+ def get_required_object_permissions(self, method, model_cls):
+ kwargs = {
+ 'model_name': model_cls._meta.module_name
+ }
+ return [perm % kwargs for perm in self.actions_map[method]]
+
+ def has_object_permission(self, request, view, obj):
+ model_cls = getattr(view, 'model', None)
+ queryset = getattr(view, 'queryset', None)
+
+ if model_cls is None and queryset is not None:
+ model_cls = queryset.model
+
+ perms = self.get_required_object_permissions(request.method, model_cls)
+ user = request.user
+
+ check = user.has_perms(perms, obj)
+ if not check:
+ raise Http404
+ return user.has_perms(perms, obj)
+
+
class TokenHasReadWriteScope(BasePermission):
"""
The request is authenticated as a user and the token used has the right scope
diff --git a/rest_framework/runtests/settings.py b/rest_framework/runtests/settings.py
index b3702d0b..be721658 100644
--- a/rest_framework/runtests/settings.py
+++ b/rest_framework/runtests/settings.py
@@ -123,6 +123,21 @@ else:
'provider.oauth2',
)
+# guardian is optional
+try:
+ import guardian
+except ImportError:
+ pass
+else:
+ ANONYMOUS_USER_ID = -1
+ AUTHENTICATION_BACKENDS = (
+ 'django.contrib.auth.backends.ModelBackend', # default
+ 'guardian.backends.ObjectPermissionBackend',
+ )
+ INSTALLED_APPS += (
+ 'guardian',
+ )
+
STATIC_URL = '/static/'
PASSWORD_HASHERS = (
diff --git a/rest_framework/tests/test_permissions.py b/rest_framework/tests/test_permissions.py
index e2cca380..2d866cd0 100644
--- a/rest_framework/tests/test_permissions.py
+++ b/rest_framework/tests/test_permissions.py
@@ -1,18 +1,16 @@
from __future__ import unicode_literals
-from django.contrib.auth.models import User, Permission
+from django.contrib.auth.models import User, Permission, Group
from django.db import models
from django.test import TestCase
from rest_framework import generics, status, permissions, authentication, HTTP_HEADER_ENCODING
+from rest_framework.compat import guardian
+from rest_framework.filters import ObjectPermissionReaderFilter
from rest_framework.test import APIRequestFactory
+from rest_framework.tests.models import BasicModel
import base64
factory = APIRequestFactory()
-
-class BasicModel(models.Model):
- text = models.CharField(max_length=100)
-
-
class RootView(generics.ListCreateAPIView):
model = BasicModel
authentication_classes = [authentication.BasicAuthentication]
@@ -144,45 +142,136 @@ class ModelPermissionsIntegrationTests(TestCase):
self.assertEqual(list(response.data['actions'].keys()), ['PUT'])
-class OwnerModel(models.Model):
+class BasicPermModel(models.Model):
text = models.CharField(max_length=100)
- owner = models.ForeignKey(User)
-
-class IsOwnerPermission(permissions.BasePermission):
- def has_object_permission(self, request, view, obj):
- return request.user == obj.owner
+ class Meta:
+ app_label = 'tests'
+ permissions = (
+ ('read_basicpermmodel', 'Can view basic perm model'),
+ # add, change, delete built in to django
+ )
-
-class OwnerInstanceView(generics.RetrieveUpdateDestroyAPIView):
- model = OwnerModel
+class ObjectPermissionInstanceView(generics.RetrieveUpdateDestroyAPIView):
+ model = BasicPermModel
authentication_classes = [authentication.BasicAuthentication]
- permission_classes = [IsOwnerPermission]
-
-
-owner_instance_view = OwnerInstanceView.as_view()
-
+ permission_classes = [permissions.DjangoObjectLevelModelPermissions]
-class ObjectPermissionsIntegrationTests(TestCase):
- """
- Integration tests for the object level permissions API.
- """
+object_permissions_view = ObjectPermissionInstanceView.as_view()
- def setUp(self):
- User.objects.create_user('not_owner', 'not_owner@example.com', 'password')
- user = User.objects.create_user('owner', 'owner@example.com', 'password')
-
- self.not_owner_credentials = basic_auth_header('not_owner', 'password')
- self.owner_credentials = basic_auth_header('owner', 'password')
-
- OwnerModel(text='foo', owner=user).save()
-
- def test_owner_has_delete_permissions(self):
- request = factory.delete('/1', HTTP_AUTHORIZATION=self.owner_credentials)
- response = owner_instance_view(request, pk='1')
- self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
-
- def test_non_owner_does_not_have_delete_permissions(self):
- request = factory.delete('/1', HTTP_AUTHORIZATION=self.not_owner_credentials)
- response = owner_instance_view(request, pk='1')
- self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
+class ObjectPermissionListView(generics.ListAPIView):
+ model = BasicPermModel
+ authentication_classes = [authentication.BasicAuthentication]
+ permission_classes = [permissions.DjangoObjectLevelModelPermissions]
+
+object_permissions_list_view = ObjectPermissionListView.as_view()
+
+if guardian:
+ from guardian.shortcuts import assign_perm
+
+ class ObjectPermissionsIntegrationTests(TestCase):
+ """
+ Integration tests for the object level permissions API.
+ """
+ @classmethod
+ def setUpClass(cls):
+ # create users
+ create = User.objects.create_user
+ users = {
+ 'fullaccess': create('fullaccess', 'fullaccess@example.com', 'password'),
+ 'readonly': create('readonly', 'readonly@example.com', 'password'),
+ 'writeonly': create('writeonly', 'writeonly@example.com', 'password'),
+ 'deleteonly': create('deleteonly', 'deleteonly@example.com', 'password'),
+ }
+
+ # give everyone model level permissions, as we are not testing those
+ everyone = Group.objects.create(name='everyone')
+ model_name = BasicPermModel._meta.module_name
+ app_label = BasicPermModel._meta.app_label
+ f = '{0}_{1}'.format
+ perms = {
+ 'read': f('read', model_name),
+ 'change': f('change', model_name),
+ 'delete': f('delete', model_name)
+ }
+ for perm in perms.values():
+ perm = '{0}.{1}'.format(app_label, perm)
+ assign_perm(perm, everyone)
+ everyone.user_set.add(*users.values())
+
+ cls.perms = perms
+ cls.users = users
+
+ def setUp(self):
+ perms = self.perms
+ users = self.users
+
+ # appropriate object level permissions
+ readers = Group.objects.create(name='readers')
+ writers = Group.objects.create(name='writers')
+ deleters = Group.objects.create(name='deleters')
+
+ model = BasicPermModel.objects.create(text='foo')
+
+ assign_perm(perms['read'], readers, model)
+ assign_perm(perms['change'], writers, model)
+ assign_perm(perms['delete'], deleters, model)
+
+ readers.user_set.add(users['fullaccess'], users['readonly'])
+ writers.user_set.add(users['fullaccess'], users['writeonly'])
+ deleters.user_set.add(users['fullaccess'], users['deleteonly'])
+
+ self.credentials = {}
+ for user in users.values():
+ self.credentials[user.username] = basic_auth_header(user.username, 'password')
+
+ # Delete
+ def test_can_delete_permissions(self):
+ request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['deleteonly'])
+ response = object_permissions_view(request, pk='1')
+ self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
+
+ def test_cannot_delete_permissions(self):
+ request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['readonly'])
+ response = object_permissions_view(request, pk='1')
+ self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+
+ # Update
+ def test_can_update_permissions(self):
+ request = factory.patch('/1', {'text': 'foobar'}, format='json',
+ HTTP_AUTHORIZATION=self.credentials['writeonly'])
+ response = object_permissions_view(request, pk='1')
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data.get('text'), 'foobar')
+
+ def test_cannot_update_permissions(self):
+ request = factory.patch('/1', {'text': 'foobar'}, format='json',
+ HTTP_AUTHORIZATION=self.credentials['deleteonly'])
+ response = object_permissions_view(request, pk='1')
+ self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+
+ # Read
+ def test_can_read_permissions(self):
+ request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['readonly'])
+ response = object_permissions_view(request, pk='1')
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+
+ def test_cannot_read_permissions(self):
+ request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['writeonly'])
+ response = object_permissions_view(request, pk='1')
+ self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+
+ # Read list
+ def test_can_read_list_permissions(self):
+ request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['readonly'])
+ object_permissions_list_view.cls.filter_backends = (ObjectPermissionReaderFilter,)
+ response = object_permissions_list_view(request)
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertEqual(response.data[0].get('id'), 1)
+
+ def test_cannot_read_list_permissions(self):
+ request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['writeonly'])
+ object_permissions_list_view.cls.filter_backends = (ObjectPermissionReaderFilter,)
+ response = object_permissions_list_view(request)
+ self.assertEqual(response.status_code, status.HTTP_200_OK)
+ self.assertListEqual(response.data, []) \ No newline at end of file