Go by Example: Worker Pools

A worker pool is a pattern in concurrent programming where a fixed number of worker goroutines are created to perform tasks from a shared task queue. This helps control the number of concurrent tasks and reduces the overhead of creating and destroying worker goroutines for every task.

Here is an example implementation of a worker pool in Go:

package main

import (
	"fmt"
	"sync"
)

// Task is a job to be performed by the worker pool
type Task struct {
	ID int
}

func worker(tasks chan Task, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		task, more := <-tasks
		if more {
			fmt.Printf("worker %d started task %d\n", task.ID)
			// Perform task here
			fmt.Printf("worker %d finished task %d\n", task.ID)
		} else {
			fmt.Println("worker exited")
			return
		}
	}
}

func main() {
	var wg sync.WaitGroup

	tasks := make(chan Task, 100)

	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go worker(tasks, &wg)
	}

	for j := 1; j <= 5; j++ {
		tasks <- Task{ID: j}
	}

	close(tasks)

	wg.Wait()
}

This example creates a worker pool with 3 workers and a task channel with a buffer size of 100. The workers listen on the task channel and perform the tasks as they come in. The main goroutine adds 5 tasks to the channel and closes it when done. The WaitGroup is used to wait for all worker goroutines to finish.

Here’s another example of a worker pool with a dynamic number of workers:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task is a job to be performed by the worker pool
type Task struct {
	ID     int
	Random int
}

func worker(tasks chan Task, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		task, more := <-tasks
		if more {
			fmt.Printf("worker %d started task %d with random value %d\n", task.ID, task.ID, task.Random)
			// Perform task here
			time.Sleep(time.Duration(task.Random) * time.Millisecond)
			fmt.Printf("worker %d finished task %d\n", task.ID, task.ID)
		} else {
			fmt.Println("worker exited")
			return
		}
	}
}

func main() {
	var wg sync.WaitGroup

	tasks := make(chan Task)

	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go worker(tasks, &wg)
	}

	for j := 1; j <= 5; j++ {
		random := rand.Intn(1000)
		tasks <- Task{ID: j, Random: random}
		wg.Add(1)
		go worker(tasks, &wg)
	}

	close(tasks)

	wg.Wait()
}

This example creates a worker pool with an initial number of 3 workers. After adding 5 tasks to the task channel, a new worker is created for each task. The workers perform the tasks with a random sleep time. The WaitGroup is used to wait for all worker goroutines to finish.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *