In the previous post, we controlled concurrent item processing in Golang by using a throttle worker object. Today, we will solve that problem using the fan-in, fan-out pattern. I will go through step by step how Golang will help to solve our problem and allow us to run more smoothly.
I. The problem
Let me repeat to you; what is the problem? The problem is as follows: we have a list of items []A, a function f(A) that transforms A into B and returns a list of items []B. At the same time, we want to regulate the maximum number of threads at any given moment. c
[]A → throttle async f(A)B → []B
II. Introduction
We will still retain the carrier and collector objects to start and finish the process. This is because we are using the fan-in, fan-out pattern in this example, where channels will be used extensively as inputs and outputs for the functions. Hence this means that the processing will resemble a pipeline, making it smoother and easier to understand compared to the previous method.
III. Implementation
This time, we need to use two item models for the request and response.
package gomap
type RequestItem[A any] struct {
item A
order int
}
type ResponseItem[B any] struct {
item B
order int
}
Firstly, we need the carrier to create a channel that stores the items to be processed. This channel will later serve as the input for the fan-out process.
package gomap
type Carrier[A any] struct {
}
func NewCarrier[A any]() Carrier[A] {
return Carrier[A]{}
}
func (c *Carrier[A]) DeliveryItems(items []A) chan RequestItem[A] {
resp := make(chan RequestItem[A], len(items))
go func() {
defer close(resp)
for i, item := range items {
requestItem := RequestItem[A]{
item: item,
order: i,
}
resp <- requestItem
}
}()
return resp
}
Secondly, we need to create a FanoutWorker that spawns’ workers to handle the fan-out request items. Each worker will process independently and fetch request items from the same channel.
Afterward, the fan-out will return a list of channels corresponding to the number of workers, and this list will be used as the input for the fan-in process.
package gomap
type FanoutWorker[A, B any] struct {
workerNumber int
fx func(A) B
}
func NewFanoutWorker[A, B any](fx func(A) B, workerNumber int) FanoutWorker[A, B] {
return FanoutWorker[A, B]{
workerNumber: workerNumber,
fx: fx,
}
}
func (t *FanoutWorker[A, B]) Fanout(items chan RequestItem[A]) []chan ResponseItem[B] {
workers := make([]chan ResponseItem[B], t.workerNumber)
for i := 0; i < t.workerNumber; i++ {
outChan := make(chan ResponseItem[B])
go t.workerRun(items, outChan)
workers[i] = outChan
}
return workers
}
func (t *FanoutWorker[A, B]) workerRun(items chan RequestItem[A], outChan chan ResponseItem[B]) {
defer close(outChan)
for item := range items {
outChan <- ResponseItem[B]{
item: t.fx(item.item),
order: item.order,
}
}
}
Next, the fan-in process will take the input as the list of channels from the fan-out. Here, fan-in will simply create workers that listen to the list of channels and merge them into a new channel.
package gomap
import (
"sync"
)
type FaninWorker[A, B any] struct {
workerNumber int
wgListener sync.WaitGroup
fx func(A) B
}
func NewFaninWorker[A, B any](fx func(A) B, workerNumber int) FaninWorker[A, B] {
return FaninWorker[A, B]{
workerNumber: workerNumber,
wgListener: sync.WaitGroup{},
fx: fx,
}
}
func (t *FaninWorker[A, B]) Fanin(itemLists []chan ResponseItem[B]) chan ResponseItem[B] {
merged := make(chan ResponseItem[B])
t.wgListener.Add(len(itemLists) * t.workerNumber)
go func() {
for i := 0; i < t.workerNumber; i++ {
go t.workerRun(itemLists, merged)
}
}()
// listen to close out channel
go func() {
defer close(merged)
t.wgListener.Wait()
}()
return merged
}
func (t *FaninWorker[A, B]) workerRun(itemLists []chan ResponseItem[B], outChan chan ResponseItem[B]) {
for _, items := range itemLists {
go t.listenChan(items, outChan)
}
}
func (t *FaninWorker[A, B]) listenChan(items chan ResponseItem[B], outChan chan ResponseItem[B]) {
defer t.wgListener.Done()
for item := range items {
outChan <- item
}
}
In this penultimate stage, the collector will be the object responsible for storing the data from the merged channel of the fan-in process.
package gomap
type Collector[B any] struct {
collectChannel chan ResponseItem[B]
deliveryItemsLen int
}
func NewCollector[B any](collectChannel chan ResponseItem[B], deliveryItemsLen int) Collector[B] {
return Collector[B]{
collectChannel: collectChannel,
deliveryItemsLen: deliveryItemsLen,
}
}
func (c *Collector[B]) CollectItems() []B {
storage := make([]B, c.deliveryItemsLen)
for channelItem := range c.collectChannel {
storage[channelItem.order] = channelItem.item
}
return storage
}
Finally, create a function called AsyncMap as the entry point, which will be called from outside the package.
package gomap
func AsyncMap[A, B any](fx func(A) B, s []A, processNumber int) []B {
// delivery
carrier := NewCarrier[A]()
fanoutItems := carrier.DeliveryItems(s)
// fanout
fanoutWorker := NewFanoutWorker[A, B](fx, processNumber)
fanoutItemsList := fanoutWorker.Fanout(fanoutItems)
// fanin
faninWorker := NewFaninWorker[A, B](fx, processNumber)
faninRespItems := faninWorker.Fanin(fanoutItemsList)
// collect
collector := NewCollector(faninRespItems, len(s))
return collector.CollectItems()
}
Let’s try running a test.
package main
import (
"fmt"
"github.com/kemao97/awesome/gomap-faninout"
"time"
)
func main() {
s := []int{1, 2, 3, 4, 5}
start := time.Now()
resp := gomap.AsyncMap(func(item int) int {
fmt.Printf("item %d start at second: %f\n", item, time.Now().Sub(start).Seconds())
time.Sleep(time.Second)
return item * item
}, s, 3)
fmt.Println(resp)
}
And we get the result as:
“`
item 1 start at second: 0.000094
item 2 start at second: 0.000098
item 3 start at second: 0.000085
item 4 start at second: 1.001298
item 5 start at second: 1.001322
[1 4 9 16 25]
“`
IV. Conclusion
Using the fan-in, fan-out pattern makes each part more clearly separated and defined. The processing resembles streaming items into pipelines. It is easy to understand and can be extended to handle more complex tasks, the overall aim is to ensure that you can scale up if you need and increase a high level of maintainability. As you see, Golang has a diverse range of functions, however some projects need that delicate touch and that is where my colleagues at zen8labs can step in, contact us here and let’s work together!
Hoa Le, Software Engineer