condicion de carrera c++ threads

Iniciado por julio1, 3 Abril 2019, 18:35 PM

0 Miembros y 1 Visitante están viendo este tema.

julio1

Estoy mirando el problema del productor/consumidor con threads en c+11. Pues bien, el siguiente código me da un problema con helgrind detecta una condicion de carrera y debuggeando un poco creo que el problema esta en la funcion consumer pero no estoy seguro y no se como resolverlo.
#include <iostream>
#include <sstream>
#include <vector>
#include <stack>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
using namespace std;

// print function for "thread safe" printing using a stringstream
void print(ostream& s) { cout << s.rdbuf(); cout.flush(); s.clear(); }

const int num_producers = 5;
const int num_consumers = 10;
const int producer_delay_to_produce = 10;   // in miliseconds
const int consumer_delay_to_consume = 30;   // in miliseconds
const int consumer_max_wait_time = 200;     // in miliseconds - max time that a consumer can wait for a product to be produced.
const int max_production = 10;              // When producers has produced this quantity they will stop to produce
const int max_products = 10;                // Maximum number of products that can be stored

atomic<int> num_producers_working(0);       // When there's no producer working the consumers will stop, and the program will stop.
stack<int> products;                        // The products stack, here we will store our products
mutex xmutex;                               // Our mutex, without this mutex our program will cry

condition_variable is_not_full;             // to indicate that our stack is not full between the thread operations
condition_variable is_not_empty;            // to indicate that our stack is not empty between the thread operations


void produce(int producer_id)
{
       unique_lock<mutex> lock(xmutex);
       int product;

       is_not_full.wait(lock, [] { return products.size() != max_products; });
       product = products.size();
       products.push(product);

       print(stringstream() << "Producer " << producer_id << " produced " << product << "\n");
       is_not_empty.notify_one();
}

//      Consume function, consumer_id will consume a product
void consume(int consumer_id)
{
       unique_lock<mutex> lock(xmutex);
       int product;

       if(is_not_empty.wait_for(lock, chrono::milliseconds(consumer_max_wait_time),
               [] { return products.size() > 0; }))
       {
               product = products.top();
               products.pop();

               print(stringstream() << "Consumer " << consumer_id << " consumed " << product << "\n");
               is_not_full.notify_one();
       }
}

void producer(int id)
{
       ++num_producers_working;
       for(int i = 0; i < max_production; ++i)
       {
               produce(id);
               this_thread::sleep_for(chrono::milliseconds(producer_delay_to_produce));
       }

       print(stringstream() << "Producer " << id << " has exited\n");
       --num_producers_working;
}

void consumer(int id)
{
       // Wait until there is any producer working
       while(num_producers_working == 0) this_thread::yield();

       while(num_producers_working != 0 || products.size() > 0)
       {
               consume(id);
               this_thread::sleep_for(chrono::milliseconds(consumer_delay_to_consume));
       }

       print(stringstream() << "Consumer " << id << " has exited\n");
}

int main()
{
       vector<thread> producers_and_consumers;

       // Create producers
       for(int i = 0; i < num_producers; ++i)
               producers_and_consumers.push_back(thread(producer, i));

       // Create consumers
       for(int i = 0; i < num_consumers; ++i)
               producers_and_consumers.push_back(thread(consumer, i));

       // Wait for consumers and producers to finish
       for(auto& t : producers_and_consumers)
               t.join();
}

Loretz

No tengo el valgrind y estoy con el Visual Studio, pero veo dos cosas:

1) [que no tiene nada que ver con tu pregunta, pero...] La función void print(ostream& s) no debería poder invocarse con un argumento temporal (un prvalue), como en
print(stringstream() << "Producer " << producer_id << " produced " << product << "\n");
Por alguna extraña razón tu compilador lo permite. Supongo que puedes sobrecargar la función print con una versión que acepte una rvalue reference:
Citarvoid print(ostream&& s) {
   cout << s.rdbuf() << flush;
   s.clear();
}


Y 2) en
Código (cpp) [Seleccionar]
is_not_full.wait(lock, [] { return products.size() != max_products; });
el predicado
Citarproducts.size() != max_products;
no garantiza que el lock no se levante debido a un wakeup espurio, haciendo que se intente ejecutar el push() por dos o más threads simultáneamente.


julio1

