Pagine    Articoli    Prodotti    Forum    Cerca  
Nickname

Password


Non sei registrato?
Registrati a GPI qui!

Puoi anche attivare un vecchio utente GPI e chiedere una nuova password.
I Team

Mappa Team
I nostri utenti

Mappa Utenti
  C++11: Introduzione alla concorrenza, Producer/Consumer
Pubblicato da Dario Oliveri il 2013-03-13 20:33:58

In questo articolo vedremo come far comunicare tra di loro 2 processi paralleli (threads). In particolare utilizzermo il pattern Producer/Consumer. Ovvero faremo comunicare i threads in maniera indiretta: uno dei due thread si limiterà a "depositare messaggi" in un buffer (o MessageQueue), mentre l'altro thread si limiterà a "prelevare messaggi" da questo buffer.

 

 

L'accesso alla Message Queue, deve essere regolato in qualche modo:

 

-Utilizzando un Mutex (std::mutex).

-Utilizzando un separatore.

 

Delucidazioni sui Mutex:

 

I mutex sono oggetti (generalmente messi a disposizione dal Sistema Operativo), che permettono di accedere ad una risorsa condivisa in maniera ordinata. Il funzionamento è analogo a quello di un casello autostradale, chiunque chieda accesso al mutex deve mettersi in coda e aspettare che chi è davanti finisca il suo turno: attraverso il casello passa soltanto un'automobile alla volta. E' nostro interesse fare in modo che questa "automobile" resti nel casello meno tempo possibile, soprattutto non deve bloccare il casello.

 

L'utilizzo del mutex è semplice: quando vogliamo entrare nel casello, chiamiamo "lock": A questo punto possiamo accedere alla risorsa condivisa. Quando abbiamo finito chiamiamo "unlock" e proseguiamo oltre. Se nel frattempo qualcuno è entrato a sua volta nel casello (chiamando pure lui "lock" sullo stesso mutex), dovrà aspettare la nostra uscita. Se per qualche ragione ci dimentichiamo di chiamare "unlock", chi è in coda dietro di noi ci resterà per sempre (in realtà questo problema si può aggirare in maniera automatica, poi vedremo come).

mutex.lock();  //chiediamo l'accesso alla risorsa condivisa

buffer.access(); // facciamo qualcosa con il buffer

mutex.unlock(); //ricordiamoci di liberare la risorsa per gli altri

 

 

Implementazione:

 

Ok, visto che creare applicazioni multithreaded è molto difficile, dobbiamo metterci fin dall'inizio in condizioni vantaggiose. Come facciamo a sapere cosa succede dentro al nostro programma? Per saperlo l'unica è crearci una Logging Facility, ovvero un sistema che ci permetta di stampare a schermo dei messaggi (un sistema di loggin dovrebbe essere la vostra prima preoccupazione vista la sua importanza) anche in presenza di processi paralleli che vogliono stampare qualcosa a schermo.

 

#include < mutex >
#include < iostream >

/** Semplice sistema di loggin per fare debug e vedere cosa viene stampato.
*   Un mutex globale è utilizzato per scrivere messaggi su console (ricordo
*   che la console è una risorsa e ogni risorsa va protetta da un mutex in
*   presenza di processi paralleli che la usano).*/

#if 0  //E' possibile cambiare 0 con 1 per disattivare i messaggi di debug.
    #define DEBUG_LOG(X) (void(0))
#else
    std::mutex  GlobalMutex;

    #define DEBUG_LOG(x) \
        GlobalMutex.lock(); \
        std::cout << x << std::endl; \
        GlobalMutex.unlock();
#endif

Come vedete non si tratta d'altro che di un "std::cout" contornato dalla famosa coppia "lock" e "unlock" del mutex. Questo perchè la Console è una risorsa condivisa, e le risorse condivise vanno sempre protette da un mutex.

 

Ora che abbiamo un utility di debug possiamo creare la nostra MessageQueue, la soluzione più semplice è utilizzare un buffer circolare di dimensione prefissata.

typedef int  ProducerMessage;  /// per ora accontentiamoci di questo ;)

/*******************************************************************************
*   SharedBuffer, risorsa condivisa tra 2 thread per potersi scambiare messaggi.
*   Non è vietato utilizzarla per più di 2 thread, ma solitamente è più
*   conveniente utilizzare un buffer per ogni canale di comunicazione piuttosto
*   che utilizzare 1 buffer per tutte le comunicazioni.
*******************************************************************************/
struct SharedBuffer{
    std::mutex      casello;
    ProducerMessage messages[8];

    // possiamo inizializzare qui e risparmiarci il costruttore.
    int             separator = 0;
    int             reader = 0;
    int             writer = 0;

    //ritorna true se il messaggio è stato scritto in out_m
    bool readMessage(ProducerMessage & out_m){
        casello.lock(); //entro nel "casello autostradale"

        if(separator == 0){
            //DEBUG_LOG("No message to read!");
            casello.unlock(); //esco dal "casello"
            return false;
        }

        out_m = messages[reader]; //copio messaggio su out_m
        reader++; //aggiorno posizione del reader
        if(reader > 7)
            reader=0;

        separator --;

        casello.unlock(); //esco dal "casello"
        return true;
    }

