文章

py-并发处理

demao

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import json
import logging
import traceback
import os
import time
from functools import partial

from concurrent import futures
from itertools import islice
from typing import Any, Callable, Iterable, Iterator, Set, TypeVar

_T = TypeVar("_T")


class ThreadPoolExecutor(futures.ThreadPoolExecutor):
    _max_workers: int

    @property
    def max_workers(self) -> int:
        return self._max_workers

    def imap_unordered(
        self, fn: Callable[..., _T], *iterables: Iterable[Any]
    ) -> Iterator[_T]:
        """Lazier version of map that does not preserve ordering of results.

        It does not create all the futures at once to reduce memory usage.
        """

        def create_taskset(n: int) -> Set[futures.Future]:
            return {self.submit(fn, *args) for args in islice(it, n)}

        it = zip(*iterables)
        tasks = create_taskset(self.max_workers * 5)
        while tasks:
            done, tasks = futures.wait(tasks, return_when=futures.FIRST_COMPLETED)
            for fut in done:
                yield fut.result()
            tasks.update(create_taskset(len(done)))

def your_task(num):
    print(num)
    return 1


if __name__ == "__main__":

    numbers = list(range(30))

    with ThreadPoolExecutor(max_workers=5) as executor:
        processor = partial(executor.imap_unordered, your_task)
        res = sum(processor(numbers))

应用具体模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def _update_vmsphere_host_inst(self, si, cmdb_inst, server_type, host):
        if server_type == "mach":
            host_details = si.get_host_details(host)
        elif server_type == "vms":
            host_details = si.get_vm_details(host)
        else:
            logging.error(f"[{server_type}] is not exit, quit")
            exit(1)

        #print(host_details)

        # 唯一性校验
        ## 虚拟机数据
        host_inst_id = host_details["inst_id"]
        host_server_type = host_details["server_type"]
        host_host_innerip = host_details["host_innerip"]

        ## get cmdb 数据
        cmdb_inst.condition =  {
            "inst_id": { "$eq": host_inst_id },
            "server_type": { "$eq": host_server_type },
            "host_innerip": { "$eq": host_host_innerip },
        }
        cmdb_inst_info = cmdb_inst.get()
        #print(cmdb_inst_info['info'])

        params = {}
        # 创建/更新实例
        if cmdb_inst_info["count"] <= 0:
            cmdb_inst.create(host_details)
            logging.info(f"cmdb:{args.modal}:[{host_server_type}:{host_inst_id}:{host_host_innerip}]: add new inst")
        else:
            #logging.info(f"[服务器类型:{host_server_type}]: [主机实例ID: {host_inst_id}] [内网IP: {host_host_innerip}] is exist, start update")
            # 查看数据是否一致
            for key in host_details.keys():
                if key not in cmdb_inst_info['info'][0]:
                    #print(f"{key}: host: {host_details[key]} cmdb: {cmdb_inst_info['info'][0][key]}")
                    params[key] = host_details[key]
                else:
                    #print(f"{key}: {cmdb_inst_info['info'][0][key]}")
                    if host_details[key] != cmdb_inst_info['info'][0][key]:
                        params[key] = host_details[key]
            if params:
                logging.info(f"cmdb:{args.modal}:[{host_server_type}:{host_inst_id}:{host_host_innerip}]:[{params}]: update succss")
                update_inst = self.bkclient.InstanceObj(args.modal, cmdb_inst_info['info'][0]["bk_inst_id"])
                update_inst.update(params)
                #logging.info(f"cmdb:{args.modal}:[{host_server_type}:{host_inst_id}:{host_host_innerip}]: update succss")                                                                                          
            else:
                logging.info(f"cmdb:{args.modal}:[{host_server_type}:{host_inst_id}:{host_host_innerip}]: data unchanged, skip.")
            
def update_vmsphere_host_inst(self, param):
        # 实例化 vSphere Client 主机
        si = VCenterConnection()

        #print(args.modal)
        # 实例化主机模型
        cmdb_inst = self.bkclient.Instance(args.modal)


        # 并发 更新物理机主机实例
        hosts = si.get_all_hosts()
        with ThreadPoolExecutor(max_workers=10) as executor:
            update_fn = partial(self._update_vmsphere_host_inst, si, cmdb_inst, "mach")
            list(executor.imap_unordered(update_fn, hosts))
            logging.info(f"modal: [{args.modal}] vmSphere [mach] update succss")


============================================================================
"""
调用 update_vmsphere_host_inst 函数时,会调用 _update_vmsphere_host_inst 函数

_update_vmsphere_host_inst 函数里面有一个 服务列表的传入值
并发就是通过控制这个传入值进行 单实例传入



"""

并发模式

1
2
3
4
5
6
7
8
9
10
1. executor.map
保证结果的顺序与输入的顺序一致。
返回一个迭代器,按提交任务的顺序生成结果。
当所有的任务完成时,才会返回结果。


2. executor.imap_unordered
不保证结果的顺序。
返回一个迭代器,任务一旦完成就会立即生成结果,而不管它们是以什么顺序提交的。
适用于不需要保持结果顺序的场景,通常会更快,因为不需要等待所有任务完成才能开始返回结果。

并发的时候 同时传递多个列表信息

1
2
3
4
5
6
7
8
9
10
# 1.传递的参数设置为元组
combinations = []
for inst in cmdb_inst_info['info']:
	bk_inst_id = inst['bk_inst_id']
	inst_relate_info = inst[args.asstModelProps]
	combinations.append((bk_inst_id, inst_relate_info))

# 2.并发调用
with ThreadPoolExecutor(max_workers=10) as executor:
	list(executor.imap_unordered(lambda args: self._delete_asst_inst(*args), combinations)) 

并发的时候将结果传递出来

1
2
3
4
5
6
7
8
9
...
...

# 并发获取ECS信息
with ThreadPoolExecutor(max_workers=10) as executor:
	host_inst_info = list(executor.imap_unordered(lambda args: self._fetch_tf_inst_info(*args), combinations))

...
...
本文由作者按照 CC BY 4.0 进行授权