2014년 1월 21일 화요일

java concurrency in practice ch7 Cancellation and shutdown

쓰레드가 stop 할때 바로 정지 된다면 작업 중이던 공유 변수들에 문제가 생길 수있다
그러므로 처음부터 thread 가 gracefully 하게 stop 될 수 있도록 하는게 중요하다

7.1 Task Cancellation
자바에서는 쓰레드를 직접적으로 바로 stop 하는 방법은 존재하지 않는다
다른 쓰레드가 interrupt 를 걸어서 cancel 을 요청하는 방법이나
코드로 구현하는 방법이 존재한다.

아래는 cancel 이 호출 될 경우 flag 를 변경하고 쓰레드에서 일을 처리하기 직전에 한번씩
확인 하는 방법으로 구현한 코드이다.
@ThreadSafe
public class PrimeGenerator implements Runnable {
 @GuardedBy("this")
 private final List[biginteger] primes = new ArrayList[biginteger]();
 private  volatile boolean cancelled;
 
 public void run() {
  BigInteger p = BigInteger.ONE;
  while (!cancelled ) {  
   p = p.nextProbablePrime();
   synchronized (this) {
    primes.add(p);
   }
  }
 }
 
 public void cancel() { cancelled = true;  }
 public synchronized List[biginteger] get() {
  return new ArrayList[biginteger](primes);
 } 
}

public List[biginteger] aSecondOfPrimes() throws InterruptedException {
 PrimeGenerator generator = new PrimeGenerator();
 
 new Thread(generator).start();
 try {
  SECONDS.sleep(1);
 } finally {
  generator.cancel();
 }

 return generator.get();
}

매번 숫자를 증가 시키기전에 flag 를 확인한다. (꼭 volatile 을 사용하자 -변경을 다른쓰레드에서 한다.)

쓰레드에서 사용하는 인스턴스 변수들은 동기화를 유념하자

두번째 코드는 실행 시키는 코드인대  prime 이 1초 간 돌게 this(두번째 코드를 호출하는) 를 재운다 그 후 finally 에서 무조건 cancel을 호출해서 정지하게 만든다.

7.1.1 Interruption
문제는 blockingQueue.put  같은 경우는 블락이기 떄문에 체크하는 로직을 넣을 수 가 없다.

만약 생산자가 소비자 보다 빨라서 큐를 다 채우고 put 을 호출 한다면 블락 될것이다.
(즉 체킹로직으로 가지 못한다)

There is nothing in the API or language specification that ties interruption to any specific cancellation semantics, but in practice, using interruption for anything but cancellation is fragile and difficult to sustain in larger application

언어 스펙 자체에는 캔슬 하기 위해 인터럽트를 사용 할수 있다는 말은 없다 하지만 실전에서는 사용할수 있지만 사용한다면 대규모 어플리케이션을 유지하기 어렵고 쉽게 부셔지게 한다.

각각의 쓰레드는 boolean 값의 interrupted status 값을 가지고 있다.
isInterrupted 는 현재 인터럽트 상태값을 리턴
static interrupted 는 인터럽트 상태값을 지우고 이전 상태값을 리턴한다(주의하자 만약 인터럽트 상태를 지울께 아니라면 익셉션을 던지던 무엇가를 하자 )

인터럽트 요청을 하는건 목표 쓰레드가 멈춘다는 뜻은 아니다 단지 멈출래 라는 메시지를 전달할 뿐이다.

인터럽트 당한 쓰레드는 바로 멈추는게 아니라 자기가 편할때 해당 상태를 채크하고 멈춘다.
wait, sleep join 같은 경우는 인터럽트 요청을 심각하게 고려하고 리퀘스트가 들어오면 익셉션을 던지려고 한다.

Interruption is usually the most sensible way to implement cancellation

인터럽트는 캔슬을 구현하기 위한 가장 실용적인 방법일때가 많다.

