小知识,大挑战!本文正在参与“程序员必备小知识”创作活动
python的修饰器用来做JWT验证感觉看起来就非常优雅,这篇博客将介绍使用python修饰器为flask添加权限组验证的JWT。
首先我们需要一个token生成函数,将用户的id写入token中,在后续解析中,就可以通过存储的用户id进行身份的判定:
def generate_token(user, expiration=864000):
# 默认十天过期
s = Serializer(SECRET_KEY, expires_in=expiration) # expiration是过期时间3600:一小时
token = s.dumps({'id': user.id}).decode('utf-8')
return token
复制代码
一、简易登录验证
def login_required(func):
@functools.wraps(func)
def wrapper(*args, **kwags):
token = request.headers["Authorization"].split()[-1]
s = Serializer(SECRET_KEY)
try:
user_id = s.loads(token)["id"]
if not users.find_by_id(user_id):
# 用户不存在,返回404
return make_resp({"error_msg": "用户不存在"}, code=404)
except SignatureExpired:
# token正确但是过期了
return make_resp({"error_msg": "token过期"}, code=50001)
except BadSignature:
# token错误
return make_resp({"error_msg": "token错误"}, code=50000)
return func(user, *args, **kwags)
return wrapper
复制代码
编写login_required进行登录验证,若token错误或过期,则返回相应的错误码,否则返回user对象,方便后续的操作。
使用方法如下,在view函数中,需要添加一个变量,进行参数的存储:
二、更进一步,拓展权限组功能
def login_required(*auths_need):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwags):
try:
'''在请求头上拿到token'''
token = request.headers["Authorization"]
except Exception as e:
'''没接收的到token,给前端抛出错误'''
return serialization.make_resp("", code=400)
token = token.split()[-1]
s = Serializer(SECRET_KEY)
try:
user_id = s.loads(token)["id"]
user = users.find_by_id(user_id)
if not user:
"""用户不存在"""
return serialization.make_resp({"error_msg": "用户不存在"}, code=404)
if auths_need:
'''获取token中的权限列表如果在参数列表中则表示有权限,否则就表示没有权限'''
if not [auth.name for auth in user.auths if auth.name in auths_need]:
# 交集为空,表示权限不足
return serialization.make_resp({"error_msg": "权限不足"}, code=401)
except SignatureExpired:
'''token正确但是过期了'''
return serialization.make_resp({"error_msg": "token过期"}, code=50001)
except BadSignature:
'''token错误'''
return serialization.make_resp({"error_msg": "token错误"}, code=50000)
return func(user, *args, **kwags)
return wrapper
return decorator
复制代码
与上面类似,但是可以看到多了一步鉴权的操作,通过读取用户的权限组与输入的权限列表取交集,若交集非空,则可通过验证。
其使用方法如下:
@admin.route("/students/upload", methods=['POST'])
@login_required("SuperAdmin", "Admin")
def students_upload():
复制代码
这样,权限包含SuperAdmin或Admin的用户就可以通过权限验证,实现访问了。




近期评论