프로그래밍 언어

chan chan 활용법 - 2. 순서가 보장되는 워커 풀 패턴으로 이용하기

해당 시리즈는 Golang Korea 페이스북 그룹 최흥배님의 번역 글에서 영감을 얻었으며 Go언어와 Go언어를 이용한 동시성 프로그래밍에 관해 어느정도 알고 있는 독자를 대상으로 한다.

이전 글에서는 채널을 변수로 취급하는 채널의 채널 활용법에 대해 알아봤다. 채널의 채널을 보게 된다면(채널의 채널 자체가 흔하지는 않겠지만) 해당 형태로 가장 많이 볼 것이다. 다음 두 글에 걸쳐 설명할 활용법들은 흔히 보이지 않을 뿐더러 해당 방법들을 몰라도 Go언어를 사용하는 데에 있어서 문제는 없을 것이다. 하지만 특정 상황에서는 요긴하게 쓰이리라 장담할 수 있다.

이번 글에서는 이 중 특정 상황에서 성능을 크게 끌어올릴만한 방법을 소개하고자 한다.

워커 풀 패턴

이에 앞서 생소할 수 있는 워커 풀 패턴에 관해 이야기해 보고자 한다.

워커 풀 패턴은 동시성 프로그래밍 패턴 중 하나이다. 일반적으로 워커 풀은 할당된 작업을 처리하려는 스레드의 집합이다. 하지만 여기서의 워커 풀은 Go언어로 구현되기 때문에 할당된 작업을 처리하려는 고루틴의 집합이라는 말이 더 적합하다. 워커 풀의 고루틴의 갯수는 사용자가 지정할 수 있고, 이렇게 생성된 고루틴들은 작업에서 나눠진 모든 부작업이 끝날 때 까지 종료되지 않고 반복적으로 처리한다. 다만, 고루틴의 특성상 일반적인 경우 순서는 보장하지 않는다.

사실 모든 부작업의 수만큼 워커를 생성하여 처리하는 것이 가장 빠르고 문법적으로 쉽다. 하지만 워커를 무제한으로 생성하면 과부화가 걸릴 수 있기 때문에 워커 풀 패턴을 통해 할당할 워커의 수를 조정해야 과부화를 방지할 수 있다.

복수 데이터 패치 서비스

여기서 사용할 예시는 여러 줄의 데이터를 외부에서 불러오는 서비스이다. 사용자가 패쳐를 실행하면 패쳐는 1000줄의 데이터를 받아들인다. 패쳐가 줄을 읽을 때 5ms가 걸리고, 읽어들인 줄을 가공할 때는 최대 99ms의 시간이 걸린다.

fetcher

fetcher는 가장 기본적인 패쳐로, 이후에 만들 패처들의 원형이다. fetcher는 단일 채널을 통해 일련의 데이터를 받고 가공한다. 생성할 때 io.Writer를 받는데, 이는 로그를 출력할 때 이용한다.

//workerpool/fetcher/simple.go
package fetcher

import (
	"fmt"
	"io"
	"math/rand"
	"time"
)

type fetcher struct {
	io.Writer
}

func Simple(writer io.Writer) *fetcher {
	return &fetcher{Writer: writer}
}

func (f *fetcher) Run() {

	readCh := make(chan string)
	fetchCh := make(chan string)

	go f.read(readCh)
	go f.fetch(readCh, fetchCh)
	f.print(fetchCh)

}

func (f *fetcher) read(outCh chan<- string){
	defer close(outCh)
	for i := 0; i < 1000; i ++ {
		time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
		outCh <- fmt.Sprintf("line: %d", i+1)
	}
}

func (f *fetcher) fetch(inCh <-chan string, outCh chan<- string) {
	defer close(outCh)
	for i := range inCh {
		f.fetchLine(i, outCh)
	}
}

func (f *fetcher) print(inCh <-chan string) {
	for line := range inCh {
		_, _ = f.Write([]byte(line))
	}
}