아래 코드는 인터럽트를 사용한 코드이다
1.while 문 조거네서 체크되면
2.put 콜을 부를때 체크된다.

블락킹 메써드등을 인터럽트용으로 사용할때 충분한 반응이 오지 않은다면 1번같이
선언에서 사용하는것도 좋을수 있다

class PrimeProducer extends Thread {
 private final BolcingQueue[BigInteger] queue;

 primeProducer(BlockingQueue[BigInteger] queue{
  this.queue = queue;
 }

 public void run(){
  try{
   BigInteger p = BigInteger.ONE;
   while(Thread.currentThread().isIntruppted()){
    queue.put(p = p.nextPrablePrime());
   }
  }catch(InterruptedException consumed){
   /* Allow thread to exit */
  }
 }

 public void cancel() { interrupt();}
}


7.1.2 Interruption Policies
task 가 취소 정책을 가지고 있듣이 thread 도 인터럽션 정책을 가지고 있어야 한다.
인터럽션 정책이란? 쓰레드가 인터럽트 상태를 확인했을떄 어떻게 대처 할꺼냐 라는거다

인터럽션 정책의 예러 바로 종료하거나 , 필요한걸 지우고 종료하거나, 다른 애한태 전달하거나 등의 방법이 있다

쓰레드가 인터럽트에 반응하는 방법, 테스크가 인터럽트에 반응하는 방법 즉 2개로 구분하는게 중요하다.

인터럽트는 cancel the current task, shut down the worker thread 즉 2개의 중의적 으미를 가진다.

쓰레드 풀에서 쓰레드를 가져다 쓸때는 이 인터럽트가 해당 쓰레드에게 책임이 있는지 확인해야한다. 만약 자기께 아니면 그냥 내버려둬야 한다.
(내가 남의 집을 봐주고 있을때 메일이 오면 버리지 않고 나중에 집 주인이 오면 처리하게 모아두는것과 같은 이치)

대부분의 블락킹 라이블러리 들이 그냥 인터럽트 익셉션을 던져버리는건 위와 같은 이유다

테스크의 같은경우 인터럽트되었을때 취소가 아니라 연기일 수도 있다

정책이 여러개 일수 있음으로 확신할수 없으면 익셉션을 던지고 Thread.currentThread().interrupt()로 다시 상태를 복원한다.

테스크가 쓰레드가 인터럽트 를 어떻게 처리할지 모르는것처럼 쓰레드도 테스크가 어떻게 처리 할지 모른다.

즉 인터럽션 정책은 쓰레드를 소유한 애만 처리하고 해당 부분을 encapsulation 하는게 좋다

각각의 쓰레드는 자신만의 인터럽션 정책을 가지고 있음으로 그 쓰레드를 인터럽트 했을때
정확이 어떤일이 행해질지 모른다면 인터럽트 걸지 말자


7.1.3 Responding to Interruption
인터럽트 블락킹 메서드 (thread.sleep, BlockingQueue.put) 를 호출할때 아래와 같이 두가 전략을 상용 할 수 있다

1. propagate the exception
2. restore the interruption status so that code higher up on the call stack can deal with it

1.번 전략은 아래와 같이 쉽게 구현 할수있다
BloclingQueue<Task> queue;
public Task getNextTask() throws InterruptionException{
 return queue.take()
}

인터럽트 익셉션을 전파 할수없다면(하기 싫거나 Runnable 일 경우)
가장 간단한 방법은 Thread.currentThread.interrupt 를 호출 하는거다
확신 할 수 없다면 인터럽트 익셉션을 먹으면 안된다.
대부분의 쓰레드는 인터럽트 상태를 유지 하는게 맞다

Only code that implements a thread's interruption policy may swallow an interruption request. General-purpose task and library code should never swallow interruption requests

쓰레드중 인터럽트 정책을 구현한 애들만 인터럽트 요청을 무시 할수  있다 일반적인 목적의 일 또는 라이블러리들은 절대로 인터럽트 요청을 무시하면 안된다.

캔슬레이션 정책을 가지지 않지만 루프 안에서 인터럽트를 발생 시킬수 있는 콜을 호출 할 수 있는 애들은 로컬 변수를 두고 마지막에 finally 블락으로 상태값을 변경해야한다.

왜냐하면 무한 루프에 걸릴수 있기 때문이다.

public Task getNextTask(BlockingQueue[Task] queue) {
 boolean interrupted = false;
 try {
  while (true) {
   try {
    return queue.take();
   } catch (InterruptedException e) {
    interrupted = true;
    // fall through and retry
   }
  }
 } finally {
  if (interrupted)
   Thread.currentThread().interrupt();
 }
}

언제 인터럽트 상태를 체크 하냐는 적시성이 중요하냐 성능이 중요하냐로 결정해야 된다.
만약 인터럽트 상태를 체크하는 로직이 있다면 꼭 동기화 해라

7.1.4 Examlpe:TimedRun
아래의 코드는 ScheduledExecutorService의 예제이다 1시간동안 10초에 한번씩 삑 이라고
울리는 코드를 작성하려면 아래와 같이 하면된다.

public class SchedulExecuterTester {
 private static ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);

