aboutsummaryrefslogtreecommitdiffstats
path: root/rest_framework/pagination.py
blob: 69d0f77d3b7ddc7d917e2b93fc9356bef39dd1c9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""
Pagination serializers determine the structure of the output that should
be used for paginated responses.
"""
from __future__ import unicode_literals
from collections import namedtuple
from django.core.paginator import InvalidPage, Paginator as DjangoPaginator
from django.template import Context, loader
from django.utils import six
from django.utils.translation import ugettext as _
from rest_framework.compat import OrderedDict
from rest_framework.exceptions import NotFound
from rest_framework.response import Response
from rest_framework.settings import api_settings
from rest_framework.templatetags.rest_framework import (
    replace_query_param, remove_query_param
)


def _strict_positive_int(integer_string, cutoff=None):
    """
    Cast a string to a strictly positive integer.
    """
    ret = int(integer_string)
    if ret <= 0:
        raise ValueError()
    if cutoff:
        ret = min(ret, cutoff)
    return ret


def _get_count(queryset):
    """
    Determine an object count, supporting either querysets or regular lists.
    """
    try:
        return queryset.count()
    except AttributeError:
        return len(queryset)


def _get_displayed_page_numbers(current, final):
    """
    This utility function determines a list of page numbers to display.
    This gives us a nice contextually relevant set of page numbers.

    For example:
    current=14, final=16 -> [1, None, 13, 14, 15, 16]

    This implementation gives one page to each side of the cursor,
    for an implementation which gives two pages to each side of the cursor,
    which is a copy of how GitHub treat pagination in their issue lists, see:

    https://gist.github.com/tomchristie/321140cebb1c4a558b15
    """
    assert current >= 1
    assert final >= current

    # We always include the first two pages, last two pages, and
    # two pages either side of the current page.
    included = set((
        1,
        current - 1, current, current + 1,
        final
    ))

    # If the break would only exclude a single page number then we
    # may as well include the page number instead of the break.
    if current <= 4:
        included.add(2)
        included.add(3)
    if current >= final - 3:
        included.add(final - 1)
        included.add(final - 2)

    # Now sort the page numbers and drop anything outside the limits.
    included = [
        idx for idx in sorted(list(included))
        if idx > 0 and idx <= final
    ]

    # Finally insert any `...` breaks
    if current > 4:
        included.insert(1, None)
    if current < final - 3:
        included.insert(len(included) - 1, None)
    return included


PageLink = namedtuple('PageLink', ['url', 'number', 'is_active', 'is_break'])


class BasePagination(object):
    def paginate_queryset(self, queryset, request, view):
        raise NotImplemented('paginate_queryset() must be implemented.')

    def get_paginated_response(self, data):
        raise NotImplemented('get_paginated_response() must be implemented.')


class PageNumberPagination(BasePagination):
    """
    A simple page number based style that supports page numbers as
    query parameters. For example:

    http://api.example.org/accounts/?page=4
    http://api.example.org/accounts/?page=4&page_size=100
    """
    # The default page size.
    # Defaults to `None`, meaning pagination is disabled.
    paginate_by = api_settings.PAGINATE_BY

    # Client can control the page using this query parameter.
    page_query_param = 'page'

    # Client can control the page size using this query parameter.
    # Default is 'None'. Set to eg 'page_size' to enable usage.
    paginate_by_param = api_settings.PAGINATE_BY_PARAM

    # Set to an integer to limit the maximum page size the client may request.
    # Only relevant if 'paginate_by_param' has also been set.
    max_paginate_by = api_settings.MAX_PAGINATE_BY

    template = 'rest_framework/pagination/numbers.html'

    def paginate_queryset(self, queryset, request, view):
        """
        Paginate a queryset if required, either returning a
        page object, or `None` if pagination is not configured for this view.
        """
        for attr in (
            'paginate_by', 'page_query_param',
            'paginate_by_param', 'max_paginate_by'
        ):
            if hasattr(view, attr):
                setattr(self, attr, getattr(view, attr))

        page_size = self.get_page_size(request)
        if not page_size:
            return None

        paginator = DjangoPaginator(queryset, page_size)
        page_string = request.query_params.get(self.page_query_param, 1)
        try:
            page_number = paginator.validate_number(page_string)
        except InvalidPage:
            if page_string == 'last':
                page_number = paginator.num_pages
            else:
                msg = _(
                    'Choose a valid page number. Page numbers must be a '
                    'whole number, or must be the string "last".'
                )
                raise NotFound(msg)

        try:
            self.page = paginator.page(page_number)
        except InvalidPage as exc:
            msg = _('Invalid page "{page_number}": {message}.').format(
                page_number=page_number, message=six.text_type(exc)
            )
            raise NotFound(msg)

        # Indicate that the browsable API should display pagination controls.
        self.mark_as_used = True
        self.request = request
        return self.page

    def get_paginated_response(self, data):
        return Response(OrderedDict([
            ('count', self.page.paginator.count),
            ('next', self.get_next_link()),
            ('previous', self.get_previous_link()),
            ('results', data)
        ]))

    def get_page_size(self, request):
        if self.paginate_by_param:
            try:
                return _strict_positive_int(
                    request.query_params[self.paginate_by_param],
                    cutoff=self.max_paginate_by
                )
            except (KeyError, ValueError):
                pass

        return self.paginate_by

    def get_next_link(self):
        if not self.page.has_next():
            return None
        url = self.request.build_absolute_uri()
        page_number = self.page.next_page_number()
        return replace_query_param(url, self.page_query_param, page_number)

    def get_previous_link(self):
        if not self.page.has_previous():
            return None
        url = self.request.build_absolute_uri()
        page_number = self.page.previous_page_number()
        if page_number == 1:
            return remove_query_param(url, self.page_query_param)
        return replace_query_param(url, self.page_query_param, page_number)

    def to_html(self):
        current = self.page.number
        final = self.page.paginator.num_pages

        page_links = []
        base_url = self.request.build_absolute_uri()
        for page_number in _get_displayed_page_numbers(current, final):
            if page_number is None:
                page_link = PageLink(
                    url=None,
                    number=None,
                    is_active=False,
                    is_break=True
                )
            else:
                if page_number == 1:
                    url = remove_query_param(base_url, self.page_query_param)
                else:
                    url = replace_query_param(url, self.page_query_param, page_number)
                page_link = PageLink(
                    url=url,
                    number=page_number,
                    is_active=(page_number == current),
                    is_break=False
                )
            page_links.append(page_link)

        template = loader.get_template(self.template)
        context = Context({
            'previous_url': self.get_previous_link(),
            'next_url': self.get_next_link(),
            'page_links': page_links
        })
        return template.render(context)


class LimitOffsetPagination(BasePagination):
    """
    A limit/offset based style. For example:

    http://api.example.org/accounts/?limit=100
    http://api.example.org/accounts/?offset=400&limit=100
    """
    default_limit = api_settings.PAGINATE_BY
    limit_query_param = 'limit'
    offset_query_param = 'offset'
    max_limit = None

    def paginate_queryset(self, queryset, request, view):
        self.limit = self.get_limit(request)
        self.offset = self.get_offset(request)
        self.count = _get_count(queryset)
        self.request = request
        return queryset[self.offset:self.offset + self.limit]

    def get_paginated_response(self, data):
        return Response(OrderedDict([
            ('count', self.count),
            ('next', self.get_next_link()),
            ('previous', self.get_previous_link()),
            ('results', data)
        ]))

    def get_limit(self, request):
        if self.limit_query_param:
            try:
                return _strict_positive_int(
                    request.query_params[self.limit_query_param],
                    cutoff=self.max_limit
                )
            except (KeyError, ValueError):
                pass

        return self.default_limit

    def get_offset(self, request):
        try:
            return _strict_positive_int(
                request.query_params[self.offset_query_param],
            )
        except (KeyError, ValueError):
            return 0

    def get_next_link(self, page):
        if self.offset + self.limit >= self.count:
            return None
        url = self.request.build_absolute_uri()
        offset = self.offset + self.limit
        return replace_query_param(url, self.offset_query_param, offset)

    def get_previous_link(self, page):
        if self.offset - self.limit < 0:
            return None
        url = self.request.build_absolute_uri()
        offset = self.offset - self.limit
        return replace_query_param(url, self.offset_query_param, offset)