Concurrency in Golang: Building a Data Pipeline for Monitoring Microservices from Scratch

Time and resource consumption have become the driving forces of developing modern applications. While building cloud-native applications, it’s important to ensure that you have the most optimized code in place, and oftentimes that means leveraging concurrency. While writing concurrent code may sound overwhelming at first, Golang makes it extremely easy to get a handle on.

This guide explores how to make a very simple concurrency-driven data pipeline microservice that transports metrics data that we provide in a configuration YAML file to our endpoint that writes it to a file. Additionally, the guide covers Prometheus HTTP for monitoring of metrics, usage, and working of GoRoutines, and channels. Finally, the guide examines the fan-in mechanism and time-outs in the pipeline as well as wait groups.

Here are the steps that are covered in this guide:

  1. Collecting Microservice Metrics with Prometheus
  2. Aggregating and Transforming Data with Channels
  3. Combining Transformed Data with a Fan-In Mechanism
  4. Transportating Data and Binding with Channels and WaitGroups
  5. Receiving Data from Applications
  6. Understanding Concurrency at Scale

Before we dive in, let’s briefly discuss why using concurrency can be so beneficial.

How Does Concurrency Help Improve App Performance?

If you don’t build applications that can scale up when there is an increased load and simultaneously scale down when the load is less, you aren’t doing justice to both the users and the application you’re building. 

If your application does not scale up as the circumstances require, customers will experience service outages, increased waiting times, and low service availability. On the other hand, if your application does not scale down, you will consume more resources and will pay more money — making your application less efficient and more expensive. 

Since neither of these outcomes are suitable, you should follow best practices — pod autoscaling (PA) rules — when setting your application in orchestration services. Depending on your company’s policies and the cloud provider’s policies, scaling can either be vertical or horizontal.

Ultimately, your application will run on your native machine or a machine physically far away from you. The machines often have multiple cores; as such, it would be most feasible to write concurrent execution codes to optimize performance. 

With that in mind, let’s dive into how to use concurrency in Golang!

Collecting Microservice Metrics with Prometheus

When applications run in the cloud, there’s almost no visibility into microservices. So, the paradigm of observability steps in, as it’s very important to know about the state of your application at a given point in time. Many observability tools have to be set up to check the state of your application and the main pillars of observability, including logging, metrics, and traces.

concurrency in golang: collecting metrics with prometheus

Metrics are set counters that track major or minor events in your application. Unlike logs, these are not descriptions of a specific event but a numeric count of the events tracked in the application. 

Based on the metrics, alerts can be set that can notify us of the increased number of errors or increased request rate on our services — which can then be analyzed before appropriate steps are taken (as discussed above for autoscaling). These give us insights into how the application is behaving in the cloud without having to manually log in to the cloud machine itself.

Today’s leading technological solutions don’t wait for customers to report the issues. Instead, they set up alerting and monitoring systems to mitigate the issues successfully before they occur.

Prometheus is an open source tool used to collect and transport metrics in a distributed environment. As a standalone and highly reliable system, Prometheus is an ideal solution for monitoring microservices. 

To work with Prometheus, you need to add the metrics you’re monitoring and handle them when events occur. These metrics are then exposed and can be queried. 

Golang provides a very simple way of doing this. These metrics can be ingested by a third-party tool or your internal application (as is the case in this guide) and can be used for analytics as desired.

In this guide, data is being collected using Prometheus HTTP client for Golang, i.e., promhttp. The set Prometheus metrics are exposed using an HTTP endpoint on the route of our internal server itself. When a cURL request is sent to the Prometheus endpoint, all of the metrics that are exposed can be retrieved. This is simply added to your router.

// Adding Prometheus http handler to expose the metrics
// this will display our metrics as well as some standard metrics
mainRouter.Path("/metrics").Handler(promhttp.Handler())

In this example, there are two existing microservices that need monitoring. Reference to these two microservices can be found in my previous article on building an authentication microservice in Golang

If you want to configure monitoring for existing microservices, you need to understand what metrics need to be set. For the signup microservice, you need to know the total number of requests to this function (this is for all the deployments and instances running), the number of successful requests, and the number of failed requests. This can be set up easily using the following code:

