Notifications rapides d’une variable-condition

Publié par cpb
août 19 2013

Un client m’a interrogé récemment sur la mise en œuvre d’une notification d’événements entre threads. Je l’ai tout naturellement aiguillé vers l’emploi d’une variable condition pthread_cond_t. Toutefois lorsque les notifications se sont produites par rafales rapides, des problèmes se sont posés, que je n’ai pu résoudre qu’avec l’emploi d’un sémaphore supplémentaire.

Contexte

Un programme industriel reçoit des données en provenance d’un port de communication RS-485. Lorsqu’une trame complète est reçue, des opérations doivent être réalisées nécessitant des émissions de données, des attentes éventuelles, ceci en parallèle sur la réception de la trame suivante. J’ai donc proposé à mon client de concevoir cette fonctionnalité avec deux threads, l’un se chargeant de recevoir les données, puis de notifier le second afin qu’il traite la suite des opérations.

Pour assurer la synchronisation entre les deux, il m’a semblé judicieux d’utiliser une variable-condition.

Variable condition

Une variable-condition est simplement une donnée de type pthread_cond_t partagée entre deux threads. J’ai pour habitude, lorsque je présente cette structure de synchronisation durant une session de formation, de la comparer à une cloche. Un thread peut s’endormir, passivement, en attente sur la cloche et un autre thread peut venir à tout moment le réveiller en donnant un coup de marteau sur la cloche.

Si aucun thread n’est attente au moment du coup de marteau, tant pis, cette notification est perdue.

C’est pour éviter cela, que l’on associe toujours une variable condition avec un mutex pthread_mutex_t. Ainsi, on pourra garantir qu’au moment du coup de marteau, un thread sera systématiquement en attente d’un réveil.

Voyons un exemple d’utilisation. Le thread qui attend les notifications se présente généralement ainsi.

    pthread_cond_t cnd = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

int main(void)
{
    // ... initialisation ...
    pthread_mutex_lock(& mtx);

    while (1) {
        pthread_cond_wait(& cnd, & mtx);
        // notification recue, traitement...
        // ...
    }

    pthread_mutex_unlock(& mtx);
    return EXIT_SUCCESS;
}

Le second thread, qui doit notifier le premier travaille ainsi :

void * fonction_thread (void * arg)
{
	while (1) {
		// Attendre les données externes
		// ....
		// Notifier le thread main()
		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);
	 }
}

Contrairement à ce que l’on pourrait croire au premier regard, il n’y a pas d’erreur dans le code ci-dessus, il y a bien deux pthread_mutex_lock() invoqués dans les deux threads. Ce qu’il faut comprendre c’est que pthread_cond_wait() contient plusieurs étapes successives :

  • relâchement du mutex (comme avec pthread_mutex_unlock()) ;
  • mise en sommeil en attente de notification ;
  • reprise du mutex (comme pthread_mutex_lock()).

L’astuce est que les deux premiers points sont atomiquement liés. Ainsi, il n’est pas possible qu’un thread invoque pthread_cond_signal() sans que le premier ne soit véritablement en attente.

NB: On notera que pthread_cond_wait() présente également deux autres particularités inattendues : la possibilité que le thread soit réveillé prématurément sans qu’une notification ne soit survenue, et le fait qu’il s’agisse d’un point d’annulation si pthread_cancel() est invoqué dans un autre thread. Ceci dépasse le cadre de cet article et n’a pas d’importance pour le sujet traité ici.

Premier essai

Dans ce premier exemple, le thread principal crée un thread de notification puis se met en attente sur la variable-condition. Les notifications se produiront régulièrement toutes les secondes. Les deux threads décriront leurs progressions sur la sortie d’erreur.

exemple-condition-01.c:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>

pthread_cond_t  cnd = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

void * thread_notification (void * arg)
{
	struct timeval tv;
	while (1) {
		sleep(1);

		pthread_mutex_lock(& mtx);
		gettimeofday(& tv, NULL);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);
		fprintf(stderr, "Notification envoyée à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec);
	}
	return NULL;
}

