# 多核并行等待

#! coding=utf-8
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor ,wait
print(cpu_count())
def play(pari):
    time.sleep(2)
    print(pari)
    return {'url':'url'}
def doneFunc(obj):
'''
回调函数
'''
    res=obj.result();
    # 获取返回的内容
    print(res)
    print('done')
import time
if __name__ == '__main__':
    start = time.time()
    # result = ProcessPoolExecutor().map(play, range(1,100))
    futures = {}
    with ProcessPoolExecutor() as p:
        for i in range(30):
            job=p.submit(play,i).add_done_callback(doneFunc)
            futures[job]=i
    # 等待执行
    # wait(futures)
    p.shutdown(True)
    end = time.time()

    print("耗时:", end - start)
    print(123)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 多cpu并行处理

#! coding=utf-8
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
print(cpu_count())
def play(pari):
    time.sleep(2)
    print(pari)
import time
if __name__ == '__main__':
    start = time.time()
    result = ProcessPoolExecutor().map(play, range(1,1000))
    end = time.time()

    print("耗时:", end - start)
    print(123)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 异步模块

# asyncio

提示

Asyncio 和其他 Python 程序一样,是单线程的,它只有一个主线程,但可以进行多个不同的任务 loop.run_in_executor(executor, get_url, url):线程池调用

import asyncio
import time


# 定义第1个协程,协程就是将要具体完成的任务,该任务耗时3秒,完成后显示任务完成
async def to_do_something(i):
    print('第{}个任务:任务启动...'.format(i))
    # 遇到耗时的操作,await就会使任务挂起,继续去完成下一个任务
    await asyncio.sleep(i)
    print('第{}个任务:任务完成!'.format(i))


# 定义第2个协程,用于通知任务进行状态
async def mission_running():
    print('任务正在执行...')


start = time.time()
# 创建一个循环
loop = asyncio.get_event_loop()
# 创建一个任务盒子tasks,包含了3个需要完成的任务
tasks = [asyncio.ensure_future(to_do_something(1)),
         asyncio.ensure_future(to_do_something(2)),
         asyncio.ensure_future(mission_running())]
# tasks接入loop中开始运行
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print(end - start)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 配合线程池使用

# 使用多线程:在协程中集成阻塞io
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse


def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # client.setblocking(False)
    client.connect((host, 80))  # 阻塞不会消耗cpu

    # 不停的询问连接是否建立好, 需要while循环不停的去检查状态
    # 做计算任务或者再次发起其他的连接请求

    client.send(
        "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))

    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break

    data = data.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()


if __name__ == "__main__":
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(3)
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        # 返回 task
        task = loop.run_in_executor(executor, get_url, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print("last time:{}".format(time.time()-start_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53