var (
	singupRequests = promauto.NewCounter(prometheus.CounterOpts{
		Name: "signup_total",
		Help: "Total number of signup requests",})
	signupSuccess = promauto.NewCounter(prometheus.CounterOpts{
		Name: "signup_success",
		Help: "Successful signup requests",})
	signupFail = promauto.NewCounter(prometheus.CounterOpts{
		Name: "signup_fail",
		Help: "Failed signup requests",})
)

Since there can also be errors for the signin microservice, it’s extremely important to set up error monitoring. So, there will be an extra metric to monitor. You can set up many alerts to get a better idea if erroneous code is running:

var (
	…
	signinError = promauto.NewCounter(prometheus.CounterOpts{
		Name: "signin_error",
		Help: "Erroneous signup requests",})
)

As an example scenario, you might set up an alert when there are a lot more failed requests to the signin function than the successful requests. This can be the case when someone is trying to run a brute force attack on your server. As soon as this alert is triggered, you can begin mitigating it. 

Look closely at the code block below. At the start of every function, we increment the promSiginTotal metric. If there is a bad request, we increment the promSigninFail metric. And if there is an internal server error, we increment the promSigninError metric. If all goes well, we increment the promSigninSuccess metric. 

promSigninTotal = promSigninFail + promSigninSuccess + promSigninError

func (ctrl *SigninController) SigninHandler(rw http.ResponseWriter, r *http.Request) {
	// increment total singin requests
	ctrl.promSigninTotal.Inc()
	// lets see if the user exists
	valid, err := validateUser(r.Header["Email"][0], r.Header["Passwordhash"][0])
	if err != nil {
		// this means either the user does not exist
		ctrl.logger.Warn("User does not exist", zap.String("email", r.Header["Email"][0]))
		ctrl.promSigninFail.Inc()
		return
	}
	if !valid {
		// this means the password is wrong
		ctrl.logger.Warn("Password is wrong", zap.String("email", r.Header["Email"][0]))
		ctrl.promSigninFail.Inc()
		return
	}
	tokenString, err := getSignedToken()
	if err != nil {
		ctrl.logger.Error("unable to sign the token", zap.Error(err))
		ctrl.promSigninError.Inc()
		return
	}
	ctrl.promSigninSuccess.Inc()
}

The metric output when you visit localhost:9090/metrics looks like this:

concurrency in golang: Metrics output

Prometheus also monitors a lot of additional information. This also gives us insights into our application’s resource consumption. As you can see, there are nine goroutines. This is because of the concurrent execution of the Golang compiler needed to run the main code. This will increase when you add goroutines from your side.

Aggregating and Transforming Data with Channels

concurrency in golang: transforming data with channels

After understanding the basics of Prometheus metrics, it’s time to start building the data pipeline. Please refer to the above diagram to understand what you’re going to build: a concurrency-driven data pipeline that will have four steps. 

The first step is data aggregation, where each microservice we’re monitoring will have a dedicated goroutine that queries/metrics the endpoint of our application to get Prometheus endpoints every five seconds. After fetching the metrics, based on the microservice name, each goroutine picks up the metrics as defined in config.yaml. Just these metrics are picked and sent to an input-only channel, which is the aggregator channel. The value sent to the aggregator channel is a string that has the metric name and value separated by a space:

// dataAggregator combines data from the microservices provided
func dataAggregator(log *zap.Logger, trackData confMap, key string, aggregateChan chan<- []string) error {
	// data is fetched every 5 seconds from the microservice specified.
	fetchInterval := time.NewTicker(5 * time.Second)
	// we start an infinite loop as we want to monitor always.
	for {
		// the above fetchInterval gives us a channel
		select {
		case <-fetchInterval.C:
	// we send the get request to prometheus http endpoint
			resp, err := http.Get(endpoint)
// if there is no error, move ahead, else stop the goroutine immediately
			if err != nil {
				log.Error("Error", zap.Any("message", err))
				return err}
			// read the response body from the server
			body, err := io.ReadAll(resp.Body)
			if err != nil {
				log.Error(err.Error())}
			// now we pick out the metrics that we want
			// this is based on the conf yaml file
			contents := string(body)
			aggregate := []string{}
	// i have used this to get value dynamically from a struct
			reflection := reflect.ValueOf(trackData)
			reflectionField := reflect.Indirect(reflection).FieldByName(key)
			// get the keys we want to fetch
			allKeys := strings.Split(reflectionField.String(), ",")
			for _, metricValue := range strings.Split(contents, "\n") {
				for _, metricKey := range allKeys {
	// check if the given line has the metric we are looking for
					if strings.Contains(metricValue, metricKey) {
						// ignore the first character
						if metricValue[0] != '#' {
							aggregate = append(aggregate, metricValue)}}}}
			// save the result in the aggregateChannel
			aggregateChan <- aggregate
		}}}

