Go concurrency applied to data pipelines

A different approach to batch processing, and how to potentiate the power of data pipelines throughout the use of the Go concurrency model.

Lucas Godoy
6 min readMar 27, 2021

Introduction to pipelines

The term pipeline applied to the computer science field, is nothing more than a series of stages that take data in, perform some operation on that data, and pass the processed data back out as a result.

Thus, when using this pattern, you can encapsulate the logic of each stage and scale your features quickly by adding/removing/modifying stages, each stage becomes easy to test, nor mentioning the huge benefit of leveraging it by using concurrency, which was the motto for this article.

Previous problem and solution

A few years back, I had a chance to work for a food and CPG delivery company as a software engineer, where I was part of a team responsible to build pieces of software to integrate retailer’s product availability within the main’s company app. After running that integration, the users were able to buy groceries with fewer stockouts risks.

To accomplish this feature, we built this “availability engine” in GoLang. So, what was it about? Long story short, it ingested several retailer’s CSV files with product availability information executing a couple of steps to enrich and filter data out based on certain business logic. At the end of the process, new files were made, with all the resultant products to be integrated into the company’s app for users to buy.

Overall, this data processing pipe looked something like this:

Example of architecture for handling in batches

As the diagram states, the first stage of the pipe takes in a set of CSV rows, processes them all, and puts the results on a new batch (new slice of maps). And the very same process repeats it as many times as the number of stages the pipe actually has. The particularity of this pattern was the fact that none of the next stages were able to start if the previous step in the pipe had not finished processing the whole set of rows. As you can see it was conceptually a batch processing pipeline.

To speed up and optimize the job we start using concurrency at the CSV file level, so we were able to process files concurrently. This approach fits us very well, but there is no silver bullet as we always say…

When working on this project, I gave in the concurrency to one of the more experienced engineers in the team, so I mostly took part in coding each of those business logic steps.

Motivated by the curiosity about different languages leads me to give it a try to the Go concurrency model and patterns to have more tools on my toolbelt. Hence I started reading some posts and books about concurrency in Go. While doing so, I stumbled upon a fantastic pattern, pipelines leveraged by the use of channels, which if I would have come across it by that time, I would rather suggest implementing it to solve this feature for sure!

A better approach for data pipelines: streams of data

In my team’s approach, we were using batch processing between stages, which was good enough for us, but there are certainly other options that would have suited better to make it more performant.

Particularly we are speaking of streaming data across the different pipeline’s stages. This actually means that each stage receives and emits one element at a time instead of waiting for a full batch of results from the previous step to do its thing on them.

In contrast, if we have to compare the memory footprint between batching and streaming, the former is bigger, since each stage has to make a new slice of maps of equal length to store the results of its calculations. On the contrary, the streaming approach will end up receiving and emitting one element at a time so the memory footprint is back down to the size of the pipeline’s input.

Implementation example

To demonstrate the pipeline implementation in Go, I had coded a very silly example of this pattern (if you want you can find the source code here). This pipeline is built on top of a chain of stages whose first input would be a series of UserIDs (currently just mocked by code). So then, the pipeline stages are the following:

  • S0: Converts slice of UserIDs to a stream of data using a generator pattern
  • S1: Fetch the users
  • S2: Filter the inactive users
  • S3: Fetch the user’s profile and aggregating it into the payload
  • S4: Convert the overall aggregation as a plain object to be saved later on somewhere

Here below, at the main function, we can see how the pipeline invocation will look like at line 24.

The pipeline invocation

Let’s start from the innermost function call from our pipeline. The first stage stream.UserIDs(done, userIDs…) is the one that will feed the pipeline by streaming the UserIDs values. To accomplish this, I used a generator pattern, which receives a slice of UserIDs (input) and by ranging over it will start pushing each value into a channel (output). Therefore, the returned channel will be in turn the input for the next stage.

Generator pattern to stream values by using a channel for it

As I mentioned before, conceptually, every stage is fed by input and spits an output out for results. So then, the stages are going to receive a read-only channel as input to do the data processing and will return a read-only channel on which the resulting outputs will be posted on. Doing so, the next stage on the pipeline will take the previous stage’s output as its input for executing its own logic, and so forth till the end of the pipe.

Because of that, the use of channels across the pipe will allow us to safely execute concurrently each pipeline stage because our inputs and outputs are safe in concurrent contexts.

Let’s have a look at the following stages on the chain, where based on the streamed data from the first stage (generator) we fetch the actual user data, filter inactive users out, enrich them with its profile, and finally rip some data apart to make a plain object out of the whole aggregation/filtering process.

Fetching users and return them on the channel
Filtering inactive users out
Aggregating user’s profiles to the payload
Converting the payload to a reduced version of it

For sure you have noticed I passed a done chan interface{} around the stages. What is this for? It is worth mentioning that goroutines are not garbage collected in runtime, so we have to make sure as programmers to make them all preemptable. By doing that, consequently, we will not leak any goroutine (I will be writing more about this in another post later on) and freeing up memory. Thus, any invocation to the pipeline could be stopped by just closing the done channel. This action will lead to the termination of all the spawn children's goroutines and cleaning them up.

All in all, after the latest stage on the pipe, start pushing data out through its output channel another routine (in this case the main one) could start reading out of that channel by iterating over it as stated on main function line 31 and do something with it.

And that is pretty much it! We implemented our first data pipeline, elegant and efficient! (more than batching pipes).

Final thoughts

In brief, if I any time on I got the chance to work with a similar problem as I had before, I will go for sure for this pattern, which not only is way more performant in terms of memory footprint but also it is faster than doing a batching approach since we can get data being processed concurrently.

Also, there are many other things we can do with pipelines, like rate limiting and fanning it in/out. This topic will be covered later in upcoming posts, where the idea is to keep iterating over this pattern by adding and combining more concurrency patterns to it.

This is my first time publishing an article, so any critics are more than welcome, so I can keep learning from all of you guys.

Cheers!

PS: there are a couple of things to be improved in this example that will do make it happen in one upcoming post in which we can talk about error handling for these types of concurrency pattern implementations.

--

--