int main (void)
{
	pthread_t thr;
	struct timeval tv;

	pthread_mutex_lock(& mtx);
	if (pthread_create(& thr, NULL, thread_notification, NULL) != 0)
		exit(EXIT_FAILURE);
	while (1) {
		pthread_cond_wait(& cnd, & mtx);
		gettimeofday(& tv, NULL);
		fprintf(stderr, "Notification reçue à %ld.%06ld\n", tv.tv_sec, tv.tv_usec);
	}
	pthread_mutex_unlock(& mtx);
	return EXIT_SUCCESS;	
}

L’exécution semble correcte.

$ ./exemple-condition-01 
Notification envoyée à: 1376546517.395413
Notification reçue à 1376546517.395442
Notification envoyée à: 1376546518.395572
Notification reçue à 1376546518.395599
Notification envoyée à: 1376546519.395732
Notification reçue à 1376546519.395761
Notification envoyée à: 1376546520.395891
Notification reçue à 1376546520.395920
Notification envoyée à: 1376546521.396049
Notification reçue à 1376546521.396077
Notification envoyée à: 1376546522.396267
Notification reçue à 1376546522.396295
Notification envoyée à: 1376546523.396430
Notification reçue à 1376546523.396460
Notification envoyée à: 1376546524.396594
Notification reçue à 1376546524.396624
^C
$

Rafales de notifications

Vérifions ce qui se produit si plusieurs notifications sont envoyées rapidement. Pour cela nous allons remplacer, dans le thread, la séquence

		pthread_mutex_lock(& mtx);
		gettimeofday(& tv, NULL);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);
		fprintf(stderr, "Notification envoyée à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec);

par

		pthread_mutex_lock(& mtx);
		gettimeofday(& tv, NULL);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);

		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);

		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);

		fprintf(stderr, "Trois notifications envoyées à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec);

L’exécution est beaucoup moins bonne qu’auparavant.

$ ./exemple-condition-02
Trois notifications envoyées à: 1376546967.599002
Notification reçue à 1376546967.599041
Trois notifications envoyées à: 1376546968.599216
Notification reçue à 1376546968.599246
Trois notifications envoyées à: 1376546969.599377
Notification reçue à 1376546969.599406
Trois notifications envoyées à: 1376546970.599540
Notification reçue à 1376546970.599567
^C
$

Bien que nous envoyions trois notifications à chaque fois, une seule d’entre elles est reçue par le thread principal.

Que se passe-t-il ?

  • Le thread main est endormi sur l’appel pthread_cond_wait(). Il a relâché le mutex.
  • Le thread de notification prend le mutex.
  • Le thread de notification invoque pthread_cond_signal(). Cela réveille le thread principal, qui essaye de reprendre le mutex. Celui-ci n’étant pas libre, le thread main se rendort en attente du mutex.
  • Le thread de notification lâche le mutex. Cet appel va réveiller le thread principal. Néanmoins ce dernier ne récupère pas instantanément le mutex, il faudra pour cela qu’il soit sélectionné par l’ordonnanceur et puisse s’exécuter.
  • L’ordonnanceur n’ayant aucune raison de préempter le thread de notification au profit du thread principal, ce dernier reste en attente (dans l’état « Prêt ») alors que le premier peut continuer son exécution et reprendre à nouveau le mutex.
  • Le thread de notification peut appeler une seconde fois pthread_cond_signal(). Puis la même séquence d’opérations se reproduit une seconde fois, et ce n’est que lorsque le thread de notification s’endort explicitement avec sleep() que le thread principal est activé et peut sortir du pthread_cond_wait().

Trois notifications ont été envoyées. Une seule a été détectée. Ce n’est pas un comportement très fiable !

Amélioration

J’avais déjà exploré un problème assez similaire en octobre 2011 dans l’article Prise de mutex et priorités.

Nous avions vu alors que pour résoudre le problème de reprise de mutex, il était possible de s’appuyer sur l’appel système sched_yield() qui représente un appel direct au scheduler en se plaçant volontairement dans une situation d’ordonnancement défavorable.

Nous allons donc tenter d’améliorer notre système en ajoutant deux choses.

  • Une priorité temps réel pour le thread principal plus élevée que la priorité temps partagé du thread de notification ;
  • Des appels systématiques sched_yield() après le pthread_mutex_unlock() qui suit le pthread_cond_signal().
exemple-condition-03.c

#include <pthread.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>

pthread_cond_t  cnd = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

void * thread_notification (void * arg)
{
	struct timeval tv;
	while (1) {
		sleep(1);

		pthread_mutex_lock(& mtx);
		gettimeofday(& tv, NULL);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);
		sched_yield();

		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);
		sched_yield();

		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);
		sched_yield();

		fprintf(stderr, "Trois notifications envoyées à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec);

	}
	return NULL;	
}