We’ve specifically made an input channel for this function. As multiple goroutines will write to this channel, it’s important to define the type of the channel in the function parameters. We don’t want the dataAggregator function to ever read from the channel. This can be done using aggregateChan chan<- []string in the function. The arrow shows that data will go to the channel in this function. 

We can spin up two parallel goroutines in the main function and provide different keys to set up the monitor module (you’ll see this later on). First, let’s set up all of the main functions.

Data transformer takes the aggregate channel that we wrote to in the last section and reads the values there. These values are then transformed and metadata is added to them. After this, they are turned into JSON strings and sent to a unique channel for each goroutine. This is done using a simple struct, and we can even track which goroutine is running using pipeline ID. Even here we are checking for errors so we can stop the goroutines.

// dataTransformer reads from aggregate channel and transorms into a struct. This is output to a unique channel for this goroutine only.
func dataTransformer(log *zap.Logger, aggeregateChan <-chan []string, transformChan chan<- string, routineId int, region string, pipelineid int) error {
	// read from the channel
	for val := range aggeregateChan {
		tr := transformationOutput{
			Time:          time.Now(),
			Region:        region,
			PipelineId:    pipelineid,
			MetricDetails: val,
		}
		// marshal into byte array
		op, err := json.Marshal(tr)
		if err != nil {
			log.Error("Error Encountered", zap.Int("routine id", routineId), zap.Any("error", err))
		}
		// send the output to the respective channel
		transformChan <- string(op)
	}
	return nil
}

It’s good to note that the aggregate channel is an output channel in this function. We have defined aggeregateChan <-chan []string with an arrow pointing out from chan, which signifies data is given out of this channel. This goes to a transformChan chan<- string, which is an input channel. This way we restrict the scopes of channels as per our functions.

Combining Transformed Data with a Fan-In Mechanism

You have two goroutines that are sending their outputs to two different channels. The process of fan-in takes the values in these two channels separately and inputs them to a single channel. In this case, the order doesn’t matter. It’s even possible to impose timeouts if the previous processes are bulky and time-consuming. 

A select statement in Golang is most useful for this case. This is important, since we’d need an infinite loop as the channels are unbuffered and never closed.

transportChan := make(chan string)
	// now we need fanin of 2 channels to a single channel. This final channel will be passed for transportation
	// we can write another goroutine for this
	go func() {
		for {
			select {
			case op1 := <-transform1Chan:
				transportChan <- op1
			case op2 := <-transform2Chan:
				transportChan <- op2
			case <-time.After(3 * time.Second):
				// the last statement imposes a timeout that maybe needed if the above routines are taking time to execute
			}
		}
	}()

In this example, we are imposing a timeout of three seconds if transform1Chan and tranform2Chan are empty. This is a useful mechanism when the application has irregular data streams. 
As you can see, the data from transform1Chan and transform2Chan are both input to a transportChan. This is again done with the help of a goroutine, which runs forever. We don’t want to miss any data. All the channels set are unbuffered, i.e., they don’t have any limits on storage of data. As such, they are blocking in nature and wait until there are further events to process.

Transportating Data and Binding with Channels and WaitGroups

Finally, you’ll complete your data pipeline using a transport function. This section discusses the binding of the above function using goroutines and then shows the demo function working using wait groups. 
After getting the fanned-in data in your transportChan channel, you need to transport it using an HTTP POST request to an output endpoint. With multiple concurrent goroutines, it’s possible to send multiple requests simultaneously. That’s the biggest advantage because the output endpoint won’t depend on the internal working of the monitoring module.

