직업은 선물 트레이더

[과제]고급프로그래밍. 멀티쓰레드 시뮬레이션

잊어버린 과거

이번 과제는 뭐랄까.. 너무 오랜만에 봐서 그런지 기억이 안난달까?

 

나름 당시에는 열심히 한다고 했던 과제같다. 사실 쓰레드 다루는 것과 관련해서는 이전에 컴퓨터네트워크 강의에서 거의 다 배우긴 했다. 그래서 나름 쉽다쉽다 하면서 했던 것 같은데, 지금 보니까 어째 완성도 면에서는 영...

 

플로우 차트부터 보는 게 이해가 빠를 듯 싶다. Producer 쓰레드가 만들고 Consumer 쓰레드가 소비하는 상황에서 중간에 어케어케 해가지고 사용자 입력값이 큐에 들어가고 그러면 그걸 Monitor 쓰레드가 출력했던가.. 그냥 쓰레드 사용정도와 관련된 시뮬레이션 프로그램이다.

 

이번 것도 유닉스/리눅스 계열의 운영체제에서 컴파일 후 돌리면 된다.

 

 

(2012. 11. 12(월))



Producer/Consumer/Monitor 

Thread Program


- Design & execution -









Dept. Computer Engineering

2010151035

장용하

 

 


1. Design

  1.1 Data Structure Modeling


  Producer/Consumer/Monitor Thread 프로그램에서의 자료구조는 10개의 데이터가 들어갈 수 있는 배열입니다. 이 배열은 선입선출구조의 Queue를 구현하며 순환(Circular)될 수 있습니다. 다음의 <그림 1>은 이 자료구조를 보이고 있습니다.


 

 

<그림 1>


이 자료구조는 ReadPointer(rp) 와 WritePointer(wp) 두 가지를 통해 각 데이터의 위치를 표현하는데, rp는 다음의 읽은 데이터의 위치를 그리고 wp는 다음에 쓸 (빈)공간의 위치를 나타냅니다. 실제로 wp가 위치하는 곳에 데이터가 있을지 몰라도 rp와 wp의 상대적인 위치를 고려하여 필요 없는 자료로 판단될 경우 데이터를 덮어씌우는 식으로 진행됩니다.


  실제 코드상에서는 다음 <그림 2>와 같이 표현됩니다. Int형으로 지정한 이유는 어느정도 적정한 범위를 가진 수치형 데이터이기 때문입니다.




 

 

<그림 2>



이 자료구조는 실제로 전역변수로 선언되어있어서 쓰레드들의 접근이 용이합니다.


 

  1.2 Operation Modeling


이 프로그램에서 제공하는 대표적인 함수들은 다음과 같습니다.


Operation

Role

Producer_thread1()

 1부터 100까지의 숫자를 Circular Queue에 순서대로 저장합니다. 이 때 Queue가 꽉 차면 Queue가 완전히 빌 때까지 기다립니다.

Producer_thread2()

 standard input으로부터 사용자가 입력한 숫자(=0이상의 양의 정수)를 받아 100을 더한 뒤 Circular Queue에 저장하는 기능을 수행합니다.

Consumer_thread()

Circular Queue에 저장된 값을 읽어서 출력하는 기능을 수행합니다. 이 때 Queue가 비어있으면 완전히 찰 때까지 기다립니다.

Monitor_thread()

consumer가 standard input의 데이터를 읽을 때마다 깨어나서 해당 데이터 값을 출력하는 기능을 수행합니다.

 




 1.2.1 Definition of Major Operations


Producer_thread1()

 형  식

 void *Producer_thread1(void *param);

 리턴값

 없음

 설  명

 wp가 가리키는 순환큐의 위치에 숫자를 넣습니다. 숫자는 1부터 100까지 있으며, 1부터 증가하는 순으로 들어갑니다. 그리고 wp의 위치를 하나 증가시킵니다. 그러나 wp의 위치가 큐 밖을 가리키지는 않도록 조절됩니다. wp와 순환큐에 접근에는 mutex_lock 과 unlock을 통해 상호배제가 지켜집니다.

 복잡도

 O(1)


