golang 如何优雅的关闭 channels

GO 刘宇帅 5年前 阅读量: 2845

原文地址:How To Gracefully Close Channels

几天前我写了一篇介绍 GO 语言 channel 的文章,文章在redditHN上收到很多赞。
我搜集了一些关于 Go channels 设计和规则方面的评论:

  1. 除了主动去关闭一个 channel 外,并没有一种简单并且通用的方式来检测 channel 是否关闭了。
  2. 关闭一个已经关闭的 channel 会触发 panic,所以当你往一个不知道是否已经关闭的 channel 里发送数据是非常危险的。
  3. 向已经关闭的 channel 发送数据会导致 panic,所以当你往一个不知道是否关闭的 channel 里发送数据是非常危险的。
    这些评论看着挺合理的(事实上并不是),不过 Go 里确实没有一个内置函数来检测一个 channel 是否已经关闭。
    如果你可以确保没有数据发送到 channel 那么下面有一个简单的方法来检测 channel 是否关闭(这个方法会在文章的其他使用):
package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
    select {
    case <-ch:
        return true
    default:
    }
    return false
}

func main() {
    c := make(chan T)
    fmt.Println(IsClosed(c)) // false
    close(c)
    fmt.Println(IsClosed(c)) // true
}

如上面所说,这并不是一个通用的检测 channel 是否关闭的方法。
然而,尽管有一个简单的方法 closed(chan T) bool 检查 channel 是否关闭,但是他的使用是有限的,就像用内置的函数 len 检查当前buffer 或则 channel 包含内容的数量。原因就是在调用这个函数获得返回值之后,在这之间 channel 的状态可能已经改变了。虽然如果使用 closed(ch) 检查 ch 返回true,那么就停止发送数据到ch是没问题的,但是如果 closed 返回 false,那么如果继续往 ch 发送数据则是危险的。

channel 关闭的原则

Go channel 的一个一般的原则是 不要在接收端关闭、不要在多个并发发送数据的情况下关闭。换句话说,你只能在只有一个 goroutine 中发送数据到 channel 的情况下关闭 channel。
(以下我们会把上面的原则作为关闭 channel 的原则)
当然这不是一个关闭 channel 的普通原则。最普通的原则是 不要发送数据或则关闭一个已经关闭的 channel 如果一个 goroutine 能够保证不再会有其他的 goroutine 会关闭或则发送数据到这个 channel,那么这个 goroutine 可以安全的关闭这个 goroutine。然而一个接收数据的 goroutine 或则众多的发送数据之一的 goroutine 要想能够达到这个保证是非常难的,而且经常会让代码变得很复杂。相反的,如果坚持上面的一般原则是比较简单的。

粗暴的方式关闭 channel

如果你偏要在一个接收数据或则多个发送数据的 goroutine 中关闭一个 channel,那么你可以使用recover机制来避免程序因panic而崩溃。下面是一个例子(假定这是一个 T 类型的 channel )。

func SafeClose(ch chan T) (justClosed bool) {
    defer func() {
        if recover() != nil {
            justClosed = false
        }
    }()
    close(ch)
    return true
}

这种解决方案没有遵守上面的一般原则。
同样的方案来解决当发送数据到一个可能关闭的 channel 的情况:

func SafeSend(ch chan T, value T)(closed bool) {
    defer func() {
        if recover() != nil {
            closed = true
        }
    }()
    ch <- value
    return false
}

以比较客气的方式关闭 channel

很多人比较喜欢用 sync.Once 来关闭 channel

type MyChannel struct {
    C chan T
    once sync.Once
}
func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
    mc.once.Do(func() {
        close(mc.C)
    })
}

当然,我们也可以使用sync.Mutex避免多次关闭 channel

type MyChannel struct {
    C chan T
    close bool
    mutex sync.Mutex
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
    mc.mutex.Lock()
    defer mc.mutex.Unlock()
    if !mc.closed {
        close(mc.C)
        mc.closed = true
    }
}

func (mc *MyChannel) IsClosed() bool {
    mc.mutex.Lock()
    defer mc.mutex.Unlock()
    return mc.closed
}

其实我们应该理解为什么 Go 不内置支持 SafeSend and SafeClose 函数,那是因为官方是不支持在接收数据或则多个发送数据之一的 goroutine 中去关闭一个 channel 的。Go 甚至禁止关闭只读的 channel。

优雅的关闭 channel

上面SafeSend函数一个缺点就是无法在 case语句中去检查channel是否已经关闭。另外一个缺点就是很多人包括我自己会认为上面的 panic/recoversync方式不够优雅。下面介绍一些不依赖panic/recoversync包的 pure-channel 方案去应对所有的场景。
(下面的例子会使用sync.WaitGroup完成展示,但是在实践中它并不是必须的。)

在多个接收数据和一个发送数据的 goroutine 情况下,发送数据的 goroutine 通过发送 "no more sends"来通知接收者关闭 channel

当发送数据的 goroutine 不想发送数据的时候,直接发送 "no more sends" 到 channel,这是最简单的解决方案。

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    const MaxRandomNumber = 100000
    const NumReceivers = 100

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    dataCh := make(chan int, 100)

    // the sender
    go func() {
        for {
            if value := rand.Intn(MaxRandomNumber); value == 0 {

            // The only sender can close the channel safely.
                close(dataCh)
                return
            } else {
                dataCh <- value
            }
        }
    }()

    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func() {
            defer wgReceivers.Done()

            // Receive values until dataCh is closed and
            // the value buffer queue of dataCh is empty.
            for value := range dataCh {
                log.Println(value)
            }
        }()
    }
    wgReceivers.Wait()
}