// dataTransportation send a POST request to our endpoint which is on our server only
func dataTransportation(log *zap.Logger, transportChan <-chan string, successChan chan<- string) error {
	for op := range transportChan {
		// this is fanned in channel
		// we send a POST request with our metric data
		response, err := http.Post(outputEndpoint, "application/json", bytes.NewBuffer([]byte(op)))
		if err != nil {
			log.Error("Error in transportation", zap.Error(err))
		}
		responseBody, err := ioutil.ReadAll(response.Body)
		if err != nil {
			log.Error("Error in Reading Output body", zap.Error(err))
			return err
		}
		// this is a temp channel to show how waitgroups function
		successChan <- string(responseBody)}
	return nil}

Now, let’s bind all the functions together. As you can see in the code below, we first read the config.yaml file to fetch the metrics that need to be monitored. Next, we set up a channel called aggregateChan of type []string. This is then passed to two goroutines which monitor signin and signup microservices respectively. The config data is parsed and accounted for to get the data in aggregateChan

The two goroutines you just created will run indefinitely and call the Prometheus HTTP every five seconds to get data for each microservice. So, the value in aggregateChan will keep on getting populated every five seconds. 

Next, we create two channels of type string. These are the channels that transformer goroutines will write to. Both the transformer goroutines we spin up next read from the aggregator channel. For each record read, that record is deleted from the aggregator channel. This ensures that there is no duplication of metrics.

Each goroutine will transform the data and write to transform1Chan and transform2Chan. The order of data in the pipeline does not matter and any goroutine can pick up any available record in aggregateChan based on the execution environment. These two channels will also get populated and so, to prevent data race, we need to set up a fan-in mechanism. 

The data from tranform1Chan and transform2Chan is written to a transport channel. The transportChan is read by two goroutinesm which then send the data to our configured endpoint. The response from the final endpoint is collected in a channel called successChan.

// simple function that prints the channels value available. It takes in the Waitgroup
func showDemo(log *zap.Logger, wg *sync.WaitGroup, finalChan <-chan string) {
	log.Info("final Chan ", zap.String("message", <-finalChan))
	wg.Done()}

// MonitorBinder binds all the goroutines and combines the output
func MonitorBinder(log *zap.Logger) error {
	// read the yaml file and unmarshal to the metricMap
	content, err := ioutil.ReadFile(metricConfFile)
	if err != nil {return err}
	// we read the info from config yaml file
	metricEndpoints := confMap{}
	yaml.Unmarshal(content, &metricEndpoints)
	log.Info("Contents", zap.Any("Any", metricEndpoints))
	// setting up unbuffered channel
	aggregateChan := make(chan []string)
	// Go routintes to monitor metrcis for 2 sevices
	// there are signin and signup
	go dataAggregator(log, metricEndpoints, "Signin", aggregateChan)
	go dataAggregator(log, metricEndpoints, "Signup", aggregateChan)
	// these are the output channel for transormer function
	transform1Chan := make(chan string)
	transform2Chan := make(chan string)
	// these 2 goroutines transform the data and send output to transform1Chan and transform2Chan
	go dataTransformer(log, aggregateChan, transform1Chan, 1, metricEndpoints.Metadataregion, metricEndpoints.Metadatapipelineid)
	go dataTransformer(log, aggregateChan, transform2Chan, 2, metricEndpoints.Metadataregion, metricEndpoints.Metadatapipelineid)
	transportChan := make(chan string)
	// now we need fanin of 2 channels to a single channel. This final channel will be passed for transportation
	// we can write another goroutine for this
	go func() {
		for {
			select {
			case op1 := <-transform1Chan:
				transportChan <- op1
			case op2 := <-transform2Chan:
				transportChan <- op2
			case <-time.After(3 * time.Second):
			}}}()
	// this has the output from our server
	successChan := make(chan string)
	// spin 2 goroutines in parallel to get the successChan fast
	go dataTransportation(log, transportChan, successChan)
	go dataTransportation(log, transportChan, successChan)
	// now we implement a WaitGroup to get the first 5 values only of the response body
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		// add to the waitgroup. Acts like a semaphore
		wg.Add(1)
		go showDemo(log, &wg, successChan)
	}
	// this goroutine only runs after the first 5 responses have been printed
	go func() {
		wg.Wait()
		log.Info("=========== WE ARE DONE FOR THE DEMO ===========")
	}()
	return nil
}

