Problema productor-consumidor - Producer–consumer problem

En informática , el problema del productor-consumidor (también conocido como el problema del búfer limitado ) es un ejemplo clásico de un problema de sincronización de múltiples procesos , cuya primera versión fue propuesta por Edsger W. Dijkstra en 1965 en su manuscrito inédito, en cuyo búfer era ilimitado y posteriormente se publicó con un búfer limitado en 1972. En la primera versión del problema, hay dos procesos cíclicos, un productor y un consumidor, que comparten un búfer común de tamaño fijo utilizado como cola . El productor genera datos repetidamente y los escribe en el búfer. El consumidor lee repetidamente los datos en el búfer, eliminándolos en el transcurso de la lectura y usando esos datos de alguna manera. En la primera versión del problema, con un búfer ilimitado, el problema es cómo diseñar el código de productor y consumidor para que, en su intercambio de datos, no se pierdan ni se dupliquen datos, los datos sean leídos por el consumidor en el orden en que se encuentran. está escrito por el productor, y ambos procesos progresan tanto como sea posible. En la formulación posterior del problema, Dijkstra propuso que varios productores y consumidores compartieran una colección finita de amortiguadores. Esto agregó el problema adicional de evitar que los productores intentaran escribir en búferes cuando todos estaban llenos y tratar de evitar que los consumidores lean un búfer cuando todos estaban vacíos.

El primer caso a considerar es aquel en el que hay un solo productor y un solo consumidor, y hay un búfer de tamaño finito. La solución para el productor es irse a dormir o descartar datos si el búfer está lleno. La próxima vez que el consumidor quita un artículo del búfer, notifica al productor, quien comienza a llenar el búfer nuevamente. De la misma forma, el consumidor puede irse a dormir si encuentra el búfer vacío. La próxima vez que el productor coloca datos en el búfer, despierta al consumidor dormido. La solución se puede alcanzar mediante la comunicación entre procesos , normalmente mediante semáforos . Una solución inadecuada podría resultar en un punto muerto en el que ambos procesos esperan ser despertados.

Implementación inadecuada

Para resolver el problema, es tentador proponer la "solución" que se muestra a continuación. En la solución se utilizan dos rutinas de biblioteca sleep y wakeup . Cuando se llama a dormir, la persona que llama se bloquea hasta que otro proceso la despierta utilizando la rutina de activación. La variable global itemCount contiene el número de elementos en el búfer.

int itemCount = 0;

procedure producer() 
{
    while (true) 
    {
        item = produceItem();

        if (itemCount == BUFFER_SIZE) 
        {
            sleep();
        }

        putItemIntoBuffer(item);
        itemCount = itemCount + 1;

        if (itemCount == 1) 
        {
            wakeup(consumer);
        }
    }
}

procedure consumer() 
{
    while (true) 
    {

        if (itemCount == 0) 
        {
            sleep();
        }

        item = removeItemFromBuffer();
        itemCount = itemCount - 1;

        if (itemCount == BUFFER_SIZE - 1) 
        {
            wakeup(producer);
        }

        consumeItem(item);
    }
}

El problema con esta solución es que contiene una condición de carrera que puede conducir a un punto muerto . Considere el siguiente escenario:

  1. El consumer acaba de leer la variable itemCount , se dio cuenta que es cero y está a punto de moverse dentro del if bloque.
  2. Justo antes de llamar a dormir, se interrumpe al consumidor y se reanuda el productor.
  3. El productor crea un artículo, lo coloca en el búfer y lo aumenta itemCount .
  4. Debido a que el búfer estaba vacío antes de la última adición, el productor intenta despertar al consumidor.
  5. Desafortunadamente, el consumidor aún no estaba durmiendo y la llamada de atención se perdió. Cuando el consumidor reanuda, se duerme y nunca más se volverá a despertar. Esto se debe a que el productor solo despierta al consumidor cuando itemCount es igual a 1.
  6. El productor ejecutará un bucle hasta que el búfer esté lleno, después de lo cual también entrará en suspensión.

