欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Linux - 线程池(附日志说明) - 示例

最编程 2024-07-19 18:04:45
...

Thread.hpp

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include<iostream>
#include<string>
#include<pthread.h>
#include<functional>
#include<unistd.h>

using namespace std;



namespace ThreadMdule
{
    using func_t = std::function<void(string)>;

    class Thread
    {
    public:
        void Excute()
        {
            _func(_threadname);
        }
        Thread(func_t func, const std::string &name="none-name")
            : _func(func), _threadname(name), _stop(true)
        {}

        static void* threadroutine(void* args)
        {
            Thread* self=static_cast<Thread*>(args);
            self->Excute();
            return nullptr;
        }
        

        bool start()
        {
            int n=pthread_create(&_tid,nullptr,threadroutine,this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid,nullptr);
            }
        }
        string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        func_t _func;
        bool _stop;
    };
}

#endif

ThreadPool.hpp

#pragma once

#include<iostream>
#include<vector>
#include<queue>
#include<pthread.h>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"
using namespace ThreadMdule;
using namespace std;
const static int gdefaultthreadnum=3;//默认线程池的线程数

template <class T>
class ThreadPool
{
public:
    ThreadPool(int threadnum=gdefaultthreadnum) :_threadnum(threadnum),_waitnum(0),_isrunning(false)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
        LOG(INFO,"ThreadPool COnstruct.");
    }
   
    //各个线程独立的任务函数
    void HandlerTask(string name)
    {
        LOG(INFO,"%s is running...",name.c_str());
        while(true)
        {
            LockQueue();//开启保护
            //等到有任务时才退出循环执行下列语句
            while(_task_queue.empty()&&_isrunning)
            {
                _waitnum++;
                ThreadSleep();
                _waitnum--;
            }
            //当任务队列空并且线程池停止时线程退出
            if(_task_queue.empty()&&!_isrunning)
            {
                UnlockQueue();
                cout<<name<<" quit "<<endl;
                sleep(1);
                break;
            }
            //1.任务队列不为空&&线程池开启
            //2.任务队列不为空&&线程池关闭,直到任务队列为空
            //所以,只要有任务,就要处理任务
            T t=_task_queue.front();//取出对应任务
            _task_queue.pop();
            UnlockQueue();
            LOG(DEBUG,"%s get a task",name.c_str());
            //处理任务
            t();
            LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString().c_str());
        }
    }

     //线程池中线程的构建
    void InitThreadPool()
    {
        for(int i=0;i<_threadnum;i++)
        {
            string name="thread-"+to_string(i+1);
            _threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);
            LOG(INFO,"init thread %s done",name.c_str());
        }
        _isrunning=true;
    }
    //线程池的启动
    void Start()
    {
        for(auto& thread:_threads)
        {
            thread.start();
        }
    }
    //线程池停止
    void Stop()
    {
        LockQueue();
        _isrunning=false;
        ThreadWakeupAll();
        UnlockQueue();
    }
    void Wait()
    {
        for(auto& thread:_threads)
        {
            thread.Join();
            LOG(INFO,"%s is quit...",thread.name().c_str());
        }
    }
    //将任务入队列
    bool Enqueue(const T& t)
    {
        bool ret=false;
        LockQueue();
        if(_isrunning)
        {
            _task_queue.push(t);
            //如果有空闲的线程,那么唤醒线程让其执行任务
            if(_waitnum>0)
            {
                ThreadWakeup();
            }
            LOG(DEBUG,"enqueue task success");
            ret=true;
        }
        UnlockQueue();
        return ret;
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }
    void ThreadWakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }


    int _threadnum;//线程数
    vector<Thread> _threads;//存储线程的vector
    queue<T> _task_queue;//输入的任务队列
    pthread_mutex_t _mutex;//互斥锁
    pthread_cond_t _cond;//条件变量

    int _waitnum;//空闲的线程数
    bool _isrunning;//表示线程池是否启动

};

Task.hpp

#include<iostream>
#include<string>
#include<functional>
using namespace std;

class Task
{
public:
    Task(){}
    Task(int a,int b): _a(a),_b(b),_result(0)
    {}

    void Excute()
    {
        _result=_a+_b;
    }
    string ResultToString()
    {
        return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result);
    }
    string DebugToString()
    {
       return to_string(_a) + "+" + to_string(_b) + "= ?";
    }
    void operator()()
    {
        Excute();
    }
private:
    int _a;
    int _b;
    int _result;
};

main.cc

int main()
{
    srand(time(nullptr)^getpid()^pthread_self());
    //EnableScreen();
    EnableFile();
    unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>(5));

    tp->InitThreadPool();
    tp->Start();

    int tasknum=10;
    while(tasknum)
    {
        sleep(1);
        int a=rand()%10+1;
        usleep(1024);
        int b=rand()%20+1;
        Task t(a,b);
        LOG(INFO,"main thread push task: %s",t.DebugToString().c_str());
        tp->Enqueue(t);
        tasknum--;
    }
    tp->Stop();
    tp->Wait();
}

Loh.hpp

#pragma once

#include<iostream>
#include<fstream>
#include<ctime>
#include<cstdarg>
#include<string>
#include<sys/types.h>
#include<unistd.h>
#include<cstdio>
#include"LockGuard.hpp"

using namespace std;


bool gIsSave=false;//默认输出到屏幕
const string logname="log.txt";
//1.日志是有等级的
enum Level
{
    DEBUG=0,
    INFO,
    WARNING,
    ERROR,
    FATAL 
};


void SaveFile(const string& filename,const string& messages)
{
    ofstream out(filename,ios::app);
    if(!out.is_open())
    {
        return;
    }
    out<<messages;
    out.close();
}
//等级转化为字符串
string LevelToString(int level)
{
    switch (level)
    {
    case DEBUG:
        return "Debug";
    case INFO:
        return "Info";
    case WARNING:
        return "Warning";
    case ERROR:
        return "Error";
    case FATAL:
        return "Fatal";
    default:
        return "Unkonwn";
    }
}

//获取当前时间
string GetTimeString()
{
    time_t curr_time=time(nullptr);//时间戳
    struct tm* format_time=localtime(&curr_time);//转化为时间结构
    if(format_time==nullptr)
        return "None";
    char time_buffer[1024];
    snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
             format_time->tm_year + 1900,
             format_time->tm_mon + 1,
             format_time->tm_mday,
             format_time->tm_hour,
             format_time->tm_min,
             format_time->tm_sec);
    return time_buffer;

}

pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
//获取日志信息
void LogMessage(string filename,int line,bool issave,int level,char* format,...)
{
    string levelstr=LevelToString(level);
    string timestr=GetTimeString();
    pid_t selfid=getpid();

    char buffer[1024];
    va_list arg;
    va_start(arg,format);
    vsnprintf(buffer,sizeof(buffer),format,arg);
    va_end(arg);

    string message= "[" + timestr + "]" + "[" + levelstr + "]" +
                          "[" + std::to_string(selfid) + "]" +
                          "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";


    LockGuard lockguard(&lock);
    if(!issave)
    {
        cout<<message;
    }
    else
    {
        SaveFile(logname,message);
    }
                                                        
}

 #define LOG(level,format,...)                                               \
    do                                                                        \
    {                                                                          \
        LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);       \
    } while (0)


#define EnableFile()         \
    do                       \
    {                        \
        gIsSave=true;        \  
    } while (0)

#define EnableScreen()         \
    do                         \
    {                          \
        gIsSave=false;         \  
    } while (0)