int main (void)
{
	pthread_t thr;
	struct timeval tv;
	struct sched_param param;

	pthread_mutex_lock(& mtx);
	if (pthread_create(& thr, NULL, thread_notification, NULL) != 0)
		exit(EXIT_FAILURE);

	param.sched_priority = 10;
	if (pthread_setschedparam(pthread_self(), SCHED_FIFO, & param) != 0) {
		perror("pthread_setschedparam");
		exit(EXIT_FAILURE);
	}

	while (1) {
		pthread_cond_wait(& cnd, & mtx);
		gettimeofday(& tv, NULL);
		fprintf(stderr, "Notification reçue à %ld.%06ld\n", tv.tv_sec, tv.tv_usec);
	}
	pthread_mutex_unlock(& mtx);
	return EXIT_SUCCESS;	
}

Le premier inconvénient de ce programme, c’est la nécessité de l’exécuter avec les droits root afin qu’il puisse prendre un ordonnancement temps réel.

De plus, l’exécution n’est pas vraiment plus concluante que le précédent.

$ sudo ./exemple-condition-03 
Trois notifications envoyées à: 1376554376.792935
Notification reçue à 1376554376.792975
Trois notifications envoyées à: 1376554377.793192
Notification reçue à 1376554377.793307
Trois notifications envoyées à: 1376554378.793395
Notification reçue à 1376554378.793428
Trois notifications envoyées à: 1376554379.793623
Notification reçue à 1376554379.793654
Trois notifications envoyées à: 1376554380.793855
Notification reçue à 1376554380.793886
Trois notifications envoyées à: 1376554381.794084
Notification reçue à 1376554381.794117
Trois notifications envoyées à: 1376554382.794310
Notification reçue à 1376554382.794350
Trois notifications envoyées à: 1376554383.794489
Notification reçue à 1376554383.794519
Trois notifications envoyées à: 1376554384.794717
Notification reçue à 1376554384.794750

Aucune des triples notifications n’a été reçue en plus d’un exemplaire.

Pourtant, si on laisse le programme tourner un moment, surtout si la charge système augmente un peu, voici ce que l’on constate.

Trois notifications envoyées à: 1376554386.795122
Notification reçue à 1376554387.795484
Notification reçue à 1376554387.795570
Notification reçue à 1376554387.795632
Trois notifications envoyées à: 1376554387.795452
Trois notifications envoyées à: 1376554388.795782
Notification reçue à 1376554388.795809
Notification reçue à 1376554389.795982
Trois notifications envoyées à: 1376554389.795960
Notification reçue à 1376554390.796222
Trois notifications envoyées à: 1376554390.796203
Notification reçue à 1376554391.796478
Notification reçue à 1376554391.796548
Trois notifications envoyées à: 1376554391.796461
Notification reçue à 1376554392.796768
Trois notifications envoyées à: 1376554392.796748
Notification reçue à 1376554393.797030
Trois notifications envoyées à: 1376554393.797011
Notification reçue à 1376554394.797294
Trois notifications envoyées à: 1376554394.797275

Dans certaines circonstances, il arrive que les notifications se comportent comme nous l’attendions. Pourtant cela ne se produit que de temps à autres, de manière a priori imprévisible.

Ce genre de comportement apparement peu déterministe doit nous faire penser à des problèmes d’exécutions parallèles, plus particulièrement sur les systèmes multicœurs (ou multiprocesseurs).

L’exécution précédente se déroulait sur un petit processeur à deux cœurs. Essayons de forcer l’exécution du programme sur un seul CPU.

