diff options
Diffstat (limited to 'debug_toolbar/utils/sqlparse/lexer.py')
| -rw-r--r-- | debug_toolbar/utils/sqlparse/lexer.py | 315 | 
1 files changed, 315 insertions, 0 deletions
| diff --git a/debug_toolbar/utils/sqlparse/lexer.py b/debug_toolbar/utils/sqlparse/lexer.py new file mode 100644 index 0000000..727a4ff --- /dev/null +++ b/debug_toolbar/utils/sqlparse/lexer.py @@ -0,0 +1,315 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2008 Andi Albrecht, albrecht.andi@gmail.com +# +# This module is part of python-sqlparse and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php. + +"""SQL Lexer""" + +# This code is based on the SqlLexer in pygments. +# http://pygments.org/ +# It's separated from the rest of pygments to increase performance +# and to allow some customizations. + +import re + +from debug_toolbar.utils.sqlparse.keywords import KEYWORDS, KEYWORDS_COMMON +from debug_toolbar.utils.sqlparse.tokens import * +from debug_toolbar.utils.sqlparse.tokens import _TokenType + + +class include(str): +    pass + +class combined(tuple): +    """Indicates a state combined from multiple states.""" + +    def __new__(cls, *args): +        return tuple.__new__(cls, args) + +    def __init__(self, *args): +        # tuple.__init__ doesn't do anything +        pass + +def is_keyword(value): +    test = value.upper() +    return KEYWORDS_COMMON.get(test, KEYWORDS.get(test, Name)), value + + +def apply_filters(stream, filters, lexer=None): +    """ +    Use this method to apply an iterable of filters to +    a stream. If lexer is given it's forwarded to the +    filter, otherwise the filter receives `None`. +    """ +    def _apply(filter_, stream): +        for token in filter_.filter(lexer, stream): +            yield token +    for filter_ in filters: +        stream = _apply(filter_, stream) +    return stream + + +class LexerMeta(type): +    """ +    Metaclass for Lexer, creates the self._tokens attribute from +    self.tokens on the first instantiation. +    """ + +    def _process_state(cls, unprocessed, processed, state): +        assert type(state) is str, "wrong state name %r" % state +        assert state[0] != '#', "invalid state name %r" % state +        if state in processed: +            return processed[state] +        tokens = processed[state] = [] +        rflags = cls.flags +        for tdef in unprocessed[state]: +            if isinstance(tdef, include): +                # it's a state reference +                assert tdef != state, "circular state reference %r" % state +                tokens.extend(cls._process_state(unprocessed, processed, str(tdef))) +                continue + +            assert type(tdef) is tuple, "wrong rule def %r" % tdef + +            try: +                rex = re.compile(tdef[0], rflags).match +            except Exception, err: +                raise ValueError("uncompilable regex %r in state %r of %r: %s" % +                                 (tdef[0], state, cls, err)) + +            assert type(tdef[1]) is _TokenType or callable(tdef[1]), \ +                   'token type must be simple type or callable, not %r' % (tdef[1],) + +            if len(tdef) == 2: +                new_state = None +            else: +                tdef2 = tdef[2] +                if isinstance(tdef2, str): +                    # an existing state +                    if tdef2 == '#pop': +                        new_state = -1 +                    elif tdef2 in unprocessed: +                        new_state = (tdef2,) +                    elif tdef2 == '#push': +                        new_state = tdef2 +                    elif tdef2[:5] == '#pop:': +                        new_state = -int(tdef2[5:]) +                    else: +                        assert False, 'unknown new state %r' % tdef2 +                elif isinstance(tdef2, combined): +                    # combine a new state from existing ones +                    new_state = '_tmp_%d' % cls._tmpname +                    cls._tmpname += 1 +                    itokens = [] +                    for istate in tdef2: +                        assert istate != state, 'circular state ref %r' % istate +                        itokens.extend(cls._process_state(unprocessed, +                                                          processed, istate)) +                    processed[new_state] = itokens +                    new_state = (new_state,) +                elif isinstance(tdef2, tuple): +                    # push more than one state +                    for state in tdef2: +                        assert (state in unprocessed or +                                state in ('#pop', '#push')), \ +                               'unknown new state ' + state +                    new_state = tdef2 +                else: +                    assert False, 'unknown new state def %r' % tdef2 +            tokens.append((rex, tdef[1], new_state)) +        return tokens + +    def process_tokendef(cls): +        cls._all_tokens = {} +        cls._tmpname = 0 +        processed = cls._all_tokens[cls.__name__] = {} +        #tokendefs = tokendefs or cls.tokens[name] +        for state in cls.tokens.keys(): +            cls._process_state(cls.tokens, processed, state) +        return processed + +    def __call__(cls, *args, **kwds): +        if not hasattr(cls, '_tokens'): +            cls._all_tokens = {} +            cls._tmpname = 0 +            if hasattr(cls, 'token_variants') and cls.token_variants: +                # don't process yet +                pass +            else: +                cls._tokens = cls.process_tokendef() + +        return type.__call__(cls, *args, **kwds) + + + + +class Lexer: + +    __metaclass__ = LexerMeta + +    encoding = 'utf-8' +    stripall = False +    stripnl = False +    tabsize = 0 +    flags = re.IGNORECASE + +    tokens = { +        'root': [ +            (r'--.*?(\r|\n|\r\n)', Comment.Single), +            (r'(\r|\n|\r\n)', Newline), +            (r'\s+', Whitespace), +            (r'/\*', Comment.Multiline, 'multiline-comments'), +            (r':=', Assignment), +            (r'::', Punctuation), +            (r'[*]', Wildcard), +            (r"`(``|[^`])*`", Name), +            (r"´(´´|[^´])*´", Name), +            (r'@[a-zA-Z_][a-zA-Z0-9_]+', Name), +            (r'[+/<>=~!@#%^&|`?^-]', Operator), +            (r'[0-9]+', Number.Integer), +            # TODO: Backslash escapes? +            (r"'(''|[^'])*'", String.Single), +            (r'"(""|[^"])*"', String.Symbol), # not a real string literal in ANSI SQL +            (r'(LEFT |RIGHT )?(INNER |OUTER )?JOIN', Keyword), +            (r'END( IF| LOOP)?', Keyword), +            (r'CREATE( OR REPLACE)?', Keyword.DDL), +            (r'[a-zA-Z_][a-zA-Z0-9_]*', is_keyword), +            (r'\$([a-zA-Z_][a-zA-Z0-9_]*)?\$', Name.Builtin), +            (r'[;:()\[\],\.]', Punctuation), +        ], +        'multiline-comments': [ +            (r'/\*', Comment.Multiline, 'multiline-comments'), +            (r'\*/', Comment.Multiline, '#pop'), +            (r'[^/\*]+', Comment.Multiline), +            (r'[/*]', Comment.Multiline) +        ] +    } + +    def __init__(self): +        self.filters = [] + +    def add_filter(self, filter_, **options): +        from sqlparse.filters import Filter +        if not isinstance(filter_, Filter): +            filter_ = filter_(**options) +        self.filters.append(filter_) + +    def get_tokens(self, text, unfiltered=False): +        """ +        Return an iterable of (tokentype, value) pairs generated from +        `text`. If `unfiltered` is set to `True`, the filtering mechanism +        is bypassed even if filters are defined. + +        Also preprocess the text, i.e. expand tabs and strip it if +        wanted and applies registered filters. +        """ +        if not isinstance(text, unicode): +            if self.encoding == 'guess': +                try: +                    text = text.decode('utf-8') +                    if text.startswith(u'\ufeff'): +                        text = text[len(u'\ufeff'):] +                except UnicodeDecodeError: +                    text = text.decode('latin1') +            elif self.encoding == 'chardet': +                try: +                    import chardet +                except ImportError: +                    raise ImportError('To enable chardet encoding guessing, ' +                                      'please install the chardet library ' +                                      'from http://chardet.feedparser.org/') +                enc = chardet.detect(text) +                text = text.decode(enc['encoding']) +            else: +                text = text.decode(self.encoding) +        if self.stripall: +            text = text.strip() +        elif self.stripnl: +            text = text.strip('\n') +        if self.tabsize > 0: +            text = text.expandtabs(self.tabsize) +#        if not text.endswith('\n'): +#            text += '\n' + +        def streamer(): +            for i, t, v in self.get_tokens_unprocessed(text): +                yield t, v +        stream = streamer() +        if not unfiltered: +            stream = apply_filters(stream, self.filters, self) +        return stream + + +    def get_tokens_unprocessed(self, text, stack=('root',)): +        """ +        Split ``text`` into (tokentype, text) pairs. + +        ``stack`` is the inital stack (default: ``['root']``) +        """ +        pos = 0 +        tokendefs = self._tokens +        statestack = list(stack) +        statetokens = tokendefs[statestack[-1]] +        known_names = {} +        while 1: +            for rexmatch, action, new_state in statetokens: +                m = rexmatch(text, pos) +                if m: +                    # print rex.pattern +                    value = m.group() +                    if value in known_names: +                        yield pos, known_names[value], value +                    elif type(action) is _TokenType: +                        yield pos, action, value +                    elif hasattr(action, '__call__'): +                        ttype, value = action(value) +                        known_names[value] = ttype +                        yield pos, ttype, value +                    else: +                        for item in action(self, m): +                            yield item +                    pos = m.end() +                    if new_state is not None: +                        # state transition +                        if isinstance(new_state, tuple): +                            for state in new_state: +                                if state == '#pop': +                                    statestack.pop() +                                elif state == '#push': +                                    statestack.append(statestack[-1]) +                                else: +                                    statestack.append(state) +                        elif isinstance(new_state, int): +                            # pop +                            del statestack[new_state:] +                        elif new_state == '#push': +                            statestack.append(statestack[-1]) +                        else: +                            assert False, "wrong state def: %r" % new_state +                        statetokens = tokendefs[statestack[-1]] +                    break +            else: +                try: +                    if text[pos] == '\n': +                        # at EOL, reset state to "root" +                        pos += 1 +                        statestack = ['root'] +                        statetokens = tokendefs['root'] +                        yield pos, Text, u'\n' +                        continue +                    yield pos, Error, text[pos] +                    pos += 1 +                except IndexError: +                    break + + +def tokenize(sql): +    """Tokenize sql. + +    Tokenize *sql* using the :class:`Lexer` and return a 2-tuple stream +    of ``(token type, value)`` items. +    """ +    lexer = Lexer() +    return lexer.get_tokens(sql) | 
