2.2.加强篇
其实以前的 linux中是没有线程这个概念的, windows程序员经常使用线程,这一看~方便啊,然后可能是当时程序员偷懒了,就把进程模块改了改(这就是为什么之前说linux下的多进程编程其实没有win下那么“重量级”),弄了个精简版进程==> 线程(内核是分不出 进程和线程的,反正 pcb个数都是一样)
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享(全局变量和堆 ==> 线程间共享。进程的栈 ==> 线程平分而独占)
还记得通过 current_thread()获取的线程信息吗?难道线程也没个id啥的?一起看看:(通过 ps-Lfpid来查看LWP)

回顾:进程共享的内容:(回顾:http://www.cnblogs.com/dotnetcrazy/p/9363810.html)
代码(.text)文件描述符(fd)内存映射(mmap)2.2.1.线程同步~互斥锁Lock
线程之间共享数据的确方便,但是也容易出现数据混乱的现象,来看个例子:
立即学习“Python免费学习笔记(深入)”;
代码语言:JavaScript代码运行次数:0运行复制
from multiprocessing.dummy import Threadingnum = 0 # def global numdef test(i): print(f"子进程:{i}") global num for i in range(100000): num += 1def main(): p_list = [threading.Thread(target=test, args=(i, )) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(num) # 应该是500000,发生了数据混乱,结果少了很多if __name__ == '__main__': main()
输出:(应该是 500000,发生了数据混乱,只剩下 358615)
代码语言:javascript代码运行次数:0运行复制
子进程:0子进程:1子进程:2子进程:3子进程:4452238
Lock案例
共享资源+CPU调度==>数据混乱==解决==>线程同步 这时候 Lock就该上场了
互斥锁是实现线程同步最简单的一种方式,读写都加锁(读写都会串行)
先看看上面例子怎么解决调:
代码语言:javascript代码运行次数:0运行复制
from multiprocessing.dummy import threading, Locknum = 0 # def global numdef test(i, lock): print(f"子进程:{i}") global num for i in range(100000): with lock: num += 1def main(): lock = Lock() p_list = [threading.Thread(target=test, args=(i, lock)) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(num)if __name__ == '__main__': main()
输出: time python31.thread.2.py
代码语言:javascript代码运行次数:0运行复制
子进程:0子进程:1子进程:2子进程:3子进程:4500000real 0m2.846suser 0m1.897ssys 0m3.159s
优化下
lock设置为全局或者局部,性能几乎一样。循环换成map后性能有所提升(测试案例在Code中)
代码语言:javascript代码运行次数:0运行复制
from multiprocessing.dummy import Pool as ThreadPool, Locknum = 0 # def global numlock = Lock()def test(i): print(f"子进程:{i}") global num global lock for i in range(100000): with lock: num += 1def main(): p = ThreadPool() p.map_async(test, list(range(5))) p.close() p.join() print(num)if __name__ == '__main__': main()
输出:
time Python31.thread.2.py
代码语言:javascript代码运行次数:0运行复制
子进程:0子进程:1子进程:3子进程:2子进程:4500000real 0m2.468suser 0m1.667ssys 0m2.644s
本来多线程访问共享资源的时候可以并行,加锁后就部分串行了(没获取到的线程就阻塞等了)
【项目中可以多次加锁,每次加锁只对修改部分加(尽量少的代码) 】(以后会说协程和Actor模型)
补充:以前都是这么写的,现在支持 with托管了(有时候还会用到,所以了解下):【net是直接 lock大括号包起来】
代码语言:javascript代码运行次数:0运行复制
#### 以前写法:lock.acquire() # 获取锁try: num += 1finally: lock.release() # 释放锁#### 等价简写with lock: num += 1
扩展知识:(GIL在扩展篇会详说)
GIL的作用:多线程情况下必须存在资源的竞争,GIL是为了保证在解释器级别的线程唯一使用共享资源(cpu)。同步锁的作用:为了保证解释器级别下的自己编写的程序唯一使用共享资源产生了同步锁
2.2.2.线程同步~递归锁RLock
看个场景:小明欠小张2000,欠小周5000,现在需要同时转账给他们:(规定:几次转账加几次锁)

小明啥也没管,直接撸起袖子就写Code了:(错误Code示意)
代码语言:javascript代码运行次数:0运行复制
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000xiaozhang = 3000xiaozhou = 5000def test(lock): global xiaoming global xiaozhang global xiaozhou # 小明想一次搞定: with lock: # 小明转账2000给小张 xiaoming -= 2000 xiaozhang += 2000 with lock: # 小明转账5000给小周 xiaoming -= 5000 xiaozhou += 5000def main(): print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}") lock = Lock() p = ThreadPool() p.apply_async(test, args=(lock, )) p.close() p.join() print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")if __name__ == '__main__': main()
小明写完代码就出去了,这可把小周和小张等急了,打了N个电话来催,小明心想啥情况?
一看代码楞住了,改了改代码,轻轻松松把钱转出去了:
代码语言:javascript代码运行次数:0运行复制
from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000xiaozhang = 3000xiaozhou = 5000# 小明转账2000给小张def a_to_b(lock): global xiaoming global xiaozhang with lock: xiaoming -= 2000 xiaozhang += 2000# 小明转账5000给小周def a_to_c(lock): global xiaoming global xiaozhou with lock: xiaoming -= 5000 xiaozhou += 5000def main(): print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}") lock = Lock() p = ThreadPool() p.apply_async(a_to_b, args=(lock, )) p.apply_async(a_to_c, args=(lock, )) p.close() p.join() print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")if __name__ == '__main__': main()
输出:
代码语言:javascript代码运行次数:0运行复制
[还钱前]小明8000,小张3000,小周5000[还钱后]小明1000,小张5000,小周10000
就这么算了吗?不不不,不符合小明性格,于是小明研究了下,发现~还有个递归锁 RLock呢,正好解决他的问题:
代码语言:javascript代码运行次数:0运行复制
from multiprocessing.dummy import Pool as ThreadPool, RLock # 就把这边换了下xiaoming = 8000xiaozhang = 3000xiaozhou = 5000def test(lock): global xiaoming global xiaozhang global xiaozhou # 小明想一次搞定: with lock: # 小明转账2000给小张 xiaoming -= 2000 xiaozhang += 2000 with lock: # 小明转账5000给小周 xiaoming -= 5000 xiaozhou += 5000def main(): print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}") lock = RLock() # 就把这边换了下 p = ThreadPool() p.apply_async(test, args=(lock, )) p.close() p.join() print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")if __name__ == '__main__': main()
RLock内部维护着一个 Lock和一个counter变量, counter记录了acquire的次数,从而使得资源可以被多次 require。直到一个线程所有的 acquire都被release,其他的线程才能获得资源
2.2.3.死锁引入1.多次获取导致死锁
小明想到了之前说的(互斥锁 Lock读写都加锁)就把代码拆分研究了下:
代码语言:javascript代码运行次数:0运行复制
print("[开始]小明转账2000给小张")lock.acquire() # 获取锁xiaoming -= 2000xiaozhang += 2000print("[开始]小明转账5000给小周")lock.acquire() # 获取锁(互斥锁第二次加锁)xiaoming -= 5000xiaozhou += 5000lock.release() # 释放锁print("[结束]小明转账5000给小周")lock.release() # 释放锁print("[开始]小明转账2000给小张")
输出发现:(第二次加锁的时候,变成阻塞等了【死锁】)
代码语言:javascript代码运行次数:0运行复制
[还钱前]小明8000,小张3000,小周5000[开始]小明转账2000给小张[开始]小明转账5000给小周
这种方式,Python提供的RLock就可以解决了
2.常见的死锁
看个场景:小明和小张需要流水帐,经常互刷~ 小明给小张转账1000,小张给小明转账1000
一般来说,有几个共享资源就加几把锁(小张、小明就是两个共享资源,所以需要两把 Lock)
先描述下然后再看代码:
正常流程 小明给小张转1000:小明自己先加个锁==>小明-1000==>获取小张的锁==>小张+1000==>转账完毕
死锁情况 小明给小张转1000:小明自己先加个锁==>小明-1000==>准备获取小张的锁。可是这时候小张准备转账给小明,已经把自己的锁获取了,在等小明的锁(两个人相互等,于是就一直死锁了)
代码模拟一下过程:
代码语言:javascript代码运行次数:0运行复制
from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000xiaozhang = 8000m_lock = Lock() # 小明的锁z_lock = Lock() # 小张的锁# 小明转账1000给小张def a_to_b(): global xiaoming global xiaozhang global m_lock global z_lock with m_lock: xiaoming -= 1000 sleep(0.01) with z_lock: xiaozhang += 1000# 小张转账1000给小明def b_to_a(): global xiaoming global xiaozhang global m_lock global z_lock with z_lock: xiaozhang -= 1000 sleep(0.01) with m_lock: xiaoming += 1000def main(): print(f"[还钱前]小明{xiaoming},小张{xiaozhang}") p = ThreadPool() p.apply_async(a_to_b) p.apply_async(b_to_a) p.close() p.join() print(f"[还钱后]小明{xiaoming},小张{xiaozhang}")if __name__ == '__main__': main()
输出:(卡在这边了)
代码语言:javascript代码运行次数:0运行复制
[转账前]小明5000,小张8000
项目中像这类的情况,一般都是这几种解决方法:(还有其他解决方案,后面会继续说)
按指定顺序去访问共享资源在访问其他锁的时候,先把自己锁解了trylock的重试机制得不到全部锁就先放弃已经获取的资源
比如上面的情况,我们如果规定,不管是谁先转账,先从小明开始,然后再小张,那么就没问题了。或者谁钱多就谁(权重高的优先)
代码语言:javascript代码运行次数:0运行复制
from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000xiaozhang = 8000m_lock = Lock() # 小明的锁z_lock = Lock() # 小张的锁# 小明转账1000给小张def a_to_b(): global xiaoming global xiaozhang global m_lock global z_lock # 以上次代码为例,这边只修改了这块 with z_lock: # 小张权重高,大家都先获取小张的锁 xiaozhang += 1000 sleep(0.01) with m_lock: xiaoming -= 1000# 小张转账1000给小明def b_to_a(): global xiaoming global xiaozhang global m_lock global z_lock with z_lock: xiaozhang -= 1000 sleep(0.01) with m_lock: xiaoming += 1000def main(): print(f"[转账前]小明{xiaoming},小张{xiaozhang}") p = ThreadPool() p.apply_async(a_to_b) p.apply_async(b_to_a) p.close() p.join() print(f"[转账后]小明{xiaoming},小张{xiaozhang}")if __name__ == '__main__': main()
输出:
代码语言:javascript代码运行次数:0运行复制
[转账前]小明5000,小张8000[转账后]小明5000,小张8000
2.2.4.线程同步~条件变量Condition
条件变量一般都不是锁,只能能阻塞线程,从而减少不必要的竞争,Python内置了 RLock(不指定就是RLock)
看看源码:
代码语言:javascript代码运行次数:0运行复制
class Condition: """ 实现条件变量的类。 条件变量允许一个或多个线程等到另一个线程通知它们为止 如果给出了lock参数而不是None,那必须是Lock或RLock对象作底层锁。 否则,一个新的RLock对象被创建并用作底层锁。 """ def __init__(self, lock=None): if lock is None: lock = RLock() self._lock = lock # 设置lock的acquire()和release()方法 self.acquire = lock.acquire self.release = lock.release
再看看可不可以进行with托管:(支持)
代码语言:javascript代码运行次数:0运行复制
def __enter__(self): return self._lock.__enter__()def __exit__(self, *args): return self._lock.__exit__(*args)
看个生产消费者的简单例子:(生产完就通知消费者)
代码语言:javascript代码运行次数:0运行复制
from multiprocessing.dummy import Pool as ThreadPool, Conditions_list = []con = Condition()def Shop(i): global con global s_list # 加锁保护共享资源 for x in range(5): with con: s_list.append(x) print(f"[生产者{i}]生产商品{x}") con.notify_all() # 通知消费者有货了def User(i): global con global s_list while True: with con: if s_list: print(f"列表商品:{s_list}") name = s_list.pop() # 消费商品 print(f"[消费者{i}]消费商品{name}") print(f"列表剩余:{s_list}") else: con.wait()def main(): p = ThreadPool() # 两个生产者 p.map_async(Shop, range(2)) # 五个消费者 p.map_async(User, range(5)) p.close() p.join()if __name__ == '__main__': main()
输出:(list之类的虽然可以不加global标示,但是为了后期维护方便,建议加上)
代码语言:javascript代码运行次数:0运行复制
[生产者0]生产商品0[生产者0]生产商品1列表商品:[0, 1][消费者0]消费商品1列表剩余:[0]列表商品:[0][消费者0]消费商品0列表剩余:[][生产者0]生产商品2列表商品:[2][消费者1]消费商品2列表剩余:[][生产者0]生产商品3[生产者1]生产商品0[生产者0]生产商品4列表商品:[3, 0, 4][消费者1]消费商品4列表剩余:[3, 0][生产者1]生产商品1[生产者1]生产商品2[生产者1]生产商品3[生产者1]生产商品4列表商品:[3, 0, 1, 2, 3, 4][消费者2]消费商品4列表剩余:[3, 0, 1, 2, 3]列表商品:[3, 0, 1, 2, 3][消费者0]消费商品3列表剩余:[3, 0, 1, 2]列表商品:[3, 0, 1, 2][消费者1]消费商品2列表剩余:[3, 0, 1]列表商品:[3, 0, 1][消费者3]消费商品1列表剩余:[3, 0]列表商品:[3, 0][消费者3]消费商品0列表剩余:[3]列表商品:[3][消费者3]消费商品3列表剩余:[]
通知方法:
notify() :发出资源可用的信号,唤醒任意一条因 wait()阻塞的进程notifyAll() :发出资源可用信号,唤醒所有因wait()阻塞的进程
2.2.5.线程同步~信号量Semaphore(互斥锁的高级版)
记得当时在分析 multiprocessing.Queue源码的时候,有提到过(点我回顾)
同进程的一样, semaphore管理一个内置的计数器,每当调用 acquire()时内置函数 -1,每当调用 release()时内置函数 +1
通俗讲就是:在互斥锁的基础上封装了下,实现一定程度的并行
举个例子,以前使用互斥锁的时候:(厕所就一个坑位,必须等里面的人出来才能让另一个人上厕所)

使用信号量之后:厕所坑位增加到5个(自己指定),这样可以5个人一起上厕所了==>实现了一定程度的并发
举个例子:(Python在语法这点特别爽,不用你记太多异同,功能差不多基本上代码也就差不多)
代码语言:javascript代码运行次数:0运行复制
from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Semaphoresem = Semaphore(5) # 限制最大连接数为5def goto_wc(i): global sem with sem: print(f"[线程{i}]上厕所") sleep(0.1)def main(): p = ThreadPool() p.map_async(goto_wc, range(50)) p.close() p.join()if __name__ == '__main__': main()
输出:

可能看了上节回顾的会疑惑:源码里面明明是 BoundedSemaphore,搞啥呢?
其实 BoundedSemaphore就比 Semaphore多了个在调用 release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常