Concurrency control: Processing items safely in Golang

4 min read
zen8labs concurrency control: Processing items safely in Golang

In the previous post, we controlled concurrent item processing in Golang by using a throttle worker object. Today, we will solve that problem using the fan-in, fan-out pattern. I will go through step by step how Golang will help to solve our problem and allow us to run more smoothly. 

I. The problem

Let me repeat to you; what is the problem? The problem is as follows: we have a list of items []A, a function f(A) that transforms A into B and returns a list of items []B. At the same time, we want to regulate the maximum number of threads at any given moment. c 

[]A → throttle async f(A)B → []B 

II. Introduction

We will still retain the carrier and collector objects to start and finish the process. This is because we are using the fan-in, fan-out pattern in this example, where channels will be used extensively as inputs and outputs for the functions. Hence this means that the processing will resemble a pipeline, making it smoother and easier to understand compared to the previous method.

III. Implementation

This time, we need to use two item models for the request and response.

package gomap 
 
type RequestItem[A any] struct { 
   item  A 
   order int 
} 
 
type ResponseItem[B any] struct { 
   item  B 
   order int 
}

Firstly, we need the carrier to create a channel that stores the items to be processed. This channel will later serve as the input for the fan-out process.

package gomap 

type Carrier[A any] struct { 
} 
 
func NewCarrier[A any]() Carrier[A] { 
   return Carrier[A]{} 
} 
 
func (c *Carrier[A]) DeliveryItems(items []A) chan RequestItem[A] { 
   resp := make(chan RequestItem[A], len(items)) 
 
   go func() { 
      defer close(resp) 
      for i, item := range items { 
         requestItem := RequestItem[A]{ 
            item:  item, 
            order: i, 
         } 
 
         resp <- requestItem 
      } 
   }() 
 
   return resp 
}

Secondly, we need to create a FanoutWorker that spawns’ workers to handle the fan-out request items. Each worker will process independently and fetch request items from the same channel. 

Afterward, the fan-out will return a list of channels corresponding to the number of workers, and this list will be used as the input for the fan-in process.

package gomap 
 
type FanoutWorker[A, B any] struct { 
   workerNumber int 
   fx           func(A) B 
} 
 
func NewFanoutWorker[A, B any](fx func(A) B, workerNumber int) FanoutWorker[A, B] { 
                     return FanoutWorker[A, B]{ 
                        workerNumber: workerNumber, 
                        fx:           fx, 
   } 
} 
 
func (t *FanoutWorker[A, B]) Fanout(items chan RequestItem[A]) []chan ResponseItem[B] { 
   workers := make([]chan ResponseItem[B], t.workerNumber) 
 
   for i := 0; i < t.workerNumber; i++ { 
      outChan := make(chan ResponseItem[B]) 
      go t.workerRun(items, outChan) 
      workers[i] = outChan 
   } 
 
   return workers 
} 

func (t *FanoutWorker[A, B]) workerRun(items chan RequestItem[A], outChan chan ResponseItem[B]) { 
   defer close(outChan) 
   for item := range items { 
      outChan <- ResponseItem[B]{ 
         item:  t.fx(item.item), 
         order: item.order, 
      } 
   } 
} 

Next, the fan-in process will take the input as the list of channels from the fan-out. Here, fan-in will simply create workers that listen to the list of channels and merge them into a new channel.

package gomap 
 
import ( 
   "sync" 
) 
 
type FaninWorker[A, B any] struct { 
   workerNumber int 
   wgListener   sync.WaitGroup 
   fx           func(A) B 
} 
 
func NewFaninWorker[A, B any](fx func(A) B, workerNumber int) FaninWorker[A, B] { 
   return FaninWorker[A, B]{ 
      workerNumber: workerNumber, 
      wgListener:   sync.WaitGroup{}, 
      fx:           fx, 
   } 
} 
 
