目录
- 正文
- 处理CVS文件记录
- 🚚 获取测试数据
- 线程池耗时差异
正文
Worker Pools
线程池是一种并发模式。该模式中维护了固定数量的多个工作器,这些工作器等待着管理者分配可并发执行的任务。该模式避免了短时间任务创建和销毁线程的代价。

在 golang
中,我们使用 goroutine
和 channel
来构建这种模式。工作器 worker
由一个 goroutine
定义,该 goroutine
通过 channel
获取数据。
处理CVS文件记录
接下来让我们通过一个例子,来进一步理解该模式。假设您需要处理来自 CVS
文件的记录数据,我们需要将该文件中的经纬度保存到数据库中。代码如下。
package main
import (
"encoding/csv"
"fmt"
"os"
"time"
)
type city struct {
name string
location string
}
func createCity(record city) {
time.Sleep(10 * time.Millisecond)
}
func main() {
startTime := time.Now()
csvFile, err := os.Open("cities.csv")
if err != nil {
fmt.Println(err)
}
fmt.Println("Successfully Opened CSV file")
defer csvFile.Close()
csvLines, err := csv.NewReader(csvFile).ReadAll()
if err != nil {
fmt.Println(err)
}
counter := 0
for _, line := range csvLines {
counter++
createCity(city{
name: line[0],
location: line[1],
})
}
fmt.Println("records saved:", counter)
fmt.Println("total time:", time.Since(startTime))
}
🚚 获取测试数据
cities.csv

输出:

正如我们所看到的,保存 CSV
中所有记录需要 55 秒,这是很长的时间,可能会导致很多性能问题。用户如果想要上传 CSV
文件,那体验感一定很差。
如何解决这个问题?那我们就使用线程池的方法试试看。
线程池耗时差异
在如下示例中,我们将解决相同的需求,但通过线程池,耗时方面,我们能够看到巨大的差异。来吧!
代码如下
package main
import (
"encoding/csv"
"fmt"
"os"
"time"
)
type city struct {
name string
location string
}
func createCity(record city) {
time.Sleep(10 * time.Millisecond)
}
func readData(cityChn chan []city) {
var cities []city
csvFile, err := os.Open("cities.csv")
if err != nil {
fmt.Println(err)
}
fmt.Println("Successfully Opened CSV file")
defer csvFile.Close()
csvLines, err := csv.NewReader(csvFile).ReadAll()
if err != nil {
fmt.Println(err)
}
for _, line := range csvLines {
cities = append(cities, city{
name: line[0],
location: line[1],
})
}
cityChn <- cities
}
func worker(cityChn chan city) {
for val := range cityChn {
createCity(val)
}
}
func main() {
startTime := time.Now()
cities := make(chan []city)
go readData(cities)
const workers = 5
jobs := make(chan city, 1000)
for w := 1; w <= workers; w++ {
go worker(jobs)
}
counter := 0
for _, val := range <-cities {
counter++
jobs <- val
}
fmt.Println("records saved:", counter)
fmt.Println("total time:", time.Since(startTime))
}


输出:

你看到很大的不同了吗?现在同样的过程只需要 8 秒。正如您所见,当我们需要处理大量数据时,线程池非常有用。
使用线程池,我们必须定义一个函数,在示例中该函数为 worker
,该函数用于定义工作进程,您可以看到它接收一个 Channel
通道来处理数据。 另外,我们必须在数据传递到通道之前启动 goroutines
协程,当 Channel
通道获取到值时,goroutines
工作者开始处理它们。
🎉 现在您知道如何实现线程池了!