欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

如何在 Django 中使用 JWT 实现用户身份验证

最编程 2024-07-28 10:21:22
...

配置JWT认证

先通过 pip install djangorestframework 命令下载 Django REST framework 库,再通过 pip install djangorestframework-simplejwt 命令下载 Django REST framework Simple JWT 库。它们提供了 JWT 的 Django 应用。

配置与编码

settings.py 文件里加入以下内容,以支持 JWT 认证:

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework_simplejwt.authentication.JWTAuthentication'
    ],
}

在某个应用的 views.py 文件下,写一个测试用的视图。

from rest_framework.views import APIView
from rest_framework import permissions
from rest_framework.response import Response
from rest_framework_simplejwt import authentication

class AutoTestView(APIView):
    permission_classes = [permissions.IsAuthenticated]
    authentication_classes = (authentication.JWTAuthentication,)

    def get(self, request, *args, **kwargs):
        print('authenticate: ', request.successful_authenticator.authenticate(request))
        print('authenticate_header: ', request.successful_authenticator.authenticate_header(request))
        print('get_header: ', request.successful_authenticator.get_header(request))
        print('get_raw_token: ', request.successful_authenticator.get_raw_token(request.successful_authenticator.get_header(request)))
        print('get_validated_token: ', request.successful_authenticator.get_validated_token(request.successful_authenticator.get_raw_token(request.successful_authenticator.get_header(request))))
        print('get_user: ', request.successful_authenticator.get_user(request.successful_authenticator.get_validated_token(request.successful_authenticator.get_raw_token(request.successful_authenticator.get_header(request)))))
        print('www_authenticate_realm: ', request.successful_authenticator.www_authenticate_realm)
        return Response('O get K')

    def post(self, request, *args, **kwargs):
        return Response('O post K')

urls.py 文件下导入 JWT 的两个视图,以及我们的测试视图的路由:

...
from rest_framework_simplejwt.views import (TokenObtainPairView, TokenRefreshView)
from django.conf.urls import url
from foundation import views as foundation_views

urlpatterns = [
    ...
    url(r'^firmware/auth/token/obtain/$', TokenObtainPairView.as_view(), name='obtain_token'),
    url(r'^firmware/auth/token/refresh/$', TokenRefreshView.as_view(), name='refresh_token'),
    url(r'^firmware/auth/token/test/$', foundation_views.AutoTestView.as_view(), name='test_token'),
]

使用示例

获取 Token:

django_djangorestframeworksimplejwt_1.png

通过 Token 获取视图信息:

django_djangorestframeworksimplejwt_3.png

通过 refresh 刷新 Token:

django_djangorestframeworksimplejwt_2.png

自定义JWT认证

同样的,要先通过 pip install djangorestframework 命令下载 Django REST framework 库,不同的是,接下来要通过 pip install djangorestframework-jwt 命令下载 Django REST framework JWT 库。

配置认证库

settings.py 文件里加入 djangorestframeworkdjangorestframework-jwt 库的配置:

……
INSTALLED_APPS = [
    ……
    'rest_framework',
]
……
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': (
        # 自定义 JWT Token 认证类
        'foundation.utils.jwt_customize.TokenAuth',
    ),
}
JWT_AUTH = {
    # 用户 Token 有效期
    'JWT_EXPIRATION_DELTA': datetime.timedelta(days=1),
    # Token 前缀
    'JWT_AUTH_HEADER_PREFIX': 'JWT',
    # 自定义响应信息
    'JWT_RESPONSE_PAYLOAD_HANDLER': 'foundation.utils.jwt_customize.jwt_response_payload_handler'
}
AUTHENTICATION_BACKENDS = [
    # 自定义 JWT 的企业 AD 域认证
    'foundation.utils.jwt_customize.UsernameAdAuthBackend',
]
……
# 企业 AD 域信息
AD_DOMAIN_INFO = {
    'ACTIVATE': True,
    'AD_SERVER': ['xxx.xx.xx.xxx'],
    'AD_SERVER_PORT': xxx,
    'AD_DN': 'xxx@xxx.local',
    'AD_PASSWORD': 'xxx',
}

