2014년 1월 16일 목요일

java concurrency in practice ch5

5.3.3.Deques and Work Stealing
디큐란?
앞뒤에서 뽑아 쓸수 있음(큐는 앞만 가능)
장점
컨슈머당 하나의 디뷰를 줌
경쟁이 일어나지 않음

컨슈머가 죽을 경우 해당 디큐에서 워크를 가져오면됨

너무 느릴경우 다른 디큐에 넣어도됨

생산자나 컨슈머가 디큐에 붙을때는 뒤에서 붙음(테일)

5.4. Blocking and Interruptible Methods
인터럽트를 먹으면 안된다
왜냐? 상위 코드에서 문제가 있는지 없는지 알수 없기 때문이다

그렇기 때문에 둘중에 하나의 로직을 사용한다.
1.외부로 집어 던지거나
2.Thread.currentThread.Intrurrept() 를 불러서 쓰레드의 상태값을 바꾼다
이렇게 한다면 상위 코드에서 필요하다면 상태값을 확인 할 수 있기 때문이다.

public class TaskRunnable implements Runnable {
 BlockingQueue queue;
 ...
 public void run() {
  try {
   processTask(queue.take());
  } catch (InterruptedException e) {
   // restore interrupted status
   Thread.currentThread().interrupt();
  }
 }
}


5.5. Synchronizers
Synchronizers는 쓰레드의 컨트롤 플루우에 참여하는 모든 것들을 지칭한다.

All synchronizers share certain structural properties:
they encapsulate state that determines whether threads arriving at
the synchronizer should be allowed to pass or forced to wait
, provide methods to manipulate that state, and provide methods to wait efficiently
for the synchronizer to enter the desired state.
5.5.1 Latches
여러개의 쓰레드가 레치의 마지막 상태값이 될때까지 기다리게 하는 Synchronizer
문처럼 움직인다 마지막 상태에 도달하기 전까지는 문이 닫혀있다
상태가 마지막 값에 도달한다면 문이 열리고 쓰레드들이 통과한다.

어떤 상태가 되기 전까지 모든 쓰레드들을 기다리게 하고 이후 한번에 실행하게 한다.

사용하는곳
리소스가 초기화대기 전까지 다른애들을 접근못하게
의존 관계에 있는 서비스들이 의존관계가 다 만들어지기 전까지 시작하지 못하게 할때
멀티플레이 게임에서 모든 사람들이 레디를 하기 전에는 시작할수 없게 하는대

CountDownLatch 말그대로 처음 시작 숫자를 주어 지고 하나씩 내려서 0이되면 실행 되게 하는 latch 의 한 종류

5.5.2 Future task
 래치와 비슷하게 블락한다 하지만 다른점은 callble 로 부르고 계산이 긴 애들을 단한번
계산하기 위해 사용되어진다. 
상태는 waiting to run, running, completed 가 있으며 한번 copmleted 되면 상태는 계속 유지 된다.

f.get 을 할시 한번 계산된 결과는 결과를 계속 리턴한다.
*캐슁할때 참 좋은거 같다.

5.5.3 Semaphore
  한번에 접근 할 수 있는 쓰레드의 수를 제한하기 위해 사용한다.
  세마포어는 여러가의 접근 권한을 가지고 있다
  예를 들어 3개라고 하면 쓰레드가 접근할때 하나씩 준다.
  즉 3개이상은 접근하지 못한다. 한명이 release 하게 되면 다른애가 접근 할 수 있다
  * set 등에 몇게까지의 저장 할 수 있는가 등에 사용된다.



public class BoundedHashSet {
 private final Set set;
 private final Semaphore sem;

 public BoundedHashSet(int bound) {
  this.set = Collections.synchronizedSet(new HashSet());
  sem = new Semaphore(bound);
 }

 public boolean add(T o) throws InterruptedException {
  sem.acquire();
  boolean wasAdded = false;
  try {
   wasAdded = set.add(o);
   return wasAdded;
  } finally {
   if (!wasAdded)
    sem.release();
  }
 }

 public boolean remove(Object o) {
  boolean wasRemoved = set.remove(o);
  if (wasRemoved)
   sem.release();
  return wasRemoved;
 }
}
5.5.4 Barriers
  레치는 이벤트 기반이다 즉 카운트를 만들고 하나씩 줄여 0이되면 실행된다.
  베리어는 인원기반이라고 생각해도 된다 즉 정해 놓은 모든 쓰레드가 도착하기 전까지
  실행 시키지 않는다.
  또한 한번 실행된 후에 다시 사용할수 있다
  예를 들면 집결지에 모이고 인원이 다오면 그 후 출발한다 라고 생각하면된다.


public class CellularAutomata {
 private final Board mainBoard;
 private final CyclicBarrier barrier;
 private final Worker[] workers;
 
 public CellularAutomata(Board board) {
  this.mainBoard = board;
  int count = Runtime.getRuntime().availableProcessors();
  this.barrier = new CyclicBarrier(count,new Runnable() {
      public void run() {
       mainBoard.commitNewValues();
      }});
  this.workers = new Worker[count];
  for (int i = 0; i < count; i++)
    workers[i] = new Worker(mainBoard.getSubBoard(count, i));
  }
 
 private class Worker implements Runnable {
  private final Board board;
  
  public Worker(Board board) { this.board = board; }
  
  public void run() {
   while (!board.hasConverged()) {
    for (int x = 0; x < board.getMaxX(); x++)
     for (int y = 0; y < board.getMaxY(); y++)
      board.setNewValue(x, y, computeValue(x, y));
    try {
     barrier.await();
    } catch (InterruptedException ex) {
     return;
    } catch (BrokenBarrierException ex) {
     return;
    }
   }
  }
 }
 public void start() {
  for (int i = 0; i < workers.length; i++)
    new Thread(workers[i]).start();
  mainBoard.waitForConvergence();}
 }
}
}
캐쉬 예제

package cache;

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import annotation.ThreadSafe;


import Util.ThrowUtil;

@ThreadSafe
public class Memorizer[A, V] implements Computable[A, V] {
 private final ConcurrentHashMap[A, Future[V]] cache = new ConcurrentHashMap[A, Future[V]]();
 private final Computable[A, V] c;

 public Memorizer(Computable c) {
  this.c = c;
 }

 @Override
 public V compute(final A key) throws InterruptedException {
  while (true) { //cahce pollution 이 일어 날수 있음
   Future f = cache.get(key);
   if (f == null) {
    Callable eval = new Callable() {
     @Override
     public V call() throws Exception {
      return c.compute(key);
     }
    };

    FutureTask ft = new FutureTask<>(eval);
    f = cache.putIfAbsent(key, ft); // atomic
    if (f == null) {
     f = ft;
     ft.run();
    };
   }

   try {
    return f.get(); // while 문 아웃
   } catch (CancellationException e) {
    cache.remove(key, f); // 삭제하고 while 문으로
   } catch (ExecutionException e) {
    throw ThrowUtil.launderThrowable(e.getCause());
   }
  }

 }

}


1.future 를 이용해서 동일한 키값이 몇번 호출되지 않게 한다.
2.해쉬맵을 사용해서 쓰레드 세이프 분제를 캐쉬에 넘긴다.
3.put if absent 를 이용해서 잠깐 사이에 2번 불릴수 있는 문제를 해결한다.
4.while 문을 사용해서 캔슬 익셉션이 날경우 삭제후 다시 적재한다.(캐쉬 오염 막기)