PHP编程在线学习平台, 提供PHP教程、PHP入门教程、PHP视频教程及源码下载

网站首页 > 文章精选 正文

如何正确地使用Go Channel

xinche 2025-01-11 18:09:11 文章精选 4 ℃ 0 评论

Go简洁的结构和强大的原生库使我们能够轻松的上手,在实现相同的功能时,它比Java和Python更高效,尤其是它的并发编程,尤其是channel这个数据特性,你可以认为channel是一个消息队列用于在goroutine直接传输数据并支持数据同步,但是你看完这篇文章你应该有新的认识。

什么是Channel?

Channel是类似切片slice和map,Channel的本质就是Channel == Semaphore + Buffer,Semaphore是核心概其次是两个主要特性: 交付保证和状态。

传输保证为 goroutine 扫清了道路,使其不必关注信号是否可以通过信道传输,或者相关信道是否可以接收信号,而只关注逻辑实现。在下面的代码示例中,ch1和 ch2的交付保证决定了 goroutine 的成功。

go func() {
   for {
      i,ok := <-ch1 // blocked if ch1 is empty
      if ok {
         ch2 <- i*i
      } else {
         break
      }
   }
}()

一个通道Channel是否可以发送或接收数据是由它的状态决定的。

  • Nil,未初始化的通道,var ch chan int,不接收或发送任何信息,操作会被阻塞,关闭时会panic。
  • Open,已初始化的信道,ch = make (chan int)。它允许接收和发送任何信号。
  • Close,关闭通道,close(ch) ,仅接收信号,发送时panic。

Buffer 将通道从信号量更改为先入先出队列,这很好地解释了“通道”的名称但是缓冲区也会危及传递保证并导致 bug。如果通道关闭时缓冲区仍然有多个项目,则这些信号将无法传递。

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan int, 3)
	go func() {
		for i := 0; i < 3; i++ {
			ch <- i
			time.Sleep(1 * time.Second)
		}
	}()

	time.Sleep(2 * time.Second)
	if v, ok := <-ch; ok {
		fmt.Printf("time: %d \n", v)
	}
	close(ch)
}

// output time:0

Channel简化并发

并发编程基于多线程的思维;线程独立运行而数据共享和同步,数据处理总是一个难点,但是合理使用Channel可以简化并发。操作符是箭头 <-(箭头的指向就是数据的流向),chan按读写方向可以分为双向Channel和单向Channel(只读Channel和只写Channel),根据是否缓冲可以分为带缓冲的Channel和无缓冲Channel。

  • 定义Channel

ChannelType = ("chan" | "chan" "<-"| "<-" "chan") ElementType

recvq根据是否允许内部和sendq队列存储相应的等待者,通道可以分为发送和接收、只读和只发送goroutines。一般来说,只读和只发送通道不直接使用,而是作为函数参数或返回,限制了通道在函数中的作用。例如

func send(ch chan<- int) { 
   for i := 0; i < 5; i++ { 
      ch <- i 
   } 
} 

func recv(ch <-chan int) { 
   for i := range ch { 
      fmt.Println(i) 
   } 
} 

func rc(int n) <- chan int { 
   ch := make(chan int) 
   go func(){ 
      // 做一些工作
      ch <- v 
   } 
   return ch 
}

如果更改send(ch chan<- int)send(ch <- chan int),则代码将无法编译

  • 使用Channel

使用通道的方法太多了,如果你想记住所有的模式它的效率很低。但关键是要牢记使用通道的核心:即 goroutine 之间的信号传输

  • 至少包含一个goroutine
  • 必须有发送者和接受者
  • 发送者和接受者都可以是多协程
  • 双方都可能发生阻塞
  • 缓冲区可以存储多余的信号量

根据发送和结束模式,将通道分为3组:同步阻塞,非阻塞和其他。

  • 同步阻塞

Channel和<-操作符一起发送和接受,一个解一个的信息。即使存在缓冲区,在某一些情况下发送和接受度可以被阻塞,尽管缓冲区可以在一定程度上增强并发性,所以要一起使用goroutine和for循环。

package main

import (
    "fmt"
    "time"
)

func worker(done chan bool) {
    fmt.Print("working...")
    time.Sleep(time.Second)
    fmt.Println("done")

    done <- true
}

func main() {

    done := make(chan bool, 1)
    go worker(done)

    <-done
}

发送者和接收者可以是一对一、一对多、多对一、多对多,而这里的“一”可以是一个或多个goroutine,甚至可以是多个发送和接收包含在for循环中。我们根据我们对并发的要求来选择模式,包括数据量和并发延迟

  • 一对一和无缓冲的通道。阻塞是不可避免的(可以是不可见的),但信号可以 100% 传递。
  • 一对多。使用buffer的时候类似于worker pool,只有在工作量大,buffer被占满的情况下才能阻塞发送者(生产者)。当没有缓冲区时,最大并发是工人(goroutines)的数量。
  • 多对一。虽然可以保证执行顺序,但发送者很有可能被阻止。
  • 多对多。它是最高效的设计,具有最快的处理速度和最低的阻塞,但也消耗最多的内存。但是,仍然需要估计发送方和接收方的数量才能达到最高性能,否则要么发送被阻塞,要么接收“饿死”。


  • 非阻塞