$ sudo taskset -c 0 ./exemple-condition-03
Notification reçue à 1376555394.075277
Notification reçue à 1376555394.075430
Notification reçue à 1376555394.075463
Trois notifications envoyées à: 1376555394.075235
Notification reçue à 1376555395.075686
Notification reçue à 1376555395.075776
Notification reçue à 1376555395.075832
Trois notifications envoyées à: 1376555395.075653
Notification reçue à 1376555396.076056
Notification reçue à 1376555396.076171
Notification reçue à 1376555396.076204
Trois notifications envoyées à: 1376555396.076025
Notification reçue à 1376555397.076408
Notification reçue à 1376555397.076497
Notification reçue à 1376555397.076527
Trois notifications envoyées à: 1376555397.076370
Notification reçue à 1376555398.076710
Notification reçue à 1376555398.076795
Notification reçue à 1376555398.076853
Trois notifications envoyées à: 1376555398.076676
Notification reçue à 1376555399.077044
Notification reçue à 1376555399.077135
Notification reçue à 1376555399.077166
Trois notifications envoyées à: 1376555399.077010

Le thread principal étant plus prioritaire, c’est lui qui affiche d’abord ses messages (« Notification reçue ») avant que le thread de notification affiche le sien, néanmoins nous pouvons vérifier à l’aide des horodatages que l’exécution est cohérente.

Dans la situation où notre système fonctionne sur un processeur unicœur, la solution sched_yield() et priorité temps réel est suffisante pour assurer le bon déroulement du programme. Chaque fois que le thread de notification invoque sched_yield(), l’ordonnanceur laisse l’exécution au thread principal qui peut obtenir le mutex et traiter la notification.

Toutefois ceci ne fonctionne donc pas sur un processeur multicœur si les deux threads tournent sur deux CPU différents (cas habituel lorsque le système n’est pas trop chargé) : lorsque le thread de notification appelle sched_yield(), l’ordonnanceur voit qu’il est seul sur son CPU et le réactive immédiatement ce qui lui permet de reverrouiller le mutex avant que le thread principal ait eu le temps de se réveiller.

Cette solution n’est donc pas satisfaisante. Il nous faudrait un moyen de s’assurer que le thread principal a bien traité toutes les notifications antérieures avant de lui en renvoyer une nouvelle. Pour cela nous pouvons essayer d’utiliser un sémaphore.

Emploi d’un sémaphore

Un sémaphore est un objet de synchronisation doté d’un compteur que l’on peut incrémenter (avec la fonction sem_post()) ou décrémenter (avec sem_wait()). Le compteur doit toujours rester positif ou nul. La décrémentation du compteur est une opération potentiellement bloquante tant que le compteur est nul.

Contrairement aux objets de synchronisation comme les mutex, l’incrémentation et la décrémentation d’un sémaphore ne se produisent pas nécessairement dans la même tâche. Il est parfaitement usuel de voir un thread invoquer sem_wait() pour attendre une certaine circonstance qui lui sera indiquée par un autre thread invoquant sem_post().

Notre première approche de l’utilisation d’un sémaphore sera donc en remplacement du sched_yield() pour attendre que le thread principal ait eu le temps de traiter la notification avant de lui en renvoyer une nouvelle.

Voici un exemple de code.

exemple-condition-04.c

#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>

pthread_cond_t  cnd = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
sem_t           sem;

void * thread_notification (void * arg)
{
	struct timeval tv;
	while (1) {
		sleep(1);

		pthread_mutex_lock(& mtx);
		gettimeofday(& tv, NULL);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);
		sem_wait(& sem);

		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);
		sem_wait(& sem);

		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		pthread_mutex_unlock(& mtx);
		sem_wait(& sem);

		fprintf(stderr, "Trois notifications envoyées à: %ld.%06ld\n", tv.tv_sec, tv.tv_usec);

	}
	return NULL;
}

int main (void)
{
	pthread_t thr;
	struct timeval tv;

	sem_init(& sem, 0, 0);

	pthread_mutex_lock(& mtx);
	if (pthread_create(& thr, NULL, thread_notification, NULL) != 0)
		exit(EXIT_FAILURE);

	while (1) {
		pthread_cond_wait(& cnd, & mtx);
		gettimeofday(& tv, NULL);
		sem_post(& sem);
		fprintf(stderr, "Notification reçue à %ld.%06ld\n", tv.tv_sec, tv.tv_usec);
	}
	pthread_mutex_unlock(& mtx);
	return EXIT_SUCCESS;	
}

