Concurrency

Go’s concurrency model is based on what are called “goroutines” - essentially lightweight threads. To invoke a goroutine, we use the go keyword:

go func() {
    fmt.Println("hello, world")
}()

If we were to add the above to a main() function and run it, we would probably see no output. This is because the main exited before the goroutine had time to finish. To see this in action, let’s add a sleep to give the goroutine some time to run:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func main() {
 9     go func() {
10         fmt.Println("hello, world")
11     }()
12 
13     time.Sleep(1 * time.Second)
14 }
$ go run main.go
hello, world

What would happen if we had multiple goroutines? Let’s try:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func main() {
 9     go func() {
10         fmt.Println("hello, world 1")
11     }()
12 
13     go func() {
14         fmt.Println("hello, world 2")
15     }()
16 
17     go func() {
18         fmt.Println("hello, world 3")
19     }()
20 
21     time.Sleep(1 * time.Second)
22 }
$ go run main.go
hello, world 2
hello, world 1
hello, world 3

We notice the goroutines all ran, and not in the order in which they were written. This is the expected behavior.

But we can’t use time.Sleep everywhere in our code to wait for our goroutines to finish, right? In the next sections we’ll discuss how to organize our goroutines.

sync.WaitGroup

With sync.WaitGroup we can avoid using time.Sleep to wait for our goroutines to finish. Instead, we create a sync.WaitGroup and add 1 to its counter for every goroutine we expect to launch. Then, inside each goroutine we decrement the counter. Finally we call the Wait() method on the WaitGroup to wait for all of our goroutines to finish. Let’s modify our previous example to use a sync.WaitGroup:

sync.WaitGroup
 1 package main
 2 
 3 import (
 4 	"fmt"
 5 	"sync"
 6 )
 7 
 8 func main() {
 9 	var wg sync.WaitGroup
10 	wg.Add(1)
11 	go func() {
12 		defer wg.Done()
13 		fmt.Println("hello, world 1")
14 	}()
15 
16 	wg.Add(1)
17 	go func() {
18 		defer wg.Done()
19 		fmt.Println("hello, world 2")
20 	}()
21 
22 	wg.Add(1)
23 	go func() {
24 		defer wg.Done()
25 		fmt.Println("hello, world 3")
26 	}()
27 
28 	wg.Wait()
29 }

When we run this code, we get the same or similar output to when we were using time.Sleep:

$ go run main.go
hello, world 3
hello, world 1
hello, world 2

All of our goroutines finished, and we didn’t need to use a time.Sleep to wait for them.

errgroup

errgroup is similar to sync.WaitGroup but provides convenient error handling functionality.

errgroup.Group
 1 package main
 2 
 3 import (
 4 	"fmt"
 5 	"log"
 6 	"os/exec"
 7 	"strconv"
 8 	"strings"
 9 	"sync/atomic"
10 
11 	"golang.org/x/sync/errgroup"
12 )
13 
14 // lineCount returns the number of lines in a given file
15 func lineCount(filepath string) (int, error) {
16 	out, err := exec.Command("wc", "-l", filepath).Output()
17 	if err != nil {
18 		return 0, fmt.Errorf("could not run wc -l: %s", err)
19 	}
20 
21 	// wc output is like: 999 filename.go
22 	count, err := strconv.Atoi(strings.Fields(string(out))[0])
23 	if err != nil {
24 		return 0, fmt.Errorf("could not convert wc -l output to integer: %s", err)
25 	}
26 
27 	return count, nil
28 }
29 
30 func main() {
31 	var totalLineCount int64
32 	var g errgroup.Group
33 
34 	files := []string{
35 		"foo.txt",
36 		"bar.txt",
37 		"baz.txt",
38 	}
39 
40 	for _, file := range files {
41 		file := file
42 		g.Go(func() error {
43 			lc, err := lineCount(file)
44 			if err != nil {
45 				return err
46 			}
47 
48 			atomic.AddInt64(&totalLineCount, int64(lc))
49 
50 			return nil
51 		})
52 	}
53 
54 	if err := g.Wait(); err != nil {
55 		log.Fatal(err)
56 	}
57 
58 	log.Printf("total line count: %d", totalLineCount)
59 }

