2016년 2월 27일 토요일

[일기] Java 동시성(Concurrency) Threads and Executors

출처 : http://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/

 Java 8 병행 지침서의 첫 시작에 눈독을 들인 것을 환영한다. 이 지침서는 병행 프로그래밍을 자바 8에서 어떻게 수행하는지를 이해하기 쉬운 예제 코드를 이용하여 알려준다. Java Concurrency API를 커버하는 지침서의 여러 시리즈 중 첫번째가 이 글이다. 다음 약 15분동안 어떻게 병렬로 스레드와 작업 및 실행자 서비스를 통해 코드를 실행시키는지 알아보게 된다.

 Part 1. 스레드와 실행자(Threads and Executors)
 Part 2. 동기화와 락(Synchronization and Locks)
 Part 3. 원자적 변수 및 동시성맵(Atomic Variables and ConcurrentMap)

 Concurrency API는 Java 5에서 처음 도입되었으며 점진적으로 매 버전 릴리즈마다 보강되었다. 이 글에서 보여질 대부분의 코드 코딩 컨셉은 초기의 자바에서도 물론 적용 가능하지만 이 글에서 제공되는 예제 코드는 Java 8에 집중하여 많은 람다식과 새 문법들이 사용될 것이다. 만일 람다식에 익숙하지 않다면, Java 8 Tutorial을 먼저 보기를 권장한다.

 Threads and Runnable
 모든 현대 운영체제는 프로세스와 스레드 두 가지를 통해 병렬처리를 지원한다. 프로세스는 서로에게 독립적인 즉, 전형적으로 실행되는 프로그램의 인스턴스이다. 만약 특정 자바 프로그램을 실행시킨다면 운영체제는 새로운 하나의 프로세스를 생성할 것이고 이 프로세스는 다른 프로그램들과 함께 병렬적으로 실행될 것이다. 이 프로세스 내부에서 우리는 스레드를 이용해서 코드를 동시적으로 실행되도록 할 수 있으며 따라서 CPU의 사용가능한 코어들을 최대한 이용할 수 있다.

 자바는 JDK 1.0 부터 Threads를 문법적으로 지원한다. 새로운 스레드를 하나 실행시키기 전에 이 스레드가 어떤 코드를 실행시켜야 하는지를 반드시 먼저 알려줘야 한다. 그리고 이를 우리는 작업(task)이라 부른다. 간단히 Runnable - 흔히 기능성 인터페이스(functional interface)라 하는 것들 중 하나인 - 을 구현하기만 하면 된다. 이 인터페이스는 파라미터를 받지 않는 메서드 run() 하나만 가지고 있으며 아래와 같이 구현 가능하다.

public static void main(String[] args) {
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
};
task.run();
Thread thread = new Thread(task);
thread.start();
System.out.println("Done!");
}
 Runnable은 기능성 인터페이스다. 따라서 위 예제처럼 현재 스레드의 이름을 콘솔에 출력하도록 람다를 이용한 코딩이 가능하다. 코드에서 보이듯 메인 스레드에서 직접적으로 runnable을 실행시키고 있으며 이후에 새로 만들어진 스레드에서 runnable이 구동되고 있다.

 결과는 예상할 것도 없이,
Hello main
Done!
Hello Thread-0
  또는
Hello main
Hello Thread-0
Done!
 이 된다.

 어느 코드가 언제 실행될지는 예측이 불가능하므로 'Done!'을 출력하는 코드가 먼저 수행될지 나중에 수행될지는 아무도 모른다. 즉, 실행 순서는 비결정적이며 따라서 불확실성이 기본이 되는 상황에서 큰 프로그램을 만들 때 병렬적으로 프로그래밍을 하는 것은 무척이나 머리아픈 일이 된다.
