8. 线程与进程详解:什么是线程,什么是进程?它们之间的区别在哪里?还有Python的GIL全局解释器锁是什么?
什么是线程(thread)?
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
一个线程是一个执行上下文,它是一个CPU需要执行一系列指令的所有信息。 假设你正在读一本书,你现在想休息一下,但是你希望能够回来,从你停止的地方继续阅读。实现这一目标的一种方法是记下页码、行号和字号。所以你读一本书的执行上下文是这三个数字。 如果你有一个室友,而且她使用的是同样的技术,她可以在你不用的时候拿着书,然后从她停止的地方继续阅读。然后你可以把它拿回来,从你所在的地方重新开始。 线程的工作方式相同。一个CPU给你的错觉是它同时在做多个计算。它通过在每个计算上花费一点时间。它可以这样做,因为它对每个计算都有一个执行上下文。就像你可以和你的朋友分享一本书一样,许多任务可以共享一个CPU。 在技术层面上,执行上下文(因此是一个线程)由CPU寄存器的值组成。 最后:线程与进程不同。线程是执行的上下文,而进程是与计算相关的一堆资源。一个进程可以有一个或多个线程。 说明:与进程相关的资源包括内存页(进程中的所有线程都具有相同的内存视图)、文件描述符(例如,打开的套接字)和安全凭据(例如启动进程的用户的ID)。
什么是进程(process)?
程序的执行实例称为进程。 每个进程提供执行程序所需的资源。进程具有虚拟地址空间、可执行代码、对系统对象的打开句柄、安全上下文、惟一进程标识符、环境变量、优先级、最小和最大工作集大小,以及至少一个执行线程。每个进程都是从一个线程开始的,通常称为主线程,但是可以从它的任何线程创建额外的线程。
程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。
线程和进程的区别?
- 线程共享创建它的进程的地址空间;进程有自己的地址空间。
- 线程可以直接访问其进程的数据段;进程有自己的父进程数据段的副本。
- 线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与同胞进程通信。
- 新线程很容易创建;新进程需要父进程的重复。
- 线程可以对相同进程的线程进行相当大的控制;进程只能对子进程进行控制。
- 对主线程的更改(取消、优先级更改等)可能会影响进程的其他线程的行为;对父进程的更改不会影响子进程。
Python GIL(Global Interpreter Lock)全局解释器锁
在CPython中,全局解释器锁(或GIL)是一个互斥锁,可以防止多个本机线程同时执行Python字节码。这一锁定主要是因为CPython的内存管理不是线程安全的。(然而,由于GIL的存在,其他特征已经发展到依赖于它所赋予的保证。)
这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL
Python threading(线程)模块
直接调用:
1 import threading
2 import time
3
4 def func(n): #定义每个线程要运行的函数
5
6 print("running on number:%s" % n)
7
8 time.sleep(3)
9
10 if __name__ == '__main__':
11
12 t1 = threading.Thread(target=func,args=(1,)) #生成一个线程实例
13 t2 = threading.Thread(target=func,args=(2,)) #生成另一个线程实例
14
15 t1.start() #启动线程
16 t2.start() #启动另一个线程
17
18 print(t1.getName()) #获取线程名
19 print(t2.getName())
继承式调用:
1 import threading
2 import time
3
4
5 class MyThread(threading.Thread):
6 def __init__(self,n):
7 threading.Thread.__init__(self)
8 self.num = n
9
10 def run(self):#定义每个线程要运行的函数
11
12 print("running on number:%s" %self.n)
13
14 time.sleep(2)
15
16 if __name__ == '__main__':
17
18 t1 = MyThread(1)
19 t2 = MyThread(2)
20 t1.start()
21 t2.start()
有些线程执行后台任务,比如发送keepalive数据包,或者执行周期性的垃圾收集,等等。这些只在主程序运行时有用,并且可以在其他非守护进程退出时将它们删除。 如果没有守护线程,您必须跟踪它们,并告诉它们退出,然后程序才能完全退出。通过将它们设置为守护线程,您可以让它们运行并忘记它们,当程序退出时,任何守护线程都会自动被杀死。
1 def run(n):
2 print('[%s] running on\n' % n)
3 time.sleep(2)
4 print('--done--')
5
6
7 def main():
8 for i in range(5):
9 t = threading.Thread(target=run, args=[i, ])
10 t.start()
11 t.join(1)
12 print('starting thread', t.getName())
13
14
15 m = threading.Thread(target=main, args=[])
16 # 将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
17 m.setDaemon(True)
18 m.start()
19 m.join(timeout=2)
20 print("---main thread done----")
start 线程准备就绪,等待CPU调度
setName 为线程设置名称
getName 获取线程名称
setDaemon 设置为后台线程或前台线程(默认)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
join 逐个执行每个线程,执行完毕后继续往下执行
run 线程被cpu调度后自动执行线程对象的run方法
注意:守护程序线程突然停在关闭。它们的资源(如打开文件、数据库事务等)可能无法正常释放。如果您希望您的线程优雅地停止,那么让它们成为非daemonic,并使用一个适当的信号机制,例如事件。
线程锁(Lock、RLock(递归锁))
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,就会得到错误的数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。
*注:我用的Python3,不知为什么,结果总是正确的,网上搜了搜,说可能是自动加了锁:以后可能会用Python2版本,也标注一下
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
20 20
21 21
22 import time
23 import threading
24
25 def addNum():
26 global num #在每个线程中都获取这个全局变量
27 print('--get num:',num )
28 time.sleep(1)
29 num +=1 #对此公共变量进行-1操作
30
31 num = 0 #设定一个共享变量
32 thread_list = []
33 for i in range(100):
34 t = threading.Thread(target=addNum)
35 t.start()
36 thread_list.append(t)
37
38 for t in thread_list: #等待所有线程执行完毕
39 t.join()
40
41
42 print('final num:', num )
不加锁的听说Python2每次运行结果不都是100.
1 import time
2 import threading
3
4 def addNum():
5 global num #在每个线程中都获取这个全局变量
6 print('--get num:',num )
7 time.sleep(1)
8 lock.acquire() #修改数据前加锁
9 num +=1 #对此公共变量进行-1操作
10 lock.release() #修改后释放
11
12 num = 0 #设定一个共享变量
13 thread_list = []
14 lock = threading.Lock() #生成全局锁
15 for i in range(100):
16 t = threading.Thread(target=addNum)
17 t.start()
18 thread_list.append(t)
19
20 for t in thread_list: #等待所有线程执行完毕
21 t.join()
22
23 print('final num:', num )
Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系 。
既然用户程序已经自己有锁了,那为什么C python还需要GIL呢?加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这可以说是Python早期版本的遗留问题。
死锁:指多个循环等待他方占有得的资源而无线期僵持下去的局面
死锁
class MyThread(threading.Thread):
def doA(self):
lA.acquire()
# lC.acquire()
print(self.name,time.ctime())
time.sleep(2)
lB.acquire()
# lC.acquire()
print(self.name, time.ctime())
lB.release()
lA.release()
#lC.release()
# lC.release()
def doB(self):
lB.acquire()
#lC.acquire()
print(self.name, time.ctime())
time.sleep(2)
lA.acquire()
# lC.acquire()
print(self.name, time.ctime())
lA.release()
lB.release()
#lC.release()
# lC.release()
def run(self):
self.doA()
self.doB()
if __name__ == '__main__':
lA = threading.Lock()
lB= threading.Lock()
# lC = threading.RLock()
thread_list = []
for i in range(5):
thread_list.append(MyThread())
for i in thread_list:
i.start()
for i in thread_list:
i.join()
可以用RLock代替Lock
信号量(Semaphore)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据
1 import threading,time
2
3 def run(n):
4 semaphore.acquire()
5 time.sleep(1)
6 print("run the thread: %s" %n)
7 semaphore.release()
8
9 if __name__ == '__main__':
10
11 num= 0
12 semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
13 for i in range(20):
14 t = threading.Thread(target=run,args=(i,))
15 t.start()
Event(事件)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
- clear:将“Flag”设置为False
- set:将“Flag”设置为True
1 import threading
2
3 def do(event):
4 print('start')
5 event.wait()
6 print('excute')
7
8 event_obj = threading.Event()
9 for i in range(10):
10 t = threading.Thread(target=do,args=(event_obj,))
11 t.start()
12 #event_obj.clear()
13 inp =input('输入:')
14 if inp =='true':
15 event_obj.set()
通过Event来实现两个或多个线程间的交互:
1 import time
2 import random
3 import threading
4
5 def light():
6 if not event.is_set():
7 event.set() #wait就不阻塞
8 count = 0
9 while True:
10 if count < 10:
11 print('\033[42;1m--green light on--\033[0m')
12 elif count < 13:
13 print('\033[43;1m--yellow light on--\033[0m')
14 elif count < 20:
15 if event.is_set():
16 event.clear()
17 print('\033[41;1m--red light on --\033[0m')
18 else:
19 count = 0
20 event.set()
21 time.sleep(1)
22 count +=1
23 def car(n):
24 while 1:
25 time.sleep(random.randrange(10))
26 if event.is_set():
27 print('car %s is running..' % n)
28 else:
29 print('car %s is wait')
30
31 if __name__ == '__main__':
32 event = threading.Event() #设置事件
33 lamp = threading.Thread(target=light) #创建一个灯颜色的线程
34 lamp.start()
35 for i in range(3):
36 t = threading.Thread(target=car,args=(i,))
37 t.start()
timer
这个类表示在经过一定时间之后才运行的操作。 计时器的启动方式与线程一样,通过调用它们的start()方法。通过调用thecancel()方法,计时器可以被停止(在它的动作开始之前)。在执行其操作之前,计时器将等待的间隔可能与用户指定的间隔不完全相同。
1 from threading import Timer
2
3
4 def hello():
5 print("hello, world")
6
7 t = Timer(1, hello)
8 t.start() # after 1 seconds, "hello, world" will be printed
条件(Condition)
使得线程等待,只有满足某条件时,才释放n个线程:
1 import threading
2
3 def run(n):
4 con.acquire()
5 con.wait()
6 print("run the thread: %s" %n)
7 con.release()
8
9 if __name__ == '__main__':
10
11 con = threading.Condition()
12 for i in range(10):
13 t = threading.Thread(target=run, args=(i,))
14 t.start()
15
16 while True:
17 inp = input('>>>')
18 if inp == 'q':
19 break
20 con.acquire()
21 con.notify(int(inp))
22 con.release()
1 def condition_func():
2
3 ret = False
4 inp = input('>>>')
5 if inp == '1':
6 ret = True
7
8 return ret
9
10
11 def run(n):
12 con.acquire()
13 con.wait_for(condition_func) #true就执行下面,不执行
14 print("run the thread: %s" %n)
15 con.release()
16
17 if __name__ == '__main__':
18
19 con = threading.Condition()
20 for i in range(10):
21 t = threading.Thread(target=run, args=(i,))
22 t.start()
queue
当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。
class queue.Queue
(maxsize=0) #先入先出class queue.LifoQueue
(maxsize=0) #last in fisrt out class queue.PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
优先队列的构造函数。maxsize是一个整数,它设置可以在队列中放置的条目数的上界限制。一旦达到这个大小,插入将阻塞,直到使用队列项为止。如果maxsize小于或等于0,队列大小是无限的。
最低值的条目首先被检索(最低的值条目是排序后返回的条目)[0]。条目的典型模式是表单中的元组:(priority_number, data)。
exception queue.Empty
当非阻塞get(或get_nowait())被调用在一个空的队列对象上时,异常会发生。
exception queue.Full
非阻塞put()(或put_nowait())被调用在一个已满的队列对象时引发异常。
Queue.qsize() Queue.empty() #return True if empty Queue.full() # return True if full Queue.put(item, block=True, timeout=None)
将项目放入队列中。如果可选的args block为真,超时为None(默认),则在必要时阻塞,直到空闲插槽可用为止。如果超时是一个正数,它会在大多数超时秒内阻塞,如果在那个时间内没有空闲插槽,就会引发完全的异常。否则(block为false),如果立即可用空闲插槽,则在队列上放置一个项目,否则将引发完全异常(在这种情况下,超时将被忽略)
Queue.put_nowait(item) Equivalent to put(item, False).
Queue.get(block=True, timeout=None)
从队列中移除并返回一个项。如果可选的args block为真,timeout为None(默认),则在必要时阻塞,直到项目可用为止。如果超时是一个正数,它会在大多数超时秒内阻塞,如果在那个时间内没有可用的项,则会引发空的异常。否则(块为false),如果立即可用,则返回一个项,否则将引发空异常(在这种情况下,超时将被忽略)。
Queue.task_done()
指示以前的队列任务已完成。用于队列消费线程。对于用于获取任务的每个get(),对task_done()的后续调用告诉队列,任务的处理是完整的。
如果一个join()当前正在阻塞,那么当所有的项目都被处理后,它将恢复(这意味着对已放入队列中的每一个项目都将收到一个task_done()调用)。
如果调用的次数多于队列中的项,则会产生一个ValueError错误。
Queue.join() block直到队列被消费完毕
常用的:
- 初始化: class Queue.Queue(maxsize) FIFO 先进先出
- 包中的常用方法:
- Queue.qsize() 返回队列的大小
- Queue.empty() 如果队列为空,返回True,反之False
- Queue.full() 如果队列满了,返回True,反之False
- Queue.full 与 maxsize 大小对应
- Queue.get([block[, timeout]])获取队列,timeout等待时间
- 创建一个“队列”对象
- import Queue
- myqueue = Queue.Queue(maxsize = 10)
- 将一个值放入队列中
- myqueue.put(10)
- 将一个值从队列中取出
- myqueue.get()
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
1 import time,random
2 import queue,threading
3 q = queue.Queue()
4 def Producer(name):
5 count = 0
6 while count <20:
7 time.sleep(random.randrange(3))
8 q.put(count)
9 print('Producer %s has produced %s baozi..' %(name, count))
10 count +=1
11 def Consumer(name):
12 count = 0
13 while count <20:
14 time.sleep(random.randrange(4))
15 if not q.empty():
16 data = q.get()
17 print(data)
18 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
19 else:
20 print("-----no baozi anymore----")
21 count +=1
22 p1 = threading.Thread(target=Producer, args=('A',))
23 c1 = threading.Thread(target=Consumer, args=('B',))
24 p1.start()
25 c1.start()
python multiprocessing
multiprocessing是一个包,它支持使用与线程模块类似的API来生成进程。multiprocessing包提供本地和远程并发性,通过使用子进程代替线程,有效地绕过全局解释器锁。由于这个原因,multiprocessing模块允许程序员在给定的机器上充分利用多个处理器。它在Unix和Windows上运行。
1 from multiprocessing import Process
2 import time
3 def f(name):
4 time.sleep(2)
5 print('hello', name)
6
7 if __name__ == '__main__':
8 p = Process(target=f, args=('bob',))
9 p.start()
10 p.join()
注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销.
进程的通信:
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
Queues
使用方法跟threading里的queue差不多
1 from multiprocessing import Process, Queue
2
3 def func(q):
4 q.put([1,2,3])
5
6 if __name__ == '__main__':
7 q = Queue()
8 p = Process(target=func, args=(q,))
9 p.start()
10 print(q.get()) # prints "[1,2,3]"
11 p.join()
Pipes
函数pipe()的作用是:返回由管道连接的一对连接对象,默认情况下是双向的。例如:
1 from multiprocessing import Process, Pipe
2
3 def f(conn):
4 conn.send([42, None, 'hello'])
5 conn.close()
6
7 if __name__ == '__main__':
8 parent_conn, child_conn = Pipe()
9 p = Process(target=f, args=(child_conn,))
10 p.start()
11 print(parent_conn.recv()) # prints "[42, None, 'hello']"
12 p.join()
由pipe()返回的两个连接对象表示管道的两端。每个连接对象都有send()和recv()方法。注意,如果两个进程(或线程)试图同时读取或写入管道的同一端口,那么管道中的数据可能会被损坏。当然,在同时使用不同端口的过程中也不会有腐败的风险。
Managers
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support:
types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
1 from multiprocessing import Process, Manager
2
3 def f(d, l):
4 d[1] = '1'
5 d['2'] = 2
6 d[0.25] = None
7 l.append(1)
8 print(l)
9
10 if __name__ == '__main__':
11 with Manager() as manager:
12 d = manager.dict()
13
14 l = manager.list(range(5))
15 p_list = []
16 for i in range(10):
17 p = Process(target=f, args=(d, l))
18 p.start()
19 p_list.append(p)
20 for res in p_list:
21 res.join()
22
23 print(d)
24 print(l)
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
- apply
- apply_async
1 from multiprocessing import Process,Pool
2 import time
3
4 def Foo(i):
5 time.sleep(2)
6 return i+100
7
8 def Bar(arg):
9 print('-->exec done:',arg)
10
11 pool = Pool(5)
12
13 for i in range(10):
14 pool.apply_async(func=Foo, args=(i,),callback=Bar)
15 #pool.apply(func=Foo, args=(i,))
16
17 print('end')
18 pool.close()
19 pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
上一篇: 什么是进程?线程?协程?
下一篇: 透彻解析硬件层面的线程概念是什么