8장 CSP와 인터프리터 패턴

6장 코루틴에서 Go의 고루틴이 코루틴과 유사하다는 점을 이야기했다. 사실 Go의 고루틴과 채널의 이론적 뿌리는 CSP(Communcating Sequential Processes) 모델이다. CSP의 핵심은 순차적 실행 단위인 프로세스들이 채널이라고 하는 통신 매개를 통해서만 동기적으로 메시지를 주고 받는 것으로 동시성을 표현하는 것이다. CSP에서는 프로세스를 “이벤트의 나열로 표현되는 어떤 객체의 행동 패턴”이라고 다소 딱딱하게 정의하였지만, 우리는 쉽게 OS 상의 프로세스나 쓰레드 혹은 Go의 고루틴이라고 보아도 될 것이다. 프로세스는 채널을 통해 다른 프로세스에 메시지(혹은 데이터)를 전달한다.

우리는 이미 7장에서 거의 유사한 모델을 구현해 봤다. 7장에서 구현한 프로세스는 읽기쓰기라는 이벤트들로만 구성되며, 모든 프로세스는 읽기 채널과 쓰기 채널이 하나씩 있는 제한된 형태였다. 아래 그림은 개미 수열의 2번 줄을 출력하는 프로세스 네트워크를 보여준다. 파이프처럼 두 프로세스를 연결하고 c0과 c1이 채널 역할을 하고 있다.

ant(2)에 해당하는 프로세스 네트워크

이번 장에서는 채널을 추가하여 CSP의 동시성 모델을 온전히 지원하도록 확장하고, 이를 이용하여 개미 수열을 구현해 본다. 말하자면 Go의 동작을 따라해 보자는 것이다.

Go의 동시성 요소들

Go에서 지원하는 동시성 요소들은 채널과 고루틴이다. 채널 및 고루틴과 관련된 기본 함수, 연산자, 키워드는 다음과 같다.

c := make(chan int)
int 타입의 채널을 만든다. 버퍼 크기를 지정할 수 있으나, 버퍼가 없는 동기 채널이 기본이다.
c <- 1
c 채널로 1을 보낸다. 현재 고루틴은 다른 고루틴이 값을 받을 때까지 블록될 수 있다.
<-c
c 채널에서 값을 받는다. 다른 고루틴이 값을 보낼 때까지 블록될 수 있다.
close(c)
c 채널을 더이상 사용하지 않는 것으로 표시한다. 닫힌 채널에 값을 쓰는 것은 허용하지 않는다.
go f()
새로운 고루틴을 만들어서 f()를 실행시킨다.

위의 다섯 가지 연산을 지원한다면 개미 수열을 CSP 모델로 구현하는 데 문제가 없을 것 같다. CPS 구현에서 이미 읽기/쓰기를 구현해 보았고, 특히 Java 버전에서 추가한 Pipe 명령을 참고한다면 go f()를 구현할 수 있을 것이다. 남은 것은 채널이다.

CSP를 위한 미니 언어

Go의 동시성 기능을 참고하여 CSP를 위한 미니 언어를 아래와 같이 정하였다.

  • chan(type)
  • send(ch, value)
  • recv(ch)
  • close(ch)
  • go(command)
  • print(value)
  • done(value)
  • command.then(v -> command)

목록 중 처음 다섯 가지 명령은 이미 설명한 Go의 동시성 요소들과 일대일 매치된다. print(value)는 미니 언어에서 출력을 지원하기 위한 명령이고, done(value)는 프로세스의 계산 결과를 담아 종료하는 명령이다. 마지막 then()은 프로세스들을 순차적으로 결합시켜 주는 역할을 한다. 7장의 컨티뉴에이션 패싱 스타일을 대신하는 것으로 볼 수 있다.

recv(ch, v -> print(v))
// vs.
recv(ch).then(v -> print(v))

명령 클래스 구조 (Java)

