一. IO多路复用
IO多路复用作用:检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(可读/可写)二. 基于IO多路复用+socket实现并发请求(一个线程100个请求) IO多路复用 socket非阻塞 基于事件循环实现的异步非阻塞框架:aaaa.py 非阻塞:不等待 异步:执行完某个任务后自动调用我给他的函数。 Python中开源 基于事件循环实现的异步非阻塞框架 Twisted# aaaa.pyimport socketimport selectclass Req(object): def __init__(self,sk,func): self.sock = sk self.func = func def fileno(self): return self.sock.fileno()class Nb(object): def __init__(self): self.conn_list = [] self.socket_list = [] def add(self,url,func): client = socket.socket() client.setblocking(False) # 非阻塞 try: client.connect((url, 80)) except BlockingIOError as e: pass obj = Req(client,func) self.conn_list.append(obj) self.socket_list.append(obj) def run(self): while True: rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005) # wlist中表示已经连接成功的req对象 for sk in wlist: # 发生变换的req对象 sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') self.conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.sock.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) sk.func(body) sk.sock.close() self.socket_list.remove(sk) if not self.socket_list: break
IO多路复用作用
检测多个socket是否发生变化。 操作系统检测socket是否发生变化,有三种模式: select:最多1024个socket;循环去检测。 poll:不限制监听socket个数;循环去检测(水平触发)。 epoll:不限制监听socket个数;回调方式(边缘触发)。 Python模块: select.select select.epoll
提高并发方案:
- 多进程 - 多线程 - 异步非阻塞模块(Twisted) scrapy框架(单线程完成并发)
什么是异步非阻塞?
- 非阻塞,不等待。 比如创建socket对某个地址进行connect、获取接收数据recv时默认都会等待(连接成功或接收到数据),才执行后续操作。 如果设置setblocking(False),以上两个过程就不再等待,但是会报BlockingIOError的错误,只要捕获即可。 - 异步,通知,执行完成之后自动执行回调函数或自动执行某些操作(通知)。 比如做爬虫中向某个地址baidu.com发送请求,当请求执行完成之后自执行回调函数。
什么是同步阻塞?
- 阻塞:等 - 同步:按照顺序逐步执行 key_list = ['alex','db','sb'] for item in key_list: ret = requests.get('https://www.baidu.com/s?wd=%s' %item) print(ret.text)class Foo(object): def __init__(self,data,info): self.row = data self.info = info def append(self,item): self.row.append(item)v = [ Foo([11,22],'线程'), # 每个都有一个append方法 Foo([22,33],'地方'), # 每个都有一个append方法 Foo([33,44],'水电费'), # 每个都有一个append方法]for item in v: print(item.append) print(item.info) print(item.row)
三. 协程 概念: 进程,操作系统中存在; 线程,操作系统中存在; 协程,是由程序员创造出来的一个不是真实存在的东西; 协程:是微线程,对一个线程进程分片,使得线程在代码块之间进行来回切换执行,而不是在原来逐行执行。
单纯的协程无用 协程 + 遇到IO就切换 => 这样就有用了 pip3 install gevent
单线程提供并发:
- 协程+IO切换:gevent - 基于事件循环的异步非阻塞框架:Twisted
import greenletdef f1(): print(11) gr2.switch() print(22) gr2.switch()def f2(): print(33) gr1.switch() print(44)# 协程 gr1gr1 = greenlet.greenlet(f1)# 协程 gr2gr2 = greenlet.greenlet(f2)gr1.switch()
# 代码:geventfrom gevent import monkeymonkey.patch_all() # 以后代码中遇到IO都会自动执行greenlet的switch进行切换import requestsimport geventdef get_page1(url): ret = requests.get(url) print(url,ret.content)def get_page2(url): ret = requests.get(url) print(url,ret.content)def get_page3(url): ret = requests.get(url) print(url,ret.content)gevent.joinall([ gevent.spawn(get_page1, 'https://www.python.org/'), # 协程1 gevent.spawn(get_page2, 'https://www.yahoo.com/'), # 协程2 gevent.spawn(get_page3, 'https://github.com/'), # 协程3])
# 代码:twistedfrom twisted.web.client import getPage, deferfrom twisted.internet import reactordef all_done(arg): reactor.stop()def callback(contents): print(contents)deferred_list = []url_list = ['http://www.bing.com', 'http://www.baidu.com', ]for url in url_list: deferred = getPage(bytes(url, encoding='utf8')) deferred.addCallback(callback) deferred_list.append(deferred)dlist = defer.DeferredList(deferred_list)dlist.addBoth(all_done)reactor.run()
其他协程方法, 用yield只做一个生成器