    //ritorna true se il messaggio è stato scritto dentro il buffer
    bool writeMessage(ProducerMessage in){
        casello.lock(); //entro nel casello

        if(separator==7){
            //DEBUG_LOG("no empty space left");
            casello.unlock();
            return false;
        }

        messages[writer]=in; //scrivo il messaggio dentro al buffer
        writer++;
        if(writer > 7)
            writer=0;

        separator++;

        casello.unlock();
        return true; //esco dal casello
    }
};

 

 

Come vedete ci sono solo 2 metodi pubblici in questa classe e sono entrambi protetti dal mutex: Si dice allora che SharedBuffer è Thread-safe (E' garantito che soltanto un thread alla volta possa accedere alla stessa risorsa, in questo caso la risorsa è un oggetto SharedBuffer).

 

Secondo voi anche DEBUG_LOG(x) è thread safe oppure no? perchè? (aiutino: se DEBUG_LOG(x) venisse chiamato da più threads alla volta, ci garantisce che soltanto uno di quei threads possa stampare "x" su console in un dato momento?)

 

Producer:

 

Adesso che abbiamo la MessageQueue e DEBUG_LOG(x), possiamo creare il Producer. Ci basta un producer semplice che scriva un po di messaggi casuali e poi termini l'esecuzione generando il "messaggio n° 5" perchè vogliamo che "5" faccia terminare il programma.

#include  //serve per "rand()".
class Producer{

    SharedBuffer * buffer;
public:

    /** Costruttore */
    Producer(SharedBuffer * abuffer):
        buffer(abuffer){   }

    void run(){
        for(int i=0; i<16; i++){
            int r = std::rand()%2; //numero casuale ritorna 0 oppure 1.
            ProducerMessage mess = r;
            switch(r){
                case 0:
                    DEBUG_LOG("producer -> 0"); break;
                case 1:
                    DEBUG_LOG("producer -> 1"); break;
            }
            //Attesa attiva che si liberi il buffer dei messaggi. In un
            //codice reale si può imporre un limite alla durata dell'attesa
            while(!buffer->writeMessage(mess)){}
        }

        // Questo messaggio chiude il consumer.
        DEBUG_LOG("producer -> 5");
        while(!buffer->writeMessage(5)){}

    }
};

 

 

 

 

 E ora il codice del consumer:

class Consumer{

    SharedBuffer * buffer;
public:

    /** Costruttore*/
    Consumer(SharedBuffer * abuffer):
        buffer(abuffer){  }

    void run(){
        ProducerMessage mess;
        while(mess!=5){
            if(buffer->readMessage(mess)){
              switch(mess){
                case 0:
                    DEBUG_LOG("consumed 0!"); break;
                case 1:
                    DEBUG_LOG("consumed 1!"); break;
              }
            }
        }
        DEBUG_LOG("consumed Quit");
    }

};

 

 

Questo è il main:

#include 
int main(){
    SharedBuffer buffer;   //creo la risorsa condivisa
    Producer B(&buffer);   //passo l'indirizzo della risorsa al producer
    Consumer C(&buffer);   //passo l'indirizzo della risorsa al consumer
    std::thread consumerThread(&Consumer::run,&C); //faccio partire i threads
    std::thread producerThread(&Producer::run,&B);
    consumerThread.join(); //aspetto il termine dei threads.
    producerThread.join();
    DEBUG_LOG("THE END");
    return 0;
}

 

 

Un esempio di output:

producer -> 1
producer -> 1
producer -> 0
producer -> 0
producer -> 1
producer -> 0
producer -> 0
producer -> 0
consumed 1!
producer -> 0
consumed 1!
producer -> 0
consumed 0!
producer -> 1
consumed 0!
producer -> 1
consumed 1!
producer -> 1
consumed 0!
consumed 0!
consumed 0!
consumed 0!
consumed 0!
consumed 1!
consumed 1!
consumed 1!
producer -> 1
consumed 1!
producer -> 1
producer -> 1
consumed 1!
producer -> 5
consumed 1!
consumed Quit
THE END

Ovviamente l'output cambia di volta in volta, l'unica cosa che viene garantita è che i messaggi escono nello stesso ordine in cui sono entrati (First In First Out). Potete scaricare il codice sorgente da pastebin:

QUI

 

 

Come esercizio vi invito profondamente a riflettere sul codice di questo articolo, c'è da qualche parte un pezzo di codice o una funzione che accede a variabili "potenzialmente condivise" e che potrebbe quindi causare problemi? Dove si verifica questo potenziale problema? Come risolvereste?  (Forum C++)

 

Si accettano suggerimenti, correzioni e soprattutto suggerimenti su dove tagliare per accorciare l'articolo :).

 

Campagne crowfunding

Just One Line
Siamo presenti su

     
Copyright ©2016 - Manifesto - Privacy - Termini di Servizio - Community - Collaboratori - Contattaci