CSP 미니 언어로 작성된 프로그램을 표현할 클래스 구조는 각 명령을 노드로 하여 트리를 만들 수 있게 설계한다. 7장에서 Java로 구현한 결과가 pipe(a, b)라는 명령 덕분에 트리 형태를 가질 수 있었던 것과 비슷하다. 여기서는 go() 명령이 pipe()와 비슷하게 가지 역할을 한다. 하지만 의미적으로는 조금 다르다. 7장의 pipe(a,b) 명령이 a와 b 코루틴을 스케줄러에 추가하면서 입출력을 연동시킨 반면, CSP의 go(a)는 a를 스케줄러에 추가할 뿐이다. 고루틴 혹은 프로세스의 입출력이나 실행 의존성은 채널에 의해 결정된다.

CSP 미니 언어의 각 명령에 해당하는 클래스는 명령 실행에 필요한 정보와 다음 명령에 대한 링크를 가진다. 특히 chan()recv() 명령은 실행 중 반환되는 값에 따라 다음 명령이 결정되는 느슨한 링크를 가진다. 명령들의 공통 부모 클래스를 Command라고 하자. 미니 언어를 위한 생성 메쏘드들을 제공하고 명령 클래스들은 감춘다.

CSP 명령 클래스 구조
class Command<A> {
  // instance method
  public <B> Command<B> then(Function<A, Command<B>> f) {
    return new Then(this, f); }

  // static methods for DSL
  public static <A> Command<Channel<A>> chan(Class<A> cls) {
    return new Chan<>(cls, ch -> done(ch); }
  public static <A> Command<Void> send(Channel<A> ch, A value) {
    return new Send<>(ch, done(null)); }
  public static <A> Command<Optional<A>> recv(Channel<A> ch) {
    return new Receive<>(ch, v -> done(v)); }
  public static <A> Command<Void> close(Channel<A> ch) {
    return new Close<>(ch, done(null)); }
  public static <A> Command<Void> go(Command<A> forked) {
    return new Go<>(forked, done(null)); }
  public static Command<Void> print(String s) {
    return new Print<>(s, done(null)); }
  public static <A> Command<A> done(A value) {
    return new Done<>(value); }

  // command sub-classes
  private static class Chan<C,A> extends Command<A> { .. }
  private static class Send<C,A> extends Command<A>{ .. }
  private static class Receive<C,A> extends Command<A> { .. }
  private static class Close<C,A> { .. }
  private static class Go<B,A> extends Command<A> { .. }
  private static class Print<A> extends Command<A> { .. }
  private static class Done<A> extends Command<A> { .. }
  private static class Then<B,A> extends Command<A> { .. }
}

send/recv는 7장의 read/write와 거의 같다. 차이 점은 채널 타입을 인자로 받아서 데이터 의존성을 명시적으로 드러낸 것이다. 채널은 chan()으로 생성하고 close()로 닫는다. 동시 실행되는 루틴은 go()로 전달하면 된다.

CPS 구현과 달리 then()으로 컨티뉴에이션을 나중에 연결시킬 수 있기 때문에 각 명령 노드를 생성할 때에는 항상 기본 값으로 done()을 사용할 수 있다.

핑퐁 예제

클래스 설계를 기초로 간단한 동시성 프로그램을 작성할 수 있다. Go 발표에 몇 번 등장했던 핑퐁 예제를 옮겨보려고 한다. Go 코드로 동작을 먼저 이해하자.

핑퐁 예제 (Go)
func main() {
  table := make(chan *Ball)
  go player("ping", table)
  go player("pong", table)

  table <- new(Ball) // game starts by throwing in a ball
  time.Sleep(1 * time.Second)
  <-table            // game ends by taking the ball
}

func player(name string, table chan *Ball) {
  for {
    ball := <-table
    ball.hits++
    fmt.Println(name, ball.hits)
    time.Sleep(100 * time.Millisecond)
    table <- ball
  }
}

이 프로그램은 메인 고루틴에서 “ping”과 “pong” 두 개의 고루틴을 파생시킨다. 세 고루틴은 table 채널을 공유한다. “ping”과 “pong” 플레이어 고루틴은 무한 루프를 돌면서 테이블에서 공(Ball)을 받아서 100ms 쉬었다가 다시 테이블로 던져보낸다. 처음에 두 플레이어 고루틴이 모두 공을 받기 위해 블록되고, 메인 고루틴이 테이블에 공을 던져 넣으면서 어느 한 플레이어가 공을 가져가면서 핑퐁이 시작된다. 어느 플레이어가 먼저 공을 가져갈 것인지는 정해지지 않았다. 1초 뒤에 메인 고루틴이 공을 뺏으면서 종료하면 프로그램이 종료한다.

핑퐁 예제에서 main()을 CSP 미니 언어로 옮겨보자. (여기 사용된 sleep() 명령은 지정 시간동안 고루틴의 실행을 멈추는 일을 한다.)

핑퐁 예제 (Java)
Command<Void> pingpong() {
  return chan(Ball.class)
      .then(table -> go(player("ping", table))
          .then(x -> go(player("pong", table)))
          .then(x -> send(table, new Ball()))
          .then(x -> sleep(1000))
          .then(x -> recv(table)))
      .then(x -> done(null));
}

pingpong() 메쏘드의 반환 타입은 Command<Void>이다. 이는 원래 Go 프로그램의 main()에 해당한다. 원래 Go 프로그램의 각 줄은 일대일로 변환되는데, 단위 명령들이 반환하는 Command 들은 then()으로 연결되어 전체 프로그램이 만들어진다. 만약 then() 대신 기존의 콜백 전달 방식을 사용했다면 다음과 같았을 것이다.

then() 없는 핑퐁 예제 (Java)
Command<Void> pingpong() {
  return chan(Ball.class, table ->
          go(player("ping", table), () ->
            go(player("pong", table), () ->
              send(table, new Ball(), () ->
                sleep(1000, () ->
                  recv(table, x ->
                    done(null)))))));
}

Then 노드를 무시한다면 메인 프로그램의 트리는 아래 그림과 같다.

핑퐁 프로그램의 노드 트리

Java #6 - CSP 개미 수열

6장에서 이미 Go로 구현했던 개미 수열을 CSP 미니 언어로 옮겨보자.

CSP 개미 수열 (Java) - 1
  private static Command<Channel<Integer>> ant(int n) {
    return init().then(ch -> loop0(0, n, ch));
  }

