파이프라인 패턴
소프트웨어 공학

파이프라인 패턴

들어가기에 앞서...

해당 패턴은 동시성 프로그래밍에 기반한 디자인 패턴이기 때문에 동시성 프로그래밍을 지원하는 언어가 아닌 경우 이 글이 별로 도움이 되지 않을 것이다. 또한 해당 글은 Go언어만의 문법을 쓰기 때문에 동시성 프로그래밍을 지원한다 하더라도 다른 언어에서는 구현 방법이 다를 수 있다. 즉, 해당 글은 Go언어 사용자 중 동시성 프로그래밍에 대한 기본 개념(고루틴, 채널)에 대한 기본 지식이 있는 독자를 대상으로 한다.

출처 : https://www.pexels.com/ko-kr/photo/257770/

파이프라인 패턴은 디자인 패턴 중 실행 패턴이다. 디자인 패턴 목록에 나열한 글에 없다고 의아해할 수 있다. 맞다. 해당 패턴은 전통적인 디자인 패턴('GoF의 디자인 패턴'에 포함된 패턴)은 아니다. 이는 해당 패턴이 일반화되지 않았다거나 바람직하지 않다는 것을 뜻하는 것은 아니다. 단지 동시성 프로그래밍을 지원하는 환경에서만 활용되기 때문에 포함되지 않은 것이다. Go언어는 설계부터 동시성 프로그래밍 환경에 최적화되어 있기 때문에 파이프라인 패턴을 익히기 좋고 또 유익하다.

해당 패턴의 예시를 들기 위한 코드로는 이 글의 마지막 예제를 이용했다. 해당 글에서 다루고 있는 채널의 채널 역시 조만간 다룰 예정이다.

파이프라인 패턴

파이프라인 패턴은 데이터의 흐름이 마치 물이 파이프를 타고 나가는 것처럼 생겨서 파이프라인 패턴이라 불린다. 파이프라인 패턴에는 해당 패턴 전체를 실행하기 위한 함수(pipeline)와 그 함수에서 호출되는 함수/채널(pipe)로 이루어져 있다. 각 pipe 함수는 매개변수나 리턴값으로 채널을 가진다. pipeline 함수는 직접 데이터를 생성하거나 외부로부터 받아서 첫번째 pipe 함수에게 전달한다. 각 pipe 함수는 다음 pipe 함수로 전달하면서 작업을 처리한다. 즉, 연속적인 데이터의 흐름을 처리하는 일련의 함수들을 나열한 것이 파이프라인 패턴이다.

장점

  • 동시성 프로그래밍에서 주로 발생하는 문제들(교착 상태, 경쟁 상태)을 탐지 및 예방하기가 쉽다.
  • 파이프라인에 있는 작업들을 교체하거나 재사용하기가 쉽다. 즉, 코드의 재사용성이 높아진다. 

단점

  • 해당 패턴의 사용처가 한정적이다. 위에서 언급한대로 동시성 프로그래밍 환경 외의 곳에는 쓰기 어렵고 쓸 필요도 없다.
  • 동시성 프로그래밍에 익숙하지 않으면 알 수 없는 문제를 일으킬 수 있다. 특히 채널을 여닫는 부분이나 고루틴을 생성하는 부분에서 조심해야 한다. 다만 이는 패턴 보다는 동시성 프로그래밍 자체의 난점으로 보는 것이 옳다.

구현

Go언어의 동시성 프로그래밍 환경에서는 채널을 이용해 데이터를 주고받는다. 파이프라인 패턴에서는 pipe 함수에 데이터를 주고 받을 채널을 넣을 방법에 따라 구현 방법이 나뉜다.

함수에서 매개변수나 반환값으로 이용하는 채널은 보통 단방향 채널로 이용한다. 따라서 채널 방향을 지정하여 함수를 정의하는 것이 오류 방지에 좋다. 데이터 타입을 미리 정하는 것과 같은 원리이다. 또한 특별한 상황이 아니면 pipe 함수 내에서 채널을 닫는 것이 바람직하다. 이는 한 pipe 함수가 대체 혹은 삭제가 되더라도 다른 pipe 함수에 영향을 끼치지 않기 위함이다. 다만, 다른 곳에서 채널을 닫아야 하는 경우도 가끔 있기 때문에 사용자의 역량에 달려 있는 문제이다.

매개변수-반환값

입력 채널은 매개변수로, 출력 채널은 반환값으로 받는 방법이다. 이 구현은 우리가 일반적으로 구현하는 함수의 형태, 즉 매개변수로 입력을 받고 출력을 반환하는 형태이기 때문에 매우 직관적이다.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func readSomething() <-chan string {

	outCh := make(chan string)

	go func() {
		defer close(outCh)
		for i := 0; i < 1000; i++ {
			time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
			outCh <- fmt.Sprintf("line: %d", i)
		}
	}()

	return outCh

}

func fetchSomething(inCh <-chan string, intensity int) <-chan chan string {

	outChCh := make(chan chan string, intensity)

	go func() {

		defer close(outChCh)

		for line := range inCh {
			outChCh <- getLine(line)
		}

	}()

	return outChCh

}