Ainsi, le thread devra, après avoir envoyé une notification, attendre que le thread principal incrémente le sémaphore (initialement nul) pour pouvoir continuer. Vérifions le fonctionnement.

$ ./exemple-condition-04 
Notification reçue à 1376605365.617369
Notification reçue à 1376605365.617557
Notification reçue à 1376605365.617594
Trois notifications envoyées à: 1376605365.617332
Notification reçue à 1376605366.617822
Notification reçue à 1376605366.617897
Notification reçue à 1376605366.617988
Trois notifications envoyées à: 1376605366.617791
Notification reçue à 1376605367.618231
Notification reçue à 1376605367.618374
Notification reçue à 1376605367.618429
Trois notifications envoyées à: 1376605367.618200
Notification reçue à 1376605368.618599
Notification reçue à 1376605368.618679
Notification reçue à 1376605368.618763
Trois notifications envoyées à: 1376605368.618572
Notification reçue à 1376605369.618967
Notification reçue à 1376605369.619217
Notification reçue à 1376605369.619293
Trois notifications envoyées à: 1376605369.618938
Notification reçue à 1376605370.619516
Notification reçue à 1376605370.619593
Notification reçue à 1376605370.619682
Trois notifications envoyées à: 1376605370.619487
^C
$

Cette méthode fonctionne très bien dans notre cas. Aussi bien sur un système multicœur que sur un processeur unicœur.

Notifications parallèles

Toutefois, en regardant le code proposé à mon client j’ai été pris d’un doute. Dans l’exemple ci-dessus, un seul thread envoyait des notifications au thread main, mais que se passera-t-il si plusieurs threads essayent d’en envoyer simultanément ?

Supposons que deux threads, appelons-les A et B, entament simultanément la portion de code suivante :

  pthread_mutex_lock(& mtx);
  pthread_cond_signal(& cnd);
  pthread_mutex_unlock(& amp; mtx);
  sem_wait(& sem);

Voici ce qui peut se produire :

  • Le thread A prend le mutex.
  • (Sur un autre CPU) le thread B réclame le mutex. Ce dernier étant déjà verrouillé, B reste bloqué.
  • A invoque pthread_cond_signal() qui réveille le thread main. Celui-ci tente d’obtenir le mutex (dans pthread_cond_wait), qui est verrouillé. Le thread main se rendort.
  • A relâche le mutex et se bloque en attente sur le sémaphore. La libération du mutex réveille B et main. L’ordonnanceur choisit… B (par malchance).
  • B obtient le mutex, et envoie la seconde notification sur la variable condition sans que le thread main n’ait traité la première.
  • B relâche le mutex et se bloque sur le sémaphore
  • Le thread principal obtient enfin le mutex, fait un traitement et incrémente une seule fois le sémaphore alors que deux threads sont bloqués en attente dessus.

Non seulement notre programme rate des notifications, mais en outre des threads peuvent se retrouver définitivement bloqués par erreur !

Pour vérifier le comportement, j’ai modifié quelque peu le programme précédent. Nous n’affichons plus les dates de notification, mais la valeur de deux compteurs. L’un est incrémenté à chaque envoi de notification et le second à chaque réception. Le thread principal affiche régulièrement ces deux compteurs. En outre, nous lançons cinq threads en parallèle.

exemple-condition-05.c

#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>

pthread_cond_t  cnd = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
sem_t           sem;
int compteur_notifications_envoyees = 0;
int compteur_notifications_recues   = 0;

void * thread_notification (void * arg)
{
	while (1) {
		sleep(1);

		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		compteur_notifications_envoyees ++;
		pthread_mutex_unlock(& mtx);
		sem_wait(& sem);

		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		compteur_notifications_envoyees ++;
		pthread_mutex_unlock(& mtx);
		sem_wait(& sem);

		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		compteur_notifications_envoyees ++;
		pthread_mutex_unlock(& mtx);
		sem_wait(& sem);
	}
	return NULL;	
}

