Que bueno que la gente se meta con problemas de concurrente!
Cuidado! Yo no tengo ni idea de helgrind, no lo había oido en mi vida, pero la depuración en concurrente es algo impracticable! Necesitas un cálculo para razonar sobre la corrección de un programa, con invariantes, etc...
Conviene tener princpios sólidos sobre los que montar tus soluciones concurrentes.
Un buen libro: "Concurrent Programming. Gregory R. Andrews".
No vas a aprender nada de C++, pero si mucho de programaci'on concurrente!
Algunas observaciones a tu programa:
Aqui un ejemplo de salida. Tu puedes experimientar variando , el numero de p/c, la velocidad relativa...
Con independencia de la velocidad relativa, dentro de un mismo grupo no está determinado el orden de acceso al monitor. Así, es posible que el productor del mensaje 9 tenga acceso al monitor antes que el productor del mensaje 8. Eso sí, entre dos consumidores el primero en acceder extraerá el mnesaje 9, y el segundo el 8, por ser FIFO.
Cita de: julio1 en 4 Abril 2019, 00:00 AM
He solucionado el problema de la condición de carrera, helgrind ya no se queja, la culpa la tenía el método size en la línea while(num_producers_working != 0 || products.size() > 0)...
Cuidado! Yo no tengo ni idea de helgrind, no lo había oido en mi vida, pero la depuración en concurrente es algo impracticable! Necesitas un cálculo para razonar sobre la corrección de un programa, con invariantes, etc...
Conviene tener princpios sólidos sobre los que montar tus soluciones concurrentes.
Un buen libro: "Concurrent Programming. Gregory R. Andrews".
No vas a aprender nada de C++, pero si mucho de programaci'on concurrente!
Algunas observaciones a tu programa:
- Productores consumidores se ejecutan indefinidamente. De otro modo, puedes llegar a bloqueos, (imagina 1 productor y 1000 consumidores, a 1 items cada uno... Es obvio que los consumidores se quedarán esperando indefinidamente
- El buffer, o es un escalar, o un buffer limitado. Nada se gana con los stacks... Interesa FIFO, no LIFO
- Nada se gana leyendo los manuales de la libreria <thread> o <condition variable> si no se conoce en que consisten los monitores
Código (cpp) [Seleccionar]
#include <cassert>
#include <iostream>
#include <vector> // as bounded buffer.
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <atomic>
#include <cassert>
#define MAX_THREADS 1000
#define SIZE_BUFFER 5
#define TIMEOUT 200
using namespace std;
// Patemeters overwritten on input.
static int P = 5; // Producers
static int C = 10; // Consumers
static int P_DELAY=1000 ; // Prod-Delay
static int C_DELAY=500;
// MONITOR VARIABLES
static mutex xmutex; // Our mutex, monitor methods in exclusion
static condition_variable if_full; // to indicate that our stack is
// not full between the thread operations
static condition_variable if_empty; // to indicate that our stack is
// not empty between the thread operations
static int front = 0;
static int rear = 0;
static vector<int> buf(SIZE_BUFFER);
static int holes = SIZE_BUFFER;
static int items = 0;
// Monitor invariants:
// inv. items = (front "-" rear)
// inv. holes = SIZE_BUFFER - (front "-" rear)
// inv. 0 <= front < SIZE_BUFFER, 0 <= rear < SIZE_BUFFER
static atomic<int> m(0); // simulating a fresh message;
// MONITOR METHODS
void init()
{
front=rear=items=0;
holes = SIZE_BUFFER;
}
void deposit(const int id,const int msg)
{
unique_lock<mutex> lock(xmutex);
// await holes > 0
if_full.wait(lock, [] { return holes > 0; });
buf[rear] = msg; rear = ((rear+1) % SIZE_BUFFER);
cout << "p[" << id << "]\tdeposit\t:\t" << msg << endl;
holes--; items++;
if_empty.notify_all();
}
// Consume function, consumer_id will consume a product
int fetch(const int id)
{
unique_lock<mutex> lock(xmutex);
int mess;
// await items > 0
if_empty.wait(lock, [] { return items > 0; });
mess = buf[front]; front = ((front+1) % SIZE_BUFFER);
cout << "c[" << id << "]\t\t\t\t\tfetch\t:\t" << mess << endl;
holes++;items--;
if_full.notify_all();
return mess;
}
// Real code for threads.
void producer(int id)
{
for (; 1; )
{
deposit(id, ++m);
this_thread::sleep_for(chrono::milliseconds(P_DELAY));
}
return;
}
void consumer(int id)
{
int msg;
for (; 1; )
{
msg = fetch(id);
this_thread::sleep_for(chrono::milliseconds(C_DELAY));
}
return ;
}
void simulation()
{
vector<thread> threads;
init();
// Create producers
for (int p = 0; p < P; ++p)
threads.push_back(thread(producer, p));
// Create consumers
for (int c = 0; c < C; ++c)
threads.push_back(thread(consumer, c));
// Wait for consumers and producers to finish
for (auto& t : threads)
t.join();
}
// IO staff.
int main(int argc, char *args[])
{
cout << "C P P_DELAY C_DELAY " << endl;
for( ; cin >> C >> P >> P_DELAY >> C_DELAY; )
{
assert((C<MAX_THREADS) && (P<MAX_THREADS) );
assert((P_DELAY > 0) && (C_DELAY > 0) );
simulation();
}
return 0;
}
Aqui un ejemplo de salida. Tu puedes experimientar variando , el numero de p/c, la velocidad relativa...
Con independencia de la velocidad relativa, dentro de un mismo grupo no está determinado el orden de acceso al monitor. Así, es posible que el productor del mensaje 9 tenga acceso al monitor antes que el productor del mensaje 8. Eso sí, entre dos consumidores el primero en acceder extraerá el mnesaje 9, y el segundo el 8, por ser FIFO.
Código [Seleccionar]
g++ conc.cpp -o main -lpthread && ./main
C P P_DELAY C_DELAY
5 5 500 500
p[1] deposit : 1
p[2] deposit : 2
p[0] deposit : 3
p[3] deposit : 4
c[0] fetch : 1
p[4] deposit : 5
c[1] fetch : 2
c[3] fetch : 3
c[2] fetch : 4
c[4] fetch : 5
p[1] deposit : 6
p[3] deposit : 7
p[0] deposit : 9
p[2] deposit : 8
c[0] fetch : 6
p[4] deposit : 10
c[1] fetch : 7
c[3] fetch : 9
c[2] fetch : 8
c[4] fetch : 10
p[1] deposit : 11
p[3] deposit : 12
p[0] deposit : 13
p[2] deposit : 14
c[0] fetch : 11
p[4] deposit : 15
c[1] fetch : 12
c[3] fetch : 13
c[2] fetch : 14
c[4] fetch : 15
p[1] deposit : 16
p[0] deposit : 18
p[3] deposit : 17
p[2] deposit : 19
c[0] fetch : 16
p[4] deposit : 20
c[1] fetch : 18
c[3] fetch : 17
c[2] fetch : 19
c[4] fetch : 20
p[0] deposit : 21
p[1] deposit : 22
p[3] deposit : 23
p[2] deposit : 24
p[4] deposit : 25
c[0] fetch : 21
c[1] fetch : 22
c[3] fetch : 23
c[2] fetch : 24
c[4] fetch : 25
p[0] deposit : 26
c[0] fetch : 26
p[3] deposit : 29
c[4] fetch : 29
p[4] deposit : 30
c[2] fetch : 30
p[1] deposit : 27
c[1] fetch : 27
p[2] deposit : 28
c[3] fetch : 28
p[0] deposit : 31
p[3] deposit : 32
c[4] fetch : 31
c[0] fetch : 32
p[4] deposit : 33
c[2] fetch : 33
p[1] deposit : 34
c[1] fetch : 34
p[2] deposit : 35
c[3] fetch : 35
p[4] deposit : 37
c[0] fetch : 37
p[3] deposit : 38
c[2] fetch : 38
p[2] deposit : 40
p[0] deposit : 36
p[1] deposit : 39
c[4] fetch : 40
c[1] fetch : 36
c[3] fetch : 39
p[4] deposit : 41
c[0] fetch : 41
p[3] deposit : 42
c[2] fetch : 42
p[2] deposit : 43
p[0] deposit : 44
p[1] deposit : 45
c[4] fetch : 43
c[1] fetch : 44
c[3] fetch : 45
...
C-c C-c