show_menu logo_rd
Taras Kushnir

Software engineer at ELEKS

Computing on the Go

0

Intro

For years now, efficient middleware solutions have been implemented mainly in C++. Because no other language allowed using high-level abstractions and low-level system programming at the same time, we at ELEKS have mainly been developing and implementing load balancing solutions using C++. Until now. Until Go, which emerged as an experiment of three Google programmers. Go offered a new approach to developing system software on multicore machines and we just couldn’t help but try it. In this article we’ll explore how to develop a distributed Load Balancer in Go, discuss its pros and cons and compare the process with the development of a similar Load Balancer in C++.

Computing_Go_ELEKSlabs_2

Many of you remember Rob Pike’s “Concurrency is not Parallelism” presentation and a Load Balancer is a bright example of how you can solve a complex problem in Go relatively easily. Previously, Load Balancer was balancing work between a number of Workers and reported results to the Requester. Today we’ll go a few steps further and develop robust networked load Balancer, which means, in terms of Pike’s presentation, that Requester, Balancer and Worker would not necessarily run on one computer.

So, let’s define some clear objectives:

  • The Requester, the Balancer and the Worker should be able to interact via TCP/IP.
  • The connection should be in the “keep-alive” state and notify about hang/disconnected parties in a silent way.
  • The Requester should be able to see the status of the sent work tasks to log them or notify the end user.
  • The Worker should be able to run computations defined as Go code or C++ dynamic library.
  • The overall architecture should be simple to understand, the Balancer should not hang, etc.
  • Solve interesting problems using Load Balancer.
  • Compare Go Load Balancer with C++ Load Balancer in terms of time needed for development and code complexity

Chapter 1. Synchronization Basics

To build a concurrent application, you have to use synchronisation primitives (which, however, may be not so primitive). In Go, such main synchronisation primitives are channels.

A channel can be compared to a two-way pipe in Unix shells: you can send and retrieve data from it. Channels are strictly-typed so you can send and receive data of specific type. In general case, writing to channel and reading from it are blocking operations, so this is the place where synchronisation is performed.

data := <- channel // reading from channel
channel <- data // writing to channel

Also, Go provides special language construct (synchronisation “primitive”) called select. It looks like_switch_ operation but all cases refer to communication via channels. Select chooses which of several read and write operations are ready to proceed at the moment. You can refer to official documentation of select to get more information.

Synchronisation primitives usually protect some data from parallel access or modification. Such parallel access can be executed from entities that can be executed concurrently or even in parallel: processes or threads. In Go, such parallel entities are goroutines: ordinary functions, executed in parallel with other goroutines in the same address space (Erlang has a similar model). You can think of goroutines cheap threads.

Go encourages to pass data between goroutines through channels. The main idea of how to completely avoid data races is only a responsible goroutine gets to modify data. We’ll talk more on that later. It is the main commandment for concurrent programming in Go. If you keep it, you won’t have to use any other synchronisation (atomic variables, mutexes, etc.).

By the way, the concepts of goroutines and channels are based on the work of Anthony Hoare from “Communicating Sequential Processes” (CACM 1978). An interested reader can refer to the history of Go for more details.

Chapter 2. Networking Basics

Our main objective is to make all parts of our distributed computation system communicate over a network so it’s worth mentioning how that communication will run. A common model suggests that a server in a loop accepts connections on a listening socket and runs goroutines to handle them. A common server side looks like this:

ln, err := net.Listen("tcp", ":4321")
if err != nil {
log.Fatal(err)
}
for {
conn, err := ln.Accept()
if err != nil {
log.Println(err)
continue
}
go handleConnection(conn)
}

We create a listening socket and bind it to a localhost and port 4321. Then, in an infinite loop we accept connection on that socket and handle it. The function handleConnection(), which is launched as a goroutine, immediately returns the control back to our loop and we’re able to accept next connection.

Typical client code would contain:

conn, err := net.Dial("tcp", addr)
if err != nil {
// send actual data
}

The Go language model usually means frequent checks for errors when calling functions, so it’s useful to find some workaround when using typical serialising and deserializing of a structure to and from a socket. The code below is the one we’re trying to optimise:

err = binary.Write(stream, binary.BigEndian, dataField1)
if err != nil { return err }
err = binary.Write(stream, binary.BigEndian, dataField2)
if err != nil { return err }
err = binary.Write(stream, binary.BigEndian, dataField3)
if err != nil { return err }
// so on..