#2
Con respecto a la primera estoy de acuerdo contigo, la verdad es que no me había fijado, es curioso porque no estoy en un compilador raro estoy usando g++ en su versión 7, y porque g++ permitirá esto? Si mal no recuerdo al tomar una referencia el parametro de una función se puede llamar como un rvalue si el parámetro es const.
Y yendo a la segunda nunca había oído hablar de un wakeup espurio.

Loretz

Citarwait causes the current thread to block until the condition variable is notified or a spurious wakeup occurs,

https://en.cppreference.com/w/cpp/thread/condition_variable/wait

julio1

Gracias, lo miro ahora. Lo que dijiste de la funcion print en el visual studio te da error al compilar o te compila sin problemas?

Loretz

CitarLo que dijiste de la funcion print en el visual studio te da error al compilar o te compila sin problemas?
Me da error al compilar.

julio1

#6
He solucionado el problema de la condición de carrera, helgrind ya no se queja, la culpa la tenía el método size en la línea while(num_producers_working != 0 || products.size() > 0)
la solución más simple pasa por meter a size dentro de una sección crítica:
size_t stack_size(const stack<int> &obj ){
   lock_guard<mutex> guardian(xmutex);
   return obj.size();

}

Modificando así la línea anterior ahora: while(num_producers_working != 0 || stack_size(products) > 0)

dijsktra

#7
Que bueno que la gente se meta con problemas de concurrente!

Cita de: julio1 en  4 Abril 2019, 00:00 AM
He solucionado el problema de la condición de carrera, helgrind ya no se queja, la culpa la tenía el método size en la línea while(num_producers_working != 0 || products.size() > 0)...

Cuidado! Yo no tengo ni idea de helgrind, no lo había oido en mi vida, pero la depuración en concurrente es algo impracticable! Necesitas un cálculo para razonar sobre la corrección de un programa, con invariantes, etc...
Conviene tener princpios sólidos sobre los que montar tus soluciones concurrentes.
Un buen libro: "Concurrent Programming. Gregory R. Andrews".
No vas a aprender nada de C++, pero si mucho de programaci'on concurrente!

