侧边栏壁纸
  • 累计撰写 18 篇文章
  • 累计创建 12 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

使用NetworkX构建图相关应用

whfree
2024-10-23 / 0 评论 / 2 点赞 / 7 阅读 / 28416 字

使用NetworkX实现任务调度功能

图是相对复杂的一种数据结构,在算法中算是比较重要的内容。图可以有效建模各种复杂的关系和网络,比如社交网络、交通网络、互联网等等,还有一些相关的算法:深搜、广搜、迪杰斯特拉算法、拓扑排序等。

Python中的NetworkX库是用于创建、操作和研究复杂网络结构的库,并提供了许多的算法。我想要实现一个支持工作流调度和执行的功能,与Airflow中的DAG用于解析和任务调度一样,使用有向无环图(DAG)对任务节点进行编排是最合适的。以下对这个库进行探索和思考目标的实现方式。

NetworkX的基础用法

创建图

  • 无向图:G = nx.Graph()
  • 有向图:DG = nx.DiGraph()

默认创建没有节点和边的空白图

节点

添加单个节点,或从可迭代对象添加节点:

G.add_node(1)
G.add_nodes_from([3,4])

在NetworkX中,节点可以是任何 hashable 对象,例如文本字符串、图像、XML对象、另一个图形、自定义节点对象等。上述add_nodes_from方法中如果容器产生2个元组形式,那么表示添加节点并设置节点属性(node, node_attribute_dict)

G.add_nodes_from([
    (4, {"color": "red"}),
    (5, {"color": "green"}),
])

G.nodes返回图的所有节点视图(NodeView),也可以用作G.nodes(data='color', default=None)返回NodeDataView,这和字典的get方法很类似,作用是遍历节点时返回每个节点的data特定属性,默认为空;此外还可以使用G.nodes.items()遍历所有节点和属性,还有设置节点属性、提供默认值等等,非常灵活,总结如下代码:

import networkx as nx
# 自定义类
class Anode:
    def __init__(self, name):
        self.name = name

    def __repr__(self):
        return f"Anode({self.name})"

    def __eq__(self, other):
        return self.name == other.name

    def __hash__(self):
        return hash(self.name)

# 创建无向图,并添加不同类型的节点
G = nx.Graph()
G.add_nodes_from([
    ('a', {"type": "alpha"}),
    (('b', 2), {"data": "2", "type": "tuple"}),
    ('c', {"data": Anode("6-node")}),
    (Anode('d'), {"type": "Anode"})
])

# G.nodes直接返回节点名称
# a <class 'str'>
# ('b', 2) <class 'tuple'>
# c <class 'str'>
# Anode(d) <class '__main__.Anode'>
for nd in G.nodes:
    print(nd, type(nd))

# G.nodes(data='type', default='default_type')返回(节点名称, type属性值)的元组迭代容器,设置默认值
# a alpha
# ('b', 2) tuple
# c default_type
# Anode(d) Anode
for nd, type_data in G.nodes(data='type', default='default_type'):
    print(nd, type_data)

# G.nodes.items()返回所有节点名称及其属性值字典
# a {'type': 'alpha'}
# ('b', 2) {'data': '2', 'type': 'tuple'}
# c {'data': Anode(6-node)}
# Anode(d) {'type': 'Anode'}
for nd, data in G.nodes.items():
    print(nd, data)

# 修改特定节点的属性值
# {'type': 'Anode'}
# {'type': 'Anode', 'data': 'd'}
print(G.nodes[Anode('d')])
G.nodes[Anode('d')]['data'] = 'd'
print(G.nodes[Anode('d')])

# G.nodes.data('type', default=None)与G.nodes(data='type', default=None)类似
# a alpha
# ('b', 2) tuple
# c None
# Anode(d) Anode
for nd_name, data_type in G.nodes.data('type', default=None):
    print(nd_name, data_type)

在实验上述代码的时候,还学到了一个知识点。

图的节点需要是可哈希对象,但是如何知道自定义类是否是可哈希对象呢?判断方式就是:如果一个对象的哈希值在其生命周期内从未改变(需要实现__hash__方法),并且可以与其他对象进行比较(需要实现__eq__方法),满足这两个条件那么就是可哈希的。比较相等的可哈希对象必须具有相同的哈希值。

用户自定义类的实例对象默认是可哈希的,因为这些对象除了和自己进行比较,都是不相等的,并且哈希值是从对象本身的存储地址(id)产生的

注意,上述将节点的名称设置为自定义类的实例,即便将类实现为可哈希对象,但也不那么合适将其设置为键或节点名称,类一般涉及到很多属性,并不都像上面仅设置name属性作为哈希值,对象的属性很容易被意外的修改。最好还是将元组、字符串这类可哈希的基础数据类型作为节点名称,节点的属性就可以按自定义的设置进行了。

添加一条边或从可迭代对象中添加边。

G.add_edge('a', ('b', 2))
G.add_edges_from([
    ('a', 'c'),
    ('c', Anode('d'))
])

