2016년 2월 27일 토요일

[일기] Java 동시성(Concurrentcy) Atomic and ConcurrencyMap Part 1

출처 : http://winterbe.com/posts/2015/05/22/java8-concurrency-tutorial-atomic-concurrent-map-examples/

 마지막이다. 이제 자바의 Conncurecy API에서 중요한 두 부분을 다룬다. Atomic Variables과 Concurrent Map 이 그것이다. 두 개념 모두 자바 8에서 람다식과 함수형 프로그래밍의 도입과 함께 상당히 개선되었다. 새로운 특징들을 쉬운 예제와 함께 살펴보게 된다.

 Part 1. Threads and Executors (완)
 Part 2. Synchronzation and Locks (완)
 Part 3. Atomic Variables and ConcurrentMap

 여기서도 Part 2에서와 마찬가지로 Sleep과 stop의 메서드를 이용할 것이다.
public static void stop(ExecutorService executor) {
        try {
            executor.shutdown();
            executor.awaitTermination(60, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            System.err.println("termination interrupted");
        }
        finally {
            if (!executor.isTerminated()) {
                System.err.println("killing non-finished tasks");
            }
            executor.shutdownNow();
        }
    }

    public static void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    } 
AtomicInteger
 패키지 중 java.concurrent.atomic 을 살펴보면 원자적 연산을 수행할 수 있는 유용한 클래스들을 확인할 수 있다. 어떤 연신이 Part 2에서 다뤄봤던 Lock이나 Synchronized 키워드 없이도 여러 스레드들에 의해 병렬적으로 수행되어도 결과의 안전성을 보장받을 수 있다면 그 연산은 원자적이라고 한다.

 내부적으로 원자적 클래스들은 CAS를( compare-and-swap) 십분 활용한다. 간단히 설명하자면, 값을 변경할 때 자신이 읽었던 변수의 값을 기억하고 있다가 변경 직전에 변수의 메모리 내의 값을 확인하여 이전에 기억해놓은 값과 같은 경우에만 처리를 진행하고 그렇지 않은 경우는 무산시키는 방식이다. 이러한 명령은 현대 CPU에 의해 직접적으로 지원된다(하드웨어적으로). 그래야 원자적 연산이 가능하다. 운영체제 시간에 배우겠지만 하드웨어의 지원이 없이는 완벽한 원자적 연산을 지원하는 것이 불가능하다. 따라서 요새는 CPU 차원에서(하드웨어 차원에서) 이러한 연산을 지원하므로 특정 편수에 여러 스레드들이 동시에 접근하여 작업하는 경우가 있는 경우 이러한 원자적 연산을 이용하는 클래스를 이용하는 것이 권장된다.

 이제 한 원자적 클래스의 예를 하나 살펴보도록 하자. 그 첫번째 타자는 AtomicInteger이다.
AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(atomicInt::incrementAndGet));

stop(executor);

System.out.println(atomicInt.get());    // => 1000
  단지 Integer 대신에 AtomicInteger를 사용했을 뿐이다. Integer는 스레드-안전 하지 않으며 따라서 변수에 접근할 때 동기화를 신경써주지 않는다면 값의 안정적인 결과를 보장받을 수 없다. 위에서 사용한 incrementAndGet 메서드는 원자적 연산을 수행하며 따라서 우리는 아주 손쉽게 멀티 스레드로부터 안전한 연산을 수행할 수 있다.

 AtomicInteger는 다양한 원자적 연산을 지원한다. updateAndGet메서드는 람다 표현식을 인자로 받을 수 있는데 이 람다식을 통해 어떤 연산을 수행할지를 입맛대로 정의할 수 있다.
AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.updateAndGet(n -> n + 2);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 2000
 다음 살펴볼 메서드는 accumulateAndGet() 이다. 이 메서드도 람다식을 인자로 받지만 들어가는 람다식은 IntBinaryoperator 타입이다. 다음 예제에서 이 메서드를 이용해 0부터 1000의 값을 모두 더한 값을 구할 수 있다.
AtomicInteger atomicInt = new AtomicInteger(0);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> {
        Runnable task = () ->
            atomicInt.accumulateAndGet(i, (n, m) -> n + m);
        executor.submit(task);
    });

stop(executor);

System.out.println(atomicInt.get());    // => 499500
 이 외에도 유용한 클래스로는 AtomicBoolean, AotmicLong 그리고 AtomicReferece가 있다.

