探索在 flask+Gunicorn(gevent) 中实现高并发的解决方案
1. 协程定义
概念:协程就是协同工作的程序,不是进程也不是线程 理解成–不带返回值的函数调用。
Coroutine:协程,又称微线程,纤程。
协程的这种“挂起”和“唤醒”机制实质上是将一个过程切分成了若干个子过程,给了我们一种以扁平的方式来使用事件回调模型。优点:共享进程的上下文,一个进程可以创建百万,千万的coroutine。
python中的yield和第三方库greenlet,都可以实现协程。
greenlet 提供了在协程中直接切换控制权的方式,比生成器(yield)更加灵活、简洁。
GIL–限制了python的多线程
即时通讯服务器 + 协程方式运行,提供并发性
服务器: 多进程 多线程 协程
Flask(框架)+Gunicorn(服务器)+(协程)高并发的解决方法探究
使用Flask的做服务器框架,可以: python code.py 的方式运行,但这种方式不能用于生产环境,不稳定,比如说: 有一定概率遇到连接超时无返回的情况—flask提供的简易测试服务器
1,通过设置app.run()的参数,来达到多进程的效果。看一下app.run的具体参数:
- 注意: threaded与processes不能同时打开,如果同时设置的话,将会出现以下的错误:
2. 解决方案
2.1 方案一
- 使用gevent做协程,从而解决高并发的问题:
Flask + gevent
# 携程的第三方包-这里选择gevent, 当然你也可以选择eventlet
pip install gevent
# 具体的代码如下:
from flask import Flask
from gevent.pywsgi import WSGIServer
from gevent import monkey
# 将python标准的io方法,都替换成gevent中同名的方法,遇到io阻塞gevent自动进行协程切换
monkey.patch_all()
# 1.创建项目应用对象app
app = Flask(__name__)
# 2.初始化服务器
WSGIServer(("127.0.0.1", 5000), app).serve_forever()
# 启动服务---这样就是以协程的方式运行项目,提高并发能力
python code.py
- 通过Gunicorn(with gevent)的形式对app进行包装,从而来启动服务【推荐】
Falsk + Gunicorn + gevent
安装遵循了WSGI协议的gunicorn服务器–俗称:绿色独角兽
pip install gunicorn
查看命令行选项: 安装gunicorn成功后,通过命令行的方式可以查看gunicorn的使用信息。
$ gunicorn -h
指定进程和端口号: -w: 表示进程(worker) --bind:表示绑定ip地址和端口号(bind) —threads 多线程 -k 异步方案
# 使用gevent做异步(默认worker是同步的) 多进程+协程
gunicorn -w 8 --bind 0.0.0.0:8000 -k 'gevent' 运行文件名称:Flask程序实例名
# 使用gunicorn命令启动flask项目
# -w 8
8个进程
# --bind 0.0.0.0:8000
ip + 端口
# -k 'gevent'
协程
方案二
将运行的信息加载到配置文件中
使用gunicorn + gevent 开启高并发
新建配置py文件:gunicorn_config.py
# 多进程
import multiprocessing
"""gunicorn+gevent 的配置文件"""
# 预加载资源
preload_app = True
# 绑定 ip + 端口
bind = "0.0.0.0:5000"
# 进程数 = cup数量 * 2 + 1
workers = multiprocessing.cpu_count() * 2 + 1
# 线程数 = cup数量 * 2
threads = multiprocessing.cpu_count() * 2
# 等待队列最大长度,超过这个长度的链接将被拒绝连接
backlog = 2048
# 工作模式--协程
worker_class = "gevent"
# 最大客户客户端并发数量,对使用线程和协程的worker的工作有影响
# 服务器配置设置的值 1200:中小型项目 上万并发: 中大型
# 服务器硬件:宽带+数据库+内存
# 服务器的架构:集群 主从
worker_connections = 1200
# 进程名称
proc_name = 'gunicorn.pid'
# 进程pid记录文件
pidfile = 'app_run.log'
# 日志等级
loglevel = 'debug'
# 日志文件名
logfile = 'debug.log'
# 访问记录
accesslog = 'access.log'
# 访问记录格式
access_log_format = '%(h)s %(t)s %(U)s %(q)s'
- 执行:gunicorn -c gunicorn_config.py flask_server:app
方案三
使用 meinheld + gunicorn + flask 开启高并发神器
前提在虚拟环境中安装meinheld:
pip install meinheld
import multiprocessing
"""gunicorn+meinheld 的配置文件"""
# 预加载资源
preload_app = True
# 绑定
bind = "0.0.0.0:5000"
# 进程数: cup数量 * 2 + 1
workers = multiprocessing.cpu_count() * 2 + 1
# 线程数 cup数量 * 2
threads = multiprocessing.cpu_count() * 2
# 等待队列最大长度,超过这个长度的链接将被拒绝连接
backlog = 2048
# 工作模式
worker_class = "egg:meinheld#gunicorn_worker"
# 最大客户客户端并发数量,对使用线程和协程的worker的工作有影响
worker_connections = 1200
# 进程名称
proc_name = 'gunicorn.pid'
# 进程pid记录文件
pidfile = 'app_run.log'
# 日志等级
loglevel = 'debug'
# 日志文件名
logfile = 'debug.log'
# 访问记录
accesslog = 'access.log'
# 访问记录格式
access_log_format = '%(h)s %(t)s %(U)s %(q)s'
# 运行方式 命令行
gunicorn -c gunicorn_config.py flask_server:app
2. 历史遗留问题—GIL锁
2.1 简介
1.线程安全是在多线程的环境下,线程安全能够保证多个线程同时执行时程序依旧运行正确,而且要保证对于共享的数据,可以由多个线程存取,但是同一时刻只能有一个线程进行存取。每一个interpreter进程,只能同时仅有一个线程来执行,获得相关的锁,存取相关的资源。那么很容易就会发现,如果一个interpreter进程只能有一个线程来执行,多线程的并发则成为不可能,即使这几个线程之间不存在资源的竞争。
2.所以虽然 CPython的线程库直接封装操作系统的原生线程,但CPython进程做为一个整体同一时间只会有一个获得了GIL的线程在跑,其它的线程都处于等待状态等着 GIL的释放。所以只能使用cpu单核。这也是python多线程被人诟病的原因。
2.2 解决方案
python的高并发更加推荐多进程+协程
io多路复用
IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。
1. select(线程不安全):它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
2. poll(线程不安全):它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制
3. epoll(线程安全):epoll可以同时支持水平触发和边缘触发
Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!!!
Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!
阻塞IO:当你去读一个阻塞的文件描述符时,如果在该文件描述符上没有数据可读,那么它会一直阻塞(通俗一点就是一直卡在调用函数那里),直到有数据可读。当你去写一个阻塞的文件描述符时,如果在该文件描述符上没有空间(通常是缓冲区)可写,那么它会一直阻塞,直到有空间可写。以上的读和写我们统一指在某个文件描述符进行的操作,不单单指真正的读数据,写数据,还包括接收连接accept(),发起连接connect()等操作...
非阻塞IO:当你去读写一个非阻塞的文件描述符时,不管可不可以读写,它都会立即返回,返回成功说明读写操作完成了,返回失败会设置相应errno状态码,根据这个errno可以进一步执行其他处理。它不会像阻塞IO那样,卡在那里不动!!!
python异步实现
多进程 + 协程 + callback(io多路复用做事件驱动)
3. 协程 第三方封装库:
-
gevent = greenlet + python.monkey(底层使用 libevent 时间复杂度: O(N * logN))
-
meinheld = greenlet + picoev (时间复杂度: O(N) )
-
eventlet
picoev和libevent
meinheld和gevent都能实现异步,但是测评中meinheld比gevent的性能好很多,不过因为meinheld支持的比较少,一般都是配合gunicorn使用的。下面分析一下meinheld和gevent性能差距主要原因,分别使用的是picoev和lievent。
# libevent
主要实现:使用堆(优先队列)作为timer事件的算法(nlogn),IO和信号的实现均使用了双向队列(用链表实现)。
时间复杂度: O(N * logN)
# picoev
picoev主要优化有两点。
1. 主要是考虑是fd(file descriptors)在unix中是用比较小的正整数表示的,那么把fd的相关信息,全部存储在一个array中,这样使得查找快速,在操作socket状态时会更加的快。
2. 第二点是对于timer事件的算法优化,通过环形缓冲区(128)和bit vector实现查看部分源码可以看出,主要实现是每个时间点对应的是缓冲区的一个位置,每个缓存区使用bit vector 表示fd的数值,相当于一种hash映射所以时间复杂度为(o(n)),n为那个缓存区所存的fd数量。
时间复杂度: O(N)
性能: picoev > libevent
理解----协程&线程&进程
2.思考:协程之前切换的场景?
程序发送阻塞的时候切换
- 读磁盘
- 读写文件
- 网络io操作
- 收发http请求
推荐阅读
-
探索在 flask+Gunicorn(gevent) 中实现高并发的解决方案
-
【Netty】「萌新入门」(七)ByteBuf 的性能优化-堆内存的分配和释放都是由 Java 虚拟机自动管理的,这意味着它们可以快速地被分配和释放,但是也会产生一些开销。 直接内存需要手动分配和释放,因为它由操作系统管理,这使得分配和释放的速度更快,但是也需要更多的系统资源。 另外,直接内存可以映射到本地文件中,这对于需要频繁读写文件的应用程序非常有用。 此外,直接内存还可以避免在使用 NIO 进行网络传输时发生数据拷贝的情况。在使用传统的 I/O 时,数据必须先从文件或网络中读取到堆内存中,然后再从堆内存中复制到直接缓冲区中,最后再通过 SocketChannel 发送到网络中。而使用直接缓冲区时,数据可以直接从文件或网络中读取到直接缓冲区中,并且可以直接从直接缓冲区中发送到网络中,避免了不必要的数据拷贝和内存分配。 通过 ByteBufAllocator.DEFAULT.directBuffer 方法来创建基于直接内存的 ByteBuf: ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer(16); 通过 ByteBufAllocator.DEFAULT.heapBuffer 方法来创建基于堆内存的 ByteBuf: ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(16); 注意: 直接内存是一种特殊的内存分配方式,可以通过在堆外申请内存来避免 JVM 堆内存的限制,从而提高读写性能和降低 GC 压力。但是,直接内存的创建和销毁代价昂贵,因此需要慎重使用。 此外,由于直接内存不受 JVM 垃圾回收的管理,我们需要主动释放这部分内存,否则会造成内存泄漏。通常情况下,可以使用 ByteBuffer.clear 方法来释放直接内存中的数据,或者使用 ByteBuffer.cleaner 方法来手动释放直接内存空间。 测试代码: public static void testCreateByteBuf { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(16); System.out.println(buf.getClass); ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(16); System.out.println(heapBuf.getClass); ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer(16); System.out.println(directBuf.getClass); } 运行结果: class io.netty.buffer.PooledUnsafeDirectByteBuf class io.netty.buffer.PooledUnsafeHeapByteBuf class io.netty.buffer.PooledUnsafeDirectByteBuf 池化技术 在 Netty 中,池化技术指的是通过对象池来重用已经创建的对象,从而避免了频繁地创建和销毁对象,这种技术可以提高系统的性能和可伸缩性。 通过设置 VM options,来决定池化功能是否开启: -Dio.netty.allocator.type={unpooled|pooled} 在 Netty 4.1 版本以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现; 这里我们使用非池化功能进行测试,依旧使用的是上面的测试代码 testCreateByteBuf,运行结果如下所示: class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf 可以看到,ByteBuf 类由 PooledUnsafeDirectByteBuf 变成了 UnpooledUnsafeDirectByteBuf; 在没有池化的情况下,每次使用都需要创建新的 ByteBuf 实例,这个操作会涉及到内存的分配和初始化,如果是直接内存则代价更为昂贵,而且频繁的内存分配也可能导致内存碎片问题,增加 GC 压力。 使用池化技术可以避免频繁内存分配带来的开销,并且重用池中的 ByteBuf 实例,减少了内存占用和内存碎片问题。另外,池化技术还可以采用类似 jemalloc 的内存分配算法,进一步提升分配效率。 在高并发环境下,池化技术的优点更加明显,因为内存的分配和释放都是比较耗时的操作,频繁的内存分配和释放会导致系统性能下降,甚至可能出现内存溢出的风险。使用池化技术可以将内存分配和释放的操作集中到预先分配的池中,从而有效地降低系统的内存开销和风险。 内存释放 当在 Netty 中使用 ByteBuf 来处理数据时,需要特别注意内存回收问题。 Netty 提供了不同类型的 ByteBuf 实现,包括堆内存(JVM 内存)实现 UnpooledHeapByteBuf 和堆外内存(直接内存)实现 UnpooledDirectByteBuf,以及池化技术实现的 PooledByteBuf 及其子类。 UnpooledHeapByteBuf:通过 Java 的垃圾回收机制来自动回收内存; UnpooledDirectByteBuf:由于 JVM 的垃圾回收机制无法管理这些内存,因此需要手动调用 release 方法来释放内存; PooledByteBuf:使用了池化机制,需要更复杂的规则来回收内存; 由于池化技术的特殊性质,释放 PooledByteBuf 对象所使用的内存并不是立即被回收的,而是被放入一个内存池中,待下次分配内存时再次使用。因此,释放 PooledByteBuf 对象的内存可能会延迟到后续的某个时间点。为了避免内存泄漏和占用过多内存,我们需要根据实际情况来设置池化技术的相关参数,以便及时回收内存; Netty 采用了引用计数法来控制 ByteBuf 对象的内存回收,在博文 「源码解析」ByteBuf 的引用计数机制 中将会通过解读源码的形式对 ByteBuf 的引用计数法进行深入理解; 每个 ByteBuf 对象被创建时,都会初始化为1,表示该对象的初始计数为1。 在使用 ByteBuf 对象过程中,如果当前 handler 已经使用完该对象,需要通过调用 release 方法将计数减1,当计数为0时,底层内存会被回收,该对象也就被销毁了。此时即使 ByteBuf 对象还在,其各个方法均无法正常使用。 但是,如果当前 handler 还需要继续使用该对象,可以通过调用 retain 方法将计数加1,这样即使其他 handler 已经调用了 release 方法,该对象的内存仍然不会被回收。这种机制可以有效地避免了内存泄漏和意外访问已经释放的内存的情况。 一般来说,应该尽可能地保证 retain 和 release 方法成对出现,以确保计数正确。