Dado que ambos procesos permanecerán inactivos para siempre, nos hemos topado con un punto muerto. Por tanto, esta solución no es satisfactoria.

Un análisis alternativo es que si el lenguaje de programación no define la semántica de accesos concurrentes a variables compartidas (en este caso itemCount ) con uso de sincronización, entonces la solución es insatisfactoria por esa razón, sin necesidad de demostrar explícitamente una condición de carrera.

Usando semáforos

Los semáforos resuelven el problema de las llamadas de atención perdidas. En la siguiente solución, usamos dos semáforos fillCount y emptyCount , para resolver el problema. fillCount es el número de elementos que ya están en el búfer y están disponibles para leer, mientras que emptyCount es el número de espacios disponibles en el búfer donde se pueden escribir los elementos. fillCount se incrementa y emptyCount decrementa cuando se coloca un nuevo elemento en el búfer. Si el productor intenta disminuir emptyCount cuando su valor es cero, el productor se pone a dormir. La próxima vez que se consume un artículo, emptyCount se incrementa y el productor se despierta. El consumidor trabaja de forma análoga.

semaphore fillCount = 0; // items produced
semaphore emptyCount = BUFFER_SIZE; // remaining space

procedure producer() 
{
    while (true) 
    {
        item = produceItem();
        down(emptyCount);
        putItemIntoBuffer(item);
        up(fillCount);
    }
}

procedure consumer() 
{
    while (true) 
    {
        down(fillCount);
        item = removeItemFromBuffer();
        up(emptyCount);
        consumeItem(item);
    }
}

La solución anterior funciona bien cuando solo hay un productor y un consumidor. Dado que varios productores comparten el mismo espacio de memoria para el búfer de elementos, o varios consumidores comparten el mismo espacio de memoria, esta solución contiene una condición de carrera grave que podría provocar que dos o más procesos lean o escriban en la misma ranura al mismo tiempo. Para entender cómo es posible esto, imagine cómo putItemIntoBuffer() se puede implementar el procedimiento . Podría contener dos acciones, una que determina la siguiente ranura disponible y la otra que escribe en ella. Si el procedimiento puede ser ejecutado simultáneamente por varios productores, entonces es posible el siguiente escenario:

  1. Decremento de dos productores emptyCount
  2. Uno de los productores determina la siguiente ranura vacía en el búfer
  3. El segundo productor determina el siguiente espacio vacío y obtiene el mismo resultado que el primer productor
  4. Ambos productores escriben en la misma ranura

Para superar este problema, necesitamos una forma de asegurarnos de que solo se esté ejecutando un productor putItemIntoBuffer() a la vez. En otras palabras, necesitamos una forma de ejecutar una sección crítica con exclusión mutua . La solución para múltiples productores y consumidores se muestra a continuación.

mutex buffer_mutex; // similar to "semaphore buffer_mutex = 1", but different (see notes below)
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;

procedure producer() 
{
    while (true) 
    {
        item = produceItem();
        down(emptyCount);
        down(buffer_mutex);
        putItemIntoBuffer(item);
        up(buffer_mutex);
        up(fillCount);
    }
}

procedure consumer() 
{
    while (true) 
    {
        down(fillCount);
        down(buffer_mutex);
        item = removeItemFromBuffer();
        up(buffer_mutex);
        up(emptyCount);
        consumeItem(item);
    }
}

Tenga en cuenta que el orden en el que se incrementan o disminuyen los diferentes semáforos es esencial: cambiar el orden puede resultar en un punto muerto. Es importante señalar aquí que aunque mutex parece funcionar como un semáforo con valor de 1 (semáforo binario), hay una diferencia en el hecho de que mutex tiene el concepto de propiedad. La propiedad significa que el mutex solo se puede "incrementar" (establecido en 1) mediante el mismo proceso que lo "disminuyó" (establecido en 0), y todas las demás tareas esperan hasta que el mutex esté disponible para su disminución (lo que significa efectivamente que el recurso está disponible) , que asegura la exclusividad mutua y evita el estancamiento. Por lo tanto, el uso indebido de mutex puede paralizar muchos procesos cuando no se requiere acceso exclusivo, pero se usa mutex en lugar de semáforo.