在一个接收数据和多个发送数据 goroutine 的情况下,接收数据的 goroutine 可以通过额外的信号通道发送“please stop sending more"来停止channel

这种方案比上面的要复杂,我不能让接收数据的 goroutine去关闭 channel,因为这是违反上面的一般原则的。但是我们可以让接收数据的 goroutine 通过其他的信号 channel 通知发送数据的 goroutine 停止发送数据。

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const MaxRandomNumber = 100000
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)

    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel。
        // Its sender is the receiver of channel datach.
        // Its receivers are the senders of channel datach.
    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                第一个select是为了尽早的退出这个 goroutine,事实上它并不是必须的,可以忽略它。
                select {
                    case <- stopCh:
                        return
                    default:
                }

                select {
                    case <- stopCh:
                        return
                    case dataCh <- rand.Intn(MaxRandomNumber):
                }
            }
        }()
    }

    // the receiver
    go func() {
        defer wgReceivers.Done()
        for value := rand dataCh {
            if value == MaxRandomNumber - 1 {
                close(stopCh)
                return
            }
            log.Prinlnt(value)
        }
    }()

    // ...
    wgReceivers.Wait()
}

在该例子中发送数据到 stopCh 的 groutine 就是接收 dataCh 的 goroutine。关闭这个 stopCh 的 groutine 就是发送数据到 stopCh 的 goroutine,这符合上面说的 channel 关闭的原则。

在这个例子中,dataCh 一直没有被关闭,因为 channel 没有必要必须关闭。如果没有 goroutine 再由某一个 channel 的引用那么它最后会被垃圾回收,不过这个 channel 是否关闭。所以关闭 channel 的优雅方式就是比去主动关闭它。

多个接收和多个发送数据的 goroutine 的情况下,他们任何一个 goroutine 发送 “let's end the game" 到一个额外的信号 channel

这是一种更复杂的情况,这种情况下我们不能让任何一个发送数据或接收数据的 goroutine 去关闭数据 channel。并且我们也不能让任何一个接收数据的 goroutine 在不管其他的接收数据和发送数据的 goroutine 的情况下去发送关闭信息到额外的 channel来停止程序。无论怎么做都会打破 channel 关闭原则。然而,我们可以找一个中立角度关闭这个额外的信号 channel 。下面的例子是一个通知中立角色去关闭额外信号 channel 的方式:

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
    "strconv"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)

    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 10
    const NumSenders = 1000

    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)

    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
         // stopCh is an additional signal channel.
        // Its sender is the moderator goroutine shown below.
        // Its reveivers are all senders and receivers of dataCh.
    toStop := make(chan string, 1)
        // The channel toStop is used to notify the moderator
        // to close the additional signal channel (stopCh).
        // Its senders are any senders and receivers of dataCh.
        // Its reveiver is the moderator goroutine shown below.
    var stoppedBy string

    // 中立角色
    go func() {
        stoppedBy = <- toStop
        close(stopCh)
    }

    // 发送数据的 channel
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(MaxRandomNumber)
                if value == 0 {
                    select {
                    case: toStop <- "sender#" + id:
                    default:
                    }
                    return
                }

                // 这个 select 是尽可能的提前退出 goroutine。包括一个 case 和一个 default 块,
                // 这个是用来优化go编译的?
                select {
                case <- stopCh:
                    return
                default:
                }

                // 尽管 stopCh 已经关闭了,但是也许会有一些 goroutine 有可能还没退出,
                // 并且 dataCh 也没有阻塞,这既是为什么需要上面的 select。

               select {
                   case <- stopCh:
                    return
                   case dataCh <- value:
               }
            }
        }(strconv.Itoa(i))
    }

    for i:= 0; i < NumReceivers; i++ {
        go func(id string) {
            defer wgReceivers.Done()

            for {
                // 
                select {
                    case <- stopCh:
                        return
                    default:
                }

                select {
                    case <- stopCh:
                        return
                    case value := <- dataCh:
                        if value == MaxRandomNumber - 1 {
                            select {
                                case toStop <- "receiver#" + id:
                                default:
                            }
                            return
                        }
                        log.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }

    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

在这个例子里依然坚持了上面的 关闭 channel 的原则。

请注意上面的toStop channel 的大小是1,这是为了变避免在中间角色的 goroutine 还没准备好的情况下第一条通知消息被忽略掉。

我们也可以把 toStop 的 容量设置为 发送数据和接收数据 channel 数量的总和,那么我们就没必要再使用 select 去通知中间角色 goroutine 了。

...
toStop := make(chan string, NumReceiverss + NumSenders)
...

        value := rand.Intn(MaxRandomNumber)
        if value == 0 {
            toStop <- "sender#" + id
            return
        }
...
            if value == MaxRandomNumber - 1 {
                toStop <- "receiver#" + id
                return
            }
...

其它方案

有个多的解决方案都是依赖上面三个方案之一的变体,例如其中一个方案是基于上面最复杂的那个方案并要求接收数据的 goroutine 读取数据 channel 里所有的数据。这种实现起来也比较简单,本文就不再讨论了。

虽然上面的三种方案不能覆盖所有的情况,他们都是比较基础的情况。实践中的情况都可以归类为3种方案其中之一。

结论

没有那种情况会让你必须破坏 channel 关闭原则才能解决。如果你遇到了这种情况,那么你必须重新设计和重写你的代码。

使用 Go channel 编程是一种艺术~

提示

功能待开通!


暂无评论~