#define NB_THREADS 5

int main (void)
{
	pthread_t thr [NB_THREADS];
	int i;

	sem_init(& sem, 0, 0);

	pthread_mutex_lock(& mtx);
	for (i = 0; i < NB_THREADS; i ++) 
		if (pthread_create(& (thr[i]), NULL, thread_notification, NULL) != 0)
			exit(EXIT_FAILURE);

	while (1) {
		pthread_cond_wait(& cnd, & mtx);
		compteur_notifications_recues ++;
		sem_post(& sem);
		fprintf(stderr, "Notifications envoyees : %d, recues : %d\n",
		                compteur_notifications_envoyees,
		                compteur_notifications_recues);
	}
	pthread_mutex_unlock(& mtx);
	return EXIT_SUCCESS;	
}

L’exécution confirme qu’un problème se pose car les compteurs sont très vite décalés.

$ ./exemple-condition-05
Notifications envoyees : 1, recues : 1
Notifications envoyees : 3, recues : 2
Notifications envoyees : 7, recues : 3
Notifications envoyees : 8, recues : 4
Notifications envoyees : 9, recues : 5
Notifications envoyees : 10, recues : 6
Notifications envoyees : 11, recues : 7
Notifications envoyees : 12, recues : 8
Notifications envoyees : 13, recues : 9
Notifications envoyees : 14, recues : 10
Notifications envoyees : 15, recues : 11
Notifications envoyees : 16, recues : 12
Notifications envoyees : 17, recues : 13
Notifications envoyees : 18, recues : 14
Notifications envoyees : 19, recues : 15
Notifications envoyees : 20, recues : 16
Notifications envoyees : 21, recues : 17
Notifications envoyees : 22, recues : 18
Notifications envoyees : 23, recues : 19
Notifications envoyees : 24, recues : 20
^C
$

Dès que les compteurs sont décalés de 4 incrémentations, nous pouvons imaginer que 4 threads sur les 5 sont bloqués sur le sémaphore.

Comment pouvons-nous donc procéder pour être sûr que toutes les notifications envoyés par les threads soient reçues par le thread main ?

Il existe une solution très simple (et très proche du programme ci-dessus). Je vous encourage à y réfléchir quelques instants avant de  poursuivre votre lecture.

Solution

Le problème qui se pose, nous l’avons bien vu précédemment est que deux threads peuvent entrer simultanément dans la portion de code

  pthread_mutex_lock(& mtx);
  pthread_cond_signal(& cnd);
  pthread_mutex_unlock(& mtx);

Pour garantir l’unicité d’exécution d’une portion de programme, nous pouvons justement faire appel à un sémaphore. Au même sémaphore que celui que nous utilisions précédemment.

Il suffit simplement de déplacer l’appel sem_wait() avant la portion de code ci-dessus. Et symétriquement déplacer sem_post() avant pthread_cond_wait().

Le sémaphore aura pour rôles de garantir

  1. qu’un seul thread pourra exécuter la fonction de notification à un moment donné
  2. qu’avant d’exécuter la notification, nous sommes sûrs que le thread principal est en attente

Voici donc le programme correct qui résoud notre problème.

exemple-condition-06.c

#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>

pthread_cond_t  cnd = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
sem_t           sem;
int compteur_notifications_envoyees = 0;
int compteur_notifications_recues   = 0;

void * thread_notification (void * arg)
{
	while (1) {
		sleep(1);

		sem_wait(& sem);
		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		compteur_notifications_envoyees ++;
		pthread_mutex_unlock(& mtx);

		sem_wait(& sem);
		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		compteur_notifications_envoyees ++;
		pthread_mutex_unlock(& mtx);

		sem_wait(& sem);
		pthread_mutex_lock(& mtx);
		pthread_cond_signal(& cnd);
		compteur_notifications_envoyees ++;
		pthread_mutex_unlock(& mtx);
	}
	return NULL;	
}

#define NB_THREADS 5

