sysom1/sysom_server/sysom_api/apps/accounts/views.py

393 lines
15 KiB
Python

from datetime import datetime
from clogger import logger
from typing import Union
from django.conf import settings
from rest_framework.request import Request
from rest_framework.viewsets import GenericViewSet
from rest_framework import mixins, status
from rest_framework.status import *
from rest_framework.views import APIView
from rest_framework.generics import CreateAPIView
from apps.accounts.permissions import IsAdminPermission
from apps.accounts.serializer import UserAuthSerializer
from apps.accounts.authentication import Authentication
from django_filters.rest_framework import DjangoFilterBackend
from . import models
from . import serializer
from lib.response import success, other_response
from lib.exception import APIException
from sysom_utils import SysomFramework
class UserModelViewSet(
GenericViewSet,
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
mixins.RetrieveModelMixin
):
queryset = models.User.objects.all()
serializer_class = serializer.UserListSerializer
# permission_classes = [IsAdminPermission]
filter_backends = [DjangoFilterBackend]
filterset_fields = ['username']
logging_options = {
'login': 0,
'action': 1
}
def get_serializer_class(self):
method = self.request.method
if method == 'GET':
return serializer.UserListSerializer
elif method == 'PUT':
return serializer.UpdateUserSerializer
else:
return serializer.AddUserSerializer
def get_permissions(self):
if self.request.method == "GET":
return []
else:
return [permission() for permission in self.permission_classes]
def get_user_info(self, request):
"""获取用户信息"""
serializer = self.get_serializer(request.user)
return success(result=serializer.data)
def create(self, request, *args, **kwargs):
response = super().create(request, *args, **kwargs)
return success(result=response.data, message="创建成功")
def list(self, request, *args, **kwargs):
# data = super().list(request, *args, **kwargs)
return super().list(request, *args, **kwargs)
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
u_serializer = self.get_serializer(instance, data=request.data, partial=partial)
u_serializer.is_valid(raise_exception=True)
self.perform_update(u_serializer)
result = serializer.UserListSerializer(instance=u_serializer.instance, many=False)
return success(result=result.data, message="修改成功")
def destroy(self, request, *args, **kwargs):
super().destroy(request, *args, **kwargs)
return success(result={}, message="删除成功")
def retrieve(self, request, *args, **kwargs):
result = super().retrieve(request, *args, **kwargs)
return success(result=result.data, message="获取成功")
def get_logs(self, request):
queryset = self._filter_log_params(request, models.HandlerLog.objects.select_related().all())
user = getattr(request, 'user', None)
if not user.is_admin:
queryset = queryset.filter(user=user)
page = self.paginate_queryset(queryset)
if page is not None:
ser = serializer.HandlerLoggerListSerializer(page, many=True)
return self.get_paginated_response(ser.data)
ser = serializer.HandlerLoggerListSerializer(queryset, many=True)
return success(result=ser.data)
def _filter_log_params(self, request, queryset):
kwargs = dict()
params = request.query_params.dict()
request_option = params.pop('request_option', None)
if request_option:
option = getattr(models.HandlerOptionEnum, request_option.upper()).value
if option is not None:
kwargs['request_option'] = option
request_ip = params.get('request_ip', "localhost")
request_url = params.get('request_url', "")
request_method: str = params.get('request_method', "")
response_status = params.get('response_status', "")
start_time = params.get('startTime', '2000-01-01 00:00:00')
end_time = params.get('endTime', datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if request_ip:
kwargs['request_ip'] = request_ip
if request_url:
kwargs['request_url'] = request_url
if request_method:
kwargs['request_method'] = request_method
if response_status:
kwargs['response_status'] = response_status
queryset = queryset.filter(created_at__range=[start_time, end_time], **kwargs)
return queryset
def get_response_code(self, request):
status_map = [{'label': k, 'value': v}for k, v in globals().items() if k.startswith('HTTP')]
return success(result=status_map)
class AuthAPIView(CreateAPIView):
authentication_classes = []
def post(self, request):
ser = UserAuthSerializer(data=request.data)
ser.is_valid(raise_exception=True)
u, t = ser.create_token()
u_ser = serializer.UserListSerializer(instance=u, many=False)
result = u_ser.data
result.update({"token": t})
return other_response(message="登录成功", code=200, result=result)
def _add_log_info(action: str):
'''
@Description: 记录登录或注销日志 (装饰器)
@param {*} action 处理业务的方法 login / logout
'''
def wrapper(func):
def inner(view, request, *args, **kwargs):
res = func(view, request, *args, **kwargs)
option: models.HandlerOptionEnum = getattr(models.HandlerOptionEnum, action.upper())
kwargs = dict()
kwargs['request_ip'] = request.META.get('REMOTE_ADDR', None)
kwargs['request_url'] = request.path
kwargs['request_method'] = request.method
kwargs['response_status'] = 200
kwargs['handler_view'] = action
kwargs['request_option'] = option.value
kwargs['request_browser_agent'] = request.headers.get('User-Agent', '')
kwargs['user'] = models.User.objects.get(pk=1)
try:
models.HandlerLog.objects.create(**kwargs)
except Exception as e:
logger.error(e)
return res
return inner
return wrapper
class AccountAuthView(GenericViewSet):
queryset = models.User.objects.filter(deleted_at=None)
authentication_classes = [Authentication]
@_add_log_info(action='login')
def login(self, request):
"""用户登录,获取接口令牌
token存放在redis中, 如果用户已登录,
就返回token中的用户token, 若token过
期后这重新登录产生新token设置默认过期
值, 并重新存入redis.
"""
ser = UserAuthSerializer(data=request.data)
ser.is_valid(raise_exception=True)
u, t = ser.create_token()
# 检查用户是否允许登录
if not u.allow_login:
return other_response(message='请联系管理员, 开启登录!', code=400)
u_ser = serializer.UserListSerializer(instance=u, many=False)
result = u_ser.data
result.update({"token": t})
SysomFramework.gcache("JWT_TOKEN").store(t, u.pk, expire=settings.JWT_TOKEN_EXPIRE)
return other_response(message="登录成功", code=200, result=result)
@_add_log_info(action='logout')
def logout(self, request: Request):
"""用户登出 清除缓存中的用户token信息"""
SysomFramework.gcache("JWT_TOKEN").delete(request.META.get('HTTP_AUTHORIZATION'))
return success(message='退出成功', result={})
def get_authenticators(self):
if self.request.method == "POST":
return []
else:
return super().get_authenticators()
class RoleModelViewSet(GenericViewSet,
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.UpdateModelMixin,
mixins.RetrieveModelMixin):
queryset = models.Role.objects.all()
serializer_class = serializer.RoleListSerializer
authentication_classes = [Authentication]
permission_classes = [IsAdminPermission]
def get_serializer_class(self):
if self.request.method == "GET":
return serializer.RoleListSerializer
else:
return serializer.AddRoleSerializer
def get_permissions(self):
if self.request.method == "GET":
return []
else:
return [permission() for permission in self.permission_classes]
def get_authenticators(self):
if self.request.method == "GET":
return []
else:
return [auth() for auth in self.authentication_classes]
def retrieve(self, request, *args, **kwargs):
result = super().retrieve(request, *args, **kwargs)
return success(result=result.data)
def create(self, request, *args, **kwargs):
create_serializer = self.get_serializer(data=request.data)
create_serializer.is_valid(raise_exception=True)
self.perform_create(create_serializer)
ser = serializer.RoleListSerializer(instance=create_serializer.instance, many=False)
return other_response(result=ser.data, code=status.HTTP_201_CREATED)
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
u_serializer = self.get_serializer(instance, data=request.data, partial=partial)
u_serializer.is_valid(raise_exception=True)
self.perform_update(u_serializer)
result = serializer.RoleListSerializer(instance=u_serializer.instance, many=False)
return success(message="修改成功", result=result.data)
class PermissionViewSet(GenericViewSet,
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.UpdateModelMixin,
):
queryset = models.Permission.objects.all()
serializer_class = serializer.PermissionListSerializer
authentication_classes = [Authentication]
permission_classes = [IsAdminPermission]
def get_serializer_class(self):
if self.request.method == "GET":
return serializer.PermissionListSerializer
else:
return serializer.AddPermissionSerializer
def get_authenticators(self):
if self.request.method == "GET":
return []
else:
return [auth() for auth in self.authentication_classes]
def get_permissions(self):
if self.request.method == "GET":
return []
else:
return [permission() for permission in self.permission_classes]
def create(self, request, *args, **kwargs):
create_serializer = self.get_serializer(data=request.data)
create_serializer.is_valid(raise_exception=True)
self.perform_create(create_serializer)
ser = serializer.PermissionListSerializer(instance=create_serializer.instance, many=False)
return success(result=ser.data)
class PasswordViewSet(GenericViewSet):
"""Change User Password"""
authentication_classes = [Authentication]
required_fields = ['username', 'row_password', 'new_password', 'new_password_again']
default_password = '123456'
def get_authenticators(self):
"""
对修改密码接口不做用户认证
"""
if 'change_password' in self.request.path:
return []
return super().get_authenticators()
@classmethod
def _verfiy_user(cls, username: str, password: str) -> Union[None, models.User]:
"""
验证用户
@args username<str>: 用户名称
@args password<str>: 用户登录的明文密码
"""
try:
user: models.User = models.User.objects.get(username=username)
except models.User.DoesNotExist:
return None
else:
return user if user.verify_password(plain_password=password) else None
def change_password(self, request: Request):
"""
用户修改密码
"""
data = request.data
for f in filter(lambda x: not x[1], [(field, data.get(field, None)) for field in self.required_fields]):
return other_response(message=f'{f[0]} 不能为空', code=400, result={})
if data['new_password'] != data['new_password_again']:
return other_response(message="两次密码不一致", code=400, result={}, success=False)
user = self._verfiy_user(data['username'], data['row_password'])
if user is None:
return other_response(message='请检查原账号信息', code=400, result={}, success=False)
try:
user.password = user.make_password(data.get('new_password'))
user.save()
except Exception as e:
logger.error(e)
return other_response(message=f"数据库错误!{e}", code=400, success=False)
return success(result={}, message="密码修成成功")
def reset_password(self, request: Request):
user = getattr(request, 'user')
if not self._has_admin_permissions(user):
raise APIException(message='not permissions')
b, m = self._check_fields(request.data, ['pk'])
if not b:
return other_response(result={}, message=m, success=False, code=400)
self._reset_password(**request.data)
return success(result={}, message='password reset success')
def _check_fields(self, data: dict, fields: list=[]):
"""检查请求参数"""
res, message = True, ""
for atx in filter(lambda item: not item[1],\
[(field, data.get(field, None)) for field in fields]):
res, message = False, f'field: {atx[0]} required!'
return res, message
def _has_admin_permissions(self, user: models.User):
"""检查用户是否具有admin权限"""
return user.is_admin
def _reset_password(self, pk: int):
"""
重置用户密码, 默认密码(123456)
Args:
pk: 用户的唯一ID
"""
try:
user = models.User.objects.get(pk=pk)
except models.User.DoesNotExist:
raise APIException(message=f'user not exist: {pk}')
try:
user.password = user.make_password(self.default_password)
user.save()
except Exception as e:
logger.exception(e)
raise APIException(message='action reset password fialed!')