sysom/sysom_api/apps/accounts/views.py

239 lines
9.1 KiB
Python

from datetime import datetime
import logging
from rest_framework.request import Request
from rest_framework.viewsets import GenericViewSet
from rest_framework import mixins, status
from rest_framework.status import *
from rest_framework.views import APIView
from rest_framework.generics import CreateAPIView
from apps.accounts.permissions import IsAdminPermission
from apps.accounts.serializer import UserAuthSerializer
from apps.accounts.authentication import Authentication
from . import models
from . import serializer
from lib import success, other_response
logger = logging.getLogger(__name__)
class UserModelViewSet(
GenericViewSet,
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
mixins.RetrieveModelMixin
):
queryset = models.User.objects.all()
serializer_class = serializer.UserListSerializer
# permission_classes = [IsAdminPermission]
logging_options = {
'login': 0,
'action': 1
}
def get_serializer_class(self):
if self.request.method == "GET":
return serializer.UserListSerializer
else:
return serializer.AddUserSerializer
def get_permissions(self):
if self.request.method == "GET":
return []
else:
return [permission() for permission in self.permission_classes]
def create(self, request, *args, **kwargs):
response = super().create(request, *args, **kwargs)
return success(result=response.data, message="创建成功")
def list(self, request, *args, **kwargs):
data = super().list(request, *args, **kwargs)
return success(result=data.data)
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
u_serializer = self.get_serializer(instance, data=request.data, partial=partial)
u_serializer.is_valid(raise_exception=True)
self.perform_update(u_serializer)
result = serializer.UserListSerializer(instance=u_serializer.instance, many=False)
return success(result=result.data, message="修改成功")
def destroy(self, request, *args, **kwargs):
super().destroy(request, *args, **kwargs)
return success(result={}, message="删除成功")
def retrieve(self, request, *args, **kwargs):
result = super().retrieve(request, *args, **kwargs)
return success(result=result.data, message="获取成功")
def get_logs(self, request):
queryset = self._filter_log_params(request, models.HandlerLog.objects.select_related().all())
user = getattr(request, 'user', None)
if not user.is_admin:
queryset = queryset.filter(user=user)
page = self.paginate_queryset(queryset)
if page is not None:
ser = serializer.HandlerLoggerListSerializer(page, many=True)
return self.get_paginated_response(ser.data)
ser = serializer.HandlerLoggerListSerializer(queryset, many=True)
return success(result=ser.data)
def _filter_log_params(self, request, queryset):
kwargs = dict()
params = request.query_params.dict()
request_option = params.pop('request_option', None)
if request_option:
option = self.logging_options.get(request_option, None)
if option is not None:
kwargs['request_option'] = option
request_ip = params.get('request_ip', None)
request_url = params.get('request_url', None)
request_method: str = params.get('request_method', None)
response_status = params.get('response_status', None)
start_time = params.get('startTime', '2000-01-01 00:00:00')
end_time = params.get('endTime', datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if request_ip:
kwargs['request_ip'] = request_ip
if request_url:
kwargs['request_url'] = request_url
if request_method:
kwargs['request_method'] = request_method
if response_status:
kwargs['response_status'] = response_status
queryset = queryset.filter(created_at__range=[start_time, end_time], **kwargs)
return queryset
def get_response_code(self, request):
status_map = [{'label': k, 'value': v}for k, v in globals().items() if k.startswith('HTTP')]
return success(result=status_map)
class AuthAPIView(CreateAPIView):
authentication_classes = []
def post(self, request):
ser = UserAuthSerializer(data=request.data)
ser.is_valid(raise_exception=True)
u, t = ser.create_token()
u_ser = serializer.UserListSerializer(instance=u, many=False)
result = u_ser.data
result.update({"token": t})
return other_response(message="登录成功", code=200, result=result)
class RoleModelViewSet(GenericViewSet,
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.UpdateModelMixin,
mixins.RetrieveModelMixin):
queryset = models.Role.objects.all()
serializer_class = serializer.RoleListSerializer
authentication_classes = [Authentication]
permission_classes = [IsAdminPermission]
def get_serializer_class(self):
if self.request.method == "GET":
return serializer.RoleListSerializer
else:
return serializer.AddRoleSerializer
def get_permissions(self):
if self.request.method == "GET":
return []
else:
return [permission() for permission in self.permission_classes]
def get_authenticators(self):
if self.request.method == "GET":
return []
else:
return [auth() for auth in self.authentication_classes]
def retrieve(self, request, *args, **kwargs):
result = super().retrieve(request, *args, **kwargs)
return success(result=result.data)
def create(self, request, *args, **kwargs):
create_serializer = self.get_serializer(data=request.data)
create_serializer.is_valid(raise_exception=True)
self.perform_create(create_serializer)
ser = serializer.RoleListSerializer(instance=create_serializer.instance, many=False)
return other_response(result=ser.data, code=status.HTTP_201_CREATED)
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
u_serializer = self.get_serializer(instance, data=request.data, partial=partial)
u_serializer.is_valid(raise_exception=True)
self.perform_update(u_serializer)
result = serializer.RoleListSerializer(instance=u_serializer.instance, many=False)
return success(message="修改成功", result=result.data)
class PermissionViewSet(GenericViewSet,
mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.UpdateModelMixin,
):
queryset = models.Permission.objects.all()
serializer_class = serializer.PermissionListSerializer
authentication_classes = [Authentication]
permission_classes = [IsAdminPermission]
def get_serializer_class(self):
if self.request.method == "GET":
return serializer.PermissionListSerializer
else:
return serializer.AddPermissionSerializer
def get_authenticators(self):
if self.request.method == "GET":
return []
else:
return [auth() for auth in self.authentication_classes]
def get_permissions(self):
if self.request.method == "GET":
return []
else:
return [permission() for permission in self.permission_classes]
def create(self, request, *args, **kwargs):
create_serializer = self.get_serializer(data=request.data)
create_serializer.is_valid(raise_exception=True)
self.perform_create(create_serializer)
ser = serializer.PermissionListSerializer(instance=create_serializer.instance, many=False)
return success(result=ser.data)
class ChangePasswordViewSet(APIView):
"""Change User Password"""
required_fields = ['row_password', 'new_password', 'new_password_again']
def post(self, request: Request):
user = request.user
data = request.data
for f in filter(lambda x: not x[1], [(field, data.get(field, None)) for field in self.required_fields]):
return other_response(message=f'{f[0]} 不能为空', code=400, result={})
if data.get('new_password') != data.get('new_password_again'):
return other_response(message="两次密码不一致", code=400, result={})
if not user.verify_password(data.get('row_password')):
return other_response(message="原始密码错误", code=400, result={})
try:
user.password = user.make_password(data.get('new_password'))
user.save()
except Exception as e:
logger.error(e)
return other_response(message=f"数据库错误!{e}", code=400)
return success(result={}, message="密码修成成功")