int main (void)
{
	pthread_t thr [NB_THREADS];
	int i;

	sem_init(& sem, 0, 0);

	pthread_mutex_lock(& mtx);
	for (i = 0; i < NB_THREADS; i ++) 
		if (pthread_create(& (thr[i]), NULL, thread_notification, NULL) != 0)
			exit(EXIT_FAILURE);

	while (1) {
		sem_post(& sem);
		pthread_cond_wait(& cnd, & mtx);
		compteur_notifications_recues ++;
		fprintf(stderr, "Notifications envoyees : %d, recues : %d\n",
		                compteur_notifications_envoyees,
		                compteur_notifications_recues);
	}
	pthread_mutex_unlock(& mtx);
	return EXIT_SUCCESS;	
}

NB : J’ai placé le sem_wait() avant le pthread_mutex_lock(). En effet, j’essaye généralement d’éviter les imbrications de mécanismes de synchronisation, de crainte du fameux dead lock si difficile à débuger. Je pense toutefois que le sem_wait() pourrait sans problème se trouver entre le pthread_mutex_lock() et le pthread_cond_signal().

Vérifions le fonctionnement :

$ ./exemple-condition-06
Notifications envoyees : 1, recues : 1
Notifications envoyees : 2, recues : 2
Notifications envoyees : 3, recues : 3
Notifications envoyees : 4, recues : 4
Notifications envoyees : 5, recues : 5
Notifications envoyees : 6, recues : 6
Notifications envoyees : 7, recues : 7
Notifications envoyees : 8, recues : 8
Notifications envoyees : 9, recues : 9
Notifications envoyees : 10, recues : 10
Notifications envoyees : 11, recues : 11
Notifications envoyees : 12, recues : 12
Notifications envoyees : 13, recues : 13
Notifications envoyees : 14, recues : 14
Notifications envoyees : 15, recues : 15
Notifications envoyees : 16, recues : 16
Notifications envoyees : 17, recues : 17
Notifications envoyees : 18, recues : 18
Notifications envoyees : 19, recues : 19
Notifications envoyees : 20, recues : 20
Notifications envoyees : 21, recues : 21
Notifications envoyees : 22, recues : 22
Notifications envoyees : 23, recues : 23
Notifications envoyees : 24, recues : 24
Notifications envoyees : 25, recues : 25
Notifications envoyees : 26, recues : 26
Notifications envoyees : 27, recues : 27
Notifications envoyees : 28, recues : 28
Notifications envoyees : 29, recues : 29
Notifications envoyees : 30, recues : 30
Notifications envoyees : 31, recues : 31
Notifications envoyees : 32, recues : 32
Notifications envoyees : 33, recues : 33
Notifications envoyees : 34, recues : 34
Notifications envoyees : 35, recues : 35
Notifications envoyees : 36, recues : 36
Notifications envoyees : 37, recues : 37
Notifications envoyees : 38, recues : 38
Notifications envoyees : 39, recues : 39
Notifications envoyees : 40, recues : 40
Notifications envoyees : 41, recues : 41
Notifications envoyees : 42, recues : 42
Notifications envoyees : 43, recues : 43
Notifications envoyees : 44, recues : 44
Notifications envoyees : 45, recues : 45
Notifications envoyees : 46, recues : 46
Notifications envoyees : 47, recues : 47
Notifications envoyees : 48, recues : 48
Notifications envoyees : 49, recues : 49
Notifications envoyees : 50, recues : 50
Notifications envoyees : 51, recues : 51
Notifications envoyees : 52, recues : 52
Notifications envoyees : 53, recues : 53
Notifications envoyees : 54, recues : 54
Notifications envoyees : 55, recues : 55
Notifications envoyees : 56, recues : 56
Notifications envoyees : 57, recues : 57
Notifications envoyees : 58, recues : 58
Notifications envoyees : 59, recues : 59
Notifications envoyees : 60, recues : 60
Notifications envoyees : 61, recues : 61
Notifications envoyees : 62, recues : 62
Notifications envoyees : 63, recues : 63
Notifications envoyees : 64, recues : 64
Notifications envoyees : 65, recues : 65
Notifications envoyees : 66, recues : 66
Notifications envoyees : 67, recues : 67
Notifications envoyees : 68, recues : 68
Notifications envoyees : 69, recues : 69
Notifications envoyees : 70, recues : 70
Notifications envoyees : 71, recues : 71
Notifications envoyees : 72, recues : 72
Notifications envoyees : 73, recues : 73
Notifications envoyees : 74, recues : 74
Notifications envoyees : 75, recues : 75
^C
$