스레드는 특정 기간동안 대기(sleep)할 수 있는데 이를 이용하면 긴시간이 걸리는 작업을 다음 코드와 같이 흉내내 볼 수 있다.
Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Foo " + name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Bar " + name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread thread = new Thread(runnable);
thread.start();
위 코드는 첫 번재 출력과 두 번째 출력 사이에 1초를 쉰다. 여기까지는 다 아는 내용이다. 그러나 이와 같이 Thread를 이용하게 되면 반복적인 코드에 지루할 뿐만 아니라 에러가 나기 쉽다(대부분). 이러한 이유로 인해 2004년 자바 5의 출현 때 Concurrency API가 재소개되었다. 이 java.util.concurrent 패키지에 속한 API들은 매우 유용한 클래스를 가지게 됐다. 이 이후로 매 자바 릴리즈마다 Concurrency API는 보강되었고 Java 8에서는 더 새로운 클래스와 메서드를 이용하여 병렬성을 다룰 수 있게 되었다.

 그런고로 그 API 중 하나인 executor services를 더 자세히 살펴보자.

 Executors

 Concurrent API에서 ExecutorService라는 개념이 도입됐다. 스레드를 직접적으로 다루는 가장 최상위 API로 앞으론 Thread 대신 이놈을 사용한다. Thread 다음 버전이라 생각하면 된다. Executors는 작업(task)들을 비동기적으로 실행시킬 수 있으며 기본적으로 스레드 풀을 운영한다. 따라서 우리는 스스로 스레드를 만들 필요가 전.혀. 없다. 스레드 풀의 스레드를은 자신의 임무를 다 마친 스레드들을 응당 재사용한다. 그렇기에 하나의 executor service를 이용하여 응용프로그램이 시작하고 끝날 때까지 우리가 원하는 만큼 병렬용 작업을 만들고 실행시킬 수 있다.

 아래는 한 예시이다.
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("Hello " + threadName);
});

// => Hello pool-1-thread-1 
 잘 보면 Executors 가 팩토리메서드를 제공하므로 ExecutorService 의 인스턴스를 얻는 것은 그저 원하는 메서드를 호출하기만 하면 이뤄진다. 위에서는 사이즈 1 크기의 스레드 풀을 갖는 executor를 생성하였다. submit으로 실행시킬 작업을 추가한다.

 결과는 위와 유사할 것이지만 차이가 존재한다. 위 프로그램은 절대 종료되지 않는다는 거다. Executors는 반드시 명시적으로 종료시켜야만 한다. 그러지 않으면 다른 작업을 기다리며 종료되지 않는다.

 ExecutorService 는 종료를 위한 목적으로 2개의 메서드를 제공하는데 하나는 shutdown()이고 다른 하나는 shutdownNow()이다. 전자는 현재 진행중인 작업이 남아있다면 모두 끝날때까지 기다렸다가 종료시키고 후자는 진행되는 작업이 있건말건 종료시킨다.

 아래에 권장되는 executors 종료 방법을 기술하였다.