与上面类似,add_edges_from可以从三元元组设置边的权重或其他属性,但貌似权重的键只能为weight,值为数值类型。

除了上述G.nodesG.edges,还有G.adj是类似的用法。

还有一些基础的内容可以从官方文档中获取所有详细的解释,有关图(Graph)的一些方法可以参考Graph—Undirected graphs with self loops

有向无环图(DGA)

使用有向无环图对任务进行编排,每个节点表示需要执行的任务,箭头的指向表明执行的依赖关系。那么对有向无环图进行拓扑排序后,可以得到一组任务的执行顺序,所有任务的执行都满足依赖关系。

triangle_graph = nx.DiGraph([('a', 'b'), ('a', 'c'), ('c', 'd'), ('d', 'e'), ('d', 'f'), ('b', 'f')])
nx.draw_planar(
    triangle_graph,
    with_labels=True,
    node_size=1000,
    node_color="#ffff8f",
    width=0.8,
    font_size=14,
)

img

以上面这个图为例,其拓扑排序可以为a→b→c→d→e→f,这是一个串行的顺序,在满足任务执行的依赖关系的前提下,可以考虑并行执行的方式,如bcef是可以同步执行的,这样可以大大提高执行的效率。

那么如何实现?需要设计两个类,一个TaskNode节点类,描述节点的名称、执行函数等属性,决定了任务如何执行;另一个就是WorkFlow图类,用于将任务节点组织为有向无环图,那么关键就在如何获取节点的执行顺序,NetworkX已经实现了判断是否为有向无环图is_directed_acyclic_graph、拓扑排序topological_sort等算法,可以帮助我们在构建工作流时检查是否存在循环。图的拓扑排序是符合任务执行顺序的一种,但是为了利用多线程等并行的方式提高效率,可以每次执行时都从0入度节点开始执行,执行完成后将节点的入度减去1,循环直到所有节点执行完成,NetworkX就实现了topological_generations这个函数,函数按顺序返回每一组执行的节点。上述例子的顺序就是[['a'], ['b', 'c'], ['d'], ['e', 'f']]

整体代码参考如下:

class TaskNode:
    def __init__(self, name: str, func=None, func_kwargs: dict={}):
        self.name = name
        self.func = func
        self.func_kwargs = func_kwargs

    def run(self, **kwargs):
        run_kwargs = {
            **self.func_kwargs,
            **kwargs
        }
        if self.func:
            return self.func(**run_kwargs)
        else:
            ...


def run_tasks_in_parallel(tasks: list[TaskNode], num_threads=4):
    # 创建一个线程池
    pool = ThreadPool(num_threads)

    # 使用线程池异步地执行所有任务
    results = pool.map(lambda task: task.run(), tasks)
    # 关闭线程池,等待所有线程完成
    pool.close()
    pool.join()

    return results


class WorkFlow:
    def __init__(self, flow_id, flow_name, nodes=None, edges=None):
        self.flow_id = flow_id
        self.flow_name = flow_name
        self.dag = nx.DiGraph()
        if nodes:
            self.add_nodes(nodes)
        if edges:
            self.add_edges(edges)

    def add_node(self, task_node: TaskNode):
        if not isinstance(task_node, TaskNode):
            raise TypeError(f"Param `task_node` expected a TaskNode, not {type(task_node)}.")

        self.dag.add_node(task_node.name, task=task_node)

    def add_edge(self, u, v):
        self.dag.add_edge(u, v)

    def add_nodes(self, task_nodes: list[TaskNode]):
        for task_node in task_nodes:
            self.add_node(task_node)

    def add_edges(self, edge_list: list[tuple]):
        for edge in edge_list:
            self.add_edge(*edge)

    def check_dag(self):
        return nx.is_directed_acyclic_graph(self.dag)

    def run_node(self, node_name, *args, **kwargs):
        self.dag.nodes[node_name]['task'].run(*args, **kwargs)

    def node_topological_sort(self):
        indegree_map = {v: d for v, d in self.dag.in_degree() if d > 0}
        zero_indegree = [v for v, d in self.dag.in_degree() if d == 0]
        while zero_indegree:
            yield zero_indegree
            new_zero_indegree = []
            for v in zero_indegree:
                for _, child in self.dag.edges(v):
                    indegree_map[child] -= 1
                    if not indegree_map[child]:
                        new_zero_indegree.append(child)
            zero_indegree = new_zero_indegree

    def start(self, processes=None):
        results = None
        run_nodes_group = self.node_topological_sort()
        for group in run_nodes_group:
            if processes:
                results = run_tasks_in_parallel([self.dag.nodes[t]['task'] for t in group], processes)
            else:
                for task in group:
                    self.dag.nodes[task]['task'].run()
        return results

    def visualize(self):
        nx.draw_planar(
            self.dag,
            with_labels=True,
            node_size=1000,
            node_color="#ffff8f",
            width=0.8,
            font_size=14,
        )

到这整体的功能是实现了,工作流可以按照设想的方式执行。

参考

Directed Acyclic Graphs & Topological Sort

2

评论区