欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

主题一: 使用Django实现JWT身份验证与登录流程

最编程 2024-07-28 10:22:58
...

目标

专题记录一种django的jwt登录认证的实现方法,实现如下功能和步骤:

  1. 自定义User类模型:模拟AbstractUser类,实现自定义User
  2. 序列化自定义User:基于REST frame实现model的序列化,提高编码效率
  3. 实现jwt加密和解密:利用JWT验证用户信息
  4. 基于视图类的登录接口实现:基于APIView实现post登录接口
  5. 系统配置将自定义User模型关联:验证配置,通用化配置
  6. API接口的登录校验

自定义User

思路:

参考AbstractUser实现自定义User类:
1.定义User对应数据的字段
2.定义objects对应的管理类UserManager
3.USERNAME_FIELD(用户校验的字段,默认为username)和REQUIRED_FIELDS(必选字段,不能与USERNAME_FIELD相同)特殊字段
4.UserManager实现(参考AbstractUser具体实现)

代码models.py
from django.db import models
from django.contrib.auth.models import AbstractUser, AbstractBaseUser, PermissionsMixin, BaseUserManager
from shortuuidfield import ShortUUIDField


class UserManager(BaseUserManager):
    use_in_migrations = True

    def _create_user(self, telephone, username, password, **extra_fields):
        if not telephone:
            raise ValueError('The given telephone must be set')
        if not username:
            raise ValueError('The given username must be set')
        if not password:
            raise ValueError('The given password must be set')

        user = self.model(telephone=telephone, username=username, **extra_fields)
        user.set_password(password)
        user.save()
        return user

    def create_user(self, telephone, username, password=None, **extra_fields):
        """
        创建普通用户
        """
        extra_fields.setdefault('is_superuser', False)
        return self._create_user(telephone, username, password, **extra_fields)

    def create_superuser(self, telephone, username, password, **extra_fields):
        """
        创建超级用户
        """
        extra_fields.setdefault('is_superuser', True)
        if extra_fields.get('is_superuser') is not True:
            raise ValueError('Superuser must have is_superuser=True.')

        return self._create_user(telephone, username, password, **extra_fields)


class CustomUser(AbstractBaseUser, PermissionsMixin):
    """
    重写django的User
    """
    uid = ShortUUIDField(primary_key=True, verbose_name="用户表主键")
    telephone = models.CharField(unique=True, max_length=11, verbose_name="手机号码")
    email = models.EmailField(unique=True, max_length=100, verbose_name='邮箱', null=True)
    username = models.CharField(max_length=100, verbose_name="用户名", unique=False)
    avatar = models.CharField(max_length=200, verbose_name='头像链接')
    date_joined = models.DateTimeField(auto_now_add=True, verbose_name='加入时间')
    is_active = models.BooleanField(default=True, verbose_name="是否可用")

    objects = UserManager()

    EMAIL_FIELD = 'email'
    # 定义登录的校验字段,默认为username
    USERNAME_FIELD = 'telephone'
    REQUIRED_FIELDS = ['username']

    def get_full_name(self):
        return self.username

    def get_short_name(self):
        return self.username

序列化User

思路

基于rest framework的能力,序列化对应的model,提高后期编码效率。 Meta类下如下字段说明:
model:关联具体的模型
fields/exclude:二选一,fields为包含,exclude为不包含

serialzies.py
from rest_framework import serializers
from django.contrib.auth import get_user_model

User = get_user_model()

class UserSerializer(serializers.ModelSerializer):
    class Meta:
        model = User
        fields = "__all__"

JWT加解密

思路

pip install安装jwt pip install pyjwt,安装对应的第三方依赖,实现jwt的加解密。
JWT需要实现两个功能:
1.加密,将要加密的信息和时间戳以json的格式封装,用Django的settings文件的SECRET_KEY进行jwt加密,生产加密信息
2.解密,参考rest_framework.authencation的TokenAuthentication自定义认证类,实现认证类authenticate方法,解密header下authentication携带的用户信息
3.时区时间,注意expire_time需要使用带时区的时间,即timezone

authoriztions.py
from rest_framework.authentication import TokenAuthentication, BaseAuthentication, get_authorization_header
from rest_framework import exceptions
import jwt
from django.contrib.auth import get_user_model
from django.conf import settings
from jwt.exceptions import ExpiredSignatureError
from datetime import datetime, timedelta
from django.utils import timezone
# 获取全局的user模型
MTUser = get_user_model()


def generate_jwt(user):
    """
    对user对象的id和时间戳进行jwt加密,作为认证信息
    """
    # expire_time = datetime.now() + timedelta(days=7) 没有时区信息
    expire_time = timezone.now() + timedelta(days=7)  # 有时区信息
    return jwt.encode({'userid': user.pk, 'exp': expire_time}, key=settings.SECRET_KEY).decode('utf-8')