 public void beepForAnHour() {
  final Runnable beeper = new Runnable() {
   @Override
   public void run() {
    System.out.println("beep");
   }
  };

  final ScheduledFuture[?] beeperHandle = cancelExec.scheduleAtFixedRate(beeper, 10, 10, TimeUnit.SECONDS);
  cancelExec.schedule(new Runnable() {
   @Override
   public void run() {
    beeperHandle.cancel(true);
   }
  }, 60 * 60, TimeUnit.SECONDS);
 }
}
1

아래는 쓰레드에서 정해진 시간후에 일을 취소 하려고 시도한 코드이다

private static final ScheduledExecutorService cancelExec;

public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
 final Thread taskThread = Thread.currentThread();
 cancelExec.schedule(new Runnable() {
  public void run() {
   taskThread.interrupt();
  }
 }, timeout, unit);
 r.run();
}
}

위코드에서 문제는 여러곳에서 발생한다.
1.현재 쓰레드의 인터럽트 정책을 알지 못함으로  인터럽트를 하면 안된다.
2.만약 현재 쓰레드가 정해진 시간보다 먼저 끝나면 콜러에게 결과값이 리턴된 후 그 후 인터럽트가 호출 된다.
   이렇게 될 경우 어떤 일이 일어날지 알수 없다.

아래 방식은 1.익셉션을 전파하고 2.완료될때까지 기달리고 리턴하는 코드다 (join  쓰레드가 죽을때까지 블락이 걸림)

 public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
  class RethrowableTask implements Runnable {
   private volatile Throwable t;

   public void run() {
    try {
     r.run();
    } catch (Throwable t) {
     this.t = t;
    }
   }

   void rethrow() {
    if (t != null)
     throw launderThrowable(t);
   }
  }
  RethrowableTask task = new RethrowableTask();
  final Thread taskThread = new Thread(task);
  taskThread.start();
  cancelExec.schedule(new Runnable() {
   public void run() {
    taskThread.interrupt();
   }
  }, timeout, unit);
  taskThread.join(unit.toMillis(timeout));
  task.rethrow();
 }

code2