自定义认证类

如上面的路径,创建一个 jwt_customize.py 文件,并编写三个自定义的认证类与方法:

from django.contrib.auth.backends import ModelBackend
from rest_framework.exceptions import AuthenticationFailed
from rest_framework_jwt.serializers import VerifyJSONWebTokenSerializer
from rest_framework import serializers
from x_atp_firmware.settings import JWT_AUTH
from django.contrib.auth.models import User, Group
from foundation.utils.ad_login import ldap_auth
from django.contrib.auth import get_user_model
from x_atp_firmware.settings import AD_DOMAIN_INFO

UserModel = get_user_model()

class TokenAuth:
    """
    自定义 JWT Token 认证类
    """
    @staticmethod
    def authenticate(request):
        """
        重写 authenticate
        """
        # 获取请求头的 Authorization 字段
        headers_token = request.headers.get('Authorization', None)
        # 校验 Authorization 是否符合规范
        if not headers_token:
            raise serializers.ValidationError({'Authorization': '该字段是必填项。'})
        elif headers_token.find(JWT_AUTH['JWT_AUTH_HEADER_PREFIX'] + ' ') == -1:
            raise serializers.ValidationError({'Token': '该字段不符合规范。'})
        # 提取 Authorization 中的 JWT Token 信息
        headers_token = headers_token.split(JWT_AUTH['JWT_AUTH_HEADER_PREFIX'] + ' ')[1]
        token = {'token': headers_token}
        # 调用默认的验证逻辑
        valid_data = VerifyJSONWebTokenSerializer().validate(token)
        user = valid_data['user']
        if user:
            # 返回用户名称与认证Token
            return user, valid_data['token']
        else:
            raise AuthenticationFailed('认证失败')

class UsernameAdAuthBackend(ModelBackend):
    """
    自定义企业 AD 域的登录验证
    """
    def authenticate(self, request, username=None, password=None, **kwargs):
        """
        重写 authenticate
        """
        if request and request.path_info == '/admin/login/':
            # 如果是来自Admin后台的请求,直接传递给默认认证函数
            try:
                user = User.objects.get(username=username)
                # 判断密码是否正确
                if not user.check_password(password):
                    return None
            except UserModel.DoesNotExist:
                return None
        elif not AD_DOMAIN_INFO['ACTIVATE']:
            # 如果未启用AD域登录,则跳过验证
            user = User.objects.get(username=username)
        else:
            login_res = ldap_auth(username, password)
            # 判断返回值是否正常的字典类型
            if isinstance(login_res, dict):
                # 判断是否AD域认证是否成功
                if login_res['result'] is True:
                    # 判断数据库中是否已存在用户组
                    if not Group.objects.filter(name=login_res['organization']['title']):
                        # 创建新用户组
                        group = Group.objects.create(name=login_res['organization']['title'])
                    else:
                        group = Group.objects.get(name=login_res['organization']['title'])
                    # 判断数据库中是否已存在用户信息
                    if not User.objects.filter(username=login_res['account']['s_am_account_name']):
                        # 创建新用户数据
                        user = User.objects.create_user(username=login_res['account']['s_am_account_name'],
                                                        email=login_res['account']['mail'],
                                                        last_name=login_res['account']['sn'],
                                                        first_name=login_res['account']['given_name'],
                                                        password=password)
                    else:
                        user = User.objects.get(username=username)
                    # 为登录用户设置用户组
                    user.groups.add(group)
                else:
                    raise serializers.ValidationError(detail='企业AD域异常: ' + str(login_res))
            elif not login_res:
                # 判断返回值是否为False
                raise serializers.ValidationError(detail='内部账户名称不存在')
            elif login_res == 'auth fail':
                # 判断返回值是否为`auth fail`
                raise serializers.ValidationError(detail='内部登录密码不正确')
            else:
                raise serializers.ValidationError(detail='企业AD域异常: ' + str(login_res))
        return user