class JWTAuthentication(BaseAuthentication):
    """
    用户认证类
    """
    keyword = 'jwt'  # jwt为token的认证关键字,进行合法性校验

    def authenticate(self, request):
        """
        用户校验方法
        """
        auth = get_authorization_header(request).split()  # 读取request下的header指定authorization字段信息,存储用户认证信息

        if not auth or auth[0].lower() != self.keyword.lower().encode():
            return None
        # jwt格式校验
        if len(auth) == 1:
            msg = "不可用的JWT请求头"
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = '不可用的JWT请求头!JWT Token中间不应该有空格!'
            raise exceptions.AuthenticationFailed(msg)

        try:
            jwt_token = auth[1]
            jwt_info = jwt.decode(jwt_token, settings.SECRET_KEY)  # jwt解密,获取userid
            userid = jwt_info.get('userid')
            try:
                user = MTUser.objects.get(pk=userid)
                return user, jwt_token
            except:
                msg = "用户不存在"
                raise exceptions.AuthenticationFailed(msg)
        except ExpiredSignatureError:
            msg = "JWT Token过期"
            raise exceptions.AuthenticationFailed(msg)


登录接口实现

思路

基于rest_framework类的views.APIView实现,对于登录接口通过post接口实现。
1.定义视图类views.APIView的post方法
2.AuthTokenSerializer序列化request.data,获得认证请求的序列化对象,判断参数有效性并获得对应user的模型(配置JWTAuthentication .authoriztions为工程的用户验证类).
2.1.AuthTokenSerializer会判断请求usernamepassword`的有效性

2.2.有效的前期下,会使用系统默认/自定义authoriztions获取对应的用户model对象
3.根据user对象,jwt加密token
4.user对象序列化后返回Response

views.py
from rest_framework.authtoken.serializers import AuthTokenSerializer
from django.utils.timezone import now
from .serializers import UserSerializer
from rest_framework.response import Response
from .authoriztions import generate_jwt

class LoginView(views.APIView):
    def post(self, request):
        serialzier = AuthTokenSerializer(data=request.data)  # request对象的AuthTokenSerializer序列化
        if serialzier.is_valid():  # 序列化有效性判断
            user = serialzier.validated_data.get('user')    # 获取user对象
            user.last_login = now() # 更新登录时间
            user.save()
            # 根据用户对象生产jwt token
            token = generate_jwt(user)  # 根据user对象生产token
            userSerializer = UserSerializer(instance=user)
            return Response({'token': token, "user": userSerializer.data})  # 结果返回
        else:
            return Response(data={"message": "用户名或密码错误"})
class LoginView(views.APIView):
    def post(self, request):
        serialzier = AuthTokenSerializer(data=request.data)  # request对象的AuthTokenSerializer序列化
        if serialzier.is_valid():  # 序列化有效性判断
            user = serialzier.validated_data.get('user')    # 获取user对象
            user.last_login = now() # 更新登录时间
            user.save()
            # 根据用户对象生产jwt token
            token = generate_jwt(user)  # 根据user对象生产token
            userSerializer = UserSerializer(instance=user)
            return Response({'token': token, "user": userSerializer.data})  # 结果返回
        else:
            return Response(data={"message": "用户名或密码错误"})

工程配置

思路

工程配置分类两类,都在配置文件settings.py配置:
1.通用配置
时区配置:LANGUAGE_CODE = 'zh-Hans',TIME_ZONE = 'Asia/Shanghai'
2.数据库配置:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': '数据库名称',
        'USERNAME': '用户',
        'PASSWORD': '密码',
        'HOST': 'ip',
        'PORT': 端口,
    }
}

2.验证信息配置
1.认证用户模型

AUTH_USER_MODEL = "custom_auth.CustomUser"
#custom_auth为对应app,即在INSTALLED_APPS注册
#CustomUser,自定义User,参考AbstractUser实现自定义

登录测试

请求登录接口,username为自定义User的telephone字段,password为密码,接口返回token和user对象。


image.png

序列化的好处是不用在对字段一一赋值,框架自动返回字段


image.png

image.png

用户认证

思路

视图类设置关键属性,来确认是否需要登录校验,以及如何校验,对应的视图类属性为:

permission_classes:

验证权限,如IsAuthenticated(登录用户),IsAdminUser(管理员),AllowAny(所有用户)

authentication_classes:

验证方式,如默认方式TokenAuthentication,或者自定义方式JWTAuthentication

代码实现
class UserView(views.APIView):
    permission_classes = [IsAuthenticated]  # 权限
    authentication_classes = [JWTAuthentication]  # 用户认证类

    def get(self, request):
        """
        获取所有用户信息
        """
        user = CustomUser.objects.all()
        serializer = UserSerializer(instance=user, many=True)
        return Response(data=serializer.data)

    def put(self, request):
        """
        修改指定用户
        """
        user = request.user
        user.username = request.data.get('username')
        user.telephone = request.data.get('telephone')
        user.email = request.data.get('email')
        user.avatar = request.data.get('avatar')
        user.save()
        serializer = UserSerializer(user)
        return Response(data=serializer.data)

推荐阅读