Python中 进程 线程 协程 (多任务概念/重点)
进程
首先明确一下进程和线程的概念,进程系统进行资源分配的基本单位,一台机器上可以有多个进程,每个进程执行不同的程序,每个进程相对独立,拥有自己的内存空间,隔离性和稳定性比较高,进程之间互不影响,但是资源共享相对麻烦,系统资源占用相对高,同时进程可以利用cpu多核资源,适合cpu密集型任务,比如一些统计计算任务,比如计算广告转化率,uv、pv等等,或者一些视频的压缩解码任务,进程还有一个使用场景,就是后期部署项目的时候,nginx反向代理后端服务,往往需要开启多个tornado服务来支持后台的并发,就是利用了多进程的互不干扰,就算某个进程僵死,也不会影响其他进程,进程使用的是mulitprossing库 ,往往是先声明进程实例,里面可以传入消费方法名称和不定长参数args,然后将实例放入指定进程数的容器中(list),通过循环或者列表推导式,使用start方法开启进程,join方法阻塞主进程。
线程
线程是系统进行资源调度的最小单位,它属于进程,每一个进程中都会有一个线程,由于线程操作是单进程的,所以线程之间可以共享内存变量,互相通信非常方便,它的系统开销比进程小,它是线程之间由于共享内存,会互相影响,如果一个线程僵死会影响其他线程,隔离性和稳定性不如进程,同时,线程并不安全,如果对同一个对象进行操作,需要手动加锁,另外从性能上讲,多线程会触发python的全局解释器锁,导致同一时间点只会有一个线程运行的交替运行模式,线程适用于io密集型任务,所谓io密集型任务就是大量的硬盘读写操作或者网络tcp通信的任务,一般就是爬虫和数据库操作,文件操作非常频繁的任务,比如我负责开发的审核系统,需要同时对mysql和redis有大量的读写操作,所以我使用多线程进行消费。线程使用的是Threading库 ,往往是先声明线程实例,里面可以传入消费方法名称和不定长参数args,然后将实例放入指定线程数的容器中(list),通过循环或者列表推导式,使用start方法开启线程,join方法阻塞主线程。
协程
协程是一种用户态的轻量级线程,协程的调度完全由用户控制,不像进程和线程是系统态,所以在不主动切换协程的情况下,操作全局变量的时候,可以无需加锁(这里有坑,协程库内置也是有锁的,但是看场景,如果使用场景内没有主动切换协程(await)写操作就不需要加锁,如果单协程执行过程中,主动切换了协程,写操作则需要加锁 协程是否加锁问题),只需要判断资源状态即可,效率非常高,同时协程是单线程的,即可以共享内存,又不需要系统态的线程切换,同时也不会触发gil全局解释器锁,所以它性能比线程要高。具体使用场景和线程一样,适合io密集型任务,所谓io密集型任务就是大量的硬盘读写操作或者网络tcp通信的任务,一般就是爬虫和数据库操作,文件操作非常频繁的任务,比如我负责开发的审核系统,需要同时对mysql和redis有大量的读写操作,所以我后期将多线程改造成协程进行消费。协程我使用的python原生协程库asyncio库,首先通过asyncio.ensure_future(doout(4))方法建立协程对象,然后根据当天审核员数量指定开启协程数,和多线程以及多进程的区别是,协程既可以直接传实参,也可以传不定长参数,很方便,然后通过await asyncio.gather(*tasks)方法启动协程,需要注意的是,主方法需要声明成async方法,并且通过asyncio.run(main())来启动。协程虽然是python异步编程的最佳方式,但是我认为它也有缺点,那就是异步写法导致代码可读性下降,同时对编程人员的综合素质要求高,并不是所有人都能理解协程的工作方式,以及python原生协程的异步写法。
线程
进程,是执行中的计算机程序。也就是说,每个代码在执行的时候,首先本身即是一个进程。
一个进程具有:就绪,运行,中断,僵死,结束等状态(不同操作系统不一样)。
生命周期:
用户编写代码(代码本身是以进程运行的)
启动程序,进入进程“就绪”状态
操作系统调度资源,做“程序切换”,使得进程进入“运行”状态
结束/中断
特性
每个程序,本身首先是一个进程
运行中每个进程都拥有自己的地址空间、内存、数据栈及其它资源。
操作系统本身自动管理着所有的进程(不需要用户代码干涉),并为这些进程合理分配可以执行时间。
进程可以通过派生新的进程来执行其它任务,不过每个进程还是都拥有自己的内存和数据栈等。
进程间可以通讯(发消息和数据),采用 进程间通信(IPC) 方式。
说明
多个进程可以在不同的 CPU 上运行,互不干扰
同一个CPU上,可以运行多个进程,由操作系统来自动分配时间片
由于进程间资源不能共享,需要进程间通信,来发送数据,接受消息等
多进程,也称为“并行”。
进程间通信
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
进程队列queue
不同于线程queue,进程queue的生成是用multiprocessing模块生成的。
在生成子进程的时候,会将代码拷贝到子进程中执行一遍,及子进程拥有和主进程内容一样的不同的名称空间。
multiprocess.Queue 是跨进程通信队列
常用方法
q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
管道pipe
管道就是管道,就像生活中的管道,两头都能进能出
默认管道是全双工的,如果创建管道的时候映射成False,左边只能用于接收,右边只能用于发送,类似于单行道
import multiprocessing
def foo(sk):
sk.send('hello world')
print(sk.recv())
if __name__ == '__main__':
conn1,conn2=multiprocessing.Pipe() #开辟两个口,都是能进能出,括号中如果False即单向通信
p=multiprocessing.Process(target=foo,args=(conn1,)) #子进程使用sock口,调用foo函数
p.start()
print(conn2.recv()) #主进程使用conn口接收
conn2.send('hi son') #主进程使用conn口发送
常用方法
conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
注意:send()和recv()方法使用pickle模块对对象进行序列化
共享数据manage
Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。
注:进程间通信应该尽量避免使用共享数据的方式
进程池
开多进程是为了并发,通常有几个cpu核心就开几个进程,但是进程开多了会影响效率,主要体现在切换的开销,所以引入进程池限制进程的数量。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
线程
线程,是在进程中执行的代码。
一个进程下可以运行多个线程,这些线程之间共享主进程内申请的操作系统资源。
在一个进程中启动多个线程的时候,每个线程按照顺序执行。现在的操作系统中,也支持线程抢占,也就是说其它等待运行的线程,可以通过优先级,信号等方式,将运行的线程挂起,自己先运行。
使用
用户编写包含线程的程序(每个程序本身都是一个进程)
操作系统“程序切换”进入当前进程
当前进程包含了线程,则启动线程
多个线程,则按照顺序执行,除非抢占
特性
线程,必须在一个存在的进程中启动运行
线程使用进程获得的系统资源,不会像进程那样需要申请CPU等资源
线程无法给予公平执行时间,它可以被其他线程抢占,而进程按照操作系统的设定分配执行时间
每个进程中,都可以启动很多个线程
说明
多线程,也被称为”并发“执行。
线程池
系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
多线程通信
共享变量
创建全局变量,多个线程公用一个全局变量,方便简单。但是坏处就是共享变量容易出现数据竞争,不是线程安全的,解决方法就是使用互斥锁。
变量共享引申出线程同步问题
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。 使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。
队列
线程间使用队列进行通信,因为队列所有方法都是线程安全的,所以不会出现线程竞争资源的情况
Queue.Queue 是进程内非阻塞队列
进程和线程的区别
一个进程中的各个线程与主进程共享相同的资源,与进程间互相独立相比,线程之间信息共享和通信更加容易(都在进程中,并且共享内存等)。
线程一般以并发执行,正是由于这种并发和数据共享机制,使多任务间的协作成为可能。
进程一般以并行执行,这种并行能使得程序能同时在多个CPU上运行;
区别于多个线程只能在进程申请到的的“时间片”内运行(一个CPU内的进程,启动了多个线程,线程调度共享这个进程的可执行时间片),进程可以真正实现程序的“同时”运行(多个CPU同时运行)。
进程和线程的常用应用场景
一般来说,在Python中编写并发程序的经验:
计算密集型任务使用多进程
IO密集型(如:网络通讯)任务使用多线程,较少使用多进程.
这是由于 IO操作需要独占资源,比如:
网络通讯(微观上每次只有一个人说话,宏观上看起来像同时聊天)每次只能有一个人说话
文件读写同时只能有一个程序操作(如果两个程序同时给同一个文件写入 'a', 'b',那么到底写入文件的哪个呢?)
都需要控制资源每次只能有一个程序在使用,在多线程中,由主进程申请IO资源,多线程逐个执行,哪怕抢占了,也是逐个运行,感觉上“多线程”并发执行了。
如果多进程,除非一个进程结束,否则另外一个完全不能用,显然多进程就“浪费”资源了。
当然如上解释可能还不足够立即理解问题所在,让我们通过不断的实操来体验其中的“门道”。
协程
协程: 协程,又称微线程,纤程,英文名Coroutine。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行.
协程由于由程序主动控制切换,没有线程切换的开销,所以执行效率极高。对于IO密集型任务非常适用,如果是cpu密集型,推荐多进程+协程的方式。
协程,又称微线程。
说明
协程的主要特色是:
协程间是协同调度的,这使得并发量数万以上的时候,协程的性能是远远高于线程。
注意这里也是“并发”,不是“并行”。
常用库:greenlet gevent
协程优点:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu
协程缺点:
1.协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
2.协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
python中的协程
一个协程是一个函数/子程序(可以认为函数和子程序是指一个东西)。这个函数可以暂停执行, 把执行权让给 YieldInstruction,等 YieldInstruction 执行完成后,这个函数可以继续执行。 这个函数可以多次这样的暂停与继续。
注:这里的 YieldInstruction, 我们其实也可以简单理解为函数。
协程可以在“卡住”的时候可以干其它事情。
async def long_task():
... print('long task started')
... await asyncio.sleep(1)
... print('long task finished')
...
>>> loop.create_task(long_task())
<Task pending coro=<long_task() running at <stdin>:1>>
>>> loop.create_task(job1()) >>>>> loop.create_task(job1())
<Task pending coro=<job1() running at <stdin>:1>>
>>>
>>> try:
... loop.run_forever()
... except KeyboardInterrupt:
... pass
...
long task started
job1 started...
job1 paused
hello world
job1 resumed
job1 finished
long task finished
从这段程序的输出可以看出,程序本来是在执行 long task 协程,但由于 long task 要 await sleep 1 秒,于是 long task 自动暂停了,hello_world 协程自动开始执行, hello world 执行完之后,long task 继续执行。
协程有两种定义的方法, 其中使用生成器形式定义的协程叫做 generator-based coroutine, 通过 async/await 声明的协程叫做 native coroutine,两者底层实现都是生成器。接着, 我们阐述了协程的概念,从概念和例子出发,讲了协程和生成器最主要的特征:可以暂停执行和恢复执行。
协程异常处理
使用协程的时候一定加了很多的异常,但百密而一疏,总是会有想象不到的异常发生,这个时候为了不让程序整体崩溃应该使用协程的额外异常处理方法,这个方法会去执行绑定的回调函数.
g_dict=dict{}
g = gevent.spawn(self._g_fetcher, feed_name) # 创建协程
g_dict[feed_name] = [g,False]
g.link_exception(self._link_exception_callback) # 给该协程添加出现处理不了的异常时候的回调函数
def _link_exception_callback(self, g):
# 可能遇到无法修复的错误,需要修改代码 todo 报警
# 可以在这个函数里面做一些错误异常的打印,或者文件的关闭,连接的关闭.
self.terminated_flag = True # 停止整个程序 让 supervior重启
logger.info("_link_exception_callback {0} {1}".format(g, g.exception))
self._kill_sleep_gevent() # 轮询结束休眠的协程
def _kill_sleep_gevent(self):
for i,is_sleep in g_dict.items():
if is_sleep[1] == "True":
gevent.kill(is_sleep[0])
greenlet框架实现协程(封装yield的基础库)
greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的"Coroutine(协程)"的一个基础库。
基于greenlet框架的高级库gevent模块
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
原生协程
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存,在调度回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合
async函数完全可以看作多个异步操作,包装成的一个 Promise 对象,而await命令就是内部then命令的语法糖
import time
def job(t):
time.sleep(t)
print('用了%s' % t)
def main():
[job(t) for t in range(1,3)]
start = time.time()
main()
print(time.time()-start)
import time
import asyncio
async def job(t): # 使用 async 关键字将一个函数定义为协程
await asyncio.sleep(t) # 等待 t 秒, 期间切换执行其他任务
print('用了%s秒' % t)
async def main(loop): # 使用 async 关键字将一个函数定义为协程
tasks = [loop.create_task(job(t)) for t in range(1,3)] # 创建任务, 不立即执行
await asyncio.wait(tasks) # 执行并等待所有任务完成
start = time.time()
loop = asyncio.get_event_loop() # 建立 loop
loop.run_until_complete(main(loop)) # 执行 loop
loop.close() # 关闭 loop
print(time.time()-start)