  private static Command<Channel<Integer>> init() {
    return chan(int.class)
        .then(ch -> go(send(ch, 1)
            .then(x -> close(ch)))
            .then(x -> done(ch)));
  }

  private static Command<Channel<Integer>> loop0(int i, int n,
                                                 Channel<Integer> ch) {
    if (i < n) {
      return next(ch).then(c -> loop0(i + 1, n, c));
    } else {
      return done(ch);
    }
  }

init() 함수는 제너레이터 패턴으로 되어 있고, 자신의 고루틴에서 1을 출력한 뒤 채널을 닫는다. init() 함수의 반환값이 Command<Channel<Integer>>이다. then() 메쏘드로 체이닝이 가능하며, 여기서 채널을 인자로 받아서 연산이 다음 명령을 연결시킨다. 반복문 대신 loop0() 재귀 도움 함수를 이용하여 next()를 n번 호출한다.

CSP 개미 수열 (Java) - 2
  private static Command<Channel<Integer>> next(Channel<Integer> i) {
    return chan(int.class)
        .then(o -> go(recv(i)
            .then(c -> loop1(c.get(), 1, i, o))
            .then(x -> close(o)))
            .then(x -> done(o)));
  }

  private static Command<Void> loop1(int prev, int count,
                                     Channel<Integer> i, Channel<Integer> o) {
    return recv(i)
        .then(value -> {
          if (value.isPresent()) {
            int v = value.get();
            if (v == prev)
              return loop1(prev, count + 1, i, o);
            else
              return send(o, count)
                  .then(x -> send(o, prev))
                  .then(x -> loop1(v, 1, i, o));
          } else {
            return send(o, count)
                .then(x -> send(o, prev));
          }
        });
  }

next() 함수 역시 제너레이터 패턴이며, loop1() 재귀 도움 함수를 이용하여 채널의 값을 하나 씩 읽으면서 다음 줄을 계산한다.

도움 함수들

CPS 구현 때와 마찬가지로 기본 반복문을 사용할 수 없다는 점이 아쉽다. 이를 위해 CSP 미니 언어에 반복문을 추가할 수도 있겠지만, 반복문의 패턴을 몇가지 도움 함수로 일반화하여 재사용하는 것이 더 편리하다. 예를 들어 ant()에서 사용된 loop0()init()의 결과인 채널을 next()로 n번 반복 호출한다. 어떤 값 a에 함수 f를 n번 거듭 호출하는 것을 applyN(n, f, a)로 일반화할 수 있다.

반복문 도움 함수 - applyN (Java)
static Command<Channel<Integer>> ant(int n) {
  return init().then(ch -> applyN(n, LookAndSay::next, ch));
}

static <A> Command<A> applyN(int n, Function<A, Command<A>> f, A initial) {
  if (n == 0)
    return done(initial);
  else
    return f.apply(initial).then(next -> applyN(n - 1, f, next));
}

CSP 미니 언어 인터프리터

CSP 인터프리터는 CSP 명령어 트리를 순회하며 명령을 실행시키면 된다. 실행 중인 프로세스를 리스트로 관리하면서 라운드 로빈 방식으로 한 번에 하나의 프로세스를 한 단계씩 실행하면 동시성을 구현할 수 있다.

다음 코드는 채널 생성 명령을 처리한다. 현재 프로세스의 실행할 명령이 Chan인 경우, Channel을 생성하고, 현재 프로세스의 다음 실행할 명령을 계산한 다음 실행 대기열의 끝에 추가한다. 다음 번에 이 프로세스가 실행될 차례가 되면 그 다음 명령이 이어서 실행될 것이다.

CSP 인터프리터의 기본 뼈대 (Java)
public class Interpreter {
  public static void run(Command mainCommand) {
    Deque<Process> processes = new LinkedList<>();
    processes.addLast(new Process(true, mainCommand));

    while (!processes.isEmpty()) {
      Process p = processes.removeFirst();
      Command c = p.command;

      if (c instanceof Command.Chan) {
        Command.Chan chan = (Command.Chan) c;
        p.command = (Command) chan.next.apply(new Channel(chan.cls));
        processes.addLast(p);
      } else {
        ...
      }
    }
  }
}

CSP 미니 언어는 Java 제너릭스를 이용하여 타입 안정성을 확보하였지만, 인터프리터는 런타임 타입 정보를 이용할 수 없기 때문에 rawtype으로 구현하였다.

send()/recv() 처리하기

send(ch, value) 명령은 채널에서 값을 받기 위해 대기 중인 프로세스가 있는 경우 대기 중인 프로세스를 깨우면서 값을 전달하고, 그렇지 않은 경우에는 현재 프로세스를 보내기 대기열에 추가한다. recv(ch) 명령은 반대로 동작하여 채널에 보내기 대기 중인 프로세스가 있으면 대기 중인 프로세스를 깨우면서 값을 전달 받고, 그렇지 않으면 현재 프로세스를 받기 대기열에 추가한다.

이를 위해 Channel은 보내기/받기 각각의 프로세스 리스트를 가진다. 이 리스트에 추가되는 프로세스는 블록되어 실행이 멈추고, 값의 보내기/받기가 동기적으로 이뤄지면서 다시 실행 대기열에 들어간다.

send() 명령 처리 (Java)
if (c instanceof Command.Send) {
  Command.Send send = (Command.Send) c;
  Channel ch = send.ch;
  if (ch.isClosed) {
    throw new RuntimeException("Send to a closed channel");
  } else if (ch.receivers.isEmpty()) {
    ch.senders.add(p);
  } else {
    Process receiver = (Process) removeRandom(ch.receivers);
    Command.Recv recv = (Command.Recv) receiver.command;
    receiver.command = (Command) recv.next.apply(Optional.of(send.value));
    processes.addLast(receiver);

    p.command = send.next;
    processes.addLast(p);
  }
}

recv()send()와 거의 비슷하게 구현할 수 있다. 차이점은 채널이 닫힌 경우에 대한 처리이다. Go의 스펙을 참고하면, 닫힌 채널에 값을 보내는 경우 패닉(panic)을 발생시키고, 닫힌 채널에서 값을 받으려 하면 제로 값을 받는 것으로 되어 있다. 여기서는 제로 값을 대신하여 Optional.empty()를 사용하기로 했다.

마지막 줄의 processes.addLast()addFirst()로 바꾼다면 어떻게 될까? 라운드 로빈 방식이 아니게 되지만, 동작에는 차이가 없다. 새로 깨어난 프로세스를 실행 대기열의 맨 앞에 추가하더라도 동작에 문제가 없다.

go() 처리하기

동시성 지원 명령 중에서 go()는 새로운 프로세스를 생성하여 실행 프로세스 대기열에 추가한다. 물론 현재 진행 중인 프로세스도 계속 진행되어야 하므로 실행 대기열에 추가해야 한다.

go() 명령 처리 (Java)
if (c instanceof Command.Go) {
  Command.Go go = (Command.Go) c;
  processes.addLast(new Process(false, go.forked));
  p.command = go.next;
  processes.addLast(p);
}

done() 처리하기

이쯤에서 Process 클래스를 살펴봐야겠다. 생성자에서 Command뿐만 아니라 불린 값을 하나 받는다. 이 불린 값은 이 프로세스가 메인 프로세스인지 여부를 나타낸다. 메인 프로세스를 표시해 두는 이유는 다른 프로세스들과 달리 메인 프로세스가 종료되었을 때 전체 실행을 중단하기 위해서다.

Process 클래스 (Java)
public class Interpreter {
  ...
  static class Process {
    final boolean isMain;
    Command command; // current process' next command ptr