To beat this ugly code, let’s introduce a wrapper over io.Writer:

type BinWriter struct {
W io.Writer
Size int64
Err error
}
func (w *BinWriter) Write(v interface{}) {
if w.Err != nil {
return
}
if w.Err = binary.Write(w.W, binary.BigEndian, v); w.Err == nil {
w.Size += int64(binary.Size(v))
}
}

Now our piece of code with serialising would look like this:

bw := &BinWriter{W: someActualWriter}
bw.Write(dataField1)
bw.Write(dataField2)
bw.Write(dataField3)
return bw.Err

This approach has a lot of space for improvement. For example, we can write data to the buffer and then flush it to the socket with the principe all-or-nothing, but the purpose was to show you the idea.

If you’re curious how more advanced networking looks like in Go, you can refer to Network Programming with Go by Yan Newmarch.

Chapter 3. Balancer Architecture

Since our Balancer is now a network service, we’ll assume it has two listening sockets: one for the Requester and another one for all Workers. The idea is that the Balancer will be a completely passive actor. Only the Requester and the Worker would send requests and receive responds. Such model makes the correct implementation of Balancer easier in terms of the Go synchronisation idea: only one goroutine gets to modify data. We’ll have select (read “lock”) on worker commands channels reading, so even if we receive several worker requests (several same handeConnection() goroutines are launched), all of them will modify data sequentially.

Computing_Go_ELEKSlabs_3

Talking about Worker requests, we can define the following:

  • Register (a newly launched Worker pings the Balancer and latter registers it, assigning a unique ID)
  • Health check (the registered Worker establishes a pingback connection for the keep-alive state)
  • Get task (the registered Worker asks the Balancer to respond with the next task assigned to a particular worker)
  • Send result (the registered Worker sends result of task computation to the Balancer)

The Requester can send the following requests:

  • Register (a newly launched Requester pings the Balancer and latter registers it)
  • Health check (the registered Requester establishes a pingback connection for the keep-alive state)
  • Send input (the registered Requester sends input for computations)
  • Get output (the registered Requester asks the Balancer to respond with computation results)

For each of these requests we can define a channel and launch a goroutine with select for Worker commands and another goroutine with select for Requester commands. All handlers of Worker requests will operate with theWorkers Pool (register, find, remove Worker), so it should be protected from parallel modification and select gives us such functionality.

Suppose we have some data structure Worker (which is a representation of the actual Worker) and Socket in a package common (which is our wrapper over Go’s socket), then we can define those channels like this:

type WorkerChannels struct {
addworker chan *Worker
healthcheck chan common.Socke
getTaskRequest chan common.Socket
}

Variable name goes first in it’s declaration (read more on Go declaration syntax in the official documentation).

Then, we will handle those channels in a loop:

func handleWorkerChannels(wch WorkerChannels) {
log.Println("Worker channels handling started")
defer log.Println("Worker channels handling finished")
WorkerLoop:
for {
select {
case w := <- wch.addworker: addWorker(w)
case s := <- wch.healthcheck: checkHealthWorker(s)
case s := <- wch.getTaskRequest: sendNextTaskToWorker(s)
}
}
}

which is launched as a goroutine:

workerChannels := WorkerChannels{
addworker: make(chan *Worker),
healthcheck: make(chan common.Socket),
getTaskRequest: make(chan common.Socket)}
go handleWorkerChannels(workerChannels)

Requester commands are handled like Worker ones (You might have noticed, we are only reading from commands channels. But who writes to them? It’s the job of TCP servers we have discussed earlier, one of which is listening to Workers connections and the other one – to Requesters), reading command from connection and writing it to an appropriate channel.