public static void timedRun2(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
  Future task = cancelExec.submit(r);
  try {
   task.get(timeout, unit);
  } catch (TimeoutException e) {
   // task will be cancelled below
  } catch (ExecutionException e) {
   // exception thrown in task; rethrow
   throw ThrowUtil.launderThrowable(e.getCause());
  } finally {
   // Harmless if task already completed
   System.out.println("cancel : " + task.cancel(true));; // interrupt if running
   System.out.println("isCancelled : " + task.isCancelled());
  }
 }

 public static void main(String[] args) throws Exception{
  Runnable r = new Runnable() {
   @Override
   public void run() {
    System.out.println("thread start");
     int waitMilesencondes = 3;
     long start_time = System.currentTimeMillis();
     while (System.currentTimeMillis() - start_time < waitMilesencondes) {


     }

    System.out.println("thread end");
   }
  };

  try {
   timedRun2(r, 1, TimeUnit.MILLISECONDS);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

future.cancel 은 해당 쓰레드가 실행되고 있으면 interrupt를 완료 되었다면 아무것도
안한다.

캔슬이 불렸다고 캔슬되는게 아니다 단지 interrupt 요청을 할뿐이다.

Executer 의 interruption police 는 task 가 필요없을 캔슬을 사용해 interrupt 거는걸 허용한다.

그러므로 Executer 서비시를 사용할때 캔슬을 사용하는건 괞찬다.


public static void timedRun3(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
  Future task = cancelExec.submit(r);
  try {
   task.get(timeout, unit);
  } catch (TimeoutException e) {
   // task will be cancelled below
  } catch (ExecutionException e) {
   // exception thrown in task; rethrow
   throw ThrowUtil.launderThrowable(e.getCause());
  } finally {
   // Harmless if task already completed
   task.cancel(true); // interrupt if running
  }
 }
위의 코드는 좋은 예제이다 시간안에 캔슬을 구현할때 위와 같이 구현하자.

7.2.Stopping a Thread based Service
쓰레드를 멈출때 절때 쓰레드를 소유하고 있지 않은애들은 멈추지 말자
executer 서비스를 사용한다면 무조건 executer service 가 처리하게 해야한다.

7.2.1.Example: A Logging Service
어플리케이션에서 로깅을 쓴다고 해보자
여러명의 생산자와 한명의 소비자 모델로 구현한 로깅이다.
아래 코드는 그냥 interrupt 요청을 받으면 바로 writer 를 받고 에러를 뱉으면서 빠져나간다.


package thread.cancel;

import java.io.PrintWriter;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class LogWriter {
 private final BlockingQueue[String] queue;
 private final LoggerThread logger;

 public LogWriter(Writer writer) {
  this.queue = new LinkedBlockingQueue[String](10);
  this.logger = new LoggerThread(writer);
 }

 public void start() {
  logger.start();
 }

 public void log(String msg) throws InterruptedException {
  queue.put(msg);
 }

 private class LoggerThread extends Thread {
  private final PrintWriter writer;

  public LoggerThread(Writer writer) {
   this.writer = (PrintWriter) writer;
  }

  public void run() {
   try {
    while (true)
     writer.println(queue.take());
   } catch (InterruptedException ignored) {} finally {
    writer.close();
   }
  }
 }
}

1. 캔슬하면 큐에 있는 모든 로그가 버려질수 있다
2. 소비자에게만 interrupt 가 있다 (생산자의 경우 멀티기때문에 어렵다)

package thread.cancel;

import java.io.PrintWriter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import Util.GuardedBy;

public class LogService2 {
 private final BlockingQueue[String] queue;
 private final LoggerThread loggerThread;
 private final PrintWriter writer;
 @GuardedBy("this")
 private int reservation;
 @GuardedBy("this")
 private boolean isShutDown;

 public LogService2(PrintWriter writer, BlockingQueue[String] queue) {
  this.queue = queue;
  this.writer = writer;
  this.loggerThread = new LoggerThread();
 }

 public void start() {
  System.out.println(this.getClass().getName());
  loggerThread.start();
 }

 public void stop() {
  synchronized (this) {

   isShutDown = true;
  }
  loggerThread.interrupt();
 }

 public void log(String msg) throws InterruptedException {
  synchronized (this) {
   if (isShutDown)
    throw new IllegalStateException();
   ++reservation;
  }
  queue.put(msg);
 }

 private class LoggerThread extends Thread {

  public void run() {
   try {
    while (true) {
     try {
      synchronized (this) {
       System.out.println(this.getClass().getName());
       if (isShutDown && reservation == 0)
        break;
      }
      String msg = queue.take();
      synchronized (this) {
       --reservation;
      }
      writer.println(msg);

     } catch (InterruptedException e) {
      /* retry */
     }
    }
   } finally {
    writer.close();
   }

  }
 }

 public static void main(String[] args) throws Exception {

  LogService2 service = new LogService2(new PrintWriter("test"), new LinkedBlockingQueue[String]());
  service.start();
 }
}


위는 다운 시킬시 좀 이쁘게 다운할려고 노력한 클래스이다. 여기서 안전하게 다운되면
조금 느리고 바로 다운 되면 조금 빠르다 트레이드 오프다 .
*문제는 내부 쓰레드에서 락을 this 에 거는 코드인대.. name 을 찍어보면 내부 쓰레드에 걸린다. 해당 락 먼가 이상하다 나중에 찾아보자

아래는 아예 executerService 에게 밀어 버린 코드이다.
public class LogService {
  private final ExecutorService exec = newSingleThreadExecutor();

  public void start() {}

  public void stop() throws InterruptedException {
   try {
    exec.shutdown();
    exec.awaitTermination(TIMEOUT, UNIT);
   } finally {
    writer.close();
   }
  }

  public void log(String msg) {
   try {
    exec.execute(new WriteTask(msg));
   } catch (RejectedExecutionException ignored) {}
  }
 }

포이즌 필
말그대로 큐에 특정 메시지가 들어오면 생산자는 독약을 넣고 더이상 큐에 안보내고
소비자는 독약을 보면 더이상 안들어오니 다운되자 라고 생각하는 방법

생산자 가 몇명이고 소비자가 몇명인지 알때 쓸수 있다
예를 들어 생산자가 n 명이면 n개의 필이 소비자가 죽는다.
또는 소비자가 n명이면 생산자가 n개의 큐에 집어넣으면 모든 n 소비자가 죽는다.

독약 패턴은 큐가 unbounded 일때만 사용가능하다.

7.2.4 Example: A One-shot Execution Service
만약 메서드가 베치성 일을 하고 있고 모든 테스크가 끝난 후 결과 같이 리턴된다면 메서드에 묶인 Executer 서비스를 사용해서 쉽게 처리 가능하다.
public class CheckMail {

 public boolean checkMail(Set[String] hosts, TimeUnit unit) throws InterruptedException {
  ExecutorService exec = Executors.newCachedThreadPool();
  final AtomicBoolean hasNewMail = new AtomicBoolean(false);
  try {
   for (final String host : hosts) {
    exec.submit(new Runnable() {
     @Override
     public void run() {
      if (checkMail(host))
       hasNewMail.set(true);
     }

     private boolean checkMail(String host) {
      //*do something*/
      return false;
     }
    });
   }
  } finally {
   exec.shutdown();
   exec.awaitTermination(10, unit);
  }

  return hasNewMail.get();
 }
}
전부 쓰레드가 시작 된 후 shutdown()이 호출되면 해당 메서드가 완료 될때까지 기다린다.
해당 메서드들의 결과를 리턴하기 전까지 블락 되어야 하므로 exex.await를 호출하면 모든 결과같이 리턴될때까지 블락된다 :)

7.2.5 Limitations of shutdownNow the task as complete
ExecuterService 가 shutdhowNow 로 죽을때 실행 시키지 않은 테스크를 리턴한다.

하지만 실행 중이지만 완료가 되지 않은 테스크를 리턴하지 않는다.
아래는 해당 이슈를 피하기 위해 구현한 코드이다.
문제는 실제로는 완료 되었지만 완료되지 않았다 라고 나올수 있다
(마지막 명령어가 실행되고 쓰레드풀은 완료 되었다고 했는대 그순간 셋에도 더해질수 있기 때문에)

즉 2번 실행해도 문제없는 코드에만 사용해야 하는 코드이다.
public class TaskTrackExecuter extends AbstractExecutorService{
 private final ExecutorService exec = Executors.newCachedThreadPool();
 private final Set[runnable] taskCancelledAtShutdown = (Set[runnable]) Collections.synchronizedCollection(new HashSet[runnable]());

 public List[runnable] getCancelledTasks() {
  if (exec.isTerminated())
   throw new IllegalStateException();
  return new ArrayList<>(taskCancelledAtShutdown);
 }

 public void execute(final Runnable runnable) {

  exec.execute(new Runnable() {

   @Override
   public void run() {
    try {
     runnable.run();
    } finally {
     if (isShutdown() && Thread.currentThread().isInterrupted())
      taskCancelledAtShutdown.add(runnable);
    }
   }

  });
 }

 //delegate other ExecutorMethod to exec
아래는 위의 TaskTrackExecuter 를 이용한 webCrwaler 예제 이다.
public class WebCrawler {
 private volatile TaskTrackExecuter exec;
 private final Set[url] urlsToCrwal = new HashSet[url]();

 public synchronized void start(){
  exec = new TaskTrackExecuter();
  for (URL url : urlsToCrwal)
   submitCrwalTask(url);

  urlsToCrwal.clear();
 }

 public synchronized void stop() throws InterruptedException{
  try {
   saveUncrwaled(exec.shutdownNow());
   if(exec.awaitTermination(10, TimeUnit.MICROSECONDS));
    saveUncrwaled(exec.getCancelledTasks());
  }finally{
   exec = null;
  }
 }

 private void saveUncrwaled(List uncrawled) {
  for (Runnable task : uncrawled)
   urlsToCrwal.add(((CrwalTask)task).getPage());
 }

 private void submitCrwalTask(URL url) {
  exec.execute(new CrwalTask(url));
 }

 private class CrwalTask implements Runnable{
  private final URL url;

  CrwalTask(URL url){
   this.url = url;
  }

  @Override
  public void run() {
   for(URL link: processPage(url)){
    if(Thread.currentThread().isInterrupted())
     return;
    submitCrwalTask(link);
   }
  }

  private List processPage(URL url) {
   /* add at queue */
   return null;
  }
  public URL getPage(){return url;}
 }
}

/* 만약 쓰레드에서 실행되는 애들이 믿을수 없는 코드라면(플러그인 같은) 아래와 같이 코딩하자 */
public void run() {
 Throwable thrown = null;
 try {
  while (!isInterrupted())
  runTask(getTaskFromWorkQueue());
 } catch (Throwable e) {
  thrown = e;
 } finally {
  threadExited(this, thrown);
 }
}

/* unchecked Exception 이 나면 JVM 은 자동으로 UncaughtExceptionHandler 를 호출한다. 없으면 그냥 콘솔에 로깅) */
public class UEHLogger implements Thread.UncaughtExceptionHandler {
 public void uncaughtException(Thread t, Throwable e) {
  Logger logger = Logger.getAnonymousLogger();
  logger.log(Level.SEVERE,
  "Thread terminated with exception: " + t.getName(),e);
 }
}

/*아래 같은 방버으로 shutdown hook 을 걸수 있다 jvm 이 죽을때 호출되면 서로 락이 걸리는걸 막기 때문에 서비스당이 아니라 하나의 jvm 에 하나의 shutdown hook 을 거는걸 추천한다.*/
public void start() {
 Runtime.getRuntime().addShutdownHook(new Thread() {
  public void run() {
   try { LogService.this.stop(); }
   catch (InterruptedException ignored) {}
  }
 });
}

요약 : 자바는 쓰레드를 죽이는데 정확한 메커니즘이 존재하지 않는다.
사용자가 여러 메커니즘을 합쳐 안전하게 죽도록 해야한다.

 FuterTask, Executer 가 해당 작업을 조금 쉽게 해준다.
... 진짜 길다  ch7...;;;