try {
    System.out.println("attempt to shutdown executor");
    executor.shutdown();
    executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    System.err.println("tasks interrupted");
}
finally {
    if (!executor.isTerminated()) {
        System.err.println("cancel non-finished tasks");
    }
    executor.shutdownNow();
    System.out.println("shutdown finished");
}
  첫째로 executors는 현재 실행중인 작업들을 위해 특정 시간동안 기다려 준다(위에선 5초). 그러다가 인내심이 바닥나면 모든 것을 강제 종료시킨다.

 Callables and Futures

  친숙한 Runnable과 함께, executors는 다른 종류의 작업인 Callable을 지원한다. 이놈 또한 기능적 인터페이스이고 Runnable과 99% 동일하다. 1%의 차이는 Runnable과 달리 이놈은 리턴 타입을 갖는다.

 백문이 불여백견. 1초간 잠을 잔 후에 정수를 리턴하는 Callable을 람다를 이용해 구현한 코드다.
Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};
 Runnalbe처럼 Callable도 executor service에 들어가 실행될 수 있다. 그러면 의문이 생긴다. 리턴 값은 어떻게 받느냐는 거다. 아니 submit은 작업이 끝날때까지 기다려주는 메서드도 아닌데 추가하고 멋대로 실행되게 될 Callable 작업의 리턴 값을 executor service가 제공해 줄 방법이 없지 않은가. 라고 고민해 봐야 헛수고다. 이미 executor가 리턴값을 주기 위해 Future라는 새 친구를 준비해 뒀다. 이 Future로부터 특정 시간의 흐름 이후에 리턴값을 돌려받을 수 있다. 즉 저넘은 리턴 값 가져다 주는 심부름꾼이다.

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result); 
 주목해야 할 부분은 submit을 통해 작업을 등록시키고 난 뒤다. isDone() 메서드를 잘 보길 바란다. 심부름꾼에게 우린 Callable이 작업 끝내고 리턴 값 줬냐? 라고 물어보는 것이다. 아직 그렇지 않다면(false) 더 기다렸다가 물어보면 된다. 하지만 위 코드에서 보면 get()을 호출 하고 있다. 이건 심부름꾼에게 이렇게 말하는 것과 같다. 리턴 값 줄때까지 무한정 기다리겠다. 즉, 리턴받을 때까지 코드의 실행은 멈춘다.
 끝나면 결과인 123을 가져올 수 있게 된다.

future done? false
future done? true
result: 123 
 결과다. System.out.println 부분이 1초 이상 걸린다면 위 결과는 안 나올거다(그럴 가능성이 요샌 없다).

 Future는 executor가 구동하고 있는 작업과 굉장히 연관이 높기 때문에 아직 종료되지 않은 작업의 리턴값을 기다리는 Future가 있다면 executor를 shutdownNow와 같은 작업으로 종료시키려 할 때 예외가 발생시킨다는 사실을 명심해야 한다. 나 안 끝났는데 왜 종료시키느냐고 따지는 거다.

 눈매가 뭉툭한 사람이라면 아마 executor를 만든 방법이 약간 달랐다는 것을 몰랐을 거다. 이번에 새로 사용한 메서드는 newFixedThreadPool(1) 으로 기능상으론 newSingleThreadExecutor()와 같다. 우리가 2 이상의 인자를 넣지 않는다면 말이다.

 Timeouts
 future.get() 을 해버렸다고 치자. future에 대응한는 Callable 작업이 끝나서 값을 반환하지 않는 이상 컴퓨터에 전기를 끊어주지 않고는 실행을 지속시킬 수 없다. 여기에 카운터어택을 날릴 방법이 있다면 대기할 시간을 get 메서드의 인자로 넣어주는 것이다.

ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
});

future.get(1, TimeUnit.SECONDS);
 분명 Callable 작업은 2초를 기다리지만 get은 1초밖에 안 기다리겠다고 한다. 결과적으로 TimeoutException이 발생한다.
Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(클라스.java:1024)
  왜 이 예외가 발생하는지 원인을 추측했다면 스스로 감탄해도 좋다. 2초 > 1초다.

 Executors 는 invokeAll() 메서드를 통해 일괄적으로 다수의 callables을 등록할 수 있다 이 메서드는 callables의 컬렉션을 받아들이고 future들의 리스트를 반환한다.

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "task1",
        () -> "task2",
        () -> "task3");