func (t *FaninWorker[A, B]) Fanin(itemLists []chan ResponseItem[B]) chan ResponseItem[B] { 
   merged := make(chan ResponseItem[B]) 
   t.wgListener.Add(len(itemLists) * t.workerNumber) 
 
   go func() { 
      for i := 0; i < t.workerNumber; i++ { 
         go t.workerRun(itemLists, merged) 
      } 
   }() 
 
   // listen to close out channel 
   go func() { 
      defer close(merged) 
      t.wgListener.Wait() 
   }() 
 
   return merged 
} 
 
func (t *FaninWorker[A, B]) workerRun(itemLists []chan ResponseItem[B], outChan chan ResponseItem[B]) { 
   for _, items := range itemLists { 
      go t.listenChan(items, outChan) 
   } 
} 
 
func (t *FaninWorker[A, B]) listenChan(items chan ResponseItem[B], outChan chan ResponseItem[B]) { 
   defer t.wgListener.Done() 
   for item := range items { 
      outChan <- item 
   } 
} 

In this penultimate stage, the collector will be the object responsible for storing the data from the merged channel of the fan-in process.

package gomap 
 
type Collector[B any] struct { 
   collectChannel   chan ResponseItem[B] 
   deliveryItemsLen int 
} 
 
func NewCollector[B any](collectChannel chan ResponseItem[B], deliveryItemsLen int) Collector[B] { 
   return Collector[B]{ 
      collectChannel:   collectChannel, 
      deliveryItemsLen: deliveryItemsLen, 
   } 
} 
 
func (c *Collector[B]) CollectItems() []B { 
   storage := make([]B, c.deliveryItemsLen) 
 
   for channelItem := range c.collectChannel { 
      storage[channelItem.order] = channelItem.item 
   } 
 
   return storage 
} 

Finally, create a function called AsyncMap as the entry point, which will be called from outside the package.

package gomap 
 
func AsyncMap[A, B any](fx func(A) B, s []A, processNumber int) []B { 
   // delivery 
   carrier := NewCarrier[A]() 
   fanoutItems := carrier.DeliveryItems(s) 
 
   // fanout 
   fanoutWorker := NewFanoutWorker[A, B](fx, processNumber) 
   fanoutItemsList := fanoutWorker.Fanout(fanoutItems) 
 
   // fanin 
   faninWorker := NewFaninWorker[A, B](fx, processNumber) 
   faninRespItems := faninWorker.Fanin(fanoutItemsList) 
 
   // collect 
   collector := NewCollector(faninRespItems, len(s)) 
   return collector.CollectItems() 
} 

Let’s try running a test.

package main 
 
import ( 
   "fmt" 
   "github.com/kemao97/awesome/gomap-faninout" 
   "time" 
) 
 
func main() { 
   s := []int{1, 2, 3, 4, 5} 
 
   start := time.Now() 
   resp := gomap.AsyncMap(func(item int) int { 
      fmt.Printf("item %d start at second: %f\n", item, time.Now().Sub(start).Seconds()) 
      time.Sleep(time.Second) 
 
      return item * item 
   }, s, 3) 
 
   fmt.Println(resp) 
} 

And we get the result as:  

“` 

item 1 start at second: 0.000094 

item 2 start at second: 0.000098 

item 3 start at second: 0.000085 

item 4 start at second: 1.001298 

item 5 start at second: 1.001322 

[1 4 9 16 25] 

“` 

IV. Conclusion 

Using the fan-in, fan-out pattern makes each part more clearly separated and defined. The processing resembles streaming items into pipelines. It is easy to understand and can be extended to handle more complex tasks, the overall aim is to ensure that you can scale up if you need and increase a high level of maintainability. As you see, Golang has a diverse range of functions, however some projects need that delicate touch and that is where my colleagues at zen8labs can step in, contact us here and let’s work together! 

Hoa Le, Software Engineer

Related posts

Programming is often seen as a complex and technical field, but at its heart, it's really about solving the problem. Our new blog introduces a creative approach
4 min read
Did you know two features of Sales and Purchase order used in Odoo? Check this blog to get a better understanding of both Sales and Purchase orders used in Odoo
5 min read
Being able to do test scripts in a safe but creative manner is a challenge for any IT company. Postman helps people have a safe space to create test scripts.
11 min read