Producer_thread2()

 형  식

 void *Producer_thread2(void *param);

 리턴값

 없음

 설  명

 wp가 가리키는 순환큐의 위치에 숫자를 넣습니다. 여기서 들어가는 숫자는 사용자가 입력하는 (양의 정수의 값+100)입니다. 그리고 wp의 위치를 하나 증가시킵니다. 그러나 wp의 위치가 큐 밖을 가리키지는 않도록 조절됩니다. wp와 순환큐에 접근에는 mutex_lock 과 unlock을 통해 상호배제가 지켜집니다.

 복잡도

 O(1)


Consumer_thread2()

 형  식

 void *Consumer_thread(void *param)

 리턴값

 없음

 설  명

 게가 가리키는 순환큐의 위치에 숫자를 Standard Output에 출력합니다. 순환큐에 데이터가 거의 꽉 찼을 경우 거의 바닥이 드러날 때 까지 출력을하며 출력이후 rp의 위치를 하나 증가시킵니다. 그러나 rp의 위치가 큐 밖을 가리키지는 않도록 조절됩니다. 그러나 사용자가 입력한 데이터에 대해서는 pthread_cond_signal()함수를 호출하여 즉시 다음번의 일어날 반응이 시작되도록 돕습니다. rp와 순환큐에 접근에는 mutex_lock 과 unlock을 통해 상호배제가 지켜집니다.

 복잡도

 O(1)


Monitor_thread()

 형  식

 void *Monitor_thread(void *param)

 리턴값

 없음

 설  명

 Consumer_thread()함수에 의해 pthread_cond_signal() 함수가 호출되면 즉시 반응하여 순환큐에 있던 데이터를 출력합니다. 이 데이터는 사용자가 입력한 정수+100의 값입니다. 당연히 이 순환큐에 접근할 때에는 mutex_lock 과 unlock을 통해 상호배제가 지켜집니다.

 복잡도

 O(1)

 

 

1.3 Flow-chart Modeling


  이 프로그램의 순서도는 다음의 <그림 3>과 같습니다. 프로그램이 진행되는 흐름에 따라 화살표를 배치하였습니다. 화살표 위에 있는 4개의 이름은 병렬적으로 진행되는 4개의 쓰레드를 의미하며, 이들은 뮤텍스를 통해 상호배제를 보장받습니다.



<그림 3>

 


2. Code


#include <stdio.h>

#include <pthread.h>


void *Producer_thread1(void *param);

void *Producer_thread2(void *param);

void *Consumer_thread(void *param);

void *Monitor_thread(void *param);

int isEmpty();

int isFull();


#define bufSize 10


int queue[10];

int rp=0;

int wp=1;

int picked;

pthread_mutex_t mutx;

pthread_cond_t cond;


int main() {

    int param;

    int state;

    void * thread_return;


    pthread_t Producer[2], Consumer, Monitor;


    state = pthread_create(&Producer[0], NULL, Producer_thread1, (void*)&param);

    if(state) {

            printf("(Notice)Error is occured : %d \n", state); return 0;

    }

    

    state = pthread_create(&Producer[1], NULL, Producer_thread2, (void*)&param);

    if(state) {

            printf("(Notice)Error is occured : %d \n", state); return 0;

    }


    state = pthread_create(&Consumer, NULL, Consumer_thread, (void*)&param);

    if(state) {

            printf("(Notice)Error is occured : %d \n", state); return 0;

    }


    state = pthread_create(&Monitor, NULL, Monitor_thread, (void*)&param);

    if(state) {

            printf("(Notice)Error is occured : %d \n", state); return 0;

    }

    pthread_join(Producer[0], &thread_return);

    pthread_join(Producer[1], &thread_return);

    pthread_join(Consumer, &thread_return);

    pthread_join(Monitor, &thread_return);

    return 0;

}