As you can see, the data flow in our data pipeline is asynchronous and indefinite. As long as the Prometheus endpoint is up, our pipeline will keep on submitting data. 

The goroutines have unbuffered channels and they don’t depend on the previous goroutine. However, this will not always be the case. You may want to run certain goroutines after the execution of a goroutine is done. This is done using WaitGroups in Golang provided by the sync package. When you define a WaitGroup, you can add the goroutine to WaitGroup using wg.Add(1) and remove the execution from WaitGroup after goroutine has completed using wg.Done(). Goroutines with wg.Wait() will wait for all the goroutines added in wait group to call wg.Done(). This is a numerical counter and can be seen as a semaphore-based execution. In the above code, we call five goroutines that simply perform a print from the success channel. After printing, they update the waiting group. Another goroutine waits for these groups and then prints the message “WE ARE DONE FOR THE DEMO” finally. 

Receiving Data from Applications

You can set up an internal endpoint that will be queried by your monitor module. This will simply output the received records to a file. The use case in this scenario is to dynamically filter and get the specified metrics only to a text file that can be viewed by upper management.

func simplePostHandler(rw http.ResponseWriter, r *http.Request) {
	fileName, err := os.OpenFile("./metricDetails.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		log.Fatal("error in file ops", zap.Error(err))
	}
	defer fileName.Close()
	reqBody, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Fatal(err)
	}
	fileName.Write([]byte(reqBody))
	fileName.Write([]byte("\n"))

	rw.Write([]byte("Post Request Received for the Success"))
}

Understanding Concurrency in Golang at Scale

With modern machines, we’ve moved towards processors with multiple cores. Multiple cores provide the scope for parallelism and concurrency. When there is only a single processor present, context switching enables concurrency. The cloud contains thousands of machines clubbed together with the leading hardware and infrastructure to enable high computation powers. If we write code that only utilizes a single core, we won’t be using the best benefits of the infrastructure that the cloud provides. Goroutines can be seen as user-created threads that are extremely lightweight and make execution memory shared using channels. 

It should be remembered that the main function is a goroutine, too. In the above example, the monitorBinder function is called using the main goroutine and so it moves ahead after spinning a goroutine. So, it can be inferred that the owner of a channel is another goroutine (i.e., the main goroutine in our case). The Golang scheduler manages the running of goroutines and implementation.

With these lightweight threads in place, we can boost the performance of our applications considerably. If we wrote the above use case as a single function without any goroutine, many parallel executions might have been delayed. When using goroutines for this case, there is a performance boost of up to 2x. 

Utilizing the power of the cloud should be the main aim of modern application development. In some cases, concurrent execution in golang can provide a performance boost of more than 8x. 

Moreover, the day-to-day change in the adoption of Golang is due to how extremely easy it is to implement. However, you should only optimize the code to a certain extent as over-optimization has its own demerits. 

With concurrency built-in, the paradigm of errors and issues becomes more complex — like deadlock handling in advanced scenarios. Having a scalable application makes sure that your app is highly available. At the same time, having a concurrent application makes sure that you are utilizing your resources efficiently and cutting down additional resources.

Building Forward

This guide presents a concurrency-based data pipeline. Now that you understand the basics, you should research atomic storage in Golang and learn how to create a clash-free and deadlock handling mechanism that supports multiple concurrent writes. To me, it’s an interesting stage and I am planning to write an article and extend product-micro-go for this. Stay tuned!

In the meantime, the code presented in this guide can be found here. Have a look and start building real-time data pipelines to monitor your own Golang applications. Good luck!
This blog post was created as part of the Mattermost Community Writing Program and is published under the CC BY-NC-SA 4.0 license. To learn more about the Mattermost Community Writing Program, check this out.