func (f *fetcher) fetchLine(line string, outCh chan<- string) {
	time.Sleep(time.Duration(rand.Intn(100))*time.Millisecond)
	outCh <- fmt.Sprintf("%s ... fetched!\n", line)
}

전반적으로 파이프라인 패턴을 이용해 설계되었다. 파이프라인 패턴을 모른다면 해당 글을 먼저 읽는 것을 추천한다.

여기서 읽고 처리할 때 발생하는 딜레이는 간단하게 time.Sleep으로 구현했다.

ParallelFetcher

ParallelFetcher는 fetcher를 합성한 객체로, 일반적인 워커 풀 패턴을 따르는 패쳐이다.

//workerpool/fetcher/parallel.go
package fetcher

import (
	"io"
	"sync"
)

type ParallelFetcher struct {
	*fetcher
	workers int
}

func Parallel(writer io.Writer, amount int) *ParallelFetcher {
	return &ParallelFetcher{fetcher: Simple(writer), workers: amount}
}

func (p *ParallelFetcher) Run() {

	readCh := make(chan string)
	printCh := make(chan string)

	go p.read(readCh)
	go p.fetch(readCh, printCh)
	p.print(printCh)

}

func (p *ParallelFetcher) fetch(inCh <-chan string, outCh chan<- string) {

	defer close(outCh)

	var wg sync.WaitGroup

	for i := 0; i < p.workers; i ++ {
		wg.Add(1)
		go p.work(&wg, inCh, outCh)
	}

	wg.Wait()

}

func (p *ParallelFetcher) work(wg *sync.WaitGroup, inCh <- chan string, outCh chan<- string) {
	for line := range inCh {
		p.fetchLine(line, outCh)
	}
	wg.Done()
}

기존 fetcher에서 오버라이드한 fetch 부분을 보면 객체 변수인 worker만큼 work 함수를 호출한다. 즉, work 함수는 워커라고 볼 수 있다. 이렇게 호출된 워커들은 readCh가 닫힐 때까지 각자 데이터가 들어오는 대로 처리한다.

sync.WaitGroup 역시 중요한데, fetch에서는 워커의 수 만큼 Add 함수를 호출하고 기다린다. 각 워커에서는 readCh가 닫히고 워커가 마지막 일을 마칠 때마다 Done 함수를 호출해 카운트를 줄인다. 즉, 모든 워커가 작업을 끝내고 나서야 fetch함수가 종료된다. 이렇게 함으로써 모든 worker가 작업을 마치기 전에 fetch 함수가 끝나는 불상사를 막을 수 있다.

MultiFetcher

MultiFetcher역시 fetcher를 합성하여 만든 객체이다.

//workerpool/fetcher/multi.go
package fetcher

import (
	"io"
)

type MultiFetcher struct {
	*fetcher
	workers int
}

func Multi(writer io.Writer, amount int) *MultiFetcher {
	return &MultiFetcher{fetcher: Simple(writer), workers: amount}
}

func (m *MultiFetcher) Run() {

	readCh := make(chan string)
	fetchCh := make(chan chan string, m.workers)
	distributeCh := make(chan string)


	go m.read(readCh)
	go m.fetch(readCh, fetchCh)
	go m.distribute(fetchCh, distributeCh)
	m.print(distributeCh)

}

func (m *MultiFetcher) fetch(inCh <-chan string, outChCh chan<- chan string) {

	defer close(outChCh)

	for line := range inCh {

		outCh := make(chan string)

		go m.fetchLine(line, outCh)
		outChCh <- outCh

	}

}

func (m *MultiFetcher) distribute(inCh <-chan chan string, outCh chan<- string) {
	defer close(outCh)
	for ch := range inCh {
		outCh <- <- ch
	}
}

func (m *MultiFetcher) fetchLine(line string, outCh chan<- string) {
	defer close(outCh)
	m.fetcher.fetchLine(line, outCh)
}

