sysom1/sysom_server/sysom_diagnosis/lib/authentications.py

65 lines
2.1 KiB
Python

import os
from clogger import logger
from importlib import import_module
from typing import List
from django.conf import settings
from django.utils.translation import ugettext as _
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.request import Request
from rest_framework.authentication import BaseAuthentication
from sysom_utils import SysomFramework
def get_jwt_decode_classes() -> List[BaseAuthentication]:
jwt_decode_classes = []
import_strings = [
f'lib.decode.{f.replace(".py", "")}' for f in
filter(lambda f: f.endswith(".py"), os.listdir(
settings.JWT_TOKEN_DECODE_DIR))
]
for string in import_strings:
module = import_module(string)
try:
m = getattr(module, 'JWTTokenDecode')
jwt_decode_classes.append(m)
except Exception as exc:
logger.warning(exc)
return jwt_decode_classes
def decode_token(token: str) -> dict:
error_message, success, result = "", False, {}
for auth_class in get_jwt_decode_classes():
result, success = auth_class().decode(token)
if not success:
error_message += result
else:
break
if not success:
raise AuthenticationFailed(error_message)
return result
class TokenAuthentication(BaseAuthentication):
def authenticate(self, request: Request):
token = request.META.get('HTTP_AUTHORIZATION')
is_local = request.META.get("REMOTE_HOST", "") in ["localhost", "127.0.0.1"]
try:
payload = decode_token(token)
except Exception as exc:
if is_local:
return {"id": 1, "token": "local"}, _
else:
raise exc
# 判断用户是否已经手动注销登录
if SysomFramework.gcache("JWT_TOKEN").load(token) is None:
if is_local:
return {"id": 1, "token": "local"}, _
else:
raise AuthenticationFailed('用户已退出登录!')
payload['token'] = token
if 'sub' in payload:
payload['id'] = int(payload['sub'])
return payload, _