欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

APScheduler 定时器使用 [简单版]:在 django 中使用 apscheduler,使用 mysql 作为存储后台

最编程 2024-05-08 12:00:20
...

一、基本环境

python版本:3.8.5

APScheduler==3.10.4
Django==3.2.7
djangorestframework==3.15.1
SQLAlchemy==2.0.29
PyMySQL==1.1.0

二、django基本设置

2.1、新增一个app

该app用来写apscheduler相关的代码

python manage.py startapp gs_scheduler

2.2、修改配置文件settings.py

#使用pymysql做客户端
import pymysql
pymysql.install_as_MySQLdb()

INSTALLED_APPS = [
    
    'rest_framework', #注册restful 应用
    'gs_scheduler', #注册新增的app
]

#配置mysql
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'admin-root'
MYSQL_NAME = 'study_websocket'

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'HOST': MYSQL_HOST,
        'PORT': MYSQL_PORT,
        'USER': MYSQL_USER,
        'PASSWORD': MYSQL_PASSWORD,
        'NAME': MYSQL_NAME,
    }
}

2.3、gs_scheduler创建urls.py

1、gs_scheduler/urls.py

from django.urls import path
from . import views
urlpatterns = [
   
]

2、根路由urls.py

from django.contrib import admin
from django.urls import path,include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/scheduler/',include('gs_scheduler.urls')),
]

三、配置gs_scheduler应用

3.1、配置api接口

这些接口,用来展示定时任务的运行情况

1、urls.py

from django.urls import path
from . import views
urlpatterns = [
    path('run_next/', views.JobNextRunTimeAPIView.as_view()),#定时任务下次运行
    path('run_history/',views.JobRunTimeHistory.as_view()),#定时任务运行历史
    path('run_error/',views.JobRunErrorHistory.as_view()), #定时任务最近运行错误
]

2、models.py

from django.db import models

# Create your models here.
import sqlite3
from gs_scheduler.management.commands.config import MYSQL_HOST,MYSQL_NAME,MYSQL_PORT,MYSQL_PASSWORD,MYSQL_USER,MYSQL_CHARSET
from datetime import datetime,timedelta
from django.conf import settings

import pymysql
from concurrent.futures import ThreadPoolExecutor

#时间戳转时间字符串
def timestamp_to_time_str(timestamp):
    # 使用 datetime 模块将时间戳转换为 datetime 对象
    dt = datetime.fromtimestamp(timestamp)
    # 将 datetime 对象格式化为时间字符串
    time_str = dt.strftime('%Y-%m-%d %H:%M:%S')
    return time_str