채널의 채널을 쓰는 것에서 알 수 있듯이 MultiFetcher의 fetch 함수와 추가된 distribute 함수가 이번 글의 핵심이다.

Run 함수에서 채널의 채널 하나가 추가로 선언된 것을 알 수 있다. 이 때 해당 채널은 버퍼 채널로 선언되어 객체가 생성될 때 받은 값만큼의 용량을 지니게 된다. 이 채널의 채널이 워커 풀이며, 채널의 버퍼 크기가 워커의 수(후술하겠지만 정확히는 워커의 수 + 1)이다.

우선 MultiFetcher에서 fetch 함수의 두번째 매개변수가 chan chan형이라는 것에 주목해보자. fetch 함수가 각 줄을 inCh로 받는 것은 기존과 동일하다. MultiFetcher의 fetch 함수는 여기에 추가로 채널(이하 가공 채널)을 생성한다. 고루틴으로 생성된 fetchLine 함수는 inCh에서 받은 값을 가공 채널로 보낸다. 이와 동시에 버퍼 채널로 생성된 outChCh에 공간이 남으면 가공 채널을 그 쪽으로 보내고 반복문의 첫 부분으로 이동한다. 이를 inCh가 닫힐 때까지 반복한다.

fetchLine는 기존의 fetch가 오버로드된 형태인데 이는 fetchLine의 역할이 기존의 객체와 다르기 때문이다. 기존의 outCh는 printCh로, 모든 데이터를 outCh로 보내 출력한 후에 닫는다. 즉, readCh가 닫히고 난 후 닫는다. 반면 여기서의 outCh는 가공 채널로, fetchLine이 가공을 마치면 채널을 닫아 가공이 끝났음을 알려준다. 즉, 한 줄을 읽고난 후 닫는다.

distribute는 inCh에서 가공 채널을 꺼내 해당 채널의 값을 꺼낸 후 outCh로 그 값을 내보내는 역할을 한다. 반복문에 있는 outCh <- <-ch라는 문법이 조금 이상해 보일 수 있는데, ch(가공 채널)의 데이터를 꺼낸 후 outCh에 집어넣는다는 뜻으로 outCh <- (<-ch)로 보는 것이 이해하기 좀 더 쉽다.

결과

해당 프로그램은 위에서 만든 패쳐의 속도를 측정한다. 기본적으로 각 패쳐의 총 소요시간이 출력된다. 사용자는 여기에 패쳐의 로그 출력 여부를 결정할 수 있다.

//workerpool/main.go
package main

import (
	"fmt"
	"github.com/simp7/playground/double_chan/workerpool/fetcher"
	"io"
	"os"
	"time"
)

type Fetcher interface {
	Run()
}

func Bench(f Fetcher) {
	start := time.Now()
	f.Run()
	fmt.Println("Fetching done:", time.Now().Sub(start))
}

func BenchAll(f ...Fetcher) {
	for i, target := range f {
		fmt.Printf("Bench fetcher %d\n", i+1)
		Bench(target)
	}
}

func main() {

	logAll := false
	var logger io.Writer
	logger, _ = os.Open("/dev/null")

	if logAll {
		logger = os.Stdout
	}

	f1 := fetcher.Simple(logger)
	f2 := fetcher.Parallel(logger, 1)
	f3 := fetcher.Parallel(logger, 30)
	f4 := fetcher.Multi(logger, 1)
	f5 := fetcher.Multi(logger, 30)
	BenchAll(f1, f2, f3, f4, f5)

}

logAll 변수는 로그의 출력 여부를 결정한다. logAll이 false인 경우 프로그램의 출력은 "/dev/null"로 보내진다. 이 "/dev/null"이란 리눅스 계열의 시스템에서 출력을 버릴 때 사용하는 파일(장치)이다.

Bench 함수는 패쳐가 작동하기 시작한 때를 저장한다. 이후 패쳐의 실행이 끝날 때 까지 기다렸다가 마친 시간에 미리 저장해 놓은 시작 시간을 빼서 총 작동 시간을 구한다.