In this example, we have a list of files for which we want to get the word count concurrently. We loop over each filename and launch a goroutine with the errgroup’s Go function. The lineCount function calls out to the wc command. Note that due to how closures work with goroutines, we must set file := file before launching the goroutine (https://golang.org/doc/faq#closures_and_goroutines).

Inside the goroutine, we get the line count and error return values from lineCount. If we encounter an error, we return it. After launching the goroutines, we call g.Wait() to wait for them to finish. We also get an error back if any errors were returned in the process:

$ go run main.go
2020/05/21 13:51:59 total line count: 17

To force an error, let’s remove one of the files we’re checking:

$ rm foo.txt
$ go run main.go
2020/05/21 13:52:35 could not run wc -l: exit status 1
exit status 1

Channels

Channels can be used to send and receive values. They’re often used in goroutines; a goroutine will do some work and then send the result to the channel that was passed in as an argument:

Channel usage
 1 package main
 2 
 3 import "fmt"
 4 
 5 func main() {
 6 	ch := make(chan string)
 7 
 8 	go func(ch chan string) {
 9 		ch <- "hello, world 1"
10 	}(ch)
11 
12 	go func(ch chan string) {
13 		ch <- "hello, world 2"
14 	}(ch)
15 
16 	go func(ch chan string) {
17 		ch <- "hello, world 3"
18 	}(ch)
19 
20 	a, b, c := <-ch, <-ch, <-ch
21 
22 	fmt.Println(a)
23 	fmt.Println(b)
24 	fmt.Println(c)
25 }

This code is functionally equivalent to our sync.WaitGroup example in the previous section. Note that we must declare the channel before it’s used, as we see with ch := make(chan string). We create the channel, then each goroutine sends a string onto the channel. Finally we select 3 values from the channel and print them out at the end of the program.

What would happen if we only sent 2 strings but tried to select 3?

Channel usage
 1 package main
 2 
 3 import "fmt"
 4 
 5 func main() {
 6 	ch := make(chan string)
 7 
 8 	go func(ch chan string) {
 9 		ch <- "hello, world 1"
10 	}(ch)
11 
12 	go func(ch chan string) {
13 		ch <- "hello, world 2"
14 	}(ch)
15 
16 	// go func(ch chan string) {
17 	// 	ch <- "hello, world 3"
18 	// }(ch)
19 
20 	a, b, c := <-ch, <-ch, <-ch
21 
22 	fmt.Println(a)
23 	fmt.Println(b)
24 	fmt.Println(c)
25 }
$ go run main.go
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
	/Users/gopher/foo.go:20 +0xf2
exit status 2

Goroutines in web handlers

We mentioned that goroutines run until the main function exits. This means that a web handler can create goroutines to do background processing and can return before the goroutines finish. Let’s see an example:

Goroutine in web handler
 1 package main
 2 
 3 import (
 4 	"log"
 5 	"net/http"
 6 	"time"
 7 )
 8 
 9 func main() {
10 	http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
11 		go func() {
12 			time.Sleep(3 * time.Second)
13 			log.Println("hello, world")
14 		}()
15 
16 		return
17 	})
18 
19 	log.Println("Running on :8080...")
20 	log.Fatal(http.ListenAndServe("127.0.0.1:8080", nil))
21 }

If we run this code in one terminal, we should see:

$ go run main.go
2018/11/02 13:21:23 Running on :8080...

Now in a second terminal, let’s do a curl:

$ curl localhost:8080
$

The curl returns almost immediately. Now if we wait a couple of seconds, we should see our print statement in the first terminal:

$ go run main.go
2018/11/02 13:21:23 Running on :8080...
2018/11/02 13:21:26 hello, world

Meaning the goroutine continued running in the background after our HTTP handler returned.

Pollers

Goroutines are useful when writing pollers as well. Let’s say we have a Go program that functions as a web server, but we also want to poll for some data in the background. We’ll keep the web server functionality small for the sake of simplicity in our example. Let’s poll for advisory information from BART, the public transportation system serving the San Francisco Bay Area:

BART advisory poller
 1 package main
 2 
 3 import (
 4 	"encoding/json"
 5 	"io/ioutil"
 6 	"log"
 7 	"net/http"
 8 	"time"
 9 )
10 
11 const bsaEndpoint = "http://api.bart.gov/api/bsa.aspx?cmd=bsa&key=MW9S-E7SL-2\
12 6DU-VV8V&json=y"
13 
14 type bsaResponse struct {
15 	Root struct {
16 		Advisories []BSA `json:"bsa"`
17 	}
18 }
19 
20 // BSA is a BART service advisory
21 type BSA struct {
22 	Station     string
23 	Description struct {
24 		Text string `json:"#cdata-section"`
25 	}
26 }
27 
28 func poll() {
29 	ticker := time.NewTicker(5 * time.Second)
30 	defer ticker.Stop()
31 	for {
32 		select {
33 		case <-ticker.C:
34 			resp, err := http.Get(bsaEndpoint)
35 			if err != nil {
36 				log.Println("ERROR: could not GET bsaEndpoint:", err)
37 			}
38 			defer resp.Body.Close()
39 
40 			b, err := ioutil.ReadAll(resp.Body)
41 			if err != nil {
42 				log.Println("ERROR: could not parse response body:", err)
43 			}
44 
45 			var br bsaResponse
46 			err = json.Unmarshal(b, &br)
47 			if err != nil {
48 				log.Println("ERROR: json.Unmarshal:", err)
49 			}
50 
51 			if len(br.Root.Advisories) > 0 {
52 				for _, adv := range br.Root.Advisories {
53 					log.Println(adv.Station, adv.Description.Text)
54 				}
55 			}
56 		}
57 	}
58 }
59 
60 func main() {
61 	http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
62 		go func() {
63 			log.Println("hello, world 1")
64 		}()
65 
66 		return
67 	})
68 
69 	go poll()
70 	log.Println("Running on :8080...")
71 	log.Fatal(http.ListenAndServe("127.0.0.1:8080", nil))
72 }

Let’s go over what this code does. We have an HTTP handler like our previous example but without the time.Sleep. If we do curl localhost:8080, we should not only see an immediate response, but also a hello, world log statement in the console running our program. After we set up our handler, we run our poll function in a goroutine.

The poll function is making a GET request every 5 seconds to a public BART API endpoint which contains advisory information, such as delays at certain stations. After making the request and unmarshalling it, we print the results.

Why does poll need to be run in a goroutine? Since it contains an infinite loop (for{ ... }), if we were to run it on its own we would never get to the code following it.

Race conditions

When writing concurrent code, we must be careful not to create any race conditions. A race condition occurs in Go when two goroutines access the same variable and at least one access is a write.6

We discuss race conditions and the race detector in detail in the [Tooling]{#racedetector} section. Here we’ll go over a simple example and fix it, but please see the Tooling section for advice on preventing data races in your build.

Consider the following example of two goroutines updating a map concurrently:

Goroutines updating same map
 1 package main
 2 
 3 import (
 4 	"log"
 5 	"time"
 6 )
 7 
 8 func main() {
 9 	m := map[string]string{}
10 
11 	go func() {
12 		m["test"] = "hello, world 1"
13 	}()
14 
15 	go func() {
16 		m["test"] = "hello, world 2"
17 	}()
18 
19 	time.Sleep(time.Second)
20 	log.Println(m["test"])
21 }

Running this code gives us the following output:

$ go run main.go
2018/11/02 17:29:27 map[test:hello, world 2]

Since these two goroutines are updating the same map concurrently, however, there is a data race. We can prevent this by using sync.Map:

sync.Map
 1 package main
 2 
 3 import (
 4 	"log"
 5 	"sync"
 6 	"time"
 7 )
 8 
 9 func main() {
10 	var m = &sync.Map{}
11 
12 	go func() {
13 		m.Store("test", "hello, world 1")
14 	}()
15 
16 	go func() {
17 		m.Store("test", "hello, world 2")
18 	}()
19 
20 	time.Sleep(time.Second)
21 
22 	val, _ := m.Load("test")
23 	log.Println(val)
24 }

This works fine, but the documentation warns us that sync.Map should only be used in the following situations:

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.7

So let’s take their advice and use map paired with a Mutex:

sync.Map
 1 package main
 2 
 3 import (
 4 	"log"
 5 	"sync"
 6 	"time"
 7 )
 8 
 9 type safeMap struct {
10 	sync.Mutex
11 	m map[string]string
12 }
13 
14 func main() {
15 	var sm = safeMap{
16 		m: make(map[string]string),
17 	}
18 
19 	go func() {
20 		sm.Lock()
21 		defer sm.Unlock()
22 		sm.m["test"] = "hello, world 1"
23 	}()
24 
25 	go func() {
26 		sm.Lock()
27 		defer sm.Unlock()
28 		sm.m["test"] = "hello, world 2"
29 	}()
30 
31 	time.Sleep(time.Second)
32 
33 	sm.Lock()
34 	log.Println(sm.m["test"])
35 	sm.Unlock()
36 }

We can add Store, Load, Delete etc. methods to our safeMap type as well:

sync.Map
 1 package main
 2 
 3 import (
 4 	"log"
 5 	"sync"
 6 	"time"
 7 )
 8 
 9 type safeMap struct {
10 	sync.Mutex
11 	m map[string]string
12 }
13 
14 func (sm *safeMap) Store(key, val string) {
15 	sm.Lock()
16 	defer sm.Unlock()
17 	sm.m[key] = val
18 }
19 
20 func (sm *safeMap) Load(key string) (val string, exists bool) {
21 	sm.Lock()
22 	defer sm.Unlock()
23 	val, ok := sm.m[key]
24 	return val, ok
25 }
26 
27 func (sm *safeMap) Delete(key, val string) {
28 	sm.Lock()
29 	defer sm.Unlock()
30 	delete(sm.m, key)
31 }
32 
33 func main() {
34 	var sm = safeMap{
35 		m: make(map[string]string),
36 	}
37 
38 	go func() {
39 		sm.Store("test", "hello, world 1")
40 	}()
41 
42 	go func() {
43 		sm.Store("test", "hello, world 2")
44 	}()
45 
46 	time.Sleep(time.Second)
47 
48 	val, _ := sm.Load("test")
49 	log.Println(val)
50 }
$ go run main.go
2018/11/02 18:45:57 hello, world 2

It also runs with the race detector enabled, no data races found:

$ go run -race main.go
2018/11/02 18:46:22 hello, world 2