DAG与拓扑排序

—— by golang

DAG有向无环图Directed Acyclic Graph)的简称;有向是排序的前提,无环则保证了路径收敛;实际应用中通常用DAG来表明一系列事件之间的依赖关系;

有向图表示

首先我们需要定义一个数据结构能表示DAG;数据结构中可以直观地表示有向,而无环无法直接体现在结构中;

注:为简单起见,我们假设DAG的起始节点只有一个(单根有向图);即使存在多个起始节点,也可以通过添加一个上游虚拟节点来单根化

package algo

type Node struct {
    Name   string
    Childs map[string]*Node
}

func (n *Node) AddChild(child *Node) {
    if n.Childs == nil {
        n.Childs = make(map[string]*Node)
    }
    n.Childs[child.Name] = child
}

如上,这样的一个节点中,我们可以直观地看到它的下游节点(出度),但不知道上游节点(入度);而在拓扑排序中,我们需要知道每个节点的入度和出度;故,我们再给出一个追溯上游节点的方法:

注:返回类型map[string]map[string]struct{}中第一层key为子节点Name,第二层key为父节点Name;与map[string][]string等价,第二层也声明为map只是为了方便查找;

package algo

func (n *Node) ParentsMap(dupCache map[string]struct{}) map[string]map[string]struct{} {

    if dupCache == nil {
        dupCache = map[string]struct{}{}
    }

    if _, ok := dupCache[n.Name]; ok {
        return nil
    }
    dupCache[n.Name] = struct{}{}

    psM := map[string]map[string]struct{}{
        n.Name: map[string]struct{}{},
    }

    for chName, chNode := range n.Childs {

        ps, ok := psM[chName]
        if !ok {
            ps = map[string]struct{}{}
        }
        ps[n.Name] = struct{}{}
        psM[chName] = ps

        chpsM := chNode.ParentsMap(dupCache)
        for ch, chps := range chpsM {
            ps, ok := psM[ch]
            if !ok {
                ps = map[string]struct{}{}
            }
            for p := range chps {
                ps[p] = struct{}{}
            }
            psM[ch] = ps
        }
    }

    return psM
}

拓扑排序

拓扑排序的原理,先找出所有入度为零的节点作为开始,然后合并单出单入(单链)的节点,将这些节点并入同一层(level),并从图中去除这些节点;然后对剩下的图重复如上操作;直到无法生成level为止,如果此时图中依然存在节点,则说明图中有环;否则排序完毕;

注:返回类型[][][]string中,最里层表示单链节点集合,中间层表示同一level中的单链集合,最外层表示level集合

package algo

func (n *Node) Sort() ([][][]string, error) {

    psm := n.ParentsMap(nil)
    origPsm := map[string]map[string]struct{}{} // deepCopy
    for n, ps := range psm {
        origPs := map[string]struct{}{}
        for p := range ps {
            origPs[p] = struct{}{}
        }
        origPsm[n] = origPs
    }

    sorted := [][][]string{}
    curNodes := map[string]*Node{
        n.Name: n,
    }

    for len(curNodes) > 0 {
        level := [][]string{}
        nextNodes := map[string]*Node{}
        for _, curN := range curNodes {
            group := []string{curN.Name}
            for {
                nextNode, ok := curN.chainChild(origPsm)
                if ok {
                    ps := psm[nextNode.Name]
                    delete(ps, curN.Name)
                    psm[nextNode.Name] = ps

                    curN = nextNode
                    group = append(group, curN.Name)
                    continue
                }
                break
            }
            level = append(level, group)
            for nextName, nextNode := range curN.Childs {
                ps := psm[nextName]
                delete(ps, curN.Name)
                psm[nextName] = ps
                if len(ps) == 0 {
                    nextNodes[nextName] = nextNode
                }
            }
        }
        sorted = append(sorted, level)
        curNodes = nextNodes
    }

    for _, ps := range psm {
        if len(ps) > 0 {
            return nil, errors.New("cycle found")
        }
    }

    return sorted, nil
}

// find single chain Child
func (n *Node) chainChild(parentsMap map[string]map[string]struct{}) (*Node, bool) {

    if len(n.Childs) != 1 {
        return nil, false
    }

    for chName, chNode := range n.Childs {
        if len(parentsMap[chName]) != 1 {
            return nil, false
        }

        return chNode, true
    }
    return nil, false
}

最后更新于

这有帮助吗?