引子

拓扑排序是以前学图论的时候学到的,但是当时太年轻,不了解这个算法的应用。现在终于知道,它在一些任务依赖方法应用广泛,比如编译系统,各个模块之间有依赖,形成一幅有向无环图,这个时候,使用拓扑排序,就可以找到编译的顺序,并且能发现哪些模块是可以并行编译的。

我希望这个算法能够应用在工作中,想起来我们系统中很多风控规则,其实每个规则就是一个任务,而目前只有几个任务是有依赖关系的,既然风控没有让我们定义依赖关系,我就自己定义一个关系吧:


rules = [
    ('userLoanLimit', 'platformLoanLimit'),
    ('platformLoanLimit', 'PreRepayment'),
    ('platformLoanLimit', 'userAge'),
    ('userLoanLimit', 'userAge'),
    ('userAge', 'userName'),
    ('userAge', 'userEducation'),
    ('userAge', 'baiqishi'),
    ('userName', 'PreRepayment'),
    ('userName', 'RepaymentTimes'),
    ('userEducation', 'PreRepayment'),
    ('PreRepayment', 'RepaymentTimes'),
    ('RepaymentTimes', 'baiqishi'),
    ('PreRepayment', 'baiqishi'),
    ('baiqishi', 'tianji')
]

rules数组里面每个元素是一个元组,元组(‘userAge’, ‘userName’)表示任务userName依赖于任务userAge, 下面画个图来看一下:

import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
%matplotlib notebook

rules = [
    ('userLoanLimit', 'platformLoanLimit'),
    ('platformLoanLimit', 'PreRepayment'),
    ('platformLoanLimit', 'userAge'),
    ('userLoanLimit', 'userAge'),
    ('userAge', 'userName'),
    ('userAge', 'userEducation'),
    ('userAge', 'baiqishi'),
    ('userName', 'PreRepayment'),
    ('userName', 'RepaymentTimes'),
    ('userEducation', 'PreRepayment'),
    ('PreRepayment', 'RepaymentTimes'),
    ('RepaymentTimes', 'baiqishi'),
    ('PreRepayment', 'baiqishi'),
    ('baiqishi', 'tianji')
]
G = nx.DiGraph()
G.add_edges_from(rules)

pos = nx.shell_layout(G)
nx.draw_networkx(G, pos, node_size=500, alpha=0.6, arrowsize=10, arrowstyle='->')
<IPython.core.display.Javascript object>

这个图看起来有点眼花缭乱,很难人工区分出来应该先跑哪些规则,再跟哪些规则,这个时候就要用上拓扑排序了。

拓扑排序的实现

拓扑排序目前有2种著名的实现方法,一种是教科书上使用的Kanh算法,另外一种是直接深入优先搜索算法。

Kanh算法

Kanh算法的思想大概是这样子的:

  • 维护一个队列,将所有入度为0的节点依次排队
  • 对于队列中的每个节点,输出,然后删掉它与其他节点的边,即它指向的节点入度都要减1,这个时候如果有入度为0的节点,也放进队列
  • 不断循环上面一个步骤,直到所有节点都处理完

首先,入度为0的节点表示它没有依赖任何节点,所以拓扑排序之后的结果里面,应该首先选取入度为0的节点,这个很好理解。 接着,为什么要删掉该节点指向其他节点的边呢?其实就是,处理完这个节点之后,删掉这些边,剩下的子图,就是同一个子问题,不断解决子问题,最后就解决了整个问题。

来看代码的实现:

from collections import deque

class Graph:
    def __init__(self, rules):
        # 邻接表
        self.adjacent = {}
        # 入度
        self.in_degree = {}
        
        for rule in rules:
            if rule[0] not in self.adjacent:
                self.adjacent[rule[0]] = []
            self.adjacent[rule[0]].append(rule[1])
            
            # 计算各个节点的入度
            if rule[0] not in self.in_degree:
                self.in_degree[rule[0]] = 0
            if rule[1] not in self.in_degree:
                self.in_degree[rule[1]] = 0
            self.in_degree[rule[1]] += 1

    def topological_sort_kanh(self):
        result = []
        queue = deque([node for node in self.in_degree if self.in_degree[node] == 0])
        while len(queue) > 0:
            node = queue.popleft()
            result.append(node)
            if node not in self.adjacent:
                continue
            for neighbor in self.adjacent[node]:
                self.in_degree[neighbor] -= 1
                if self.in_degree[neighbor] == 0:
                    queue.append(neighbor)
        return result

graph = Graph(rules)
result = graph.topological_sort_kanh()
print(result)
['userLoanLimit', 'platformLoanLimit', 'userAge', 'userName', 'userEducation', 'PreRepayment', 'RepaymentTimes', 'baiqishi', 'tianji']

最后拓扑排序的结果如上,对了一下原图,是对的,我们就可以根据这个来从头到尾跑规则了,即使未来增加再多风控规则,依赖关系再复杂,使用这个算法,都不是问题!

深度优先搜索实现

这种方法的原理是:拓扑排序的结果其实就是图的逆后序遍历的结果。

我们知道,后序遍历是遍历了节点之后才访问的,所以每一次遍历,遍历到最后一个节点一定是出度为0的节点,出于后序遍历里面遍历与访问是反过来的,那么最先记下来的就是出度为0的节点,最后记下来的就是入度为0的节点。我们把这个访问顺序倒过来,其实就是入度0的排在最前面,然后当这个节点不存在,再排入度为0的节点,这就是拓扑排序的顺序了,跟Kanh算法本质上是一样的。

下面看代码:

from collections import deque

class DiGraph:
    def __init__(self, rules):
        # 邻接表
        self.adjacent = {}
        # 是否访问过
        self.marked = {}
        
        for rule in rules:
            self.marked[rule[0]] = False
            self.marked[rule[1]] = False
            if rule[0] not in self.adjacent:
                self.adjacent[rule[0]] = []
            self.adjacent[rule[0]].append(rule[1])
    def topological_sort_dfs(self):
        result = deque([])
        for node in self.adjacent:
            if not self.marked[node]:
                self.dfs(node, result)
        return result
    
    def dfs(self, node, result):
        self.marked[node] = True
        if node in self.adjacent:
            for neighbor in self.adjacent[node]:
                if not self.marked[neighbor]:
                    self.dfs(neighbor, result)
        result.appendleft(node)

graph = DiGraph(rules)
result = graph.topological_sort_dfs()
print(result)
deque(['userLoanLimit', 'platformLoanLimit', 'userAge', 'userEducation', 'userName', 'PreRepayment', 'RepaymentTimes', 'baiqishi', 'tianji'])

上面的结果跟Kanh算法几乎一样,除了userName和userEducation位置稍有不同,但是这个不影响,因为它们2个没有谁先谁后的规定。

平时在应用该算法时,我还是会选择Kanh算法,因为Kanh算法比较直观。

参考链接