class MysqlDB:
    DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    # 创建数据库连接池
    def __init__(self):
        self.conn = pymysql.connect(
            host=MYSQL_HOST,
            port=MYSQL_PORT,
            user=MYSQL_USER,
            password=MYSQL_PASSWORD,
            db=MYSQL_NAME,
            charset=MYSQL_CHARSET,
            cursorclass=pymysql.cursors.DictCursor
        )

    # 执行数据库增删改查
    def _execute_sql(self,query):
        sql,args = query
        try:
            with self.conn.cursor() as cursor:
                # 执行sql语句
                if args:
                    if isinstance(args,(tuple,list)):
                        # args = (value1,value2)
                        cursor.execute(sql,args)
                    else:
                        # args = value1
                        cursor.execute(sql,(args,))
                else:
                    cursor.execute(sql)
                # 不同类型sql,设置不同返回值
                if sql.strip().lower()[:6] == 'select':
                    #查询语句
                    rows = cursor.fetchall() or []
                    return rows
                elif sql.strip().lower()[:4] == 'show':
                    #查看表是否存在
                    result = cursor.fetchall()
                    return result
                else:
                    #增删改语句
                    self.conn.commit()
                    return True
        except Exception as e:
            print('sql执行失败',e)
            pass

    # 线程池执行sql语句
    def start_sql_with_pool(self, sql:str,args=None):
        with ThreadPoolExecutor() as executor:
            result = executor.submit(self._execute_sql, (sql,args)).result()
            return result

    #创建记录定时任务历史表(调度器使用)
    def create_apscheduler_history(self):
        #创建表语句:表名不能是占位符
        sql = """CREATE TABLE apscheduler_history (
                id INTEGER AUTO_INCREMENT PRIMARY KEY,
                job_id VARCHAR(128),
                run_time DATETIME,
                is_error TINYINT,
                error_msg VARCHAR(256) NULL)"""
        #判断表存在不,不存在再创建
        results = self.start_sql_with_pool('SHOW TABLES')
        is_exist = False
        for dic in results:
            if dic['Tables_in_{}'.format(MYSQL_NAME,)] == 'apscheduler_history':
                is_exist = True
        #表不存在才创建
        if not is_exist:
            # print('表不存在,执行创建表')
            create = self.start_sql_with_pool(sql)
            # 创建索引
            self.start_sql_with_pool(db.conn.cursor().execute("CREATE INDEX run_time_index ON apscheduler_history (run_time)"))
        return True

    # 将任务运行的结果记录到数据库中(调度器使用)
    def insert_into_apscheduler_history(self, data_list: list):
        if len(data_list) == 4:  # 异常执行的任务
            sql = """
            INSERT INTO apscheduler_history (job_id,run_time,is_error,error_msg) VALUES (%s,%s,%s,%s)
            """
        else:  # 正常执行的任务
            sql = """
            INSERT INTO apscheduler_history (job_id,run_time,is_error) VALUES (%s,%s,%s)
            """
        #使用线程池执行插入语句
        self.start_sql_with_pool(sql,data_list)

    # 删除8小时前的历史记录(调度器使用)
    def delete_8hour_before_history(self):
        before_8hour = (datetime.now() - timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
        sql = '''
        DELETE FROM apscheduler_history WHERE run_time < %s'''
        self.start_sql_with_pool(sql,(before_8hour,))

    # 查询每个任务的下次运行时间(api展示)
    def fetch_all_next_run_time(self):
        sql = 'SELECT id,next_run_time FROM apscheduler_jobs'
        # 获取查询结果
        rows = self.start_sql_with_pool(sql)
        for row in rows:
            row['next_run_time'] = timestamp_to_time_str(row['next_run_time'])
        return rows

    # 查询所有任务最近10次运行记录(api展示)
    def fetch_all_run_history(self):
        '''
        :param query:'SELECT id,next_run_time,job_state FROM apscheduler_jobs'
        :return:
        '''
        #获取所有定时任务id
        data_list = self.fetch_all_next_run_time()
        id_list = (dic.get('id') for dic in data_list)
        # 创建一个游标对象
        sql = """
           SELECT id,job_id,run_time FROM apscheduler_history WHERE is_error=0 AND job_id = %s ORDER BY run_time DESC LIMIT 10
           """
        ret_list = []
        for id in id_list:
            # 执行查询
            rows = self.start_sql_with_pool(sql,(id,)) or []
            dic = {'id': id, 'last_run_time': [], 'msg': '最近10次运行时间'}
            for row in rows:
                run_time = row.get('run_time')
                run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_time
                dic['last_run_time'].append(run_time)
            ret_list.append(dic)
        return ret_list

    # 获取任务最近运行失败情况(api展示)
    def fetch_all_error_history(self):
        data_list = self.fetch_all_next_run_time()
        id_list = (dic.get('id') for dic in data_list)
        sql = """
        SELECT run_time,error_msg FROM apscheduler_history WHERE is_error=1 AND job_id = %s ORDER BY id DESC LIMIT 5
        """
        ret_list = []
        for id in id_list:
            # 获取查询结果
            rows = self.start_sql_with_pool(sql, (id,)) or []
            dic = {'id': id, 'last_run_time': [], 'msg': '最近5次执行失败'}
            for row in rows:
                run_time = row.get('run_time')
                run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_time
                error = row.get('error_msg')
                dic['last_run_time'].append({'run_time': run_time, 'error': error})
            ret_list.append(dic)
        return ret_list

    # 关闭连接
    def close(self):
        self.conn.close()



if __name__ == '__main__':
    db = MysqlDB()
    db.create_apscheduler_history()
    # for i in range(1,10):
    #     db.insert_into_apscheduler_history(['send_to_big_data','2024-05-01 13:{}:12'.format(str(i).zfill(2)),1,'执行失败了'])
    # db.delete_8hour_before_history()
    # ret = db.fetch_all_run_history()
    # print(ret,'history')
    # ret = db.fetch_all_next_run_time()
    # print(ret,'next')
    # ret = db.fetch_all_error_history()
    # print(ret,'error')
    db.close()



3、views.py

from django.shortcuts import render

# Create your views here.
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import MysqlDB

#任务下次运行时间
class JobNextRunTimeAPIView(APIView):
    authentication_classes = []
    def get(self,request):
        db = MysqlDB()
        data = db.fetch_all_next_run_time()
        db.close()
        ret = {
            'code':200,
            'status':'success',
            'data':data,
        }

        return Response(ret)

#任务最近运行历史
class JobRunTimeHistory(APIView):
    authentication_classes = []
    def get(self,request):
        db = MysqlDB()
        data = db.fetch_all_run_history()
        db.close()
        ret = {
            'code':200,
            'status':'success',
            'data':data
        }
        return Response(ret)

#任务最近运行错误
class JobRunErrorHistory(APIView):
    authentication_classes = []
    def get(self,request):
        db = MysqlDB()
        data = db.fetch_all_error_history()
        db.close()
        ret = {
            'code': 200,
            'status': 'success',
            'data': data
        }
        return Response(ret)

3.2、在gs_scheduler创建

1、config.py 代码

该文件存放的是启动APScheduler调度器的一些配置数据

import os
from apscheduler.jobstores.memory import MemoryJobStore #内存做后端存储
#from apscheduler.jobstores.redis import RedisJobStore #redis做后端存储
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore #mysql等做后端存储
# from django.conf import settings
from study_apscheduler import settings
#mysql://root:ldc-root@127.0.0.1:3306/jobs?charset=utf8
MYSQL_CONFIG = settings.DATABASES.get('default')
MYSQL_USER = MYSQL_CONFIG.get('USER')
MYSQL_PASSWORD = MYSQL_CONFIG.get('PASSWORD')
MYSQL_HOST = MYSQL_CONFIG.get('HOST')
MYSQL_PORT = MYSQL_CONFIG.get('PORT')
MYSQL_NAME = MYSQL_CONFIG.get('NAME')
MYSQL_CHARSET = 'utf8mb4'
URL = 'mysql://{}:{}@{}:{}/{}?charset={}'.format(MYSQL_USER,MYSQL_PASSWORD,MYSQL_HOST,MYSQL_PORT,MYSQL_NAME,MYSQL_CHARSET)
#时区
TIME_ZONE = 'Asia/Shanghai'
#job的默认配置
JOB_DEFAULTS =  {
        'coalesce': True, #系统挂掉,任务积攒多次为执行,True是合并成一次执行,False是执行所有的次数。 持久化存储才有效
        'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。
    }
#job的存储后端
JOB_STORE = {
    'default': SQLAlchemyJobStore(url=URL)
}

#监听事件对应的情况
LISTENER={
    1:'调度程序启动',
    2:'调度程序关闭',
    4:'调度程序中任务处理暂停',
    64:'将任务存储添加到调度程序中',
    8192:'任务在执行期间引发异常',
    4096:'任务执行成功',
}

2、task.py

所有的定时任务都存放在这里

import os
from datetime import datetime, timedelta, date
from gs_scheduler.models import MysqlDB
from utils.log_util import info_log
from utils.send_monitor_data import SendData

#推送到数据仓的告警信息:5分钟执行一次
send_to_big_data = SendData().send

#清除定时任务历史运行记录
def delete_apscheduler_history():
    db = MysqlDB()
    db.delete_8hour_before_history()

if __name__ == '__main__':
    print(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

3、crontab.py

实例化调度器

# 导入所需的调度器类和触发器类
from apscheduler.schedulers.background import BackgroundScheduler #后台运行
from apscheduler.schedulers.blocking import BlockingScheduler  #主进程运行,需要单独运行
from apscheduler.triggers.interval import IntervalTrigger #时间间隔
from apscheduler.triggers.cron import CronTrigger #复杂的定时任务
from apscheduler.triggers.date import DateTrigger #一次性定时任务
from django.core.management.base import BaseCommand
from apscheduler import events
from pytz import timezone
from threading import RLock
from datetime import datetime, timedelta
from gs_scheduler.models import MysqlDB
#定时任务
from .task import delete_apscheduler_history
from .task import send_to_big_data
#日志
from utils.log_util import info_log
from .config import LISTENER
from .config import TIME_ZONE,JOB_DEFAULTS,JOB_STORE

#脚本运行:python manage.py crontab
class Command(BaseCommand):
    TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    #初始化调度器
    def _scheduler_obj(self):
        scheduler = BlockingScheduler()
        scheduler.configure(
            timezone = TIME_ZONE, #时区
            job_defaults=JOB_DEFAULTS, #job的默认配置
            jobstores=JOB_STORE, #job的存储后端
        )
        return scheduler

    #添加任务
    def _add_job(self,scheduler:BlockingScheduler):
        #每5分钟执行一次推送告警到大数据仓
        scheduler.add_job(
            send_to_big_data,
            trigger=IntervalTrigger(minutes=1),
            id='send_to_big_data',
            replace_existing=True,
            coalesce=True,
        )
        #每隔8个小时,清除历史的记录
        scheduler.add_job(
            delete_apscheduler_history,
            trigger=IntervalTrigger(hours=8),
            id='delete_apscheduler_history',
            replace_existing=True,
            coalesce=True,
        )

    #添加监听器
    def _listener(self,event:events):
        code = event.code
        run_time = datetime.now().strftime(self.TIME_FORMAT)
        msg = LISTENER.get(code)
        db = MysqlDB()
        if msg:
            if code == 4096:
                #成功运行
                job_id = event.job_id
                #记录到数据库中
                db.insert_into_apscheduler_history([job_id,run_time,0])
            elif code == 8192:
                #运行异常了
                job_id = event.job_id
                #记录到数据库中
                db.insert_into_apscheduler_history([job_id,run_time,1,msg])
            else:
                info_log(msg)
        db.close()
    def start(self):
        scheduler = self._scheduler_obj()
        # 创建记录运行记录
        db = MysqlDB()
        db.create_apscheduler_history()
        db.close()
        # 设置监听器
        scheduler.add_listener(self._listener)
        # 设置定时任务
        self._add_job(scheduler)
        try:
            # print('{},定时器启动成功,等待定时任务执行...'.format(datetime.now().strftime(self.TIME_FORMAT)))
            scheduler.start()
        except KeyboardInterrupt:
            scheduler.shutdown()

    # python manage.py crontab运行 就是调用该方法
    def handle(self, *args, **options):
        self.start()

#伴随django,在后台运行的。 在wsgi.py 文件中,调用BackRunScheduler().start()
class BackRunScheduler(Command):
    # 初始化调度器
    def _scheduler_obj(self):
        scheduler = BackgroundScheduler()
        scheduler.configure(
            timezone=TIME_ZONE,  # 时区
            job_defaults=JOB_DEFAULTS,  # job的默认配置
            jobstores=JOB_STORE,  # job的存储后端
        )
        return scheduler

3.3、启动方式:

方式一: python manage.py  crontab    (生产环境,推荐使用此方式,单独运行)

方式二:在settings.py中,调用BackRunScheduler().start()  , 后台运行

四、测试

4.1、启动

启动django项目:python manage.py runserver 8080

启动定时器:python manage.py crontab

4.2、等待一段时间

1、查询任务下次执行时间

请求:http://127.0.0.1:8080/api/scheduler/run_next/

2、查询任务最近运行情况

请求:http://127.0.0.1:8080/api/scheduler/run_next/

3、查询任务最近异常情况

请求:http://127.0.0.1:8080/api/scheduler/run_error/

五、源代码下载

码云地址:

django应用定时器: django下使用定时器的方法https://gitee.com/liuhaizhang/django-application-timer/

目前有两套代码:一个以mysql存储任务,一个用sqlite存储任务