Usando monitores

El siguiente pseudocódigo muestra una solución al problema productor-consumidor usando monitores . Dado que la exclusión mutua está implícita en los monitores, no es necesario ningún esfuerzo adicional para proteger la sección crítica. En otras palabras, la solución que se muestra a continuación funciona con cualquier número de productores y consumidores sin ninguna modificación. También es digno de mención que es menos probable que un programador escriba código que sufra condiciones de carrera cuando usa monitores que cuando usa semáforos.

monitor ProducerConsumer 
{
    int itemCount = 0;
    condition full;
    condition empty;

    procedure add(item) 
    {
        if (itemCount == BUFFER_SIZE) 
        {
            wait(full);
        }

        putItemIntoBuffer(item);
        itemCount = itemCount + 1;

        if (itemCount == 1)
        {
            notify(empty);
        }
    }

    procedure remove() 
    {
        if (itemCount == 0) 
        {
            wait(empty);
        }

        item = removeItemFromBuffer();
        itemCount = itemCount - 1;

        if (itemCount == BUFFER_SIZE - 1)
        {
            notify(full);
        }

        return item;
    }
}

procedure producer() 
{
    while (true) 
    {
        item = produceItem();
        ProducerConsumer.add(item);
    }
}

procedure consumer() 
{
    while (true) 
    {
        item = ProducerConsumer.remove();
        consumeItem(item);
    }
}

Sin semáforos ni monitores

El problema productor-consumidor, particularmente en el caso de un solo productor y un solo consumidor, se relaciona fuertemente con la implementación de un FIFO o un canal . El patrón productor-consumidor puede proporcionar una comunicación de datos altamente eficiente sin depender de semáforos, mutex o monitores para la transferencia de datos . El uso de esas primitivas puede resultar caro en términos de rendimiento, en comparación con la operación atómica básica de lectura / escritura. Los canales y FIFO son populares simplemente porque evitan la necesidad de una sincronización atómica de un extremo a otro. A continuación se muestra un ejemplo básico codificado en C. Tenga en cuenta que:

  • Se evita el acceso atómico de lectura, modificación y escritura a las variables compartidas, ya que cada una de las dos Count variables se actualiza solo mediante un único hilo. Además, estas variables admiten un número ilimitado de operaciones de incremento; la relación permanece correcta cuando sus valores se envuelven en un desbordamiento de enteros.
  • Este ejemplo no pone los subprocesos en suspensión, lo que puede ser aceptable según el contexto del sistema. Se schedulerYield() inserta como un intento de mejorar el rendimiento y puede omitirse. Las bibliotecas de subprocesos normalmente requieren semáforos o variables de condición para controlar la suspensión / activación de los subprocesos. En un entorno multiprocesador, la suspensión / activación de subprocesos se produciría con mucha menos frecuencia que el paso de tokens de datos, por lo que evitar las operaciones atómicas en el paso de datos es beneficioso.
  • Este ejemplo no funciona para varios productores y / o consumidores porque existe una condición de carrera al verificar el estado. Por ejemplo, si solo hay un token en el búfer de almacenamiento y dos consumidores encuentran que el búfer no está vacío, ambos consumirán el mismo token y posiblemente aumentarán el contador de los tokens consumidos por encima del contador de los tokens producidos.
  • Este ejemplo, tal como está escrito, requiere que UINT_MAX + 1 sea ​​divisible por BUFFER_SIZE ; si no es divisible uniformemente, [Count % BUFFER_SIZE] produce el índice de búfer incorrecto después de que Count pasa de UINT_MAX nuevo a cero. Una solución alternativa que evita esta limitación emplea dos Idx variables adicionales para rastrear el índice de búfer actual para la cabeza (productor) y la cola (consumidor). Estas Idx variables serían utilizado en lugar de [Count % BUFFER_SIZE] , y cada uno de ellos tendrían que ser incrementado al mismo tiempo que el respectivo Count se incrementa la variable, como sigue: Idx = (Idx + 1) % BUFFER_SIZE .
  • Las dos Count variables deben ser lo suficientemente pequeñas para admitir acciones de lectura y escritura atómicas. De lo contrario, existe una condición de carrera en la que el otro subproceso lee un valor parcialmente actualizado y, por lo tanto, un valor incorrecto.


