Building High-Performance Concurrent Applications with bigopool
Introduction
Concurrency is one of Go’s greatest strengths, but managing goroutines effectively at scale can be challenging. When you have millions of tasks to process, spawning a goroutine for each one can quickly overwhelm your system with excessive memory usage and context switching overhead. This is where worker pools come to the rescue.
A few years ago, I created bigopool, a lightweight Go library that implements high-performance worker pools with elegant error and result handling. Today, I have finally decided to write about it.
The Problem: Too Many Goroutines
Imagine you need to process 1,00,000 images, make API calls to validate 500,000 email addresses, or perform complex calculations on a massive dataset. The naive approach might look like this:
// Don't do this!
for _, item := range hugeSlice {
go func(item Item) {
// process item
}(item)
}
While this code will work for small datasets, it becomes problematic at scale:
Memory explosion: Each goroutine consumes ~2KB of stack space. For 1M items, that’s 2gb in goroutine allocation alone.
CPU thrashing: Too many goroutines lead to excessive context switching
Resource exhaustion: You might hit system limits on the number of threads
The Solution: Worker Pools
Worker pools solve this by maintaining a fixed number of worker goroutines that process jobs from a shared queue. This gives you:
- Controlled resource usage: Fixed memory footprint regardless of job count
- Better performance: Optimal number of workers for your system
- Backpressure: Built-in queue management prevents overwhelming your system
Introducing bigopool
bigopool
was designed with simplicity and performance in mind. It provides two main abstractions:
- Worker Pool: For processing large numbers of jobs with controlled concurrency
- Parallel Execution: For running a small number of functions concurrently
Let me show you how both work in practice.
Worker Pool in Action
The worker pool is perfect when you have many similar tasks to process. Here’s how it works:
1. Define Your Job
First, implement the Job
interface by defining an Execute()
method:
type ImageProcessingJob struct {
ImagePath string
OutputPath string
Quality int
}
func (j ImageProcessingJob) Execute() (bigopool.Result, error) {
// Simulate image processing
time.Sleep(time.Millisecond * 100)
result := map[string]interface{}{
"input": j.ImagePath,
"output": j.OutputPath,
"size_kb": 1024,
"processed": true,
}
return result, nil
}
2. Create and Configure the Dispatcher
// Create a dispatcher with 5 workers and a queue capacity of 100
dispatcher, err := bigopool.NewDispatcher(5, 100)
if err != nil {
log.Fatal(err)
}
// Start the workers
dispatcher.Run()
3. Enqueue Jobs and Collect Results
// Add jobs to the queue
imagePaths := []string{"image1.jpg", "image2.jpg", "image3.jpg"}
for _, path := range imagePaths {
job := ImageProcessingJob{
ImagePath: path,
OutputPath: "processed_" + path,
Quality: 85,
}
dispatcher.Enqueue(job)
}
// Wait for all jobs to complete and collect results
results, errors := dispatcher.Wait()
fmt.Printf("Processed %d images\n", len(results))
if !errors.IsEmpty() {
fmt.Printf("Encountered %d errors\n", len(errors.All()))
}
Parallel Execution for Quick Tasks
Sometimes you don’t need a full worker pool—you just want to run a few functions concurrently. The Parallel
function is perfect for this:
func FetchUserProfile(userID int) error {
var user User
var posts []Post
var followers []User
// Run multiple API calls in parallel
errs := bigopool.Parallel(
func() error {
var err error
user, err = api.GetUser(userID)
return err
},
func() error {
var err error
posts, err = api.GetUserPosts(userID)
return err
},
func() error {
var err error
followers, err = api.GetUserFollowers(userID)
return err
},
)
if !errs.IsEmpty() {
return errs.ToError()
}
// All API calls completed successfully
fmt.Printf("User: %s, Posts: %d, Followers: %d\n",
user.Name, len(posts), len(followers))
return nil
}
(Almost)Real-World Example: Web Scraper
Let’s build a practical example—a web scraper that fetches multiple URLs concurrently:
type WebScrapingJob struct {
URL string
Timeout time.Duration
UserAgent string
}
func (j WebScrapingJob) Execute() (bigopool.Result, error) {
client := &http.Client{Timeout: j.Timeout}
req, err := http.NewRequest("GET", j.URL, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", j.UserAgent)
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch %s: %w", j.URL, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return WebScrapingResult{
URL: j.URL,
StatusCode: resp.StatusCode,
BodySize: len(body),
Content: string(body),
FetchedAt: time.Now(),
}, nil
}
type WebScrapingResult struct {
URL string
StatusCode int
BodySize int
Content string
FetchedAt time.Time
}
func ScrapeWebsites(urls []string) {
// Configure for 10 concurrent workers with a buffer queue of 500
dispatcher, err := bigopool.NewDispatcher(10, 500)
if err != nil {
log.Fatal(err)
}
dispatcher.Run()
// Enqueue all scraping jobs
for _, url := range urls {
job := WebScrapingJob{
URL: url,
Timeout: time.Second * 30,
UserAgent: "bigopool-scraper/1.0",
}
dispatcher.Enqueue(job)
}
// Wait for completion
results, errors := dispatcher.Wait()
// Process results
for _, result := range results {
if scrapedData, ok := result.(WebScrapingResult); ok {
fmt.Printf("✓ %s (%d bytes, status: %d)\n",
scrapedData.URL, scrapedData.BodySize, scrapedData.StatusCode)
}
}
// Handle errors
for _, err := range errors.All() {
fmt.Printf("✗ Error: %v\n", err)
}
}
Performance and Benchmarks
One of the things I’m most proud of about bigopool
is its performance. The library is designed to minimize allocations and overhead. Here are some benchmark results from a potato MacBook Pro from 2016:
Benchmark5Workers1000Queue 2190982 566 ns/op 98 B/op 0 allocs/op
Benchmark10Workers100Queue 2168791 559 ns/op 79 B/op 0 allocs/op
Benchmark20Workers200Queue 2159338 572 ns/op 80 B/op 0 allocs/op
These benchmarks show that bigopool
can process jobs with sub-microsecond latency and minimal memory allocations, making it suitable for high-throughput applications.
Advanced Features
Cancelable Parallel Execution
For scenarios where you want to cancel remaining operations if one fails, use CancelableParallel
:
ctx := context.Background()
errs := bigopool.CancelableParallel(ctx,
func(ctx context.Context) error {
return criticalOperation1(ctx)
},
func(ctx context.Context) error {
return criticalOperation2(ctx)
},
func(ctx context.Context) error {
return criticalOperation3(ctx)
},
)
If any operation fails, the context is canceled and remaining operations can detect this and exit early.
Thread-Safe Error Handling
bigopool
includes a thread-safe error collection mechanism. The Errors
interface provides several useful methods:
results, errors := dispatcher.Wait()
// Check if any errors occurred
if !errors.IsEmpty() {
// Get all errors as a slice
allErrors := errors.All()
log.Printf("Processing failed with %d errors: %v",
len(allErrors), combinedError)
}
Design Philosophy
When building bigopool
, I focused on several key principles:
- Simplicity: The API should be intuitive and require minimal boilerplate. Note that the library won’t automatically handle failed jobs but will let you know of them.
- Performance: Zero allocations in the hot path where possible
- Safety: Thread-safe by default with proper error handling.
- Flexibility: Support both worker pool and simple parallel execution patterns
The library intentionally keeps the interface minimal—you implement one method (Execute()
) and the library handles the rest.
When to Use bigopool
I consider bigopool
a good option for:
- High-volume job processing: Image processing, data transformation, API calls, batch processing all your users to build email message content
- I/O-bound operations: File processing, database operations, web scraping
- Batch processing: ETL jobs, report generation, bulk operations
- Microservice communication: Parallel API calls, data aggregation
It might be overkill for:
- One-off parallel tasks: Use regular goroutines
- CPU-bound work with optimal worker count: Consider
runtime.GOMAXPROCS(0)
workers - Real-time streaming: Consider channels and select statements
Conclusion
Building bigopool
taught me a lot about Go’s concurrency primitives and the importance of controlling resource usage in high-throughput applications. The library has been used in production systems processing dozens of millions of jobs, and I’m proud of how it balances simplicity with performance.
Try it out in your next project:
go get -u github.com/bigodines/bigopool
For more, visit bigopool’s GitHub. If you find a bug or have ideas for improvements, contributions are always welcome!
If this library saves you time or helps with your project, I’d love to hear about it. You can find me on LinkedIn @bigodines.