void *Producer_thread1(void *param) {

    int num=1;

    int i=0;


    while(1) {

            

            if(isEmpty()) { // queue가 완전히 비면

                    while(!isFull()) {


                            pthread_mutex_lock(&mutx);

                            queue[wp]= num;

                            wp = (wp+1)%bufSize;

                            pthread_mutex_unlock(&mutx);

                            printf("[num] : %d\n", num);


                            num = num + 1;

                            sleep(1);

                            if(num >100)

                                    return 0;

                    }

            }

    }

}


void *Producer_thread2(void *param) {

    int number;


    while(1) {


            printf("\n[Insert Number(with larger Integer than 0) : \n");

            scanf("%d", &number);


            number = number + 100;


            pthread_mutex_lock(&mutx);

            queue[wp] = number;

            wp = (wp+1)%bufSize;

            pthread_mutex_unlock(&mutx); 

    }

}


void *Consumer_thread(void *param) {

    while(1) {

            if(isFull()) { //꽉차면 소비

                    while(!isEmpty()) { // 거의 바닥날 때 까지 소비

                    

                            pthread_mutex_lock(&mutx);

                            

                            if(queue[rp] > 100) {

                                    picked = rp;

                                    //pthread_cond_signal(&cond);

                            }

                            

                            printf("queue[%d] : %d | [wp] : %d, | [rp] : %d\n", rp, queue[rp], wp, rp);

                            rp = (rp+1)%bufSize;

                            pthread_mutex_unlock(&mutx);


                            sleep(1);

                    }

            }

    }

}


void *Monitor_thread(void *param) {


    while(1) {

            pthread_mutex_lock(&mutx);


            pthread_cond_wait(&cond, &mutx);

            printf("\nThis number : %d\n", queue[picked]);


            pthread_mutex_unlock(&mutx);

    }

}


int isEmpty() {

    if((rp+1)%bufSize == wp)

            return 1;

    else

            return 0;

}

int isFull() {

    if(rp == (wp+1)%bufSize)

            return 1;

    else

            return 0;

}

 

 

2. Excution

  1.1 실패한 경우[1]

 

  다음의 <그림 4>는 이 프로그램을 일부러 오류가 나도록 조치한 프로그램의 캡쳐 화면입니다. Thread Synchronization Tool을 인위적으로 제거했을 때의 모습입니다.

 

 


<그림 4>


  Synchronization Tool이 없어지자 Monitor Thread가 무분별하게 무제한적으로 실행되면서 다른 쓰레드의 진행부분을 전부 가리게 됩니다. 그리고 어느 정도 시간이 지나면 넣지도 않은 수치가(= <그림 4>에서는 10) 인위적으로 대입되는 모습 또한 포착되었습니다.

 


  1.2 실패한 경우[2]


  다음의 <그림 5>는 Monitor Thread를 잘 제어해서 <그림 4>의 문제를 없앴을 때 나머지 쓰레드들의 동작모습을 보이고 있습니다.


 

 

<그림 5>


  [num]은 Producer1 쓰레드가 큐에 삽입한 값이고, queue[x]=xx [rp]=xx [wp]=xx 는 Consumer 쓰레드가 출력한 값입니다.

  <그림 5>에서 최초의 queue[3]= 11에서 시작되어 출력이 지속되는데, 12 다음 13이 나와야 하지만, 그림 이전에 삽입한 여러 수치가 있는데 그 수치들이 + 100되지 않고 기존의 Producer1에 의해 들어간 13이란 수치에 +100이 되어 출력된 상태를 보여주고 있습니다. 140도 마찬가지이고 150은 이전에 입력한 수치가 운 좋게 제대로 나온 경우입니다.


 


  1.3 성공한 경우


  다음의 <그림 6>은 성공한 경우를 보이고 있습니다. Producer1은 delay없이 자율적으로 실행되게 했으며 관찰의 용이성을 위해 Consumer thread 에는 sleep()함수로 delay를 주었습니다.

 

 

 <그림 6>