func getLine(line string) chan string {

	outCh := make(chan string)
	
	go func() {
		time.Sleep(time.Duration(rand.Intn(100))*time.Millisecond)
		outCh <- fmt.Sprintf("%s ... fetched!", line)
		close(outCh)
	}()

	return outCh

}

func distribute(inCh <-chan chan string) <-chan string {

	outCh := make(chan string)

	go func() {
		defer close(outCh)
		for ch := range inCh {
			outCh <- <- ch
		}
	}()

	return outCh

}

func printSomething(inCh <-chan string) {
	for line := range inCh {
		fmt.Println(line)
	}
}

func test(intensity int) {

	start := time.Now()

	reader := readSomething()
	fetcher := fetchSomething(reader, intensity)
	distributor := distribute(fetcher)
	printSomething(distributor)

	fmt.Println("done", time.Now().Sub(start))

}

func main() {
	test(20)
	test(40)
}

해당 구현의 장점으로 pipeline 함수 부분이 깔끔하다는 것에 있다. 대신 각 pipe 함수에서 채널을 관리해야 하기 때문에 그 과정에서 실수할 여지가 좀더 있다.

위에서 distribute 함수의 리턴 값으로는 방향을 지정하지 않았는데 만약 여기에 방향을 지정하면 다음과 같은 오류가 발생한다.

cannot use getLine(line) (type <-chan string) as type chan string in send

언어 자체의 버그인지 필자가 완전히 이해하지 않아서인지는 모르겠지만 <-chan string은 chan string으로 변환할 수 없다고 나온다. 여기에 관해서는 좀 더 공부한 후에 수정할 예정이다.

매개변수 - 매개변수

입력 채널과 출력채널 모두 매개변수로 받는 방법이다. 해당 방법은 매개변수로 일반 매개변수와 포인터 매개변수를 하나씩 받는 함수와 같은 형태이다.

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func readSomething(outCh chan<- string){

	defer close(outCh)

	for i := 0; i < 1000; i ++ {
		time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
		outCh <- fmt.Sprintf("line: %d", i)
	}

}

func fetchSomething(inCh <-chan string, outChCh chan<- chan string) {

	defer close(outChCh)

	for line := range inCh {

		outCh := make(chan string)

		go getLine(line, outCh)
		outChCh <- outCh

	}

}

func getLine(line string, outCh chan<- string) {
	defer close(outCh)
	time.Sleep(time.Duration(rand.Intn(100))*time.Millisecond)
	outCh <- fmt.Sprintf("%s ... fetched!", line)
}

func distribute(inCh <-chan chan string, outCh chan<- string) {
	defer close(outCh)
	for ch := range inCh {
		outCh <- <- ch
	}
}

func printSomething(inCh <-chan string) {
	for line := range inCh {
		fmt.Println(line)
	}
}

func test(intensity int) {

	start := time.Now()

	readCh := make(chan string)
	fetchCh := make(chan chan string, intensity)
	distributeCh := make(chan string)

	go readSomething(readCh)
	go fetchSomething(readCh, fetchCh)
	go distribute(fetchCh, distributeCh)
	printSomething(distributeCh)

	fmt.Println("done", time.Now().Sub(start))

}

func main() {
	test(20)
	test(40)
}

해당 방법은 위와 반대로 pipe함수가 깔끔한 대신 pipeline 함수의 복잡성이 증가했다. 시각적으로는 해당 방법이 좀 더 깔끔하다. 다만 pipeline 함수에 파이프를 변경하거나 제거할 때 따로 채널을 정의하고 앞뒤 파이프의 매개변수를 바꾸는 작업이 필요하다.

두 구현 방법의 코드 구조를 보면 재밌는 점을 발견할 수 있을 것이다. 첫번째 방법은 분산형 구조로 pipeline 함수는 그저 pipe들을 연결하는 역할만 담당하고 고루틴이나 채널의 관리는 각 pipe 함수에 일임한다. 반면 두번째 방법은 중앙 제어형 구조로 pipeline함수가 pipe들을 연결할 뿐만 아니라 각 pipe함수가 사용할 채널을 관리하고 고루틴 역시 직접 호출한다. 

위의 구현 방법들 중 어떤 것을 써야할 지 고민된다면 코드를 수정할 때를 생각해보자. 만약 pipe 함수를 추가하거나 삭제할 일이 많을 것 같으면 첫번째 구현 방법이 낫다. 하지만 pipe함수 내부를 변경할 일이 많다면 두번째 구현 방법이 낫다. 물론 자신에게 맞는 방법이 있다면 그 방법으로 하는 것이 제일 좋다.

마치며...

위의 두 구현 방법은 서로 섞어서 쓸 수는 있지만 되도록 한 구현만 쓰는 것을 추천한다. 두 구현의 장점을 모두 없애버리는 결과만 남게 된다.

사실 해당 글은 파이프라인 패턴의 아주 기초적인 부분이다. 팬인-팬아웃, 에러 처리 등 해당 패턴에서 발전한 여러가지 기술들이 있다. 필자는 아직 이에 관해서는 공부가 부족하다고 판단해 넣지 않았다. 해당 부분을 자세히 공부하고 싶다면 참고 자료의 smpl님의 글이 도움이 될 것이다.

참고 자료