-
[JAVA] 쓰레드(Thread)의 동기화 / synchronized / Lock과 Condition / fork & join 프레임워크DEV/JAVA 2024. 4. 23. 20:27
멀티쓰레드 프로세스의 경우 여러 쓰레드가 같은 프로세스 내의 자원을 공유해서 작업하기 때문에 서로의 작업에 영향을 주게된다. 한 쓰레드가 특정 작업을 끝마치기 전까지 다른 쓰레드에 의해 방해받지 않도록 하는것이 필요한데, 그래서 도입된 개념이 '임계 영역'과 '잠금(락,lock)'이다.
- 공유데이터(객체)를 사용하는 코드 영역 : 임계 영역으로 지정
- 공유데이터(객체)가 가지고 있는 lock을 획득한 단 하나의 쓰레드만 임계영역 내 코드를 수행,
임계영역에서 벗어나서 lock을 반납해야 다른 쓰레드가 반납된 lock을 획득하여 임계영역 내 코드 수행 가능
→ 즉, 쓰레드의 동기화란 한 쓰레드가 진행중인 작업을 다른 쓰레드가 간섭하지 못하도록 막는것
방법
- synchronized 블럭 사용
- Lock과 Condition 사용
1) synchronized를 이용한 동기화
임계 영역 설정 : synchronized가 호출된 시점부터 해당 영역에 포함된 객체의 lock을 얻어 작업을 수행하고, 영역에서 벗어나면 lock을 반납한다.
//1.메서드 전체를 임계 영역으로 지정 public synchronized void calcSum() { //... } //2.특정한 영역을 임계 영역으로 지정 synchronized(참조변수) { //... }
- lock의 획득과 반납이 자동으로 이루어짐
- 임계 영역은 멀티쓰레드 프로그램의 성능을 좌우하기 때문에 가능한 메서드 전체보다는 synchronized블럭으로 임계 영역을 최소화해서 보다 효율적인 프로그램이 되도록 해야한다.
ex1) 계좌 출금
public class StreamEX { public static void main(String[] args) { Runnable r = new RunnableEx(); new Thread(r).start(); new Thread(r).start(); } } class Account { private int balance = 1000; //private으로 설정 필수 public int getBalance() { return balance; } public synchronized void withdraw(int money) { if(balance >= money) { try { Thread.sleep(1000); } catch (InterruptedException e) { } balance -= money; } } } class RunnableEx implements Runnable { Account acc = new Account(); @Override public void run() { while (acc.getBalance() > 0) { int money = (int) (Math.random() * 3 + 1) * 100; //100,200,300중 랜덤 출금 acc.withdraw(money); System.out.println("balance:" + acc.getBalance()); } } }
ex1 결과 )
balance:900 balance:600 balance:300 balance:100 balance:100 balance:100 balance:100 balance:100 balance:0 balance:0
- synchronized를 사용하지 않으면 출금하기 전 다른쓰레드가 조건문을 통과하여 먼저 출금을 하는 경우가 발생하여 잔고가 마이너스가 되는 상황이 발생함
- Account 클래스의 balance 변수가 private로 설정되지않으면 외부에서 직접 접근이 가능하여 동기화를 사용하더라도 이 값의 변경을 막을 수 없다. → synchronized는 지정된 영역의 코드를 한번에 하나의 쓰레드가 수행하는것을 보장하는 것일 뿐이기 때문
wait()과 notify()
: 특정 쓰레드가 객체의 락을 가진채로 오랜시간을 보내지 않도록 해야함
→ 다른쓰레드가 기다리느라 다른 작업을 하지 못함.
- wait() : 실행중이던 쓰레드는 해당 객체의 waiting pool에서 기다림
- notify() : waiting pool에 있던 모든 쓰레드 중 임의의 쓰레드를 깨움
- notifyAll() : wating pool에 있던 모든 쓰레드를 깨움 ~> 그러나 lock을 얻을 수 있는건 하나의 쓰레드일 뿐, 나머지는 다시 waiting pool에서 기다림
wait()은 notify() 또는 notifyAll()이 호출될 때까지 기다리고,
매개변수가 있는 wait()은 지정된 시간동안만 기다린다.
(지정된 시간이 지난 후 자동적으로 notify()가 호출되는 것과 같다.)
- Object에 정의되어있음
- 동기화 블록(synchronized 블록) 내에서만 사용할 수 있음
- 보다 효율적인 동기화를 가능하게함
ex2) 식당
import java.util.ArrayList; public class ThreadWaitEx1 { public static void main(String[] args) throws Exception { Table table = new Table(); //여러 쓰레드가 공유하는 객체 new Thread(new Cook(table), "COOK1").start(); new Thread(new Customer(table, "donut"), "CUST1").start(); new Thread(new Customer(table, "burger"), "CUST2").start(); //0.1초 후 깅제종료 Thread.sleep(100); System.exit(0); } } class Customer implements Runnable { //손님 private Table table; private String food; Customer(Table table, String food) { this.table = table; this.food = food; } @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) {} String name = Thread.currentThread().getName(); if(eatFood()) { System.out.println(name + " ate a " + food); } else { System.out.println(name + " failed to eat."); } } } boolean eatFood() { return table.remove(food); } } class Cook implements Runnable { //요리사 private Table table; Cook(Table table) { this.table = table; } @Override public void run() { while (true) { //임의의 요리 하나를 table에 추가 int idx = (int)(Math.random() * table.dishNum()); table.add(table.dishNames[idx]); try { Thread.sleep(1); } catch (InterruptedException e) {} } } } class Table { //공유 테이블 String[] dishNames = { "donut", "donut", "burger"}; //도넛이 더 자주 나옴 final int MAX_FOOD = 6; //테이블에 놓을 수 있는 최대 음식 개수 private ArrayList<String> dishes = new ArrayList<>(); public void add(String dish) { //테이블에 음식이 가득 차면 테이블에 음식을 추가하지 않음 if(dishes.size() >= MAX_FOOD) return; dishes.add(dish); System.out.println("Dishes:" + dishes.toString()); } public boolean remove(String dishName) { //지정된 요리와 일치하는 요리를 테이블에서 제거 for(int i=0; i<dishes.size(); i++) { if(dishName.equals(dishes.get(i))) { dishes.remove(i); return true; } } return false; } public int dishNum() { return dishNames.length; } }
실행할때마다 다른 결과가 도출 되는데, 2가지 예외가 발생할 수 있다.
- ConcurrentModificationException : Cook 쓰레드가 테이블에 음식을 놓는 도중 Customer 쓰레드가 음식을 가져가려한 경우
- IndexOutOfBoundsException : Customer쓰레드가 테이블의 마지막 남은 음식을 가져가는 도중 다른 Customer 쓰레드가 먼저 음식을 낚아채려하여 테이블에 음식이 없는데 제거하려한 경우. → 여러번 시도해봤지만 나오지 않음, PC성능(코어 수)에 따라 차이가 있는듯함
ex2 결과 )
Dishes:[burger] Dishes:[burger, burger] Dishes:[burger, burger, donut] Dishes:[burger, burger, donut, donut] Dishes:[burger, burger, donut, donut, donut] Dishes:[burger, burger, donut, donut, donut, burger] Dishes:[burger, donut, donut, burger, donut] Dishes:[burger, donut, donut, burger, donut, burger] CUST2 ate a burger CUST1 ate a donut CUST2 ate a burger CUST1 ate a donut CUST2 ate a burger Exception in thread "COOK1" java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967) at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:456) at Table.add(ThreadWaitEx1.java:71) at Cook.run(ThreadWaitEx1.java:53) at java.base/java.lang.Thread.run(Thread.java:842) CUST2 ate a burger CUST1 ate a donut CUST2 ate a burger CUST1 ate a donut
ex2에 공유객체 Table에 동기화 적용
class Table { //공유 테이블 String[] dishNames = { "donut", "donut", "burger"}; //도넛이 더 자주 나옴 final int MAX_FOOD = 6; //테이블에 놓을 수 있는 최대 음식 개수 private ArrayList<String> dishes = new ArrayList<>(); public synchronized void add(String dish) { //테이블에 음식이 가득 차면 테이블에 음식을 추가하지 않음 if(dishes.size() >= MAX_FOOD) return; dishes.add(dish); System.out.println("Dishes:" + dishes.toString()); } public boolean remove(String dishName) { //지정된 요리와 일치하는 요리를 테이블에서 제거 synchronized (this) { while (dishes.size() == 0) { String name = Thread.currentThread().getName(); System.out.println(name + " is waiting."); try { Thread.sleep(500); } catch (InterruptedException e) {} } for(int i=0; i<dishes.size(); i++) { if(dishName.equals(dishes.get(i))) { dishes.remove(i); return true; } } } return false; } public int dishNum() { return dishNames.length; } }
결과) Customer쓰레드가 테이블 객체의 lock을 잡고 기다리고있어 Cook 쓰레드가 lock이 없어 음식을 추가 할 수 없음. 이럴때 사용하는것이 wait와 notify.
Dishes:[burger] CUST1 is waiting. //donut이 없어서 기다림 CUST2 ate a burger CUST1 is waiting. //donut이 없어서 테이블에 lock을 건채로 기다림 CUST1 is waiting. CUST1 is waiting. CUST1 is waiting. CUST1 is waiting. ... 중간 생략 ...
ex2에 wait & notify 적용
import java.util.ArrayList; public class ThreadWaitEx1 { public static void main(String[] args) throws Exception { Table table = new Table(); //여러 쓰레드가 공유하는 객체 new Thread(new Cook(table), "COOK1").start(); new Thread(new Customer(table, "donut"), "CUST1").start(); new Thread(new Customer(table, "burger"), "CUST2").start(); //0.1초 후 강제종료 Thread.sleep(1000); System.exit(0); } } class Customer implements Runnable { //손님 private Table table; private String food; Customer(Table table, String food) { this.table = table; this.food = food; } @Override public void run() { while (true) { try { Thread.sleep(100); } catch (InterruptedException e) {} String name = Thread.currentThread().getName(); table.remove(food); System.out.println(name + " ate a " + food); } } } class Cook implements Runnable { //요리사 private Table table; Cook(Table table) { this.table = table; } @Override public void run() { while (true) { //임의의 요리 하나를 table에 추가 int idx = (int)(Math.random() * table.dishNum()); table.add(table.dishNames[idx]); try { Thread.sleep(10); } catch (InterruptedException e) {} } } } class Table { //공유 테이블 String[] dishNames = { "donut", "donut", "burger"}; //도넛이 더 자주 나옴 final int MAX_FOOD = 6; //테이블에 놓을 수 있는 최대 음식 개수 private ArrayList<String> dishes = new ArrayList<>(); public synchronized void add(String dish) { //테이블에 음식이 가득 차면 테이블에 음식을 추가하지 않음 if(dishes.size() >= MAX_FOOD) { String name = Thread.currentThread().getName(); System.out.println(name + " is waiting."); try { wait(); //COOK 쓰레드를 기다리게함 Thread.sleep(500); } catch (InterruptedException e) {} } dishes.add(dish); notify(); //기다리고있는 CUST 쓰레드 깨움 System.out.println("Dishes:" + dishes.toString()); } public void remove(String dishName) { String name = Thread.currentThread().getName(); synchronized (this) { while (dishes.size() == 0) { System.out.println(name + " is waiting."); try { wait(); //CUST 쓰레드 기다리게함 Thread.sleep(500); } catch (InterruptedException e) {} } while (true) { for(int i=0; i<dishes.size(); i++) { if(dishName.equals(dishes.get(i))) { dishes.remove(i); notify(); //기다리고있는 COOK 쓰레드 깨움 return; } } try { System.out.println(name + " is waiting"); wait(); //원하는 음식이 없는 CUST 쓰레드를 기다리게함 Thread.sleep(500); } catch (InterruptedException e) {} } } } public int dishNum() { return dishNames.length; } }
결과)
Dishes:[donut] Dishes:[donut, donut] Dishes:[donut, donut, donut] Dishes:[donut, donut, donut, burger] Dishes:[donut, donut, donut, burger, donut] Dishes:[donut, donut, donut, burger, donut, donut] COOK1 is waiting. // 테이블이 가득차 요리사가 기다림 CUST1 ate a donut CUST2 ate a burger Dishes:[donut, donut, donut, donut, donut] CUST2 is waiting // 원하는 음식이 없어 손님이 기다림 CUST1 ate a donut // 테이블의 음식이 소비되어 notify 호출 CUST2 is waiting // 요리사가 아닌 손님 쓰레드가 깨워짐. 원하는 음식이 없어 다시 기다림 CUST1 ate a donut // 테이블의 음식이 소비되어 notify 호출 Dishes:[donut, donut, donut, burger] // 이번엔 요리사 쓰레드가 깨어나 음식 추가 CUST1 ate a donut Dishes:[donut, donut, donut] CUST2 ate a burger Dishes:[donut, donut, donut, burger] Dishes:[donut, donut, donut, burger, burger] Dishes:[donut, donut, donut, burger, burger, burger] COOK1 is waiting. CUST1 ate a donut
- wait과 notify를 추가하여 테이블에 음식이 없을 때와 원하는 음식이 없을 때 손님이 기다리도록 변경
- 테이블 객체의 waiting pool에 Cook 쓰레드와 Customer 쓰레드가 같이 기다리게 되어 notify가 임의의 쓰레드를 호출하여 불필요한 쓰레드가 깨워지게됨.
- 운이 좋지 않아 한 쓰레드가 계속 오랫동안 기다리게 되는 현상을 ‘기아현상’이라고 한다. 이 현상을 막으려면 notifyAll()을 사용해서 모든 쓰레드를 깨워야한다. 위 예시에서 요리사 쓰레드가 계속 기다리는 경우 notifyAll을 통해 손님 쓰레드는 다시 waiting pool에 들어가더라도 요리사 쓰레드는 lock을 얻어 작업을 진행 할 수 있다.
- 기아현상은 막았지만, 손님 쓰레드까지 깨워 불필요하게 요리사 쓰레드와 lock을 얻기 위해 경쟁하게 되는데 이처럼 여러 쓰레드가 lock을 얻기위해 서로 경쟁하는 것을 ‘경쟁상태’라고한다.
2) Lock과 Condition을 이용한 동기화
경쟁상태를 개선하기위해 선별적으로 쓰레드 깨우기위한것, JDK1.5부터 추가됨
- ReentrantLock : 재진입이 가능한 lock. 가장 일반적인 배타 lock
- ReentrantReadWriteLock : 읽기에는 공유적, 쓰기에는 배타적인 lock
- StampedLock : ReentrantReadWriteLock에 낙관적인 lock의 가능을 추가 (JDK1.8부터 추가)
ReentrantLock의 생성자, 메서드
- ReentrantLock()
- ReentrantLock(boolean fair)
: 생성자 매개변수를 true로 주면, lock이 풀렸을 때 가장 오래 기다린 쓰레드가 lock을 획득할 수 있게
즉, 공정하게 처리한다. 그러나 공정하게 처리하려면 어떤 쓰레드가 가장 오래 기다렸는지 확인하는 과정을 거칠 수 밖에 없으므로 성능은 떨어진다.
- void lock() : lock을 잠근다.
- void unlock() : lock을 해지한다.
- boolean isLocked() : lock이 잠겼는지 확인한다.
- boolean tryLock() : lock이 걸려있으면 lock을 얻으려고 기다리지 않음. lock을 얻으면 true, 얻지못하면 false 반환 (응답성이 중요한 경우 lock을 얻지못하면 다시 작업을 시도할 것인지 포기할것인지 사용자가 결정할수있게 할때 사용)
- boolean tryLock(long timeout, TimeUnit unit) : 지정된 시간만큼만 기다림
: 자동으로 lock의 잠금과 해제가 관리되는 synchronized블럭과 달리 lock클래스들은 수동으로 lock을 잠그고 해제해야한다.
ReentrantLock lock = new ReentrantLock(); lock.lock(); try { //임계영역 } finally { lock.unlock(); }
임계 영역 내에서 예외가 발생하거나 return문으로 빠져나가게되면 lock이 풀리지 않을 수 있으므로
unlock()은 try-finally문으로 감싸는 것이 일반적이다.
대부분의 lock() & unlock() 대신 synchronized 블럭을 사용할수 있으며, 그럴 땐 그냥 synchronized 블럭을 사용하는 것이 더 나을 수 있다.
ReentrantLock과 Condition
- 쓰레드 각자의 Condition을 만들어서 각각의 waiting pool에서 따로 기다리게 함
- 이미 생성된 lock으로부터 newCondition()을 호출하여 생성
private ReentrantLock lock = new ReentrantLock(); //lock으로 Condition생성 private Condition forCook = lock.newCondition(); private Condition forCust = lock.newCondition();
- wait() & notify() 대신 Condition의 await() & signal() 사용
ex2에 Condition 적용
private ReentrantLock lock = new ReentrantLock(); private Condition forCook = lock.newCondition(); private Condition forCust = lock.newCondition(); public void add(String dish) { lock.lock(); try { while (dishes.size() >= MAX_FOOD) { String name = Thread.currentThread().getName(); System.out.println(name + " is waiting."); try { forCook.await(); //COOK 쓰레드를 기다리게함 Thread.sleep(500); } catch (InterruptedException e) {} } dishes.add(dish); forCust.signal(); //기다리고있는 CUST 쓰레드 깨움 System.out.println("Dishes:" + dishes.toString()); } finally { lock.unlock(); } }
→ 쓰레드의 대기와 통지의 대상이 명확히 구분됨.
fork & join 프레임워크
하나의 작업을 작은 단위로 나눠서 여러 쓰레드가 동시에 처리하는 것을 쉽게 만들어준다.
수행할 작업에 따라 RecursiveAction, RecursiveTask 두 클래스 중 하나를 상속 받아 compute() 구현
- RecursiveAction : 반환값이 없는 작업 구현
- RecursiveTask : 반환값이 있는 작업 구현
다음처럼 쓰레드풀과 수행할 작업을 생성하고, invoke()로 작업을 시작한다.
쓰레드를 시작할때 run()이 아닌 start()를 호출하는것처럼
fork&join 프레임웍으로 수행할 작업도 compute()가 아닌 invoke()로 시작한다.
ForkJoinPool pool = new ForkJoinPool(); //쓰레드 풀 생성 SumTask task = new SumTask(from, to); //수행할 작업 생성 Long result = pool.invoke(task) //작업 시작
ForkJoinPool은 지정된 수의 쓰레드를 생성해서 미리 만들어 놓고 반복해서 재사용할수 있게한다.
수행해야하는 작업이 담긴 큐를 제공하며, 각 쓰레드는 자신의 작업 큐에 담긴 작업을 순서대로 처리.
장점
- 쓰레드를 반복해서 생성하지 않아도됨
- 너무많은 쓰레드가 생성되어 성능이 저하되는것을 막아줌 (기본적으로 코어 개수와 동일한 개수의 쓰레드를 생성)
compute()의 구현
수행할 작업 외에도 작업을 어떻게 나눌 것인가에 대해서도 알려줘야한다.
🌟 구조가 일반적인 재귀호출 메서드와 동일함
public Long compute() { long size = to - from + 1; //from <= i <= to if(size <= 5) //더할 숫자가 5개 이하면 return sum(); // from부터 to까지 숫자의 합을 반환 //범위를 반으로 나누어 두개의 작업을 생성 long half = (from+to)/2; SumTask leftSum = new SumTask(from, half); SumTask rightSum = new SumTask(half+1,to); leftSum.fork(); //작업(leftSum)을 작업 큐에 넣는다. return rightSum.compute() + leftSum.join(); }
다른 쓰레드의 작업 훔쳐오기
fork()가 호출되어 작업 큐에 추가된 작업 역시, compute()에 의해 더 이상 나눌 수 없을때까지 반복해서 나뉘고, 자신의 작업 큐가 비어있는 쓰레드는 다른 쓰레드의 작업 큐에서 작업을 가져와서 수행한다. 이것을 작업 훔쳐오기(work stealing)라고 하며, 이 과정은 모두 쓰레드풀에 의해 자동적으로 이루어진다. 이 과정을 통해, 여러 쓰레드가 골고루 작업을 나누어 처리하게 된다.
fork()와 join()
fork()는 작업을 쓰레드의 작업 큐에 넣는것. 작업 큐에 들어간 작업은 compute()로 더이상 나눌 수 없을때까지 나뉜다. 나눠진 작업은 각 쓰레드가 골고루 나눠서 처리하고, 작업의 결과는 join()을 호출해서 얻을 수 있다.
- fork() : 해당 작업을 쓰레드 풀의 작업 큐에 넣는다. 비동기메서드
- join() : 해당 작업의 수행이 끝날 때까지 기다렸다가, 수행이 끝나면 그 결과를 반환. 동기메서드
'DEV > JAVA' 카테고리의 다른 글