使用NetworkX实现任务调度功能
图是相对复杂的一种数据结构,在算法中算是比较重要的内容。图可以有效建模各种复杂的关系和网络,比如社交网络、交通网络、互联网等等,还有一些相关的算法:深搜、广搜、迪杰斯特拉算法、拓扑排序等。
Python中的NetworkX
库是用于创建、操作和研究复杂网络结构的库,并提供了许多的算法。我想要实现一个支持工作流调度和执行的功能,与Airflow
中的DAG
用于解析和任务调度一样,使用有向无环图(DAG)对任务节点进行编排是最合适的。以下对这个库进行探索和思考目标的实现方式。
NetworkX的基础用法
创建图
- 无向图:
G = nx.Graph()
- 有向图:
DG = nx.DiGraph()
默认创建没有节点和边的空白图
节点
添加单个节点,或从可迭代对象添加节点:
G.add_node(1)
G.add_nodes_from([3,4])
在NetworkX中,节点可以是任何 hashable 对象,例如文本字符串、图像、XML对象、另一个图形、自定义节点对象等。上述add_nodes_from
方法中如果容器产生2个元组形式,那么表示添加节点并设置节点属性(node, node_attribute_dict)
:
G.add_nodes_from([
(4, {"color": "red"}),
(5, {"color": "green"}),
])
G.nodes
返回图的所有节点视图(NodeView
),也可以用作G.nodes(data='color', default=None)
返回NodeDataView
,这和字典的get
方法很类似,作用是遍历节点时返回每个节点的data
特定属性,默认为空;此外还可以使用G.nodes.items()
遍历所有节点和属性,还有设置节点属性、提供默认值等等,非常灵活,总结如下代码:
import networkx as nx
# 自定义类
class Anode:
def __init__(self, name):
self.name = name
def __repr__(self):
return f"Anode({self.name})"
def __eq__(self, other):
return self.name == other.name
def __hash__(self):
return hash(self.name)
# 创建无向图,并添加不同类型的节点
G = nx.Graph()
G.add_nodes_from([
('a', {"type": "alpha"}),
(('b', 2), {"data": "2", "type": "tuple"}),
('c', {"data": Anode("6-node")}),
(Anode('d'), {"type": "Anode"})
])
# G.nodes直接返回节点名称
# a <class 'str'>
# ('b', 2) <class 'tuple'>
# c <class 'str'>
# Anode(d) <class '__main__.Anode'>
for nd in G.nodes:
print(nd, type(nd))
# G.nodes(data='type', default='default_type')返回(节点名称, type属性值)的元组迭代容器,设置默认值
# a alpha
# ('b', 2) tuple
# c default_type
# Anode(d) Anode
for nd, type_data in G.nodes(data='type', default='default_type'):
print(nd, type_data)
# G.nodes.items()返回所有节点名称及其属性值字典
# a {'type': 'alpha'}
# ('b', 2) {'data': '2', 'type': 'tuple'}
# c {'data': Anode(6-node)}
# Anode(d) {'type': 'Anode'}
for nd, data in G.nodes.items():
print(nd, data)
# 修改特定节点的属性值
# {'type': 'Anode'}
# {'type': 'Anode', 'data': 'd'}
print(G.nodes[Anode('d')])
G.nodes[Anode('d')]['data'] = 'd'
print(G.nodes[Anode('d')])
# G.nodes.data('type', default=None)与G.nodes(data='type', default=None)类似
# a alpha
# ('b', 2) tuple
# c None
# Anode(d) Anode
for nd_name, data_type in G.nodes.data('type', default=None):
print(nd_name, data_type)
在实验上述代码的时候,还学到了一个知识点。
图的节点需要是可哈希对象,但是如何知道自定义类是否是可哈希对象呢?判断方式就是:如果一个对象的哈希值在其生命周期内从未改变(需要实现
__hash__
方法),并且可以与其他对象进行比较(需要实现__eq__
方法),满足这两个条件那么就是可哈希的。比较相等的可哈希对象必须具有相同的哈希值。
用户自定义类的实例对象默认是可哈希的,因为这些对象除了和自己进行比较,都是不相等的,并且哈希值是从对象本身的存储地址(
id
)产生的
注意,上述将节点的名称设置为自定义类的实例,即便将类实现为可哈希对象,但也不那么合适将其设置为键或节点名称,类一般涉及到很多属性,并不都像上面仅设置name
属性作为哈希值,对象的属性很容易被意外的修改。最好还是将元组、字符串这类可哈希的基础数据类型作为节点名称,节点的属性就可以按自定义的设置进行了。
边
添加一条边或从可迭代对象中添加边。
G.add_edge('a', ('b', 2))
G.add_edges_from([
('a', 'c'),
('c', Anode('d'))
])
与上面类似,add_edges_from
可以从三元元组设置边的权重或其他属性,但貌似权重的键只能为weight
,值为数值类型。
除了上述G.nodes
、G.edges
,还有G.adj
是类似的用法。
还有一些基础的内容可以从官方文档中获取所有详细的解释,有关图(Graph)的一些方法可以参考Graph—Undirected graphs with self loops
有向无环图(DGA)
使用有向无环图对任务进行编排,每个节点表示需要执行的任务,箭头的指向表明执行的依赖关系。那么对有向无环图进行拓扑排序后,可以得到一组任务的执行顺序,所有任务的执行都满足依赖关系。
triangle_graph = nx.DiGraph([('a', 'b'), ('a', 'c'), ('c', 'd'), ('d', 'e'), ('d', 'f'), ('b', 'f')])
nx.draw_planar(
triangle_graph,
with_labels=True,
node_size=1000,
node_color="#ffff8f",
width=0.8,
font_size=14,
)
以上面这个图为例,其拓扑排序可以为a→b→c→d→e→f
,这是一个串行的顺序,在满足任务执行的依赖关系的前提下,可以考虑并行执行的方式,如b
和c
、e
和f
是可以同步执行的,这样可以大大提高执行的效率。
那么如何实现?需要设计两个类,一个TaskNode
节点类,描述节点的名称、执行函数等属性,决定了任务如何执行;另一个就是WorkFlow
图类,用于将任务节点组织为有向无环图,那么关键就在如何获取节点的执行顺序,NetworkX
已经实现了判断是否为有向无环图is_directed_acyclic_graph
、拓扑排序topological_sort
等算法,可以帮助我们在构建工作流时检查是否存在循环。图的拓扑排序是符合任务执行顺序的一种,但是为了利用多线程等并行的方式提高效率,可以每次执行时都从0入度节点开始执行,执行完成后将节点的入度减去1,循环直到所有节点执行完成,NetworkX
就实现了topological_generations
这个函数,函数按顺序返回每一组执行的节点。上述例子的顺序就是[['a'], ['b', 'c'], ['d'], ['e', 'f']]
。
整体代码参考如下:
class TaskNode:
def __init__(self, name: str, func=None, func_kwargs: dict={}):
self.name = name
self.func = func
self.func_kwargs = func_kwargs
def run(self, **kwargs):
run_kwargs = {
**self.func_kwargs,
**kwargs
}
if self.func:
return self.func(**run_kwargs)
else:
...
def run_tasks_in_parallel(tasks: list[TaskNode], num_threads=4):
# 创建一个线程池
pool = ThreadPool(num_threads)
# 使用线程池异步地执行所有任务
results = pool.map(lambda task: task.run(), tasks)
# 关闭线程池,等待所有线程完成
pool.close()
pool.join()
return results
class WorkFlow:
def __init__(self, flow_id, flow_name, nodes=None, edges=None):
self.flow_id = flow_id
self.flow_name = flow_name
self.dag = nx.DiGraph()
if nodes:
self.add_nodes(nodes)
if edges:
self.add_edges(edges)
def add_node(self, task_node: TaskNode):
if not isinstance(task_node, TaskNode):
raise TypeError(f"Param `task_node` expected a TaskNode, not {type(task_node)}.")
self.dag.add_node(task_node.name, task=task_node)
def add_edge(self, u, v):
self.dag.add_edge(u, v)
def add_nodes(self, task_nodes: list[TaskNode]):
for task_node in task_nodes:
self.add_node(task_node)
def add_edges(self, edge_list: list[tuple]):
for edge in edge_list:
self.add_edge(*edge)
def check_dag(self):
return nx.is_directed_acyclic_graph(self.dag)
def run_node(self, node_name, *args, **kwargs):
self.dag.nodes[node_name]['task'].run(*args, **kwargs)
def node_topological_sort(self):
indegree_map = {v: d for v, d in self.dag.in_degree() if d > 0}
zero_indegree = [v for v, d in self.dag.in_degree() if d == 0]
while zero_indegree:
yield zero_indegree
new_zero_indegree = []
for v in zero_indegree:
for _, child in self.dag.edges(v):
indegree_map[child] -= 1
if not indegree_map[child]:
new_zero_indegree.append(child)
zero_indegree = new_zero_indegree
def start(self, processes=None):
results = None
run_nodes_group = self.node_topological_sort()
for group in run_nodes_group:
if processes:
results = run_tasks_in_parallel([self.dag.nodes[t]['task'] for t in group], processes)
else:
for task in group:
self.dag.nodes[task]['task'].run()
return results
def visualize(self):
nx.draw_planar(
self.dag,
with_labels=True,
node_size=1000,
node_color="#ffff8f",
width=0.8,
font_size=14,
)
到这整体的功能是实现了,工作流可以按照设想的方式执行。
评论区