def jwt_response_payload_handler(token, user=None, request=None):
    """
    自定义返回 Token 认证信息
    :param token: JWT 认证 Token
    :param user: 用户对象
    :param request: 请求对象
    :return: 认证信息
    """
    return {
        "token": token,
        'id': user.id,
        'username': user.username,
        'email': user.email,
        'exp': JWT_AUTH['JWT_EXPIRATION_DELTA']
    }

还需要创建一个 ad_login.py 文件,编写企业AD域的连接认证方法:

import logging
from ldap3 import Connection, SUBTREE, ServerPool
from x_atp_firmware.settings import AD_DOMAIN_INFO

# 域控服务器ip地址
LDAP_SERVER_POOL = AD_DOMAIN_INFO['AD_SERVER']
# 端口
LDAP_SERVER_PORT = AD_DOMAIN_INFO['AD_SERVER_PORT']
# 拥有查询权限的域账号
ADMIN_DN = AD_DOMAIN_INFO['AD_DN']
# 对应的密码
ADMIN_PASSWORD = AD_DOMAIN_INFO['AD_PASSWORD']
SEARCH_BASE = 'ou=OU,dc=LEEDARSON,dc=LOCAL'

def ldap_auth(username, password):
    """
    通过 AD 域认证并获取用户资料
    :param username: 用户AD账号
    :param password: 用户AD密码
    :return: 认证信息
    """
    ldap_server_pool = ServerPool(LDAP_SERVER_POOL)
    conn = Connection(ldap_server_pool, user=ADMIN_DN, password=ADMIN_PASSWORD,
                      check_names=True, lazy=False, raise_exceptions=False)
    logging.warning('x_atp_firmware.foundation.utils.ad_login.ldap_auth (AD域连接): ' + str(conn))
    conn.open()
    conn.bind()
    res = conn.search(
        search_base=SEARCH_BASE,
        # 查询所有用户
        search_filter='(sAMAccountName={})'.format(username),
        search_scope=SUBTREE,
        # sAMAccountName=账号,cn=用户中文名,sn=姓,givenName=名,mail=邮件
        # department=部门,manager=经理, title=头衔
        attributes=['cn', 'sn', 'ou', 'givenName', 'mail', 'sAMAccountName', 'department', 'manager', 'title',
                    'directReports'],
        # 使用`ALL_ATTRIBUTES`可以获取所有属性值
        # attributes=ALL_ATTRIBUTES,
        paged_size=5
    )
    if res:
        # 开始同步
        entry = conn.response[0]
        # dn包含了ou信息dc信息等,在做域验登录时可以作为验证账号
        _dn = entry['dn']
        attr_dict = entry['attributes']
        # 使用dn检查密码
        try:
            conn2 = Connection(ldap_server_pool, user=_dn, password=password,
                               check_names=True, lazy=False, raise_exceptions=False)
            conn2.bind()
            if conn2.result['description'] == 'success':
                res = {'result': True,
                       'account': {
                           's_am_account_name': attr_dict['sAMAccountName'],
                           'cn': attr_dict['cn'],
                           'sn': attr_dict['sn'],
                           'given_name': attr_dict['givenName'],
                           'mail': attr_dict['mail'],
                       },
                       'organization': {
                           'title': attr_dict['title'],
                           'department': attr_dict['department'],
                           'manager': attr_dict['manager'],
                           'ou': attr_dict['ou'],
                       }}
                return res
            else:
                # 返回认证失败信息
                return 'auth fail'
        except Exception as exc:
            return exc
    else:
        return False

使用方式

同样,在某个应用的 views.py 文件下写一个测试代码。

from rest_framework.views import APIView

class AutoTestView(APIView):
    """
    基础-验证-Token-测试
    URL /foundation/auth/token/test/
    """
    @staticmethod
    def options(request, *args, **kwargs):
        print(request.user, request.auth)

调用示例

通过用户名称与密码登录:

django_djangorestframeworkjwt_1.png

通过用户获取的 Token 发送请求到服务器:

django_djangorestframeworkjwt_2.png

推荐阅读