producer consumer problem
producer : 데이터를 생성하고 버퍼에 집어넣는다. 동시에 여러 생산자가 데이터를 생산하고 버퍼에 푸쉬할 수 있다.
consumer : 버퍼에서 데이터를 꺼내고 소비한다. 한번에 한명씩 버퍼에서 데이터를 꺼내야 한다.
고정된 크기의 버퍼가 있다. queue로 구현되어 있다. 버퍼가 꽉차면 producer는 대기하고 버퍼가 비어있으면 consumer는 대기한다.
이런식으로 버퍼에 enqueue, dequeue에 synchronized object를 사용해서 상호배제를 구현하면 된다. 그리고 각 쓰레드는 while문을 돌면서 계속 enqueue, dequeue가 가능한지 물어보면된다.
하지만 이 방식은 계속해서 쓰레드들이 cpu 자원을 소모하기 때문에 그리 좋은 방법은 아니다. (busy waiting)
따라서 wait()를 사용해서 자원을 기다려야 하는 thread를 block상태로 만들고 나중에 자원이 available해지면 notifyAll()로 잠자는 모든 쓰레드를 깨워주도록 하자.
while()문 이후에 enqueue에서는 버퍼가 비었을 때, notifyAll() 로 잠자고 있는 모든 쓰레드를 깨운다. (그 중 잠자고 있던 모든 생산자들을 깨우기 위해)
while()문 이후에 dequeue에서는 버퍼가 꽉 찼을 때, notifyAll() 로 잠자고 있는 모든 쓰레드를 깨운다. (그 중 잠자고 있던 모든 소비자들을 깨우기 위해)
* if문을 사용하면 위같은 문제가 있다. thread1이 기다렸다가 thread2의 깨움을 받고 버퍼에 데이터를 추가하려고 하는데 그 사이에 thread3이 버퍼를 써버렸을 수도 있다. 즉 if->wait을 통과한 시점에 다른 쓰레드가 진입할 수도 있으니 항상 while문으로 조건을 재확인하도록 하자.
Example
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class NoLockTest {
public static void main(String[] args) {
CounterLock c_lock = new CounterLock();
int inc_num = 10001234;
int dec_num = 10000000;
long start = System.currentTimeMillis();
Thread p = new Thread (new Producer(c_lock, inc_num));
p.start();
Thread c = new Thread (new Consumer(c_lock, dec_num));
c.start();
try {
p.join();
} catch (InterruptedException e) {}
try {
c.join();
} catch (InterruptedException e) {}
long finish = System.currentTimeMillis();
System.out.println(inc_num+" inc() calls, "+dec_num+" dec() calls = " + c_lock.getCount());
System.out.println("No-Lock time: "+(finish-start)+"ms");
}
}
class Producer implements Runnable{
private CounterLock myCounter;
int num;
public Producer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.inc();
}
}
}
class Consumer implements Runnable{
private CounterLock myCounter;
int num;
public Consumer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.dec();
}
}
}
class CounterLock {
private long count = 0;
public void inc() {
this.count++;
}
public void dec() {
this.count--;
}
public long getCount() {
return this.count;
}
}
concurrency가 전혀 적용되지 않은 생산자 소비자 문제이다. lock, synchronized. semaphore, atomictest를 사용하여 동기화를 적용시켜본다.
Lock
package test;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
public static void main(String[] args) {
CounterLock c_lock = new CounterLock();
int inc_num = 10001234;
int dec_num = 10000000;
long start = System.currentTimeMillis();
Thread p = new Thread (new Producer(c_lock, inc_num));
p.start();
Thread c = new Thread (new Consumer(c_lock, dec_num));
c.start();
try {
p.join();
} catch (InterruptedException e) {}
try {
c.join();
} catch (InterruptedException e) {}
long finish = System.currentTimeMillis();
System.out.println(inc_num+" inc() calls, "+dec_num+" dec() calls = " + c_lock.getCount());
System.out.println("With-Lock Time: "+(finish-start)+"ms");
}
}
class Producer implements Runnable{
private CounterLock myCounter;
int num;
public Producer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.inc();
}
}
}
class Consumer implements Runnable{
private CounterLock myCounter;
int num;
public Consumer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.dec();
}
}
}
class CounterLock {
private long count = 0;
private Lock lock = new ReentrantLock();
public void inc() {
try {
lock.lock();
this.count++;
} finally {
lock.unlock();
}
}
public void dec() {
try {
lock.lock();
this.count--;
} finally {
lock.unlock();
}
}
public long getCount() {
try {
lock.lock();
return this.count;
} finally {
lock.unlock();
}
}
}
CounterLock 클래스는 lock을 가지고 있다. Producer, Consumer가 counterlock에 동시에 접근한다. counterlock 객체의 inc(), dec(), getCount()를 사용하려면 lock을 시켜서 한개의 쓰레드만 사용하고 있음을 보장해준다. 성공적으로 작업을 마치면 unlock으로 다른 쓰레드들을 깨워준다.
Synchronized method
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SyncMethodTest {
public static void main(String[] args) {
CounterLock c_lock = new CounterLock();
int inc_num = 10001234;
int dec_num = 10000000;
long start = System.currentTimeMillis();
Thread p = new Thread (new Producer(c_lock, inc_num));
p.start();
Thread c = new Thread (new Consumer(c_lock, dec_num));
c.start();
try {
p.join();
} catch (InterruptedException e) {}
try {
c.join();
} catch (InterruptedException e) {}
long finish = System.currentTimeMillis();
System.out.println(inc_num+" inc() calls, "+dec_num+" dec() calls = " + c_lock.getCount());
System.out.println("No-Lock time: "+(finish-start)+"ms");
}
}
class Producer implements Runnable{
private CounterLock myCounter;
int num;
public Producer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.inc();
}
}
}
class Consumer implements Runnable{
private CounterLock myCounter;
int num;
public Consumer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.dec();
}
}
}
class CounterLock {
private long count = 0;
public synchronized void inc() {
this.count++;
}
public synchronized void dec() {
this.count--;
}
public synchronized long getCount() {
return this.count;
}
}
생산자, 소비자가 동시에 접근하고자 하는 counterlock 객체의 모든 메서드는 synchronized되어 있다. 따라서 저 메서드에는 한번에 한 쓰레드만 들어가 있음이 보장된다.
synchronized object
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SyncObjTest {
public static void main(String[] args) {
CounterLock c_lock = new CounterLock();
int inc_num = 10001234;
int dec_num = 10000000;
long start = System.currentTimeMillis();
Thread p = new Thread (new Producer(c_lock, inc_num));
p.start();
Thread c = new Thread (new Consumer(c_lock, dec_num));
c.start();
try {
p.join();
} catch (InterruptedException e) {}
try {
c.join();
} catch (InterruptedException e) {}
long finish = System.currentTimeMillis();
System.out.println(inc_num+" inc() calls, "+dec_num+" dec() calls = " + c_lock.getCount());
System.out.println("No-Lock time: "+(finish-start)+"ms");
}
}
class Producer implements Runnable{
private CounterLock myCounter;
int num;
public Producer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.inc();
}
}
}
class Consumer implements Runnable{
private CounterLock myCounter;
int num;
public Consumer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.dec();
}
}
}
class CounterLock {
private long count = 0;
public void inc() {
synchronized (this) {
this.count++;
}
}
public void dec() {
synchronized (this) {
this.count--;
}
}
public synchronized long getCount() {
synchronized (this) {
return this.count;
}
}
}
sychronized (생산자, 소비자가 동시에 접근하려는 객체) 저 critical section에 한번에 한 쓰레드만 접근함을 보장한다.
semaphore
import java.util.concurrent.*;
public class SemaphoreTest {
public static void main(String[] args) {
CounterLock c_lock = new CounterLock();
int inc_num = 10001234;
int dec_num = 10000000;
long start = System.currentTimeMillis();
Thread p = new Thread (new Producer(c_lock, inc_num));
p.start();
Thread c = new Thread (new Consumer(c_lock, dec_num));
c.start();
try {
p.join();
} catch (InterruptedException e) {}
try {
c.join();
} catch (InterruptedException e) {}
long finish = System.currentTimeMillis();
System.out.println(inc_num+" inc() calls, "+dec_num+" dec() calls = " + c_lock.getCount());
System.out.println("With-Lock Time: "+(finish-start)+"ms");
}
}
class Producer implements Runnable{
private CounterLock myCounter;
int num;
public Producer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.inc();
}
}
}
class Consumer implements Runnable{
private CounterLock myCounter;
int num;
public Consumer(CounterLock x, int Num) {
this.num=Num;
this.myCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myCounter.dec();
}
}
}
class CounterLock {
private long count = 0;
private Semaphore sema = new Semaphore(1);
public void inc() {
try {
sema.acquire();
this.count++;
} catch(InterruptedException e) {
} finally {
sema.release();
}
}
public void dec() {
try {
sema.acquire();
this.count--;
} catch(InterruptedException e) {
} finally {
sema.release();
}
}
public long getCount() {
try {
sema.acquire();
return this.count;
} catch(InterruptedException e) {
return -1;
} finally {
sema.release();
}
}
}
AtomicTest
import java.util.concurrent.atomic.*;
public class AtomicTest {
public static void main(String[] args) {
AtomicInteger atomic_int = new AtomicInteger(0);
int inc_num = 10001234;
int dec_num = 10000000;
long start = System.currentTimeMillis();
Thread p = new Thread (new Producer(atomic_int, inc_num));
p.start();
Thread c = new Thread (new Consumer(atomic_int, dec_num));
c.start();
try {
p.join();
} catch (InterruptedException e) {}
try {
c.join();
} catch (InterruptedException e) {}
long finish = System.currentTimeMillis();
System.out.println(inc_num+" inc() calls, "+dec_num+" dec() calls = " + atomic_int.get());
System.out.println("With-Lock Time: "+(finish-start)+"ms");
}
}
class Producer implements Runnable{
private AtomicInteger myAtomicCounter;
int num;
public Producer(AtomicInteger x, int Num) {
this.num=Num;
this.myAtomicCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myAtomicCounter.incrementAndGet(); // myAtomicCounter++
}
}
}
class Consumer implements Runnable{
private AtomicInteger myAtomicCounter;
int num;
public Consumer(AtomicInteger x, int Num) {
this.num=Num;
this.myAtomicCounter = x;
}
@Override
public void run() {
for (int i = 0; i < num; i++) {
myAtomicCounter.decrementAndGet(); // myAtomicCounter--
}
}
}
한번에 한 쓰레드만 접근함을 보장해주는 atomicType 변수를 활용하는 방법이다.
'ComputerScience > Multi-core Computing' 카테고리의 다른 글
멀티코어컴퓨팅 - 6. Concurrent Programming (0) | 2022.04.16 |
---|---|
멀티코어컴퓨팅 - 5. Java Concurrency Utilities (0) | 2022.04.14 |
멀티코어컴퓨팅 - 3. Programming JAVA threads (0) | 2022.03.31 |
멀티코어컴퓨팅 - 2. Performance of Parallel Programs (0) | 2022.03.22 |
멀티코어컴퓨팅 - 1. Introduction to Multicore Computing (0) | 2022.03.08 |