LongAdder
 이 클래스는 AtomicLong에 상응하는 클래스다. 연속적으로 숫자를 더할 때 사용할 수 있다.
ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(adder::increment));

stop(executor);

System.out.println(adder.sumThenReset());   // => 1000
 LongAdder는 add()와 increment() 메서드를 다른 원자적 숫자 클래스와 마찬가지로 제공한다. 그리고 역시 스레드-안전하다. 그러나 하나의 결과만을 도출하는 대신에 이 클래스는 내부적으로 다른 스레드들의 경쟁을 감소시키기 위해 변수들의 집합을 유지하고 있다. 그 실제 결과는 sum()과 sumThenReset() 을 통해 얻을 수 있다.

 이 클래스는 대개 여러 스레드에 의해 읽기보다 업데이트가 많이 일어나는 원자적 숫자를 다룰때 유용하고 바람직하게 여겨진다. 특히 통계 데이터를 뽑아낼 때와 같은 경우가 있다. 특정 웹 서버가 처리한 요청의 숫자를 셈한다거나 할 수 있다. 결점이 있다면 굉장하게 메모리를 소모한다는 점인데 전술하였듯이 내부적으로 수많은 값들의 집합을 유지하고 있기 때문이다.

LongAccumulator

  이놈은 LongAdder의 일반화된 버전이다. 단순히 덧셈을 수행하는 대신하기 보다 LongBinaryOperator의 람다식을 이용한다.
LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10)
    .forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

stop(executor);

System.out.println(accumulator.getThenReset());     // => 2539
 LongAccumulator를 하나 만들고 2 * x + y의 람다식을 인자로 주었다. 그리고 초기값을 1로 지정했다. 매순간 accumulate(i) 가 호출될 때마다 현재의 결과와 i의 값은 람다식의 인자로 넘어간다. 이 클래스도 LongAdder 와 마찬가지로 내부적으로 스레드간의 경합을 줄이기 위해 변수의 집합을 유지한다.

 그런데 결과과 왜 2539일까. 이건 이것대로 꽤나 머리가 아프다. i가 y의 값으로 리턴된 현재 값이 x로 들어간다면 우리가 차근차근 계산하면 다음과 같이 나와야 한다.
x y   result
1 0 > 2
2 1 >5
5 2 > 12
12 3 > 27
27 4 >  58
58 5 > 121
121 6 > 248
248 7 > 503
503 8 > 1014
1014 9 > 2037
 아니 그런데 왜 2539인가.. 이유는 다음과 같다. 우리는 싱글 스레드가 아니라 멀티 스레드로 돌렸기 때문이다. 잘 생각해보면 스레드가 2개가 돈다. 따라서 다음과 같이 들어간다.
x : y >1, 0 result : 2
x : y >1, 1 result : 3
x : y >2, 2 result : 6
x : y >6, 3 result : 15
x : y >15, 4 result : 34
x : y >34, 5 result : 73
x : y >73, 6 result : 152
x : y >152, 7 result : 311
x : y >311, 8 result : 630
x : y >630, 9 result : 1269
x : y >1269, 1 result : 2539
2539
또는
x : y >1, 1 result : 3
x : y >1, 0 result : 2
x : y >3, 2 result : 8
x : y >8, 3 result : 19
x : y >19, 4 result : 42
x : y >42, 5 result : 89
x : y >89, 6 result : 184
x : y >184, 7 result : 375
x : y >375, 8 result : 758
x : y >758, 9 result : 1525
x : y >1525, 0 result : 3050
3050
중요하게 넘어갈 것이 있다. java doc에 따르면 accumulator 의 동작은 보장되지 않는다. 따라서 연산의 순서가 문제되지 않는 현상에서 응용가능하다고 적혀 있다. 즉 (2 * x + y)는 순서에 의존적이므로 문제(교환/결합법칙 불가능)가 된다. (x + 2* y)라면 순서가 문제가 되지 않을 것이다.
0 에서 10은 언제 어떻게 순서를 바꿔서 더하든 결과는 같다. 1 + 2 + 5 + 3 + 4 = 1 + 5 + 2 + 4 + 3. 그러나 곱하는 건 다르다.

시간이 늦어서 나머진 나중에 적기로 한다.

댓글 없음:

댓글 쓰기