func handleWorker(sock common.Socket, wch WorkerChannels) error {
var opType byte
err := binary.Read(sock, binary.BigEndian, &opType)
if err != nil {
return err
}
optype := common.WorkerOperation(opType)
switch optype {
case common.WInit:
wch.addWorker <- &Worker{ID: 0}
go sock.Close()
case common.WHealthCheck:
wch.healthcheck <- sock
case common.WGetTask:
wch.getTaskRequest <- sock
log.Printf("Waiting for worker connection with code [%v] to finish", optype)
<-sock.Done
return nil
}

Where the common.Socket is defined like this:

type Socket struct {
Conn net.Conn
Done chan bool
}
func (s Socket) Close() error {
s.Done <- true
return nil
}

Actual handlers like checkHealthWorker or sendNextTaskToWorker would close the socket when they finish with it and notify the current goroutine using the Done channel. In this example, addWorker does not receive socket as a parameter, so the current function launches a goroutine to close the socket.

Chapter 3. Keep-alive State

Since checking health is common for the Worker and the Requester, lets talk more about it. We need a socket which is going to be maintained in the health reporting state and a way to notify somebody if health checking fails.

Health checking consists of two parts: physical and logical. The physical part involves reading and writing to a socket and the logical part involves reply analysis and response generation. We can implement the logical and the physical parts separately using anonymous goroutines.

The physical part (simplified) can look like this:

go func(healthcheck chan int, done chan bool) {
var heartBeat int
for {
binary.Read(sock, binary.BigEndian, &heartBeat)
healthcheck <- heartBeat
select {
case healthReply := <- reply: {
binary.Write(sock, binary.BigEndian, healthReply)
}
case <- done: return
}
}
}(healthcheckChannel, doneChannel)

And the logical part can be as simple as this:

Loop:
for {
select {
case heartBeat := <- healthcheck: {
setHealthStatus(heartBeat)
reply <- getHealthReply()
}
case <- time.After(5 * time.Second): {
log.Printf("Timeout in healthcheck")
done <- true
timeout <- hr
break Loop
}
}
}

Chapter 4. Balancing Tasks

Rob Pike’s Balancer was so simple because it interacted with Workers directly. But we have to deal with networking and passive behaviour (because Workers may be on network nodes restricted in accepting communications). So lets divide our implementation in physical and logical parts like we did in health checking. The logical part will now consist of assigning tasks to some Workers’ logical abstractions that are locally accessible to the Balancer – a list of registered Workers with positive health check status (which means they are alive and can carry out tasks or process commands). The physical part would be the part of task balancing lifecycle where the Worker requests next task for computations.

But how will the Worker know that it should requests something for computation? Remember about health checking? It’s a request-response action and the Balancer can easily notify the Worker through response that it has something for the Worker to do.

For our logical model, we’ll use the same idea as the local Balancer had: a heap (priority queue) of Workers sorted by the number of tasks assigned. If we want to introduce some filtering (e.g. a Worker can have a restriction for maximum simultaneously assigned tasks), we can easily implement it on this logical layer. On the physical layer, the Worker will only download and compute another task.

If you are wondering about the head, it’s an arbitrary type which implements the following interface:

Len() int
Less(i, j int) bool
Swap(i, j int)
Push(x interface{})
Pop() interface{}

Chapter 5. Reporting Progress

Having progress reporting is important for any complex computation that takes more than a second to complete and it is a must for our Balancer, which is supposed to handle heavy computations.

Obviously, in our case the Requester cannot ask the Worker directly. The Worker can report its progress to the Balancer (via health check request) and the Balancer can send overall reports in a health check response to the Requester. The Balancer should accumulate results received from Workers and give the overall score to the Requester. But how to reduce the load on the Balancer?

Assuming our computation is pretty heavy and time-consuming, it will not be that bad if we provide statistics of completed tasks to the Requester with little delay. Each logical representation of the Worker will hold the number of its completed task and from time to time (defined by us) an accumulating goroutine that updates statistics sent to the Requester will be launched.. Eventually, the Requester will receive correct statistics with a delay that we’ll define as an interval for launching accumulator.

We will eliminate additional expenses on locking really useful channels in select when adding statistics from every Worker each time it updates. Go has a useful package time which defines a Ticker – an object that writes data to a channel with specified timeout. It’s very useful for our purpose:

statUpdateTicker := time.NewTicker(3*time.Second)
for {
select {
// ... other channels ...
case <- statUpdateTicker.C: refreshDoneStats()
}
}

where the refreshDoneStats() method updates overall statistics every 3 seconds (as a parameter of the “static constructor” NewTicker()).

In the next post we’ll discuss working with C++ libraries, solving interesting issues with GO and finally compare building a Load Balancer with GO and with C++.

Taras Kushnir

Taras Kushnir is a software engineer at ELEKS. He is enthusiastic about Ruby, C++ and .NET and likes participating in programming contests like ACM/ICPC. Competitiveness takes over his hobbies too: Taras likes long-distance running, table tennis and mountaineering. Entomology and drawing are also his hobbies. Taras has always liked writing and with enough experience to share with readers, he started blogging.

tags

Comments: 0