Conclusion

Nous avons réussi à obtenir un schéma robuste et performant pour traiter dans un thread des notifications asynchrones déclenchées par d’autres threads. On voit que la solution n’était pas si évidente que nous pouvions le penser au début.

Il faut comprendre que l’API des Pthreads, conçue dans les années 1990, souffre de quelques défauts lorsqu’on l’emploie avec des processeurs multicœurs, qui n’apparaissaient pas à l’époque sur des systèmes unicœurs.

Nous pouvons également noter que la solution reposant sur des priorités temps réel ne fonctionne pas bien, et qu’il est parfois nécessaire d’ajouter un objet de synchronisation supplémentaire pour venir à bout des problèmes posés par la concurrence d’accès.

L’ensemble des fichiers sources et Makefile se trouvent dans cette archive.

Remarques, commentaires, etc. sont les bienvenus !

4 Réponses

  1. JeanFrancois dit :

    Pourquoi ne pas avoir utilisé une file de message mq_receive/mq_send pour synchroniser vos tâches : la(les) tâche(s) qui lise(nt) la ligne série écrit(vent) dans la file, la tâche qui traite les données les reçoit par la file ?

    • cpb dit :

      Oui, ce serait probablement une bonne méthode. Il faudrait vérifier les performances (temps de réveil de la tâche en attente) par rapport aux variables-condition, mais ça ne devrait pas être très différent. J’essayerai ça prochainement.

      Pour assouplir le système et permettre à plusieurs tâches de poster des notifications alors que la tâche de réception est déjà occupée, on peut allonger la file de message (qui par défaut ne contient que 10 messages seulement) en utilisant mq_getattr()/mq_setattr().

      En outre il est possible de donner une priorité aux messages, donc d’avoir des notifications (très prioritaires) prises en compte avant d’autres (moins prioritaires) postées auparavant.

      Notons quand même que le comportement n’est pas tout à fait identique puisque la tâche envoyant une notification n’est pas obligée d’attendre que celle de réception soit en attente.

      • JeanFrancois dit :

        Oui on peut imaginer que les performances sont les mêmes dans cet exemple où l’on posterais seulement un entier dans la file dans chaque message. Peut-être que l’on peut gagner sur le nombre d’appel systèmes _dans_le_pire_cas_ avec 1 seul appel système mq_send au lieu de quatre mutex_lock/cond_signal/mutex_unlock/sem_wait dans l’exemple.

        Dans nos applications, on n’utilise jamais le mécanisme de priorité de messages dans les files car nous trouvons que ça peut compliquer le design de la tâche de réception. S’il y a besoin de deux niveaux de traitement (rapide/moins rapide), on crée deux files et deux tâches de priorité différentes qui reçoivent sur chacune des deux files. La tâche de plus haute priorité traitera les messages qui doivent être traités rapidement.

        Pour retrouver le même comportement que dans l’exemple de l’article, c’est à dire qu’une tâche doit attendre que la notification soit lue pour être débloquée, il existe l’appel rt_task_reply dans Xenomai. Ceci dit, je n’aime pas ce type d’appel car l’intérêt des files de message est de découpler totalement le traitement de la notification, qui peuvent donc s’exécuter avec des priorités différentes sur des coeurs différents.

        Avec rt_task_reply ou comme dans votre exemple, la durée du traitement peut donc bloquer l’appelant assez longtemps. Il faut donc trouver un moyen de recouvrer ce temps dans la tâche de notification ce qui n’est pas évident si l’on veut garder toutes ses tâches les plus simples possibles.

  2. Loïc dit :

    Cet article semble manquer un élément clef lié au variable condition: le thread qui attend la notification doit tester la véracité de la condition qui a déclanché la notification de la part d’un autre thread.

    Il me semble que vous pourriez obtenir ainsi des solutions plus simples/performantes aux problèmes approchés dans cet article ( en outre d’obtenir une solution POSIX conforme ).

URL de trackback pour cette page