이제 실행해보자.

Bench fetcher 1
Fetching done: 51.195381423s
Bench fetcher 2
Fetching done: 52.405077857s
Bench fetcher 3
Fetching done: 2.36847305s
Bench fetcher 4
Fetching done: 22.623192478s
Bench fetcher 5
Fetching done: 3.01325228s

워커를 1개 생성한 경우를 보면 뭔가 이상하다는 것을 느낄 수 있다. ParallelFetcher는 fetcher보다 느린데, 워커 생성과 채널을 한번 더 거치는 것을 감안하면 납득이 된다. 문제는 MultiFetcher인데, fetcher보다 오히려 2배 이상 빠르다는 것이다. 이는 MultiFetcher의 작동 방식 때문인데, 워커가 채널의 채널에 들어가 데이터를 마저 처리하는 동안 다른 워커도 채널의 채널에 들어가있지만 않을 뿐 데이터를 처리하고 있기 때문이다. 결과적으로 총 워커의 수는 버퍼의 크기에 한개를 더한 형태가 되는 것이다.

워커가 충분히 많은 경우 ParallelFetcher가 MultiFetcher보다 빠른 것을 확인할 수 있다. 이런 현상의 이유로는 두 가지를 들 수 있다.

우선 ParallelFetcher는 시간이 많이 드는 작업이 적게 드는 작업을 막지 않는다. 즉, 한 워커가 비교적 오래 걸리는 작업을 처리해도 그 동안 다른 워커가 여러 개의 짧은 작업들을 처리하는 것이다. 반면 MultiFetcher는 버퍼가 꽉 차 있을 때 첫 워커가 작업을 아직 처리 중이라면 다른 워커는 작업을 마쳐도 대기할 수 밖에 없게 된다.

이전 이유보다는 사소하지만 워커의 생명주기 역시 영향을 끼친다. ParallelFetcher는 일단 워커를 생성하면 작업이 모두 끝날 때까지 해당 워커를 재사용한다. 반면 MultiFetcher는 새로운 작업을 시작하면 워커를 생성하고 작업이 끝날 때 소멸시킨다. 즉 MultiFetcher의 워커는 각 작업과 생명주기가 같다. 보통 워커보다 작업의 수가 훨씬 많기 때문에 MultiFetcher는 ParallelFetcher보다 워커를 더 많이 생성하고 소멸시킨다.

이번엔 logAll을 true로 바꾸어보자. 이제 각 패쳐는 로그를 보여준다. 실행해보면 여러 워커가 있는 ParallelFetcher를 실행할 때 순서가 뒤죽박죽인 것을 알 수 있다. 이외에는 별 차이가 없기 때문에 해당 경우의 출력은 생략한다.

결론

워커 풀의 두 가지 구현법에 대해 알아보았다. 해당 기법의 유일한 단점은 채널을 이용한 데이터의 이동을 강제한다는 점이다. 다만, 워커 풀을 쓸 상황이면 동시성 프로그래밍을 사용했다는 것이고, Go언어에서 동시성 프로그래밍에는 채널을 이용하는 것이 일반적이기 때문에 큰 단점은 아니다. 해당 기법에 파이프라인 패턴을 같이 사용하는 것을 적극적으로 추천한다. 코드가 더 깔끔해질 것이다.

chan chan을 이용한 워커 풀은 필자가 매우 좋아하는 기법이다. Go언어만의 문법과 특징을 이용해 성능을 비약적으로 끌어내는 기법이기 때문이다. 게다가 원리만 잘 이해한다면 구현이 그렇게 어렵지도 않기 때문에 이론의 영역에만 머물지 않는다. Go언어로 프로젝트를 진행하다 동시성 프로그래밍으로 속도를 향상시키고 싶은 구간이 있다면 이 글을 떠올려 시도해봤으면 좋겠다.

참고자료