    Process(boolean isMain, Command command) {
      this.isMain = isMain;
      this.command = command;
    }
  }
}

이제 done()을 처리하는 것은 간단하다. 메인 프로세스가 종료되면 전체 실행 루프를 끝낸다. 다른 프로세스가 종료되면 실행 대기열에 더 이상 추가하지 않으면 된다.

done() 명령 처리 (Java)
if (c instanceof Command.Done) {
  if (p.isMain) {
    break;
  }
}

then() 처리하기

CSP 미니 언어에서 프로세스는 명령의 순차적 리스트이다. 여기서 then()의 역할은 두 개의 명령을 이어 붙여서 (심지어 done()에 대해서도) 일반적인 프로그램의 함수 호출과 같은 구조화를 가능하게 한다.

Then 명령 클래스 (Java)
class Command<A> {
  <B> Command<B> then(Function<A, Command<B>> f) {
    return new Then<>(this, f);
  }
}

class Then<A, B> extends Command<B> {
  final Command<A> sub;
  final Function<A, Command<B>> k;

  Then(Command<A> sub, Function<A, Command<B>> k) {
    this.sub = sub;
    this.k = k;
  }

  @Override
  public <C> Command<C> then(Function<B, Command<C>> f) {
    return new Then<>(sub, a -> k.apply(a).then(f));
  }
}

Then 클래스는 then() 메쏘드를 오버라이드하는데, 그 이유는 불필요한 중첩을 사전에 막기 위한 것이다.

앞서 미니 언어로 구현한 개미 수열의 ant() 함수는 다른 함수 init()을 호출하는데, 이 함수는 done()으로 종료하는 작은 명령 트리를 반환한다. done()은 원래 프로세스가 종료되고 실행할 다음 명령이 없는 것을 의미하지만, ant()에서는 마치 서브 루틴을 호출한 것처럼 then()으로 다음 동작을 이어간다.

ant()에서 init()loop0()을 호출하는 모양

즉, then() 명령으로 생성되는 노드는 실제 어떤 행위를 나타낸다기보다는 리스트 두 개를 이어주는 역할을 할 뿐이므로 인터프리터는 다음 실행할 명령을 얻어올 때 then() 노드를 모두 건너뛰어야 한다. (만약 Then.subrun()으로 실행하여 결과를 받아와서 Then.k를 실행시킨다면 콜스택을 사용하는 문제가 다시 생겨난다.)

Interpreterthen()을 스킵 (Java)
public static void run(Command mainCommand) {
  Deque<Process> processes = new LinkedList<>();
  processes.addLast(new Process(true, mainCommand));

  while (!processes.isEmpty()) {
    Process p = processes.removeFirst();
    Command c = next(p.command);
    ...
  }
}

private static Command next(Command command) {
  Command c = command;
  while (c instanceof Command.Then) {    // skip `Then`s
    Command.Then then = (Command.Then) c;
    if (then.sub instanceof Command.Done) {
      Command.Done done = (Command.Done) then.sub;
      c = (Command) then.k.apply(done.value);
    }
    ...
  }
  return c;
}

여기서 특이한 케이스는 then()에 의해 선행 실행되어야 하는 명령이 다시 then()인 경우다. 이건 마치 서브 루틴을 중첩 호출한 것과 같아서 다음 명령 찾기 루프를 계속 진행해야 한다. 이때, 콜스택에 해당하는 컨티뉴에이션 k가 누적되지 않도록 중첩된 컨티뉴에이션(sub.k) 다음으로 이어주어야 한다.

중첩 then() 건너뛰기 (Java)
while (c instanceof Command.Then) {   // skip `Then`s
  Command.Then then = (Command.Then) c;
  if (... Done) {
    ...
  } else if (then.sub instanceof Command.Then) { // nested Then
    Command.Then sub = (Command.Then) then.sub;
    c = sub.sub.then(a -> ((Command) sub.k.apply(a)).then(then.k));
  }

조금 복잡한 7번 줄을 그림으로 옮겨보면 다음과 같다. A, B, C의 실행 순서를 지키면서 then() 중첩을 컨티뉴에이션 쪽으로 옮긴다. 다음 루프를 통해 A를 탐색한다.

중첩된 then() 처리하기

CSP 인터프리터 리뷰

CSP는 동시에 진행되는 여러 프로세스들의 동작을 기술할 수 있는 언어와 동작을 검증할 수 있는 수학적 토대를 제공한다. 쓰레드와 락 중심의 동시성 프로그래밍 대신 동기화 수단으로서 채널을 이용하는 방법은 동시성 프로그램을 새롭게 설계할 수 있게 도와준다.

이 장에서는 Go를 본보기로 하여 CSP 미니 언어를 디자인하고, 인터프리터를 만들어 보았다. 더이상 개미 수열의 1,000,000번 줄을 출력하는 것은 문제가 되지 않는다. 멀티 쓰레드를 사용하지 않지만 코루틴의 메커니즘으로 동시성 프로그래밍까지 가능하다는 것을 확인했다.

하지만 이 장에서 구현한 CSP 미니 언어는 아직 제약이 많다. Go와 비교해 보면 아래의 몇 가지 기본 기능이 빠져있다.

  1. 버퍼 채널 - 버퍼가 지정된 채널은 버퍼 크기만큼 보내기만 먼저 발생할 수 있다.
  2. sleep() - 프로세스를 일정 시간만큼 잠 재운다.
  3. select() - 두 개 이상의 채널 연산 중 먼저 발생하는 이벤트로 흐름을 진행한다.

코루틴이라는 아이디어에서 출발하여 여러가지 방식으로 개미 수열을 풀어보았다. 하지만 Go처럼 런타임 스케줄러가 없는 경우, 즉 우리가 코루틴을 흉내낸 경우는 한결같이 디스패쳐가 필요했다. 아이디어를 확인하고 연습하기에는 적절하지만 실용적이라고 하기는 어려울 것 같다.

디스패쳐는 명령 노드들의 느슨한 연결 리스트를 하나씩 따라가는 역할을 한다. 이 부분에서 우리는 느슨한 연결 리스트라는 새로운 도구를 얻었다. 이를 일반화하여 지연 리스트를 만들 수 있다. 9장에서 다룰 내용이다.