[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: Thread starvation with mutex



The topic might be wrong as I am dealing with a barrier, but I think it
is quite related...

Please find attached the file 'maxcrea.c' 

In this sample, I am trying to create as many threads as possible and
then remove them "cleanly" as soon as create fails. I am using two
barriers to synchronize every threads.

When I am running with the symbol WITH_MAIN_SLEEPS defined, everything
goes well, but as soon as I remove it, it hangs on my box (p3 700MHz;
redhat 9 + kernel 2.6.2; with standard redhat glibc and with CVS glibc
(+ nptl) i have the same result). In fact it looks like when the main
threads passes the barrier then destroys it and then reinit it with a
different value, before the other threads are woken, those are never
woken ? As I don't really understand source code from the NPTL yet, I
don't see where is the problem, but still I think this is a problem, am
I wrong?

Thank you!

Sebastien Decugis.


Le mar 10/02/2004 à 11:50, Jamie Lokier a écrit :
> Perez-Gonzalez, Inaky wrote:
> > > > 1. Thread A calls FUTEX_WAKE
> > > > 2. Thread A receives 0 from FUTEX_WAKE
> > > > 3. Thread A atomically unlocks the user space word
> > > >
> > > > Now, if some Thread B comes in between 2 and 3 and tries to
> > > > lock, it will see the user space word locked and go down to
> > > > wait in the kernel. It will sit there for ever because
> > > > in (3) the word is locked and nobody knows B is there sleeping.
> > > 
> > > After step 2, Thread B sees the user space word is locked and does an
> > > atomic decrement (or whatever) to indicate that there is a waiter, as
> > > usual.
> > > 
> > > In step 3, Thread A tries to unlock by doing an atomic copmare and
> > > exchang, and then it sees that the word indicates there is a waiter,
> > > so loops back to 1.
> > 
> > Ouch, I had forgotten we had the counter, not just a cmp/exchange.
> > Thanks for the correction.
> > 
> > But I still think the deadlock apples (in slightly more twisted fashion): 
> > B, in between 2 and 3, has decremented and gone down to the kernel. 
> > Before it is able to grab the futex spinlock, A is running, sees there is
> > a waiter according to the userspace word, so goes back to one. FUTEX_WAKE
> > returns 0 (because B still hasn't had the chance to queue up), so it 
> > unlocks. Now B reaches the futex and sleeps for ever.
> > 
> > Did I miss anything in this scenario?
> 
> Thread A cannot unlock as long as the userspace word indicates "there
> are waiters".  Therefore it will keep looping calling FUTEX_WAKE It's
> a near-livelock not a deadlock, and will resolve eventually but may
> take a long time.
> 
> The correct version (having just looked at Rusty's code :) is:
> 
>    1. Thread A tries cmp/exchange to unlock the word;
>       it fails because there are waiters
>    2. Thread A calls FUTEX_WAKE to pass ownership
>    3. Thread A receives 0 from FUTEX_WAKE
>    4. Thread A atomically increments the word; still finds waiters
>    5. Thread A calls FUTEX_WAKE to signal change
>          -> Thread B will either be woken or FUTEX_WAIT returns -EAGAIN.
> 
> Steps 1-3 are the ownership passing "fair" wakup.  Steps 4-5 are the
> fallback "unfair" wakeup, that only occurs during the race window.
> 
> This provides a high likelihood of ownership passing, but does not
> guarantee ownership will be passed.
>      
> -- Jamie
-- 
Sébastien DECUGIS
Bull S.A.
Tel: 04 76 29 74 93
/* **************************************************************************************
 *  maxcrea.c
 * 
 * This little sample is designed to test the maximal number of posix threads 
 * that can be created (and kept running) inside a single process, with various
 * parameters. Barriers are also used to synchronize the threads.
 * 
 * ***************************************************************************************/
 #include <errno.h>
 #define __USE_XOPEN2K
 //#define __USE_GNU

 #include <pthread.h>

 #undef __USE_XOPEN2K
 //#undef __USE_GNU
 #include <features.h>
 #include <stdarg.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include <sys/time.h>
 
 
 #define DEFAULT_MAXTHREADS	(65536)
 #define DEFAULT_STACKSIZE			(16 * 4096)
 
// #define WITH_MAIN_SLEEPS
 
 typedef enum {ORDER_GO_ON, ORDER_EXIT} order_t;
 
 order_t	control;
 pthread_barrier_t b1, b2;
 pthread_mutex_t m_trace;
 
 const verbose = 1;
 const debug = 1;
 
 /********************************* Trace facility *****************************************/
 #define MYDEBUG printf("L%03i : ", __LINE__); trace
 
void trace( char * format, ... )
{
   va_list ap;
//   int len;
//   char * buffer;
   
   if (verbose)
   {
   		pthread_mutex_lock(&m_trace);
	   va_start( ap, format );
//	   len = _vscprintf( format, args ) // _vscprintf doesn't count
//	                                              + 1; // terminating '\0'
//	   buffer = malloc( len * sizeof(char) );
//	   vsprintf( buffer, format, args );
//	   printf( buffer );
//	   free( buffer );
		vprintf(format, ap);
	   va_end(ap);
   		pthread_mutex_unlock(&m_trace);
   }
}
 /*****************************************************************************************/
  
 void * threaded(void * arg)
 {
 	int rc;
	MYDEBUG ("Thread %li starting\n", (unsigned long) arg );
 	while (1)
 	{
 	/* Wait for barrier b1 */
		MYDEBUG ("thread %li  waiting for b1\n", (unsigned long) arg );
 		rc = pthread_barrier_wait(&b1);
		MYDEBUG ("thread %li  got b1 (rc = %i)\n", (unsigned long) arg , rc);
 		
 	/* read control value */
 		if (control == ORDER_EXIT)
 	/* if control says 'exit' then exit */
 		{
			MYDEBUG ("thread %li  exiting\n", (unsigned long) arg );
 			return NULL;
 		}
 			
 	/* wait for barrier b2 */
		MYDEBUG ("thread %li waiting for b2\n", (unsigned long) arg );
 		rc = pthread_barrier_wait(&b2);
		MYDEBUG ("thread %li got b2 (rc = %i)\n", (unsigned long) arg , rc);
 		
 	/* loop */
 	}
 }

/**************************** MAIN ********************************/
 int main (int argc, char * argv[])
 {
 	/* N = # of thread created so far */
 	unsigned long N=0;
 	pthread_attr_t  thr_attr;
 	pthread_t * threads;
 	int rc=0;
 	
 	unsigned long nbmaxthreads = DEFAULT_MAXTHREADS;
 	int stacksize = DEFAULT_STACKSIZE;
 	
 	if (sizeof (unsigned long) != sizeof (void *))
 	{
 		printf("Warning threads # may be corrupted...\n");	
 	}
 	
	pthread_mutex_init(&m_trace, NULL);

 	control = ORDER_GO_ON;
 	
 	MYDEBUG ("MaxCrea by Sebastien Decugis: Starting...\n");
 	/* add parameters parsing here to overwrite nbmaxthreads or stacksize... */
 	
 	/* initialization of pthread_attributes (loop on attributes -> joinable/detached, ...?) */
 	if (rc= pthread_attr_init(&thr_attr))
 	{
 		trace("ERROR(%i): pthread_attr_init returned %d; exiting...\n", __LINE__, rc);
 		return 1;
 	}
 	MYDEBUG ("pthread_attr_init OK\n");
 	
 	if (rc= pthread_attr_setstacksize(&thr_attr, stacksize))
 	{
 		trace("ERROR(%i): pthread_attr_setstacksize(%i) returned %d; exiting...\n",__LINE__,stacksize, rc);
 		pthread_attr_destroy(&thr_attr);
 		return 1;
 	}
 	MYDEBUG ("pthread_attr_setstacksize OK\n");
 	
 	/* initialize thread table */
 	threads = (pthread_t *) malloc(nbmaxthreads * sizeof(pthread_t));
 	if (threads==(pthread_t *)NULL)
 	{
 		trace("ERROR(%i): could not allocate memory for threads handlers, exiting...\n",__LINE__);
 		pthread_attr_destroy(&thr_attr);
 		return 1;
 	}
 	MYDEBUG ("malloc OK\n");
 	
	/* Initialize barrier b2 with value 1 */
  	if (rc= pthread_barrier_init(&b2, NULL, 1 ))
 	{
 		trace("ERROR(%i): pthread_barrier_init (b2, 1) returned %d; exiting...\n",__LINE__, rc);
 		pthread_attr_destroy(&thr_attr);
 		free(threads);
 		return 1;
 	}
 	MYDEBUG ("pthread_barrier_init (b2, 1) OK\n");

    /* Loop */
    while(1)
    {
 	/* |  Initialize barrier b1 with value N+2 */
	  	if (rc= pthread_barrier_init(&b1, NULL, N + 2 ))
	 	{
	 		trace("ERROR(%i): pthread_barrier_init (b1, %d) returned %d; cannot continue...\n",__LINE__, N+2, rc);
	 		break;
	 	}
	 	MYDEBUG ("pthread_barrier_init (b1, %i) OK\n", N+2);
	/* |  if N >= maxthreads => exit */
		if (N >= nbmaxthreads)
		{
			printf("%i threads were created successfully\n", N);
			break;
		}
 	/* |  create new thread */
	  	if (rc= pthread_create(&threads[N], &thr_attr, &threaded, (void *)N))
 	/* |  if thread creation is NOK */
	 	{
 	/* |  | exit loop */
	 		trace("ERROR(%i): the thread #%i could not be created (%i)...\n",__LINE__, N+1, rc);
	 		break;
 	/* |  end if (creation is OK, new thread is waiting on b1) */
	 	}
	 	MYDEBUG ("pthread_create (%i) OK\n", N);
 	/* |  N = N + 1 */
 		N++;
	 	MYDEBUG ("N = %i\n", N);
 	/* |  wait for barrier b2 (then all threads will be waiting on b1) */
 		if ((rc=pthread_barrier_wait(&b2)) && (rc != PTHREAD_BARRIER_SERIAL_THREAD))
 		{
 			trace("ERROR(%i): waiting for barrier b2 (%i)\n",__LINE__, rc);
 			break;
 		}
	 	MYDEBUG ("pthread_barrier_wait(b2) OK\n", N);

#ifdef WITH_MAIN_SLEEPS
	 	MYDEBUG ("Main sleep for 1 sec...\n", N);
		sleep(1);
#endif
		

 	/* |  destroy barrier b2 */
	  	if (rc= pthread_barrier_destroy(&b2))
	 	{
	 		trace("ERROR(%i): pthread_barrier_destroy (b2) returned %d\n You will probably lose some resources\n",__LINE__, rc);
	 		break;
	 	}
	 	MYDEBUG ("pthread_barrier_destroy(b2) OK\n", N);
 	
 	/* |  initialize barrier b2 with value N+1 */
	  	if (rc= pthread_barrier_init(&b2, NULL, N + 1 ))
	 	{
	 		trace("ERROR(%i): pthread_barrier_init (b2, %i) returned %d\n You will probably lose some resources\n",__LINE__, N+1, rc);
	 		break;
	 	}
	 	MYDEBUG ("pthread_barrier_init(b2, %i) OK\n", N+1);

 	/* |  wait for barrier b1 (all threads read control then wait for b2) */
 		if ((rc=pthread_barrier_wait(&b1)) && (rc != PTHREAD_BARRIER_SERIAL_THREAD))
 		{
 			trace("ERROR(%i): waiting for barrier b1 (%i)\n",__LINE__, rc);
 			break;
 		}
	 	MYDEBUG ("pthread_barrier_wait(b1) OK\n", N);

#ifdef WITH_MAIN_SLEEPS
	 	MYDEBUG ("Main sleep for 1 sec...\n", N);
		sleep(1);
#endif
		
 	/* |  destroy barrier b1 */
	  	if (rc= pthread_barrier_destroy(&b1))
	 	{
	 		trace("ERROR(%i): pthread_barrier_destroy (b1) returned %d\n You will probably lose some resources\n",__LINE__, rc);
	 		break;
	 	}
	 	MYDEBUG ("pthread_barrier_destroy(b1) OK\n", N);

 	/* end loop */
    }

 	/* set control = 'exit' */
 	control = ORDER_EXIT;
 	MYDEBUG ("control = EXIT\n", N);
 	
 	/* wait for barrier b1 (all threads should read control and exit now)*/
	if ((rc=pthread_barrier_wait(&b1)) && (rc != PTHREAD_BARRIER_SERIAL_THREAD))
	{
		trace("ERROR(%i): waiting for barrier b1 (%i)\n",__LINE__, rc);
	}
 	MYDEBUG ("pthread_barrier_wait(b1) OK/NOK\n", N);
 	
 	
 	/* if threads were joinable, join them. */
 	while (N>0)
 	{
	 	--N;
 		if (rc = pthread_join(threads[N], NULL))
 		{
 			trace("Failed to join thread #%i (%i)\n", N, rc);
 		}
	 	MYDEBUG ("pthread_join(%i) OK/NOK\n", N);
	}

	pthread_attr_destroy(&thr_attr);
	free(threads);

	MYDEBUG ("MaxCrea: See U Space Cowboy...\n", N+1);
	
	pthread_mutex_destroy(&m_trace);
	
	return 0;
 	
 }

[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]