executor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    })
    .forEach(System.out::println); 
 같은 컬렉션임을 알리기 위해 callables 에 진한 표시 한거는 이해 되는데 리턴 값이라는 리스트라는 future가 왜 저기에 표시되었는지 궁금한 사람은 Java 8의 새로운 stream API를 몰라서 그런거다. 나중에 다룬다. 단지, List 는 .stream() 메서드를 호출할 수 있다고만 알아둬라. 위 코드는 모든 future 리스트를 가져와서 리턴값을 가져와 forEach에서 출력한다.

 추가적으로 InvokeAny() 도 있다. 이 메서드는 invokeAll()과는 약간 동작방식이 다르다. 이 메서드는 동일하게 Callable의 컬렉션을 인자로 가져가지만, 오로지 가장 먼저 실행 완료되어 리턴값을 뱉어내는 Callable의 리턴값만을 반환한다.

 과연 그런가 확인해보기 위해 Callable들 몇개는 3초 걸리고 하나만 1초 걸리게 만들어 테스트 해보자. 각각 a1, a2, a3 을 리턴하도록 하면 1초가 먼저 끝나니 a2가 리턴되어야 마땅할 것이다.
Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}
즉, 첫번째 인자로 리턴값을 정하고, 두번째 인자로 기다릴 시간을 정하도록 만들었다.
ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
    callable("a1", 3),
    callable("a2", 1),
    callable("a3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

// => a2
 오오.. 좋다.

 그런데 이번엔 newWorkStealingPool() 이 사용되었다!!(이런 식으로 설명하는 것을 기회주의자식(틈새형) 설명이라 한다). 이넘은 ForkJoinPool 타입의 executor인데, 일반적인 executor와 동작방식이 약간 다르다. 고정된 크기의 풀 사이즈를 사용하는게 아니라 호스트 컴퓨터의 CPU의 가용한 코어의 갯수에 대응하는 크기로 풀 사이즈를 챙겨간다. 이미 Java 7부터 존재했고 이후 Part2, Part 3에서 다룰지도 모른다.

Scheduled Executors

한 executor에 한번 특정 작업을 등록시키고 실행시키는 방법은 알아냈다. 그러면 한번 등록한 작업을 여러번 주기적으로 실행시키고 싶을 땐 어떻게 하는지 알아봐야 한다고 치자. 이때 사용하는 것이 scheduled thread pool이다.

 ScheduledExecutorService 는 작업을 주기적으로나 특정 시간이 지나 만료되었을 때 한번 실행시킬 수 있는 능력을 겸비하고 있다.
 아래 코드는 한 작업을 3초의 초기 지연 시간이 지난 이후에 실행되도록 예정해 놓는 내용을 담고 있다.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 3, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(1337);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);
여기서 새로운 심부름꾼이 등장한다. ScheduledFuture다. getDelay() 를 제공하여 남은 지연을 알려준다. 지연이 지나면 작업은 병렬적으로 실행된다.

 주기적으로 실행되게 하려면 executor가 제공하는 2개의 메서드 중 하나를 사용하면 된다. scheduleAtFixedRate()와 scheduleWithFixedDelay()다. 전자는 고정된 시간 비율로 작업을 실행한다. 즉 2초에 1번 실행과 같은 식이다.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
 또한 전자의 메서드는 처음으로 실행되기 전까지 기다릴 시간을 인자로 받을 수 있다.

 하나 주의해야 하는 것이 있는데 전자의 메서드는 실행될 작업의 수행시간은 전혀 고려되지 않는다는 점이다. 만약 특정 작업을 2초에 한번씩 실행되도록 해놨는데 작업이 수행되는데 모두 3초가 걸린다고 하면 실행은 다 못했는데 새 작업이 수행되고 스레드 풀은 곧 꽉 차게 된다.

 따라서 이러한 경우 고려해야 할 메서드가 후자의 메서드이다. 이놈은 전자의 반대로 행동한다. 즉 수행 시간을 고려한다. 그것이 차이점이며 작업이 끝난 후부터 수행될 시간을 측정한다. 즉 2초에 한번 그러나 3초 걸리는 작업이면 5초에 한번씩 수행될 거다.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 01, TimeUnit.SECONDS);
이 경우 초기 실행 지연은 0이며 수행시간 2초고 매순간 고정 지연은 1초다. 따라서 0초 3초 6초 9초 ... 마다 작업이 실행될 것이다.

댓글 3개: