Пример многопоточного программирования в Go

Возникла задача: у нас есть компилятор собственного языка программирования, которым мы компилируем некоторый диалект бейсика в исходник на C.

К сожалению, по историческим причинам, у нас не было четкого регрессионного тестирования для этого компилятора. Но сейчас, на основе исходников бизнес-приложения, написанного на этом бейсике, решили сделать полноценное тестирование.

План таков: принять какую-то текущую версию компилятора, на которую нет открытых жалоб от клиентов, за эталон. Скомпилировать этой версией приличное количество исходников, сохранить результат, и затем каждый раз при внесении в компилятор изменений прогонять все эти исходники и смотреть, генерируется ли точно такой же вывод. Это не защитит от появления ошибок в целом, но по крайне мере будет уверенность, что существующий бизнес код все еще компилируется правильно.

Несложная задача. Только есть одно “но”. Количество исходников, которые планируется использовать как эталонные - около 15 тысяч файлов, суммарным объемом чуть меньше гига (для удобства они завернуты в один TAR). Подобный “прогон” может быть весьма долгим. И есть естественное желание сделать тест максимально быстрым, используя многопроцессорную машину, ибо задача прекрасно распараллеливается.

Как вариант - можно сделать Makefile и запускать его с ключом -j в GNU Make. Но если написать специализированную многопоточную программу, то можно достичь лучшей производительности.

Итак, очевидно: вместо последовательного выполнения нужно запускать компиляцию каждого файла в параллельных потоках. Но так как файлов много (~15 тысяч), неэффективно просто одновременно запустить столько много потоков. Разумнее всего будет иметь пул потоков, где их количество будет определяться, например, количеством процессоров (например, умноженное на 2). Пул будет назначать очередную задачу на свободный поток, и если все потоки заняты, он будет блокироваться до тех пор, пока не появиться свободный.

Таким образом, мы будем поддерживать занятыми N потоков, обеспечивая оптимальную загрузку процессоров, не тратя время на лишние переключения контекста и постоянное создание и уничтожение потоков.

Сначала я решил написать все на С++ и pthreads. После нескольких часов танцов вокруг функторов, мьютексов, семафоров и условных переменных, у меня так ничего реально работающего не вышло. И тут я вспомнил про Go. Не поверите - через час работы у меня была готова первая версия, включая мелочевку типа работы с TAR, командной строкой и запуском внешнего процесса.

Итак: данная программа берет TAR с исходниками, распаковывает его, и каждый файл прогоняет через компилятор.

Сразу скажу, цель того, что я все это пишу тут, это продемонстрировать (и не более того), как просто и удобно на Go можно писать многопоточные императивные программы.

Главная концепция, которая используется в этой программе - это каналы. По каналам можно синхронно передавать данные и функции между потоками (Go-рутинами).

Далее, можно просто смотреть по исходнику. Самое интересное место там, где видно, как функция “compile()” может вызываться из нескольких потоков без каких-либо изменений.

package main

import (
        "archive/tar"
        "container/vector"
        "exec"
        "flag"
        "fmt"
        "io"
        "os"
        "strings"
)

// Два флага: количество потоков и имя компилятора.
var jobs *int = flag.Int("jobs", 0, "number of concurrent jobs")
var compiler *string = flag.String("cc", "bcom", "compiler name")

func main() {
        flag.Parse()
        os.Args = flag.Args()
        args := os.Args

        ar := args[0]
        r, err := os.Open(ar, os.O_RDONLY, 0666);
        if err != nil {
                fmt.Printf("unable to open TAR %s\n", ar)
                os.Exit(1)
        }
        // defer - это аналог "finally {}", гарантированное выполнение
        // кода при выходе из блока.
        defer r.Close()

        // Цикл распаковки TAR.
        fmt.Printf("- extracting %s\n", ar)
        // Создаем контекст для распаковки.
        tr := tar.NewReader(r)
        tests := new(vector.StringVector)
        // Последовательный проход по архиву, сохранение файлов и составление
        // списка для компиляции.
        for {
                // Получаем дескриптор следующего файла в архиве.
                hdr, _ := tr.Next()
                if hdr == nil {
                        break
                }
                name := &hdr.Name
                // Если это не заголовочный файл, сохраним имя.
                if !strings.HasPrefix(*name, "HDR_") {
                        tests.Push(*name)
                }
                // Создаем новый файл.
                w, err := os.Open("data/" + *name, os.O_CREAT | os.O_RDWR, 0666)
                if err != nil {
                        fmt.Printf("unable to create %s\n", *name)
                        os.Exit(1)
                }
                // Копируем содержимое в текущий файл.
                io.Copy(w, tr)
                w.Close()
        }

        fmt.Printf("- compiling...\n")
        *compiler , _ = exec.LookPath(*compiler)
        fmt.Printf("- compiler %s\n", *compiler)

        if *jobs == 0 {
                // Вызываем "compile()" последовательно, в основном потоке.
                fmt.Printf("- running sequentially\n")
                for i := 0; i < tests.Len(); i++ {
                        compile(tests.At(i))
                }
        } else {
                // Запускаем "compile()" в параллельных потоках.
                fmt.Printf("- running %d concurrent job(s)\n", *jobs)

                // Канал задач: в этот канал мы будем класть имена файлов,
                // которые надо скомпилировать. Потоки-runner'ы будут ждать
                // сообщений из этого канала. Канал имеет ограничение по
                // длине. Это аналог семафора, чтобы блокировать главный
                // поток, если все runner'ы заняты.
                tasks := make(chan string, *jobs)

                // Канал подтверждения полного завершение потока-runner'а.
                // Главный поток будет ждать, пока все runner'ы ответят
                // по этому каналу. Тип сообщений тут не важен.
                done := make(chan bool)

                // Запускаем runner'ы.
                for i := 0; i < *jobs; i++ {
                        go runner(tasks, done)
                }

                // Передаем в канал имена файлов для обработки. При
                // достижении максимального размера канала, главный поток
                // будет заблокирован.
                for i := 0; i < tests.Len(); i++ {
                        tasks <- tests.At(i)
                }

                // Посылаем всем потокам команду завершиться и ждем
                // подтверждения о нормальном выходе от каждого потока.
                for i := 0; i < *jobs; i++ {
                        tasks <- ""
                        <- done
                }
        }
}

// Поток-runner.
func runner(tasks chan string, done chan bool) {
        // Бесконечный цикл.
        for {
                // Ждем сообщения из канала. Обычно, поток заблокирован
                // на этом месте.
                name := <- tasks
                // Если имя пустое, нас просят завершиться.
                if len(name) == 0 {
                        break
                }
                // Компилируем файл.
                compile(name)
        }
        // Посылаем сообщение, что поток завершился.
        done <- true
}

func compile(name string) {
        // Вызываем компилятор.
        c, err := exec.Run(*compiler, []string{*compiler, name},
                           os.Environ(), "./data", exec.DevNull,
                           exec.PassThrough, exec.PassThrough)
        if err != nil {
                fmt.Printf("unable to compile %s (%s)\n", name, err.String())
                os.Exit(1)
        }
        c.Wait(0)
}

Makefile:

target = tar_extractor

all:
        6g $(target).go
        6l -o $(target) $(target).6

Я погонял это добро под Линуксом 64-бит на восьми процессорном блейде. Во время тестирования я был на машине один, так что результаты разных прогонов можно сравнивать. Файл “huge.tar” содержит ~15 тысяч исходников и имеет размер один гигабайт.

Так выглядит загрузка процессоров, когда машина ничего не делает (все процессоры почти на 100% в idle):

Cpu0  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu1  :  0.0%us,  0.0%sy,  0.0%ni, 99.7%id,  0.3%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu2  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu3  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu4  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu5  :  0.0%us,  0.3%sy,  0.0%ni, 99.3%id,  0.3%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu6  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu7  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Запускаем в последовательном режиме (-jobs 0):

make && time -p ./tar_extractor -jobs 0 huge.tar

Время работы:

real 213.81
user 187.32
sys 61.33

Практически все процессоры на 70-80% ничего не делают (все снимки я делал во время стадии компиляции):

Cpu0  : 11.9%us,  4.3%sy,  0.0%ni, 82.5%id,  1.3%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu1  :  9.6%us,  2.7%sy,  0.0%ni, 87.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu2  :  4.3%us,  1.3%sy,  0.0%ni, 92.7%id,  1.7%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu3  : 16.0%us,  6.0%sy,  0.0%ni, 78.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu4  : 12.6%us,  4.3%sy,  0.0%ni, 82.7%id,  0.3%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu5  : 11.6%us,  3.3%sy,  0.0%ni, 85.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu6  :  4.7%us,  1.3%sy,  0.0%ni, 94.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu7  : 16.6%us,  6.3%sy,  0.0%ni, 77.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Суммарная загрузка процессоров - 2.7%:

 PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND

15054 tester 18 0 41420 4980 1068 S 2.7 0.1 0:02.96 tar_extractor

Теперь запускаем с пулом потоков, но только с одним каналом (-jobs 1):

Время:

real 217.87
user 191.42
sys 62.53

Процессоры:

Cpu0  :  5.7%us,  1.7%sy,  0.0%ni, 92.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu1  : 13.3%us,  5.3%sy,  0.0%ni, 81.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu2  :  7.0%us,  2.7%sy,  0.0%ni, 89.3%id,  0.7%wa,  0.0%hi,  0.3%si,  0.0%st
Cpu3  : 15.3%us,  5.7%sy,  0.0%ni, 77.7%id,  1.3%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu4  :  6.0%us,  2.0%sy,  0.0%ni, 92.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu5  : 14.3%us,  7.3%sy,  0.0%ni, 78.4%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu6  :  7.0%us,  2.3%sy,  0.0%ni, 90.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu7  : 15.3%us,  6.6%sy,  0.0%ni, 78.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Понятно, что картина такая же, так как реально мы также гоняем один поток.

А теперь включаем пул потоков (-jobs 32):

make && time -p ./tar_extractor -jobs 32 huge.tar

Время работы упало почти в семь раз:

real 38.38
user 195.55
sys 69.92

Общая загрузка процессоров (во время стадии компиляции) возросла до 23%:

  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
17488 tester    16   0 45900 9732 1076 S 23.6  0.1   0:06.40 tar_extractor

Видно, что все процессоры реально заняты:

Cpu0  : 56.3%us, 26.3%sy,  0.0%ni, 17.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu1  : 55.5%us, 27.9%sy,  0.0%ni, 15.6%id,  1.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu2  : 56.1%us, 25.9%sy,  0.0%ni, 15.0%id,  0.7%wa,  0.3%hi,  2.0%si,  0.0%st
Cpu3  : 58.1%us, 26.2%sy,  0.0%ni, 15.6%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu4  : 57.2%us, 25.8%sy,  0.0%ni, 17.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu5  : 56.8%us, 26.2%sy,  0.0%ni, 16.9%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu6  : 59.0%us, 26.3%sy,  0.0%ni, 13.0%id,  1.7%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu7  : 56.5%us, 27.2%sy,  0.0%ni, 16.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Данное тестирование предназначено исключительно для понимания, как простейший параллельный код ускоряет все в разы. И также для демонстрации, как просто и относительно безопасно можно программировать параллельные вычисления в Go.

Посты по теме Go:


Оригинальный пост | Disclaimer

Комментарии