Handling many tasks at once is key to making software run better. Golang has strong tools to help with this. In this post, we’ll look at how to limit the number of tasks running at the same time in Golang. This helps balance the workload and avoid overloading the system. Using these methods can keep your application running smoothly and efficiently.
The problem
The problem is as follows: we have a list of items []A, a function f(A) that transforms A into B and returns a list of items []B. At the same time, we want to regulate the maximum number of threads at any given moment.
Summary: []A → throttle async f(A)B → []B
Introduction
In this article, I will use a channel to throttle concurrent processing at a given time. The components will include the following actors:
- The throttle channel will act like a set of processing slots for delivery items, with each slot processing only one item at a time.
- The delivery item is the item that needs to be processed.
- The response item is the final product returned after processing.
- The collector receives and aggregates the response items from the throttle channel and returns the results.
- The carrier transports the delivery items to the channel.
Implementation
Because the request processing is asynchronous, the collector needs to know the position of each response item in the delivery items when it receives the response. Therefore, I will create a struct that includes the response item and its order.
type ResponseItem[B any] struct {
item B
order int
}
First, I will create a slot to transform the delivery item into the response item.
package gomap
type Slot[A, B any] struct {
fx func(A) B
}
func NewSlot[A, B any](processFx func(A) B) Slot[A, B] {
return Slot[A, B]{
fx: processFx,
}
}
func (slot Slot[A, B]) ProcessDeliveryItem(item A) B {
return slot.fx(item)
}
Next, I need to create a throttle channel. The channel will receive delivery items from the carrier and put them into an empty slot for processing. If no slot is available, it will wait until one becomes available. After processing, it will return the final product to the collector via the collect channel.
package gomap
type ThrottleChannel[A, B any] struct {
slotChannel chan any
collectChannel chan ResponseItem[B]
fx func(A) B
}
func NewThrottleChannel[A, B any](fx func(A) B, slotChannel chan any, collectChannel chan ResponseItem[B]) ThrottleChannel[A, B] {
return ThrottleChannel[A, B]{
slotChannel: slotChannel,
fx: fx,
collectChannel: collectChannel,
}
}
func (t *ThrottleChannel[A, B]) waitIdleSlot() Slot[A, B] {
var anything any
for {
select {
case t.slotChannel <- anything:
return NewSlot[A, B](t.fx)
}
}
}
func (t *ThrottleChannel[A, B]) HandleDeliveryItem(item A, order int) {
slot := t.waitIdleSlot()
go func() {
respItem := slot.ProcessDeliveryItem(item)
t.collectChannel <- ResponseItem[B]{
item: respItem,
order: order,
}
t.cleanSlot()
}()
}
func (t *ThrottleChannel[A, B]) cleanSlot() {
<-t.slotChannel
}
After that, create a simple carrier to deliver items to the throttle channel.
package gomap
type Carrier[A, B any] struct {
throttle ThrottleChannel[A, B]
}
func NewCarrier[A, B any](throttle ThrottleChannel[A, B]) Carrier[A, B] {
return Carrier[A, B]{
throttle: throttle,
}
}
func (c *Carrier[A, B]) DeliveryItems(items []A) {
for i, item := range items {
c.throttle.HandleDeliveryItem(item, i)
}
}
Next, create a collector to receive the response items and sort them according to the order of the delivery items. Here, I will place the response item into an array at the correct position in the original list. Therefore, there’s no need to sort again.
package gomap
type Collector[B any] struct {
collectChannel chan ResponseItem[B]
deliveryItemsLen int
}
func NewCollector[B any](collectChannel chan ResponseItem[B], deliveryItemsLen int) Collector[B] {
return Collector[B]{
collectChannel: collectChannel,
deliveryItemsLen: deliveryItemsLen,
}
}
func (c *Collector[B]) CollectItems() []B {
storage := make([]B, c.deliveryItemsLen)
for i := 0; i < c.deliveryItemsLen; i++ {
resp := <-c.collectChannel
order := resp.order
storage[order] = resp.item
}
return storage
}
Finally, create a function called AsyncMap as the entry point, which will be called from outside the package.
func AsyncMap[A, B any](fx func(A) B, s []A, processNumber int) []B {
slotChannel := make(chan any, processNumber)
collectChannel := make(chan ResponseItem[B], len(s))
defer func() {
close(slotChannel)
close(collectChannel)
}()
throttle := NewThrottleChannel(fx, slotChannel, collectChannel)
carrier := NewCarrier(throttle)
collector := NewCollector(collectChannel, len(s))
go carrier.DeliveryItems(s)
return collector.CollectItems()
}
Let’s try running a test.
package main
import (
"fmt"
"github.com/test/gomap"
"time"
)
func main() {
s := []int{1, 2, 3, 4, 5}
start := time.Now()
resp := gomap.AsyncMap(func(item int) int {
fmt.Printf("item %d start at second: %f\n", item, time.Now().Sub(start).Seconds())
time.Sleep(time.Second)
return item * item
}, s, 3)
fmt.Println(resp)
}
And we get the result as:
item 3 start at second: 0.000084
item 2 start at second: 0.000115
item 1 start at second: 0.000096
item 4 start at second: 1.001128
item 5 start at second: 1.001139
[1 4 9 16 25]
This is a simple way to use channels for asynchronous processing of a slice. Additionally, we can also implement using the fan-in, fan-out pattern. In practice, we also need to handle additional cases such as errors or premature termination of the flow.
Conclusion
In conclusion, limiting the number of tasks running at the same time in Golang helps manage resources and keeps your application running well. Using Golang’s built-in tools, like channels and goroutines, developers can control task flow to avoid overloading the system. These methods make applications more reliable and easier to manage. For other knowledge sharing, check out the articles on zen8labs’ website.
Hoa Le, Software Engineer