[Python-Flask]JWT权限组实现

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动

python的修饰器用来做JWT验证感觉看起来就非常优雅,这篇博客将介绍使用python修饰器为flask添加权限组验证的JWT。

首先我们需要一个token生成函数,将用户的id写入token中,在后续解析中,就可以通过存储的用户id进行身份的判定:

def generate_token(user, expiration=864000):
    # 默认十天过期
    s = Serializer(SECRET_KEY, expires_in=expiration)   # expiration是过期时间3600:一小时
    token = s.dumps({'id': user.id}).decode('utf-8')
    return token
复制代码

一、简易登录验证

def login_required(func):
    @functools.wraps(func)
    def wrapper(*args, **kwags):
        token = request.headers["Authorization"].split()[-1]
        s = Serializer(SECRET_KEY)
        try:
            user_id = s.loads(token)["id"]
            if not users.find_by_id(user_id):
                # 用户不存在,返回404
                return make_resp({"error_msg": "用户不存在"}, code=404)
        except SignatureExpired:
            # token正确但是过期了
            return make_resp({"error_msg": "token过期"}, code=50001)
        except BadSignature:
            # token错误
            return make_resp({"error_msg": "token错误"}, code=50000)
        return func(user, *args, **kwags)
    return wrapper
复制代码

编写login_required进行登录验证,若token错误或过期,则返回相应的错误码,否则返回user对象,方便后续的操作。
使用方法如下,在view函数中,需要添加一个变量,进行参数的存储:
image.png

二、更进一步,拓展权限组功能

def login_required(*auths_need):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwags):
            try:
                '''在请求头上拿到token'''
                token = request.headers["Authorization"]
            except Exception as e:
                '''没接收的到token,给前端抛出错误'''
                return serialization.make_resp("", code=400)
            token = token.split()[-1]
            s = Serializer(SECRET_KEY)
            try:
                user_id = s.loads(token)["id"]
                user = users.find_by_id(user_id)
                if not user:
                    """用户不存在"""
                    return serialization.make_resp({"error_msg": "用户不存在"}, code=404)
                if auths_need:
                    '''获取token中的权限列表如果在参数列表中则表示有权限,否则就表示没有权限'''
                    if not [auth.name for auth in user.auths if auth.name in auths_need]:
                        # 交集为空,表示权限不足
                        return serialization.make_resp({"error_msg": "权限不足"}, code=401)
            except SignatureExpired:
                '''token正确但是过期了'''
                return serialization.make_resp({"error_msg": "token过期"}, code=50001)
            except BadSignature:
                '''token错误'''
                return serialization.make_resp({"error_msg": "token错误"}, code=50000)
            return func(user, *args, **kwags)
        return wrapper
    return decorator
复制代码

与上面类似,但是可以看到多了一步鉴权的操作,通过读取用户的权限组与输入的权限列表取交集,若交集非空,则可通过验证。

其使用方法如下:

@admin.route("/students/upload", methods=['POST'])
@login_required("SuperAdmin", "Admin")
def students_upload():
复制代码

这样,权限包含SuperAdmin或Admin的用户就可以通过权限验证,实现访问了。