Have you ever in a situation where there is a so much task to do, yet you have to do a lot of it as fast as possible? Well, we are going to learn how to solve that with worker queue. Because just like us, computer will also get overwhelmed when you push too many task at the same time. But before we start using worker pool, we have to understand what is worker pool first.
What is a Worker Pool?
Well let's just translate it, worker means a worker (a CPU) and a pool is a pool (a collection). That's mean it's a collection (which is usually limited) of worker. Well the translating it didn't help much for us so yeah let's just think of it like you have a limited number of people to execute task yet you have to execute a lot of tasks. That's it, you just learned what is a worker pool. Now let's move to why you need it by looking at the benefit that it gives you.
What is The Benefit of Worker Pool?
Moving on to the benefit since i don't know what else should I explain in the "What is" scenario. Maybe I'll give a better example, think of the worker as chefs and the task as order, let's say you have 10 chefs and 100 orders. Normally, you can just assign each chefs with 10 orders or if you like chaos you can just throw 100 orders to the kitchen and let them handle them self. Now let's do similar scenario (of course the chaotic one) to see the impact of using worker pool and not using it. And with a quick magic, we will be using below code to compare before and after using worker pool :
package%20workerpool%0A%0Aimport%20(%0A%09%22runtime%22%0A%09%22sync%22%0A%09%22testing%22%0A)%0A%0Aconst%20NUM_TEST%20=%201000%0A%0Afunc%20Compute()%20int%20%7B%0A%09result%20:=%200%0A%09for%20i%20:=%200;%20i%20%3C%201000;%20i++%20%7B%0A%09%09result%20+=%20i%20*%20i%0A%09%7D%0A%09return%20result%0A%7D%0A%0Afunc%20BenchmarkNonWokerPool(b%20*testing.B)%20%7B%0A%09b.RunParallel(func(p%20*testing.PB)%20%7B%0A%09%09for%20p.Next()%20%7B%0A%09%09%09result%20:=%20make(chan%20int,%20NUM_TEST)%0A%09%09%09wg%20:=%20sync.WaitGroup%7B%7D%0A%09%09%09wg.Add(NUM_TEST)%0A%09%09%09for%20i%20:=%200;%20i%20%3C%20NUM_TEST;%20i++%20%7B%0A%09%09%09%09go%20func()%20%7B%0A%09%09%09%09%09result%20%3C-%20Compute()%0A%09%09%09%09%09wg.Done()%0A%09%09%09%09%7D()%0A%09%09%09%7D%0A%09%09%09wg.Wait()%0A%09%09%7D%0A%09%7D)%0A%7D%0A%0Afunc%20BenchmarkNonWokerPoolBatched(b%20*testing.B)%20%7B%0A%09numCPU%20:=%20runtime.NumCPU()%0A%09b.RunParallel(func(p%20*testing.PB)%20%7B%0A%09%09for%20p.Next()%20%7B%0A%09%09%09result%20:=%20make(chan%20int,%20NUM_TEST)%0A%09%09%09wg%20:=%20sync.WaitGroup%7B%7D%0A%09%09%09batchLength%20:=%20NUM_TEST%20/%20numCPU%0A%09%09%09wg.Add(numCPU)%0A%09%09%09for%20i%20:=%200;%20i%20%3C%20numCPU;%20i++%20%7B%0A%09%09%09%09go%20func()%20%7B%0A%09%09%09%09%09for%20j%20:=%200;%20j%20%3C%20batchLength;%20j++%20%7B%0A%09%09%09%09%09%09result%20%3C-%20Compute()%0A%09%09%09%09%09%7D%0A%09%09%09%09%09wg.Done()%0A%09%09%09%09%7D()%0A%09%09%09%7D%0A%09%09%09wg.Wait()%0A%09%09%7D%0A%09%7D)%0A%7D%0A%0Afunc%20BenchmarkWorkerPool(b%20*testing.B)%20%7B%0A%09numCPU%20:=%20runtime.NumCPU()%0A%09b.RunParallel(func(p%20*testing.PB)%20%7B%0A%09%09for%20p.Next()%20%7B%0A%09%09%09result%20:=%20make(chan%20int,%20NUM_TEST)%0A%09%09%09wg%20:=%20sync.WaitGroup%7B%7D%0A%09%09%09wg.Add(NUM_TEST)%0A%09%09%09jobChannel%20:=%20make(chan%20bool,%20numCPU)%0A%09%09%09for%20i%20:=%20numCPU;%20i%20%3E%200;%20i--%20%7B%0A%09%09%09%09go%20func()%20%7B%0A%09%09%09%09%09for%20do%20:=%20range%20jobChannel%20%7B%0A%09%09%09%09%09%09if%20do%20%7B%0A%09%09%09%09%09%09%09result%20%3C-%20Compute()%0A%09%09%09%09%09%09%09wg.Done()%0A%09%09%09%09%09%09%7D%0A%09%09%09%09%09%7D%0A%09%09%09%09%7D()%0A%09%09%09%7D%0A%0A%09%09%09for%20i%20:=%200;%20i%20%3C%20NUM_TEST;%20i++%20%7B%0A%09%09%09%09jobChannel%20%3C-%20true%0A%09%09%09%7D%0A%09%09%09close(jobChannel)%0A%09%09%09wg.Wait()%0A%09%09%7D%0A%09%7D)%0A%7D%0A
And as always, below are the result from my machine :
Name |
Ops |
Time/Op |
Memory |
Allocation |
BenchmarkNonWokerPool-4 |
1696 |
797432 ns/op |
32209 B/op |
1002 allocs/op |
BenchmarkNonWokerPoolBatched-4 |
2142 |
564663 ns/op |
8336 B/op |
6 allocs/op |
BenchmarkWorkerPool-4 |
3442 |
362014 ns/op |
8449 B/op |
7 allocs/op |
As expected, the pooled process run faster than the non pooled version. We can also note that batching helps but still get beaten by using worker pool (but it allocate less resources tough). This has to do by how CPU handle threading. CPU needs to guarantee that some task can still be executed otherwise the unprocessed task will be starved. This phenomenon is known as starvation
(sometime software engineer don't invent weird names). To do this they usually have to split time between execution on each process which is known as time slicing
. The process of switching process is known as context switching
which takes some amount of resources to be scheduled by OS. To prove this, let's multiply the number of CPU returned by golang's runtime and see it's effect. We will use below code to experiment with :
package%20workerpool%0A%0Aimport%20(%0A%09%22runtime%22%0A%09%22sync%22%0A%09%22testing%22%0A)%0A%0Aconst%20NUM_TEST%20=%201000%0A%0Afunc%20Compute()%20int%20%7B%0A%09result%20:=%200%0A%09for%20i%20:=%200;%20i%20%3C%201000;%20i++%20%7B%0A%09%09result%20+=%20i%20*%20i%0A%09%7D%0A%09return%20result%0A%7D%0A%0Afunc%20BenchmarkNonWokerPool(b%20*testing.B)%20%7B%0A%09b.RunParallel(func(p%20*testing.PB)%20%7B%0A%09%09for%20p.Next()%20%7B%0A%09%09%09result%20:=%20make(chan%20int,%20NUM_TEST)%0A%09%09%09wg%20:=%20sync.WaitGroup%7B%7D%0A%09%09%09wg.Add(NUM_TEST)%0A%09%09%09for%20i%20:=%200;%20i%20%3C%20NUM_TEST;%20i++%20%7B%0A%09%09%09%09go%20func()%20%7B%0A%09%09%09%09%09result%20%3C-%20Compute()%0A%09%09%09%09%09wg.Done()%0A%09%09%09%09%7D()%0A%09%09%09%7D%0A%09%09%09wg.Wait()%0A%09%09%7D%0A%09%7D)%0A%7D%0A%0Afunc%20BenchmarkNonWokerPoolBatched(b%20*testing.B)%20%7B%0A%09numCPU%20:=%20runtime.NumCPU()%20*%2010%0A%09b.RunParallel(func(p%20*testing.PB)%20%7B%0A%09%09for%20p.Next()%20%7B%0A%09%09%09result%20:=%20make(chan%20int,%20NUM_TEST)%0A%09%09%09wg%20:=%20sync.WaitGroup%7B%7D%0A%09%09%09batchLength%20:=%20NUM_TEST%20/%20numCPU%0A%09%09%09wg.Add(numCPU)%0A%09%09%09for%20i%20:=%200;%20i%20%3C%20numCPU;%20i++%20%7B%0A%09%09%09%09go%20func()%20%7B%0A%09%09%09%09%09for%20j%20:=%200;%20j%20%3C%20batchLength;%20j++%20%7B%0A%09%09%09%09%09%09result%20%3C-%20Compute()%0A%09%09%09%09%09%7D%0A%09%09%09%09%09wg.Done()%0A%09%09%09%09%7D()%0A%09%09%09%7D%0A%09%09%09wg.Wait()%0A%09%09%7D%0A%09%7D)%0A%7D%0A%0Afunc%20BenchmarkWorkerPool(b%20*testing.B)%20%7B%0A%09numCPU%20:=%20runtime.NumCPU()%20*%2010%0A%09b.RunParallel(func(p%20*testing.PB)%20%7B%0A%09%09for%20p.Next()%20%7B%0A%09%09%09result%20:=%20make(chan%20int,%20NUM_TEST)%0A%09%09%09wg%20:=%20sync.WaitGroup%7B%7D%0A%09%09%09wg.Add(NUM_TEST)%0A%09%09%09jobChannel%20:=%20make(chan%20bool,%20numCPU)%0A%09%09%09for%20i%20:=%20numCPU;%20i%20%3E%200;%20i--%20%7B%0A%09%09%09%09go%20func()%20%7B%0A%09%09%09%09%09for%20do%20:=%20range%20jobChannel%20%7B%0A%09%09%09%09%09%09if%20do%20%7B%0A%09%09%09%09%09%09%09result%20%3C-%20Compute()%0A%09%09%09%09%09%09%09wg.Done()%0A%09%09%09%09%09%09%7D%0A%09%09%09%09%09%7D%0A%09%09%09%09%7D()%0A%09%09%09%7D%0A%0A%09%09%09for%20i%20:=%200;%20i%20%3C%20NUM_TEST;%20i++%20%7B%0A%09%09%09%09jobChannel%20%3C-%20true%0A%09%09%09%7D%0A%09%09%09close(jobChannel)%0A%09%09%09wg.Wait()%0A%09%09%7D%0A%09%7D)%0A%7D%0A
And here are the result of the above benchmark :
Name |
Ops |
Time/Op |
Memory |
Allocation |
BenchmarkNonWokerPool-4 |
1327 |
838959 ns/op |
32224 B/op |
1002 allocs/op |
BenchmarkNonWokerPoolBatched-4 |
2106 |
574545 ns/op |
9488 B/op |
42 allocs/op |
BenchmarkWorkerPool-4 |
2584 |
412715 ns/op |
9649 B/op |
43 allocs/op |
Based on the above benchmark result, we can see that there is an increase in memory used on each run. That's fine, but what is concerning is the result in the worker pool process which start to drop as we increase the number of CPU used. Moral of the story is, don't abuse your language, they just make it easier to do things, but they doesn't magically solve your issues.
But worker pool is not a silver bullet for every thing. It is best used for CPU intensive task where there is low delay between operation. If you use worker pool for IO heavy operation it will surely be useless. Let's just create a simple example to demonstrate it. We will use code below to see the result :
package%20workerpool%0A%0Aimport%20(%0A%09%22runtime%22%0A%09%22sync%22%0A%09%22testing%22%0A%09%22time%22%0A)%0A%0Aconst%20NUM_TEST%20=%20100%0A%0Afunc%20Compute()%20int%20%7B%0A%09time.Sleep(20%20*%20time.Millisecond)%0A%09return%201%0A%7D%0A%0Afunc%20BenchmarkNonWokerPool(b%20*testing.B)%20%7B%0A%09b.RunParallel(func(p%20*testing.PB)%20%7B%0A%09%09for%20p.Next()%20%7B%0A%09%09%09result%20:=%20make(chan%20int,%20NUM_TEST)%0A%09%09%09wg%20:=%20sync.WaitGroup%7B%7D%0A%09%09%09wg.Add(NUM_TEST)%0A%09%09%09for%20i%20:=%200;%20i%20%3C%20NUM_TEST;%20i++%20%7B%0A%09%09%09%09go%20func()%20%7B%0A%09%09%09%09%09result%20%3C-%20Compute()%0A%09%09%09%09%09wg.Done()%0A%09%09%09%09%7D()%0A%09%09%09%7D%0A%09%09%09wg.Wait()%0A%09%09%7D%0A%09%7D)%0A%7D%0A%0Afunc%20BenchmarkNonWokerPoolBatched(b%20*testing.B)%20%7B%0A%09numCPU%20:=%20runtime.NumCPU()%0A%09b.RunParallel(func(p%20*testing.PB)%20%7B%0A%09%09for%20p.Next()%20%7B%0A%09%09%09result%20:=%20make(chan%20int,%20NUM_TEST)%0A%09%09%09wg%20:=%20sync.WaitGroup%7B%7D%0A%09%09%09batchLength%20:=%20NUM_TEST%20/%20numCPU%0A%09%09%09wg.Add(numCPU)%0A%09%09%09for%20i%20:=%200;%20i%20%3C%20numCPU;%20i++%20%7B%0A%09%09%09%09go%20func()%20%7B%0A%09%09%09%09%09for%20j%20:=%200;%20j%20%3C%20batchLength;%20j++%20%7B%0A%09%09%09%09%09%09result%20%3C-%20Compute()%0A%09%09%09%09%09%7D%0A%09%09%09%09%09wg.Done()%0A%09%09%09%09%7D()%0A%09%09%09%7D%0A%09%09%09wg.Wait()%0A%09%09%7D%0A%09%7D)%0A%7D%0A%0Afunc%20BenchmarkWorkerPool(b%20*testing.B)%20%7B%0A%09numCPU%20:=%20runtime.NumCPU()%0A%09b.RunParallel(func(p%20*testing.PB)%20%7B%0A%09%09for%20p.Next()%20%7B%0A%09%09%09result%20:=%20make(chan%20int,%20NUM_TEST)%0A%09%09%09wg%20:=%20sync.WaitGroup%7B%7D%0A%09%09%09wg.Add(NUM_TEST)%0A%09%09%09jobChannel%20:=%20make(chan%20bool,%20numCPU)%0A%09%09%09for%20i%20:=%20numCPU;%20i%20%3E%200;%20i--%20%7B%0A%09%09%09%09go%20func()%20%7B%0A%09%09%09%09%09for%20do%20:=%20range%20jobChannel%20%7B%0A%09%09%09%09%09%09if%20do%20%7B%0A%09%09%09%09%09%09%09result%20%3C-%20Compute()%0A%09%09%09%09%09%09%09wg.Done()%0A%09%09%09%09%09%09%7D%0A%09%09%09%09%09%7D%0A%09%09%09%09%7D()%0A%09%09%09%7D%0A%0A%09%09%09for%20i%20:=%200;%20i%20%3C%20NUM_TEST;%20i++%20%7B%0A%09%09%09%09jobChannel%20%3C-%20true%0A%09%09%09%7D%0A%09%09%09close(jobChannel)%0A%09%09%09wg.Wait()%0A%09%09%7D%0A%09%7D)%0A%7D%0A
And here are the result for it :
Name |
Ops |
Time/Op |
Memory |
Allocation |
BenchmarkNonWokerPool-4 |
213 |
5375028 ns/op |
11406 B/op |
202 allocs/op |
BenchmarkNonWokerPoolBatched-4 |
6 |
168379661 ns/op |
1469 B/op |
12 allocs/op |
BenchmarkWorkerPool-4 |
6 |
170648950 ns/op |
1549 B/op |
13 allocs/op |
Look at that, using worker pool for IO heavy operation absolutely destroy it uses. Instead of speeding up the process, it bottleneck the operation.
Remarks
- Worker pool work best for CPU intensive operation.
- Worker pool work best when used with correct number of CPU available in the system.
- Worker pool isn't a solution for IO heavy operation.
Well I think that's all about worker pool for now. Thank you for reading ~
Reference
Pexel : https://www.pexels.com/id-id/foto/kota-jalan-bangunan-menunggu-4568193/