Algunas observaciones a tu programa:


  • Productores consumidores se ejecutan indefinidamente. De otro modo, puedes llegar a bloqueos, (imagina 1 productor y 1000 consumidores, a 1 items cada uno... Es obvio que los consumidores se quedarán esperando indefinidamente
  • El buffer, o es un escalar, o un buffer limitado. Nada se gana con los stacks... Interesa FIFO, no LIFO
  • Nada se gana leyendo los manuales de la libreria <thread> o <condition variable> si no se conoce en que consisten los monitores



Código (cpp) [Seleccionar]

#include <cassert>
#include <iostream>
#include <vector> // as bounded buffer.
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <atomic>
#include <cassert>

#define MAX_THREADS 1000
#define SIZE_BUFFER 5
#define TIMEOUT  200

using namespace std;

// Patemeters overwritten on input.
static int P = 5; // Producers
static int C = 10; // Consumers
static int P_DELAY=1000 ; // Prod-Delay
static int C_DELAY=500;


// MONITOR VARIABLES
static mutex xmutex;                    // Our mutex, monitor methods in exclusion
static condition_variable if_full;             // to indicate that our stack is
// not full  between the thread operations
static condition_variable if_empty;            // to indicate that our stack is
// not empty between the thread operations

static int front = 0;
static int rear = 0;
static vector<int>  buf(SIZE_BUFFER);
static int holes = SIZE_BUFFER;
static int items = 0;
// Monitor invariants:
// inv. items =  (front "-" rear)
// inv. holes = SIZE_BUFFER - (front "-" rear)
// inv. 0 <= front < SIZE_BUFFER, 0 <= rear < SIZE_BUFFER

static atomic<int> m(0); // simulating a fresh message;

// MONITOR METHODS
void init()
{
 front=rear=items=0;
 holes = SIZE_BUFFER;
}

void deposit(const int id,const int msg)
{
unique_lock<mutex> lock(xmutex);

// await holes > 0
if_full.wait(lock, [] { return holes > 0; });
buf[rear] = msg; rear = ((rear+1) % SIZE_BUFFER);
cout << "p[" << id << "]\tdeposit\t:\t" << msg << endl;
holes--; items++;
if_empty.notify_all();
}

//      Consume function, consumer_id will consume a product
int fetch(const int id)
{
unique_lock<mutex> lock(xmutex);
int mess;
// await items > 0
if_empty.wait(lock, [] { return items > 0; });
mess = buf[front]; front = ((front+1) % SIZE_BUFFER);
cout << "c[" << id << "]\t\t\t\t\tfetch\t:\t" << mess << endl;
holes++;items--;
if_full.notify_all();
return mess;
}


// Real code for threads.
void producer(int id)
{
for (; 1; )
{
deposit(id, ++m);
this_thread::sleep_for(chrono::milliseconds(P_DELAY));
}
return;
}

void consumer(int id)
{
int msg;
for (; 1; )
{
msg = fetch(id);
this_thread::sleep_for(chrono::milliseconds(C_DELAY));
}
return ;
}


void simulation()
{
vector<thread> threads;

init();
// Create producers
for (int p = 0; p < P; ++p)
 threads.push_back(thread(producer, p));

// Create consumers
for (int c = 0; c < C; ++c)
 threads.push_back(thread(consumer, c));

// Wait for consumers and producers to finish
for (auto& t : threads)
t.join();
}


// IO staff.
int main(int argc, char *args[])
{
 cout << "C P P_DELAY C_DELAY " << endl;
 for( ; cin >> C >> P >> P_DELAY >> C_DELAY; )
   {
     assert((C<MAX_THREADS) && (P<MAX_THREADS) );
     assert((P_DELAY > 0) && (C_DELAY > 0) );
     simulation();
   }
 return 0;
}



Aqui un ejemplo de salida. Tu puedes experimientar variando , el numero de p/c, la velocidad relativa...

Con independencia de la velocidad relativa, dentro de un mismo grupo no está determinado el orden de acceso al monitor. Así,  es posible que el productor del mensaje 9 tenga acceso al monitor antes que el productor del mensaje 8.  Eso sí, entre dos consumidores el primero en acceder extraerá el mnesaje 9, y el segundo el 8, por ser FIFO.




g++ conc.cpp -o main -lpthread && ./main
C P P_DELAY C_DELAY
5 5 500 500
p[1] deposit : 1
p[2] deposit : 2
p[0] deposit : 3
p[3] deposit : 4
c[0] fetch : 1
p[4] deposit : 5
c[1] fetch : 2
c[3] fetch : 3
c[2] fetch : 4
c[4] fetch : 5
p[1] deposit : 6
p[3] deposit : 7
p[0] deposit : 9
p[2] deposit : 8
c[0] fetch : 6
p[4] deposit : 10
c[1] fetch : 7
c[3] fetch : 9
c[2] fetch : 8
c[4] fetch : 10
p[1] deposit : 11
p[3] deposit : 12
p[0] deposit : 13
p[2] deposit : 14
c[0] fetch : 11
p[4] deposit : 15
c[1] fetch : 12
c[3] fetch : 13
c[2] fetch : 14
c[4] fetch : 15
p[1] deposit : 16
p[0] deposit : 18
p[3] deposit : 17
p[2] deposit : 19
c[0] fetch : 16
p[4] deposit : 20
c[1] fetch : 18
c[3] fetch : 17
c[2] fetch : 19
c[4] fetch : 20
p[0] deposit : 21
p[1] deposit : 22
p[3] deposit : 23
p[2] deposit : 24
p[4] deposit : 25
c[0] fetch : 21
c[1] fetch : 22
c[3] fetch : 23
c[2] fetch : 24
c[4] fetch : 25
p[0] deposit : 26
c[0] fetch : 26
p[3] deposit : 29
c[4] fetch : 29
p[4] deposit : 30
c[2] fetch : 30
p[1] deposit : 27
c[1] fetch : 27
p[2] deposit : 28
c[3] fetch : 28
p[0] deposit : 31
p[3] deposit : 32
c[4] fetch : 31
c[0] fetch : 32
p[4] deposit : 33
c[2] fetch : 33
p[1] deposit : 34
c[1] fetch : 34
p[2] deposit : 35
c[3] fetch : 35
p[4] deposit : 37
c[0] fetch : 37
p[3] deposit : 38
c[2] fetch : 38
p[2] deposit : 40
p[0] deposit : 36
p[1] deposit : 39
c[4] fetch : 40
c[1] fetch : 36
c[3] fetch : 39
p[4] deposit : 41
c[0] fetch : 41
p[3] deposit : 42
c[2] fetch : 42
p[2] deposit : 43
p[0] deposit : 44
p[1] deposit : 45
c[4] fetch : 43
c[1] fetch : 44
c[3] fetch : 45
...
C-c C-c




Si la depuración es el proceso de eliminar fallos en el software, entonces programar debe ser el proceso de ponerlos dentro. (Edsger Dijsktra)