<-运算符和 for range都用于一个通道。Go 提供了选择操作符select以方便通道操作。

  • 支持同时接收多个信道
  • 支持同时发送和接收
  • 支持增加超时,以防止信道等待时间过长
  • 支持在信道满时丢弃信号。
package main

import "time"
import "fmt"

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    // 各个通道将在若干时间后接收一个值,这个用来模拟例如
    // 并行的 Go 协程中阻塞的 RPC 操作
    go func() {
        time.Sleep(time.Second * 1)
        c1 <- "1"
    }()
    go func() {
        time.Sleep(time.Second * 2)
        c2 <- "2"
    }()

    // 我们使用 `select` 关键字来同时等待这两个值,并打
    // 印各自接收到的值。
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}
  • 其它

Channel可以与其它Go语言特性结合使用,以更好的设计并发代码。

  • 验证通道关闭

通过 if v,ok: = <-ch; ok {} ,您可以确定通道是否已关闭,以便接收方可以优雅地结束

  • 为了保证通道“结束”

使用了时间睡眠和等待的主要方法是以 Goroutine 结束这显然不符合生产标准。如果把 waitGroup 结合起来,频道和 goroutine (一种经典的“生产者-消费者”模式)可以完美地结束。

package main

import (
	"sync"
	"time"
)
import "fmt"

func main() {
	res := make(chan chan int, 10)

	var wg sync.WaitGroup

	wg.Add(10) // 10 jobs

	go func() {
		defer close(res)
		for i := 0; i < 10; i++ {
			o := work(&wg, i) // do work
			time.Sleep(time.Second)
			res <- o // pass result
		}
	}()

	go func() {
		for o := range res {
			fmt.Println(<-o)
		}
	}()
	wg.Wait()
}

func work(wg *sync.WaitGroup, a int) chan int {
	out := make(chan int)
	go func() {
		// do some work
		out <- a
		wg.Done()
	}()
	return out
}
  • 取消通道

在生产中我们不限制渠道,而是限制整个业务的执行时间。这就是上下文所扮演的角色: 我们使用上下文来链接整个逻辑的每个方面,设置超时,并确定在上下文结束时结束通道块。

package main

import (
	"context"
	"time"
)
import "fmt"

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	result := make(chan int, 1) // 我们将缓冲区设置为1,以避免在 goroutine 发送超时和主结束时出现内存泄漏。
	asyncDoStuffWithTimeout(ctx, result)
	fmt.Printf("restult get: %v", <-result)
}

func asyncDoStuffWithTimeout(ctx context.Context, result chan int) {
	go func() {
		select {
		case <-ctx.Done():
			fmt.Printf("ctx is done, %v", ctx.Err())
			result <- 0
			return
		case <-time.After(2 * time.Second):
			fmt.Println("set result")
			result <- 10
		}
	}()
}

什么时候使用Channel

很难说什么时候使用通道,但主要取决于经验,尤其是并发开发的经验。可以在 Kubernetes 代码库中找到许多用例,但是在这里我只能列出最常见的场景。

  • 触发信号,包括异步结束和启动
  • 在异步中传输数据,异步进程对非紧急进程逐个进行处理
  • 对于关键步骤,需要进行阻塞和等待
  • 进程池。worker要么被任务唤醒,要么被长时间阻塞,直到任务到来


信号通道

使用信道作为结束某些任务的信号。{ name } chan struct {}用于定义不带任何数据的通道,因为 struct {}占用最低内存。用 <-{ name }通知结束和关闭通道。它在 kuberentes/controller-run 管理器中使用了很多次。

在控制器管理器中,还有两个信号通道,分别是选定的 chan 结构({})和 Internal alProcessStop chan 结构({})。

异步处理

作为一个数据通道,通道还可以传输数据并促进异步处理,例如 ControlerManager 中的错误处理。所有错误都将被放入 errChan chan 错误中。

只有当通道结束时,才会排出此错误通道并异步处理它。


异步处理错误在 Go 代码中非常常见,您还可以在与 waitGroup 结合使用的实物代码中找到类似的处理。

worker模式

在并发worker模式中,通道触发任务执行,worker在没有任务的情况下被阻塞,避免了 goroutine 创建和销毁的开销增加。看看kubernetes的观察者是怎么工作的。

  • 在接收到停止信号之前,将所有事件转发到信道。
  • 启动 for 循环 select 以等待任务

异步任务执行

异步执行任务并等待完成,这种模式通常应用于代码的关键步骤,比如在 kublet 中下载docker image。

这里的 pullImage 是一个异步执行的阻塞作业,由 puller.go 中的 goroutine 执行,并将结果发送到通道。

结论

通道简化了并发性并提供了很大的灵活性,但只有在我们知道如何以及何时使用它的时候。因此,理解 channel = Semaphore + Buffer 的核心非常重要,并指出何时需要信号量或缓冲区,在哪些场景中,发送和接收被阻塞。为了提高编码效率,我们应该将其集成到并发思想中,使其更好地为我们的实践服务。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

请填写验证码
最近发表
标签列表
最新留言