volatile unsigned int produceCount = 0, consumeCount = 0;
TokenType sharedBuffer[BUFFER_SIZE];

void producer(void) {
	while (1) {
		while (produceCount - consumeCount == BUFFER_SIZE) {
			schedulerYield(); /* sharedBuffer is full */
		}
		/* Write to sharedBuffer _before_ incrementing produceCount */
		sharedBuffer[produceCount % BUFFER_SIZE] = produceToken();
		/* Memory barrier required here to ensure update of the sharedBuffer is 
		visible to other threads before the update of produceCount */
		++produceCount;
	}
}

void consumer(void) {
	while (1) {
		while (produceCount - consumeCount == 0) {
			schedulerYield(); /* sharedBuffer is empty */
		}
		consumeToken(&sharedBuffer[consumeCount % BUFFER_SIZE]);
		++consumeCount;
	}
}

La solución anterior emplea contadores que, cuando se usan con frecuencia, pueden sobrecargarse y alcanzar su valor máximo UINT_MAX . La idea esbozada en la cuarta viñeta, originalmente sugerida por Leslie Lamport , explica cómo los contadores pueden reemplazarse por contadores de rango finito. Específicamente, se pueden reemplazar con contadores de rango finito con el valor máximo N, la capacidad del búfer.

Cuatro décadas después de la presentación del problema productor-consumidor, Aguilera, Gafni y Lamport demostraron que el problema se puede resolver para que los procesos accedan solo a contadores de rango fijo (es decir, un rango que es independiente del tamaño del búfer) mientras determina si el búfer está vacío o lleno. La motivación de esta medida de eficiencia es acelerar las interacciones entre un procesador y los dispositivos que interactúan a través de canales FIFO. Propusieron una solución en la que se leen contadores de valor máximo para determinar si es seguro acceder al búfer. Sin embargo, su solución aún emplea contadores ilimitados que crecen infinitamente, solo que no se accede a esos contadores durante la fase de verificación descrita.

Más tarde, Abraham y Amram propusieron una solución más simple, presentada a continuación en pseudocódigo, que posee la propiedad de rango fijo discutida. La solución emplea contadores de valor máximo N. Sin embargo, para determinar si el búfer está vacío o lleno, los procesos acceden solo a registros de escritor único de rango finito . Cada uno de los procesos posee un solo escritor de 12 valores. El proceso productor escribe Flag_p y el proceso consumidor escribe Flag_c , ambos son matrices de 3 campos. Flag_p[2] y Flag_c[2] puede almacenar "lleno", "vacío" o "seguro", lo que indica correspondientemente si el búfer está lleno, vacío o ni lleno ni vacío.

La idea detrás del algoritmo es la siguiente. Los procesos cuentan el número de artículos entregados y retirados módulo N + 1 a través de registros CountDelivered y CountRemoved . Cuando un proceso entrega o elimina un artículo, compara esos contadores y, por lo tanto, determina con éxito el estado del búfer y almacena estos datos en Flag_p[2] o Flag_c[2] . En una fase de verificación, el proceso de ejecución lee Flag_p y Flag_c , e intenta estimar qué valor entre Flag_p[2] y Flag_c[2] refleja el estado actual del búfer. Dos técnicas de sincronización ayudan a lograr este objetivo.

  1. Después de la entrega de un artículo, el productor escribe en Flag_p[0] el valor que lee de Flag_c[0] , y después de la eliminación de un elemento, las escrituras de consumo hasta Flag_c[1] el valor: 1-Flag_p[0] . Por lo tanto, la condición Flag_p[0] == Flag_c[0] sugiere que el productor verificó recientemente el estado del búfer, mientras que Flag_p[0] != Flag_c[0] sugiere lo contrario.
  2. Una operación de entrega (eliminación) finaliza escribiendo en Flag_p[1] ( Flag_c[1] ) el valor almacenado en Flag_p[0] ( Flag_c[0] ). Por lo tanto, la condición Flag_p[0] == Flag_p[1] sugiere que el productor terminó su última operación de entrega. Asimismo, Condition Flag_c[0] == Flag_c[1] sugiere que la última eliminación del consumidor ya fue cancelada.

