# 多核并行等待
#! coding=utf-8
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor ,wait
print(cpu_count())
def play(pari):
time.sleep(2)
print(pari)
return {'url':'url'}
def doneFunc(obj):
'''
回调函数
'''
res=obj.result();
# 获取返回的内容
print(res)
print('done')
import time
if __name__ == '__main__':
start = time.time()
# result = ProcessPoolExecutor().map(play, range(1,100))
futures = {}
with ProcessPoolExecutor() as p:
for i in range(30):
job=p.submit(play,i).add_done_callback(doneFunc)
futures[job]=i
# 等待执行
# wait(futures)
p.shutdown(True)
end = time.time()
print("耗时:", end - start)
print(123)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 多cpu并行处理
#! coding=utf-8
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
print(cpu_count())
def play(pari):
time.sleep(2)
print(pari)
import time
if __name__ == '__main__':
start = time.time()
result = ProcessPoolExecutor().map(play, range(1,1000))
end = time.time()
print("耗时:", end - start)
print(123)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 异步模块
# asyncio
提示
Asyncio 和其他 Python 程序一样,是单线程的,它只有一个主线程,但可以进行多个不同的任务 loop.run_in_executor(executor, get_url, url):线程池调用
import asyncio
import time
# 定义第1个协程,协程就是将要具体完成的任务,该任务耗时3秒,完成后显示任务完成
async def to_do_something(i):
print('第{}个任务:任务启动...'.format(i))
# 遇到耗时的操作,await就会使任务挂起,继续去完成下一个任务
await asyncio.sleep(i)
print('第{}个任务:任务完成!'.format(i))
# 定义第2个协程,用于通知任务进行状态
async def mission_running():
print('任务正在执行...')
start = time.time()
# 创建一个循环
loop = asyncio.get_event_loop()
# 创建一个任务盒子tasks,包含了3个需要完成的任务
tasks = [asyncio.ensure_future(to_do_something(1)),
asyncio.ensure_future(to_do_something(2)),
asyncio.ensure_future(mission_running())]
# tasks接入loop中开始运行
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print(end - start)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 配合线程池使用
# 使用多线程:在协程中集成阻塞io
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
# 建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.setblocking(False)
client.connect((host, 80)) # 阻塞不会消耗cpu
# 不停的询问连接是否建立好, 需要while循环不停的去检查状态
# 做计算任务或者再次发起其他的连接请求
client.send(
"GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
data = b""
while True:
d = client.recv(1024)
if d:
data += d
else:
break
data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
# 返回 task
task = loop.run_in_executor(executor, get_url, url)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print("last time:{}".format(time.time()-start_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53