Por lo tanto, en la fase de verificación, si el productor lo encuentra Flag_c[0] != Flag_p[0] & Flag_c[0] == Flag_c[1] , actúa de acuerdo con el valor de Flag_c[2] , y en caso contrario, de acuerdo con el valor almacenado en Flag_p[2] . De manera análoga, si el consumidor lo encuentra Flag_p[0] == Flag_c[0] & Flag_p[0] == Flag_p[1] , actúa de acuerdo con el valor de Flag_p[2] , y de lo contrario, de acuerdo con el valor almacenado en Flag_c[2] . En el código siguiente, las variables en mayúsculas indican registros compartidos, escritos por uno de los procesos y leídos por ambos procesos. Las variables no capitalizadas son variables locales en las que los procesos copian los valores leídos de los registros compartidos.

countDelivered = 0;
countRemoved = 0;
Flag_p[0] = 0; Flag_p[1] = 0; Flag_p[2] = "empty";
Flag_c[0] = 0; Flag_c[1] = 0; Flag_c[2] = "empty"; 

procedure producer() 
{
    while (true) {
        item = produceItem();
        
        /* check phase: busy wait until the buffer is not full */       	
        repeat {
            flag_c = Flag_c;
    	      if (flag_c[0] != Flag_p[0] & flag_c[0] == flag_c[1]) ans = flag_c[2];
    	      else ans = Flag_p[2];
        } until (ans != "full")
    
         /* item delivery phase */
         putItemIntoBuffer(item);
         CountDeliverd = countDelivered+1 % N+1;
         flag_c = Flag_c;
         Flag_p[0] = flag_c[0];
         removed = CountRemoved;
         if (CountDelivered  removed == N) { Flag_p[1] = flag_c[0]; Flag_p[2] = "full"; }
         if (CountDelivered  removed == 0) { Flag_p[1] = flag_c[0]; Flag_p[2] = "empty"; }
         if (0 < CountDelivered  removed < N) { Flag_p[1] = flag_c[0]; Flag_p[2] = "safe"; }
     }
}

procedure consumer() 
{
    while (true) {
        /* check phase: busy wait until the buffer is not empty */       	
        repeat {
            flag_p = Flag_p;
    	      if (flag_p[0] == Flag_c[0] & flag_p[1] == flag_p[0]) ans = flag_p[2]);
    	      else ans = Flag_c[2];
         } until (ans != "empty")
    
         /* item removal phase */
         Item = removeItemFromBuffer();
         countRemoved = countRemoved+1 % N+1;
         flag_p = Flag_p;
         Flag_c[0] = 1-flag_p[0];
         delivered = CountDelivered;
         if (delivered  CountRemoved == N) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] = "full"; }
         if (delivered  CountRemoved == 0) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] = "empty"; }
         if (0 < delivered  CountRemoved < N) { Flag_c[1] = 1-flag_p[0]; Flag_c[2] ="safe"; }
     }
}

La exactitud del código se basa en la suposición de que los procesos pueden leer una matriz completa o escribir en varios campos de una matriz en una sola acción atómica. Dado que esta suposición no es realista, en la práctica, uno debe reemplazar Flag_p y Flag_c con (log (12) -bit) enteros que codifican los valores de esas matrices. Flag_p y Flag_c se presentan aquí como matrices solo para la legibilidad del código.

Ver también

Referencias

Otras lecturas