[Date Prev][Date Next] [Thread Prev][Thread Next] [Date Index] [Thread Index]

pthreads help needed to diagnose #98866



I have an odd situation with the Common C++ library.  It provides an
abstraction class (Thread), which provides a portable thread API.  It uses
pthreads where available, and who-knows-what-else elsewhere.  The relavant
source files from the POSIX version are attached (thread.h, thread.cpp).

Also attached is threadtest.cc, which demonstrates the problem I'm having.  If
it is compiled with BROKEN defined, each thread allocates storage that is never
freed, as if the thread is joinable.  However, Common C++'s thread class is
detaching the thread before it exits.

WORKS1 and WORKS2 demonstrate two ways I have found to keep the problem from
happening.  WORKS1 has the thread wait on a semaphore before executing.  WORKS2
overrides the Thread::Start method with its own version, using exactly the same
code.  Both of these methods cause the problem to go away.  Where is the bug?

-- 
 - mdz
// Link with shared libraries:
//  g++ -g -D_REENTRANT -D_THREAD_SAFE -D_GNU_SOURCE
//  -I/usr/include/cc++ -o threadtest threadtest.cc -lccstd -lccxx -lz
//  -lpthread

// Link statically:
//  g++ -static -g -D_REENTRANT -D_THREAD_SAFE -D_GNU_SOURCE
//  -I/usr/include/cc++ -g -o threadtest threadtest.o -lccstd -lccxx
//  -lz -lpthread


#include <cc++/thread.h>
#include <iostream>
#include <cstdlib>

// Define BROKEN only for broken behavior when dynamically linked
// Define WORKS1 or WORKS2 for un-broken behavior

#define BROKEN
//#define WORKS1
//#define WORKS2

class ThreadTest: public virtual Thread {
public:
  ThreadTest(Semaphore *sem): Thread(sem) { };
  ~ThreadTest() { Terminate(); };
  void Run() { };
  void Final() { delete this; };
};

int main() {
  system("ps ux | grep threadtest | grep -v grep");
  for(int i = 0; i < 20; ++i) {
#if defined(BROKEN) || defined(WORKS2)

    //
    // Things break for this simplest case, unless we override
    // Thread::Start
    //

    ThreadTest *thread = new ThreadTest(NULL);

#elif defined(WORKS1)

    //
    // Things work if we have the thread wait on a semaphore instead
    // of starting immediately (this has the effect of calling
    // pthread_create() from Thread::Thread instead of Thread::Start)
    //

    Semaphore sem;
    ThreadTest *thread = new ThreadTest(&sem);

#endif

    // For the semaphore case, this posts to the semaphore and lets
    // the thread continue.  For the non-semaphore case, this creates
    // the thread.  See code for Thread::Start below.
    thread->Start();
  }
  sleep(5); // Allow threads to exit
  system("ps ux | grep threadtest | grep -v grep");
  return 0;
}

#ifdef WORKS2
//
// Things work if we override Thread::Start in the shared library with
// our own version.  Exactly the same code.  It also works to link
// with -static.
//
typedef	void	*(*exec_t)(void *);
int Thread::Start(Semaphore *start)
{
	if(_tid)
	{
		if(_start)
		{
			_start->Post();
			return 0;
		}
		else
			return -1;
	}

	_start = start;
	return pthread_create(&_tid, &_attr, exec_t(&execHandler), this);
}
#endif
// Copyright (C) 1999-2001 Open Source Telecom Corporation.
//  
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
// 
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
// 
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software 
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
// 
// As a special exception to the GNU General Public License, permission is 
// granted for additional uses of the text contained in its release 
// of Common C++.
// 
// The exception is that, if you link the Common C++ library with other
// files to produce an executable, this does not by itself cause the
// resulting executable to be covered by the GNU General Public License.
// Your use of that executable is in no way restricted on account of
// linking the Common C++ library code into it.
//
// This exception does not however invalidate any other reasons why
// the executable file might be covered by the GNU General Public License.
// 
// This exception applies only to the code released under the 
// name Common C++.  If you copy code from other releases into a copy of
// Common C++, as the General Public License permits, the exception does
// not apply to the code that you add in this way.  To avoid misleading
// anyone as to the status of such modified files, you must delete
// this exception notice from them.
// 
// If you write modifications of your own for Common C++, it is your choice
// whether to permit this exception to apply to your modifications.
// If you do not wish that, delete this exception notice.  
//

#ifndef	__CCXX_THREAD_H__
#define	__CCXX_THREAD_H__
#define	__CCXX_POSIX

#ifndef	_REENTRANT
#define	_REENTRANT
#endif

#ifndef	_THREAD_SAFE
#define	_THREAD_SAFE
#endif

#ifndef __CCXX_CONFIG_H__
#ifdef	PACKAGE
#undef	PACKAGE
#endif
#ifdef	VERSION
#undef	VERSION
#endif
#include <cc++/config.h>
#endif

#ifndef	HAVE_PTHREAD_H
#include <pthread.h>
#include <semaphore.h>
#endif

#ifndef	__CCXX_MACROS_H__
#include <cc++/macros.h>
#else
#ifdef	__CCXX_NAMESPACE_H__
#include <cc++/macros.h>
#endif
#endif

#include <time.h>
#include <signal.h>
#include <setjmp.h>
#include <unistd.h>

#ifdef	__linux__
#define _SIG_THREAD_STOPCONT
#define	_SIG_THREAD_ALARM
#endif

#ifdef	_THR_UNIXWARE
#define	_EFTSAFE
#undef	PTHREAD_MUTEXTYPE_RECURSIVE
#undef	_POSIX_THREAD_PRIORITY_SCHEDULING
#define	sigwait(x, y) _thr_sigwait(x, y)
#endif

typedef	pthread_t	cctid_t;
typedef	unsigned long	timeout_t;
typedef	int		signo_t;

typedef enum {
	THROW_NOTHING, 
	THROW_OBJECT, 
	THROW_EXCEPTION
}		throw_t;

#define TIMEOUT_INF ~((timeout_t) 0)

// use a define so that if the sys/types.h header already defines caddr_t
// as it may on BSD systems, we do not break it by redefining again.

#undef caddr_t
#define	caddr_t		char *

#define	ENTER_CRITICAL	EnterMutex();
#define	LEAVE_CRITICAL	LeaveMutex();
#define	ENTER_DEFERRED	setCancel(THREAD_CANCEL_DEFERRED);
#define LEAVE_DEFERRED 	setCancel(THREAD_CANCEL_IMMEDIATE);

// These macros override common functions with thread-safe versions. In
// particular the common "libc" sleep() has problems since it normally
// uses SIGARLM (as actually defined by "posix").  The pthread_delay and
// usleep found in libpthread are gaurenteed not to use SIGALRM and offer
// higher resolution.  psleep() is defined to call the old process sleep.

#undef	sleep
#define	sleep(x)	ccxx_sleep((x) * 1000)
#define	yield()		ccxx_yield()
#define	psleep(x)	(sleep)(x)

typedef enum
{
	THREAD_CANCEL_INITIAL=0,
	THREAD_CANCEL_DEFERRED=1,
	THREAD_CANCEL_IMMEDIATE,
	THREAD_CANCEL_DISABLED,
	THREAD_CANCEL_DEFAULT=THREAD_CANCEL_DEFERRED,
	THREAD_CANCEL_INVALID
} thread_cancel_t;

typedef enum
{
	THREAD_SUSPEND_ENABLE,
        THREAD_SUSPEND_DISABLE
} thread_suspend_t;

#define	 THREAD_SIGNAL_BLOCKED	false
#define	 THREAD_SIGNAL_UNBLOCK	true
	
#ifdef	_SIG_THREAD_STOPCONT
#define _SIG_THREAD_SUSPEND SIGSTOP
#define _SIG_THREAD_RESUME  SIGCONT
#else
#ifndef	SIGUSR3
#ifdef	SIGWINCH
#define	SIGUSR3	SIGWINCH
#else
#define	SIGUSR3	SIGINT
#endif
#endif
#define	_SIG_THREAD_SUSPEND SIGUSR3
#define _SIG_THREAD_RESUME SIGUSR3
#endif

class Thread;

Thread *getThread(void);

extern "C" {
	void execHandler(Thread *th);
	void sigHandler(int signo);
};

/**
 * The ThreadLock class impliments a thread rwlock for optimal reader performance
 * on systems which have rwlock support, and reverts to a simple mutex for those
 * that do not.
 *
 * @author David Sugar <dyfet@ostel.com>
 * @short Posix rwlock extension for protected access.
 */
class ThreadLock
{
private:
#ifdef HAVE_PTHREAD_RWLOCK
	pthread_rwlock_t _lock;
#else
	pthread_mutex_t _lock;
#endif

public:
	/**
	 * Create a process shared thread lock object.
	 */
	ThreadLock();

	/**
	 * Destroy a process shared thread lock object.
	 */
	~ThreadLock();

	/**
	 * Aquire a read lock for the current object.
	 */
	void ReadLock(void);

	/**
	 * Aquire a write lock for the current object.
	 */
	void WriteLock(void);

	/**
	 * Attempt read lock for current object.
	 *
	 * @return true on success.
	 */
	bool TryReadLock(void);

	/**
	 * Attempt write lock for current object.
	 *
	 * @return true on success.
	 */
	bool TryWriteLock(void);

	/**
	 * Release any held locks.
	 */
	void Unlock(void);
};

/**
 * The Mutex class is used to protect a section of code so that at any
 * given time only a single thread can perform the protected operation.
 * 
 * The Mutex can be used as a base class to protect access in a derived
 * class.  When used in this manner, the ENTER_CRITICAL and LEAVE_CRITICAL
 * macros can be used to specify when code written for the derived class
 * needs to be protected by the default Mutex of the derived class, and
 * hence is presumed to be 'thread safe' from multiple instance execution.
 * One of the most basic Common C++ synchronization object is the Mutex
 * class.  A Mutex only allows one thread to continue execution at a given
 * time over a specific section of code.  Mutex's have a enter and leave
 * method; only one thread can continue from the Enter until the Leave is
 * called.  The next thread waiting can then get through.  Mutex's are also
 * known as "CRITICAL SECTIONS" in win32-speak.
 * 
 * The Mutex is always recursive in that if the same thread invokes
 * the same mutex lock multiple times, it must release it multiple times.
 * This allows a function to call another function which also happens to
 * use the same mutex lock when called directly. This was
 * deemed essential because a mutex might be used to block individual file
 * requests in say, a database, but the same mutex might be needed to block a
 * whole series of database updates that compose a "transaction" for one
 * thread to complete together without having to write alternate non-locking
 * member functions to invoke for each part of a transaction.
 * 
 * Strangely enough, the original pthread draft standard does not directly
 * support recursive mutexes.  In fact this is the most common "NP" extension
 * for most pthread implementations.  Common C++ emulates recursive mutex
 * behavior when the target platform does not directly support it.
 * 
 * In addition to the Mutex, Common C++ supports a rwlock class.  This
 * implements the X/Open recommended "rwlock".  On systems which do not
 * support rwlock's, the behavior is emulated with a Mutex; however, the
 * advantage of a rwlock over a mutex is then entirely lost.  There has been
 * some suggested clever hacks for "emulating" the behavior of a rwlock with
 * a pair of mutexes and a semaphore, and one of these will be adapted for
 * Common C++ in the future for platforms that do not support rwlock's
 * directly.
 * 
 * @author David Sugar <dyfet@ostel.com>
 * @short Mutex lock for protected access.
 */
class Mutex 
{
private:
#ifndef	PTHREAD_MUTEXTYPE_RECURSIVE
	volatile int _level;
	volatile Thread *_tid;
#endif

protected:
	/**
	 * Pthread mutex object.  This is protected rather than private
	 * because some mixed mode pthread operations require a mutex as
	 * well as their primary pthread object.  A good example of this
	 * is the Event class, as waiting on a conditional object must be
	 * associated with an accessable mutex.  An alternative would be
	 * to make such classes "friend" classes of the Mutex.
	 */
	pthread_mutex_t	_mutex;

public:
	/**
	 * The mutex is always initialized as a recursive entity.
	 */
	Mutex();

	/**
	 * Destroying the mutex removes any system resources associated
	 * with it.  If a mutex lock is currently in place, it is presumed
	 * to terminate when the Mutex is destroyed.
	 */
	~Mutex()
		{pthread_mutex_destroy(&_mutex);};

	/**
	 * Entering a Mutex locks the mutex for the current thread.  This
	 * also can be done using the ENTER_CRITICAL macro or by using the
	 * ++ operator on a mutex.
	 * 
	 * @see #LeaveMutex
	 */
#ifdef	PTHREAD_MUTEXTYPE_RECURSIVE
	inline void EnterMutex(void)
		{pthread_mutex_lock(&_mutex);};
#else
	void	EnterMutex(void);
#endif

	/**
	 * Tries to lock the mutex for the current thread. Behaves like
	 * #EnterMutex , except that it doesn't block the calling thread
	 * if the mutex is already locked by another thread.
	 *
	 * @return true if locking the mutex was succesful otherwise false
	 *
	 * @see EnterMutex
	 * @see LeaveMutex
	 */
	bool TryEnterMutex(void);

	/**
	 * Leaving a mutex frees that mutex for use by another thread.  If
	 * the mutex has been entered (invoked) multiple times (recursivily)
	 * by the same thread, then it will need to be exited the same number
	 * of instances before it is free for re-use.  This operation can
	 * also be done using the LEAVE_CRITICAL macro or by the -- operator
	 * on a mutex.
	 * 
	 * @see #EnterMutex
	 */
#ifdef	PTHREAD_MUTEXTYPE_RECURSIVE
	inline void LeaveMutex(void)
		{pthread_mutex_unlock(&_mutex);};
#else
	void LeaveMutex(void);
#endif
};

/**
 * The Mutex Counter is a counter variable which can safely be incremented
 * or decremented by multiple threads.  A Mutex is used to protect access
 * to the counter variable (an integer).  An initial value can be specified 
 * for the counter, and it can be manipulated with the ++ and -- operators.
 * 
 * @author David Sugar <dyfet@ostel.com>
 * @short Thread protected integer counter.
 */
class MutexCounter : public Mutex
{
private:
	int	counter;

public:
	MutexCounter(int initial = 0);
	
	friend int operator ++(MutexCounter &mc);
	friend int operator --(MutexCounter &mc);
};

/**
 * The AtomicCounter class offers thread-safe manipulation of an integer
 * counter.  These are commonly used for building thread-safe "reference"
 * counters for C++ classes.  The AtomicCounter depends on the platforms
 * support for "atomic" integer operations, and can alternately substitute
 * a "mutex" if no atomic support exists.
 *
 * @author Sean Cavanaugh <sean@dimensionalrift.com>
 * @short atomic counter operation.
 */
class AtomicCounter
{
private:
#ifdef	HAVE_ATOMIC
	atomic_t atomic;
#else
	int counter;
	Mutex lock;
#endif

public:
	/**
	 * Initialize an atomic counter to 0.
	 */
	AtomicCounter();

	/**
	 * Initialize an atomic counter to a known value.
	 *
	 * @param initial value.
	 */
	AtomicCounter(int value);

	int operator++(void);
	int operator--(void);
	int operator+=(int change);
	int operator-=(int change);
	int operator+(int change);
	int operator-(int change);
	int operator=(int value);
	bool operator!(void);
	operator int();
};

/**
 * A semaphore is generally used as a synchronization object between multiple
 * threads or to protect a limited and finite resource such as a memory or
 * thread pool.  The semaphore has a counter which only permits access by
 * one or more threads when the value of the semaphore is non-zero.  Each
 * access reduces the current value of the semaphore by 1.  One or more
 * threads can wait on a semaphore until it is no longer 0, and hence the
 * semaphore can be used as a simple thread synchronization object to enable
 * one thread to pause others until the thread is ready or has provided data
 * for them.  Semaphores are typically used as a
 * counter for protecting or limiting concurrent access to a given
 * resource, such as to permitting at most "x" number of threads to use
 * resource "y", for example.   
 * 
 * @author David Sugar <dyfet@ostel.com>
 * @short Semaphore counter for thread synchronization. 
 */
class Semaphore
{
protected:
	sem_t _semaphore;

public:
	/**
	 * The initial value of the semaphore can be specified.  An initial
	 * value is often used When used to lock a finite resource or to 
	 * specify the maximum number of thread instances that can access a 
	 * specified resource.
	 * 
	 * @param resource specify initial resource count or 0 default.
	 */
	Semaphore(size_t resource = 0);

	/**
	 * Destroying a semaphore also removes any system resources
	 * associated with it.  If a semaphore has threads currently waiting
	 * on it, those threads will all continue when a semaphore is
	 * destroyed.
	 */
	~Semaphore()
#ifndef	__linux__
		{sem_destroy(&_semaphore);}
#endif
	;

	/**
	 * Wait is used to keep a thread held until the semaphore counter
	 * is greater than 0.  If the current thread is held, then another
	 * thread must increment the semaphore.  Once the thread is accepted, 
	 * the semaphore is automatically decremented, and the thread 
	 * continues execution.
	 * 
	 * The pthread semaphore object does not support a timed "wait", and
	 * hence to maintain consistancy, neither the posix nor win32 source
	 * trees support "timed" semaphore objects.
	 * 
	 * @see #Post
	 */
	void Wait(void)
#ifndef	__linux__
		{sem_wait(&_semaphore);}
#endif
	;

	/**
	 * TryWait is a non-blocking variant of Wait. If the semaphore counter
	 * is greater than 0, then the thread is accepted and the semaphore
	 * counter is decreased. If the semaphore counter is 0 TryWait returns
	 * immediately with false.
	 *
	 * @return true if thread is accepted otherwise false
	 *
	 * @see #Wait
	 * @see #Post
	 */
	bool TryWait(void)
#ifndef	__linux__
		{ return ( sem_trywait(&_semaphore) == 0 ) ? true : false; }
#endif
	;
	
	/**
	 * Posting to a semaphore increments its current value and releases
	 * the first thread waiting for the semaphore if it is currently at
	 * 0.  Interestingly, there is no support to increment a semaphore by
	 * any value greater than 1 to release multiple waiting threads in
	 * either pthread or the win32 API.  Hence, if one wants to release
	 * a semaphore to enable multiple threads to execute, one must perform
	 * multiple post operations.
	 * 
	 * @see #Wait
	 */
	void Post(void)
#ifndef	__linux__
		{sem_post(&_semaphore);}
#endif
	;

	/**
	 * Get the current value of a semaphore.
	 *
	 * @return current value.
	 */
	int getValue(void);
};

/**
 * The Event class implements a feature originally found in the WIN32 API;
 * event notification.  A target thread waits on a resetable Event, and one
 * or more other threads can then signal the waiting thread to resume 
 * execution.  A timeout can be used to specify a wait duration in 
 * milliseconds.  The Event class must be reset before it can be used again 
 * as a trigger.  These event objects
 * use a trigger/reset mechanism and are related to low level conditional
 * variables.
 * 
 * @author: David Sugar <dyfet@ostel.com>
 * @short Thread synchornization on event notification.
 */
class Event : public Mutex
{
protected:
	pthread_cond_t _cond;
	bool _signaled;
	int _count;

public:
	Event();

	~Event()
		{pthread_cond_destroy(&_cond);};
	
	/**
	 * Once signaled, the Event class must be "reset" before responding
	 * to a new signal.
	 * 
	 * @see #Signal
	 */
	void Reset(void)
		{_signaled = false;};

	/**
	 * Signal the event for the waiting thread.
	 */
	void Signal(void);
	/**
	 * Wait either for the event to be signaled by another thread or
	 * for the specified timeout duration.
	 * 
	 * @see #Signal
	 * @return true if signaled, false if timed out.
	 * @param timer timeout in milliseconds to wait for a signal.
	 */
	bool Wait(timeout_t timer = 0);
};

/**
 * The buffer class represents an IPC service that is built upon a buffer
 * of fixed capacity that can be used to transfer objects between one or
 * more producer and consumer threads.  Producer threads post objects
 * into the buffer, and consumer threads wait for and receive objects from
 * the buffer.  Semaphores are used to to block the buffer from overflowing
 * and indicate when there is data available, and mutexes are used to protect
 * multiple consumers and producer threads from stepping over each other.
 * 
 * The buffer class is an abstract class in that the actual data being
 * buffered is not directly specified within the buffer class itself.  The
 * buffer class should be used as a base class for a class that actually
 * impliments buffering and which may be aware of the data types actually
 * are being buffered.  A template class could be created based on buffer
 * for this purpose.  Another possibility is to create a class derived
 * from both Thread and Buffer which can be used to implement message passing
 * threads.
 * 
 * @author David Sugar <dyfet@ostel.com>
 * @short Producer/Consumer buffer for use between threads.
 */
class Buffer
{
private:
	Mutex lock_head, lock_tail;
	Semaphore size_head, size_tail;
	size_t _size;
	size_t _used;

protected:
	/**
	 * Invoke derived class buffer peeking method.
	 * @return size of object found.
	 * @param buf pointer to copy contents of head of buffer to.
	 */
	virtual int OnPeek(void *buf) = 0;
	/**
	 * Invoke derived class object request from buffer.
	 * @return size of object returned.
	 * @param buf pointer to hold object returned from the buffer.
	 */
	virtual int OnWait(void *buf) = 0;
	/**
	 * Invoke derived class posting of object to buffer.
	 * @return size of object posted.
	 * @param buf pointer to object being posted to the buffer.
	 */
	virtual int OnPost(void *buf) = 0;

public:
	/**
	 * Create a buffer object of known capacity.
	 * @param capcity is the integer capacity of the buffer.
	 */
	Buffer(size_t capacity);
	/**
	 * In derived functions, may be used to free the actual memory
	 * used to hold buffered data.
	 */
	virtual ~Buffer()
		{return;};

	/**
	 * Return the capacity of the buffer as specified at creation.
	 * @return size of buffer.
	 */
	inline size_t getSize(void)
		{return _size;};
	
	/**
	 * Return the current capacity in use for the buffer.  Free space
	 * is technically getSize() - getUsed().
	 * @return integer used capacity of the buffer.
	 * @see #getSize
	 */
	inline size_t getUsed(void)
		{return _used;};

	/**
	 * Let one or more threads wait for an object to become available
	 * in the buffer.  The waiting thread(s) will wait forever if no
	 * object is ever placed into the buffer.
	 * 
	 * @return size of object passed by buffer in bytes.
	 * @param buf pointer to store object retrieved from the buffer.
	 */
	int Wait(void *buf);

	/**
	 * Post an object into the buffer and enable a waiting thread to
	 * receive it.
	 * 
	 * @return size of object posted in bytes.
	 * @param buf pointer to object to store in the buffer.
	 */
	int Post(void *buf);

	/**
	 * Peek at the current content (first object) in the buffer.
	 * 
	 * @return size of object in the buffer.
	 * @param buf pointer to store object found in the buffer.
	 */
	int Peek(void *buf);

	/**
	 * New virtual to test if buffer is a valid object.
	 * @return true if object is valid.
	 */
	virtual bool isValid(void)
		{return true;};
};

/**
 * A buffer class that holds a known capacity of fixed sized objects defined
 * during creation.
 * 
 * @author David Sugar <dyfet@ostel.com>
 * @short producer/consumer buffer for fixed size objects.
 */
class FixedBuffer : public Buffer
{
private:
	char *buf, *head, *tail;
	size_t objsize;

protected:
	/**
	 * Return the first object in the buffer.
	 * @return predefined size of this buffers objects.
	 * @param buf pointer to copy contents of head of buffer to.
	 */
	int OnPeek(void *buf);

	/**
	 * Wait for and return a fixed object in the buffer.
	 * @return predefined size of this buffers objects.
	 * @param buf pointer to hold object returned from the buffer.
	 */
	int OnWait(void *buf);

	/**
	 * Post an object of the appropriate size into the buffer.
	 * @return predefined size of this buffers objects.
	 * @param buf pointer to data to copy into the buffer.
	 */
	int OnPost(void *buf);	

public:
	/**
	 * Create a buffer of known capacity for objects of a specified
	 * size.
	 * 
	 * @param capacity of the buffer.
	 * @param objsize for each object held in the buffer.
	 */
	FixedBuffer(size_t capacity, size_t objsize);

	/**
	 * Create a copy of an existing fixed size buffer and duplicate
	 * it's contents.
	 * 
	 * @param fb existing FixedBuffer object.
	 */
	FixedBuffer(const FixedBuffer &fb);

	/**
	 * Destroy the fixed buffer and free the memory used to store objects.
	 */
	~FixedBuffer();

	FixedBuffer &operator=(const FixedBuffer &fb);

	bool isValid(void);
};

/**
 * Every thread of execution in an application is created by deriving
 * a unique class from the Thread class and by implementing the Run
 * method.  The base Thread class supports encapsulation of the generic
 * threading methods implemented on various target operating systems.  
 * This includes the ability to start and stop threads in a synchronized
 * and controllable manner, the ability to specify thread execution priority,
 * and thread specific "system call" wrappers, such as for sleep and yield.
 * A thread exception is thrown if the thread cannot be created.
 * Threading was the first part of Common C++ I wrote, back when it was still
 * the APE library.  My goal for Common C++ threading has been to make
 * threading as natural and easy to use in C++ application development as
 * threading is in Java.  With this said, one does not need to use threading
 * at all to take advantage of Common C++.  However, all Common C++ classes
 * are designed at least to be thread-aware/thread-safe as appropriate and
 * necessary.
 * 
 * Common C++ threading is currently built either from the Posix "pthread"
 * library or using the win32 SDK.  In that the Posix "pthread" draft
 * has gone through many revisions, and many system implementations are
 * only marginally compliant, and even then usually in different ways, I
 * wrote a large series of autoconf macros found in ost_pthread.m4 which
 * handle the task of identifying which pthread features and capabilities
 * your target platform supports.  In the process I learned much about what
 * autoconf can and cannot do for you..
 * 
 * Currently the GNU Portable Thread library (GNU pth) is not directly
 * supported in Common C++.  While GNU "Pth" doesn't offer direct
 * native threading support or benefit from SMP hardware, many of the design
 * advantages of threading can be gained from it's use, and the  Pth pthread
 * "emulation" library should be usable with Common C++.  In the future,
 * Common C++ will directly support Pth, as well as OS/2 and BeOS native
 * threading API's.
 * 
 * Common C++ itself defines a fairly "neutral" threading model that is
 * not tied to any specific API such as pthread, win32, etc.  This neutral
 * thread model is contained in a series of classes which handle threading
 * and synchronization and which may be used together to build reliable
 * threaded applications.
 * 
 * Common C++ defines application specific threads as objects which are
 * derived from the Common C++ "Thread" base class.  At minimum the "Run"
 * method must be implemented, and this method essentially is the "thread",
 * for it is executed within the execution context of the thread, and when
 * the Run method terminates the thread is assumed to have terminated.
 * 
 * Common C++ allows one to specify the running priority of a newly created
 * thread relative to the "parent" thread which is the thread that is
 * executing when the constructor is called.  Since most newer C++
 * implementations do not allow one to call virtual constructors or virtual
 * methods from constructors, the thread must be "started" after the
 * constructor returns.  This is done either by defining a "starting"
 * semaphore object that one or more newly created thread objects can wait
 * upon, or by invoking an explicit "Start" member function.
 * 
 * Threads can be "suspended" and "resumed".  As this behavior is not defined
 * in the Posix "pthread" specification, it is often emulated through
 * signals.  Typically SIGUSR1 will be used for this purpose in Common C++
 * applications, depending in the target platform.  On Linux, since threads
 * are indeed processes, SIGSTP and SIGCONT can be used.  On solaris, the
 * Solaris thread library supports suspend and resume directly.
 * 
 * Threads can be canceled.  Not all platforms support the concept of
 * externally cancelable threads.  On those platforms and API
 * implementations that do not, threads are typically canceled through the
 * action of a signal handler.
 * 
 * As noted earlier, threads are considered running until the "Run" method
 * returns, or until a cancellation request is made.  Common C++ threads can
 * control how they respond to cancellation, using setCancellation().
 * Cancellation requests can be ignored, set to occur only when a
 * cancellation "point" has been reached in the code, or occur immediately.
 * Threads can also exit by returning from Run() or by invoking the Exit()
 * method.
 * 
 * Generally it is a good practice to initialize any resources the thread may
 * require within the constructor of your derived thread class, and to purge
 * or restore any allocated resources in the destructor.  In most cases, the
 * destructor will be executed after the thread has terminated, and hence
 * will execute within the context of the thread that requested a join rather
 * than in the context of the thread that is being terminated.  Most
 * destructors in derived thread classes should first call Terminate() to
 * make sure the thread has stopped running before releasing resources.
 * 
 * A Common C++ thread is normally canceled by deleting the thread object.
 * The process of deletion invokes the thread's destructor, and the
 * destructor will then perform a "join" against the thread using the
 * Terminate() function.  This behavior is not always desirable since the
 * thread may block itself from cancellation and block the current "delete"
 * operation from completing.  One can alternately invoke Terminate()
 * directly before deleting a thread object.
 * 
 * When a given Common C++ thread exits on it's own through it's Run()
 * method, a "Final" method will be called.  This Final method will be called
 * while the thread is "detached".  If a thread object is constructed through
 * a "new" operator, it's final method can be used to "self delete" when
 * done, and allows an independent thread to construct and remove itself
 * autonomously.
 * 
 * A special global function, getThread(), is provided to identify the thread
 * object that represents the current execution context you are running
 * under.  This is sometimes needed to deliver signals to the correct thread.
 * Since all thread manipulation should be done through the Common C++ (base) 
 * thread class itself, this provides the same functionality as things like
 * "pthread_self" for Common C++.
 * 
 * Common C++ threads are often aggregated into other classes to provide
 * services that are "managed" from or operate within the context of a
 * thread, even within the Common C++ framework itself.  A good example of
 * this is the TCPSession class, which essentially is a combination of a TCP
 * client connection and a separate thread the user can define by deriving a
 * class with a Run() method to handle the connected service.  This
 * aggregation logically connects the successful allocation of a given
 * resource with the construction of a thread to manage and perform 
 * operations for said resource.
 * 
 * Threads are also used in "service pools".  In Common C++, a service pool
 * is one or more threads that are used to manage a set of resources.  While
 * Common C++ does not provide a direct "pool" class, it does provide a model
 * for their implementation, usually by constructing an array of thread
 * "service" objects, each of which can then be assigned the next new
 * instance of a given resource in turn or algorithmically.
 * 
 * Threads have signal handlers associated with them.  Several signal types
 * are "predefined" and have special meaning.  All signal handlers are
 * defined as virtual member functions of the Thread class which are called
 * when a specific signal is received for a given thread.  The "SIGPIPE"
 * event is defined as a "Disconnect" event since it's normally associated
 * with a socket disconnecting or broken fifo.  The Hangup() method is
 * associated with the SIGHUP signal.  All other signals are handled through
 * the more generic Signal().
 * 
 * Incidently, unlike Posix, the win32 API has no concept of signals, and
 * certainly no means to define or deliver signals on a per-thread basis.
 * For this reason, no signal handling is supported or emulated in the win32
 * implementation of Common C++ at this time.
 * 
 * In addition to TCPStream, there is a TCPSession class which combines a
 * thread with a TCPStream object.  The assumption made by TCPSession is that
 * one will service each TCP connection with a separate thread, and this
 * makes sense for systems where extended connections may be maintained and
 * complex protocols are being used over TCP.
 * 
 * 
 * @author David Sugar <dyfet@ostel.com>
 * @short base class used to derive all threads of execution.
 */
class Thread
{
private:
	friend class Slog;

	static Thread *_main;

#ifndef	_SIG_THREAD_ALARM
	static Thread *_timer;
	static Mutex _arm;
#endif
	
	Thread *_parent;
	pthread_t _tid;
	pthread_attr_t _attr;
	thread_cancel_t	_cancel;
	jmp_buf	_env;
	time_t	_alarm;
	Semaphore *_start;
	int _msgpos;
	char _msgbuf[128];
	throw_t _throw;

	friend void execHandler(Thread *th);
	friend void sigHandler(int signo);
	friend Thread *getThread(void);

protected:
       	/**
	 * All threads execute by deriving the Run method of Thread.
	 * This method is called after Initial to begin normal operation
	 * of the thread.  If the method terminates, then the thread will
	 * also terminate after notifying it's parent and calling it's
	 * Final() method.
	 * 
	 * @see #Initial
	 */
	virtual void Run(void) = 0;

	/**
	 * This method is called for the very first instance of a new thread
	 * being created in a multi-threaded application.  Hence, it is only
	 * called once, and by the derived Thread class that happens to be
	 * created first.
	 */
	virtual void First(void)
		{return;};
	
	/**
	 * A thread that is self terminating, either by invoking Exit() or
	 * leaving it's Run(), will have this method called.  It can be used
	 * to self delete the current object assuming the object was created
	 * with new on the heap rather than stack local, hence one may often
	 * see Final defined as "delete this" in a derived thread class.  A
	 * Final method, while running, cannot be terminated or cancelled by
	 * another thread.
	 * 
	 * @see #Exit
	 * @see #Run
	 */
	virtual void Final(void)
		{return;};

	/**
	 * The initial method is called by a newly created thread when it
	 * starts execution.  This method is ran with deferred cancellation
	 * disabled by default.  The Initial method is given a seperate
	 * handler so that it can create temporary objects on it's own
	 * stack frame, rather than having objects created on Run() that
	 * are only needed by startup and yet continue to consume stack space.
	 * 
	 * @see #Run
	 */
	virtual void Initial(void)
		{return;};
	
	/**
	 * Since getParent() and getThread() only refer to an object of the
	 * Thread "base" type, this virtual method can be replaced in a
	 * derived class with something that returns data specific to the
	 * derived class that can still be accessed through the pointer
	 * returned by getParent() and getThread().
	 *
	 * @return pointer to derived class specific data.
	 */
	virtual void *getExtended(void)
		{return NULL;};

	/**
	 * When a thread terminates, it now sends a notification message
	 * to the parent thread which created it.  The actual use of this
	 * notification is left to be defined in a derived class.
	 * 
	 * @param th the thread that has terminated.
	 */
	virtual void Notify(Thread *th)
		{return;};

	/**
	 * In the Posix version of Common C++, this can be used to send a
	 * signal into the parent thread of the current object.
	 * 
	 * @param signo a posix signal id.
	 */
	inline void SignalParent(signo_t signo)
		{_parent->SignalThread(signo);};

	/**
	 * In the Posix version of Common C++, this can be used to send a
	 * signal into the nain application thread.
	 * 
	 * @param signo a posix signal id.
	 */
	inline void SignalMain(signo_t signo)
		{_main->SignalThread(signo);};

	/**
	 * A derivable method to call when a SIGALRM is being delivered
	 * to a specific thread.
	 */
	virtual void OnTimer(void)
		{return;};

	/**
	 * A derived method to handle hangup events being delivered
	 * to a specific thread.
	 */
	virtual void OnHangup(void)
		{return;};

	/**
	 * A derived method to call when a SIGABRT is being delivered
	 * to a specific thread.
	 */
	virtual void OnException(void)
		{return;};

	/**
	 * A derived method to call when a SIGPIPE is being delivered
	 * to a specific thread.
	 */
	virtual void OnDisconnect(void)
		{return;};

	/**
	 * A derived method to handle asynchronous I/O requests delivered
	 * to the specified thread.
	 */
	virtual void OnPolling(void)
		{return;};

	/**
	 * A derivable method to call for delivering a signal event to
	 * a specified thread.
	 *
	 * @param signo posix signal id.
	 */
	virtual void OnSignal(int signo)
		{return;};

	/**
	 * A thread-safe sleep call.  On most Posix systems, "sleep()"
	 * is implimented with SIGALRM making it unusable from multipe
	 * threads.  Pthread libraries often define an alternate "sleep"
	 * handler such as usleep(), nanosleep(), or nap(), that is thread
	 * safe, and also offers a higher timer resolution.
	 * 
	 * @param msec timeout in milliseconds.
	 */
	inline void Sleep(timeout_t msec)
		{ccxx_sleep(msec);};

	/**
	 * Used to properly exit from a Thread derived Run() or Initial()
	 * method.  Terminates execution of the current thread and calls
	 * the derived classes Final() method.
	 */
	inline void Exit(void)
		{longjmp(_env, 1);};
	       
	/**
	 * Used to specify a timeout event that can be delivered to the
	 * current thread via SIGALRM.  When the timer expires, the OnTimer() 
	 * method is called for the thread.  At present, only one thread
	 * timer can be active at any given time.  On some operating
	 * systems (including Linux) a timer can be active on each thread.
	 * 
	 * @param timer timeout in milliseconds.
	 */
	void setTimer(timeout_t timer);
	/**
	 * Gets the time remaining for the current threads timer before
	 * it expires.
	 * 
	 * @return time remaining before timer expires in milliseconds.
	 */
	timeout_t getTimer(void);
	/**
	 * Terminates the timer before the timeout period has expired.
	 * This prevents the timer from sending it's SIGALRM and makes
	 * the timer available to other threads.
	 */
	void endTimer(void);
	/**
	 * Used to wait on a Posix signal from another thread.  This can be
	 * used as a crude rondevious/synchronization method between threads.
	 * 
	 * @param signo a posix signal id.
	 */
	void WaitSignal(signo_t signo);
	/**
	 * Yeilds the current thread's CPU time slice to allow another thread to
	 * begin immediate execution.
	 */
	void Yield(void);
	/**
	 * test a cancellation point for deferred thread cancellation.
	 */
	void testCancel(void);
	/**
	 * Sets thread cancellation mode.  Threads can either be set immune to
	 * termination (THREAD_CANCEL_DISABLED), can be set to terminate when
	 * reaching specific "thread cancellation points" (THREAD_CANCEL_DEFERRED)
	 * or immediately when Terminate is requested (THREAD_CANCEL_IMMEDIATE).
	 * 
	 * @param mode for cancellation of the current thread.
	 */
	void setCancel(thread_cancel_t mode);
	/**
	 * Sets the thread's ability to be suspended from execution.  The
	 * thread may either have suspend enabled (THREAD_SUSPEND_ENABLE) or
	 * disabled (THREAD_SUSPEND_DISABLE).
	 * 
	 * @param mode for suspend.
	 */
	void setSuspend(thread_suspend_t mode);
	/**
	 * Used to enable or disable a signal within the current thread.
	 *
	 * @param signo posix signal id.
	 * @param active set to true to enable.
	 */
	void setSignal(int signo, bool mode);
	/**
	 * Used by another thread to terminate the current thread.  Termination
	 * actually occurs based on the current setCancel() mode.  When the
	 * current thread does terminate, control is returned to the requesting
	 * thread.  Terminate() should always be called at the start of any
	 * destructor of a class derived from Thread to assure the remaining
	 * part of the destructor is called without the thread still executing.
	 */
	void Terminate(void);

	/**
	 * clear parent thread relationship.
	 */
	inline void clrParent(void)
		{_parent = NULL;};
public:
	/**
	 * This is actually a special constructor that is used to create a
	 * thread "object" for the current execution context when that context
	 * is not created via an instance of a derived Thread object itself.
	 * This constructor does not support First.
	 * 
	 * @param bool used if the main "thread" of the application.
	 */
	Thread(bool flag);
	/**
	 * When a thread object is contructed, a new thread of execution
	 * context is created.  This constructor allows basic properties
	 * of that context (thread priority, stack space, etc) to be defined.
	 * The starting condition is also specified for whether the thread
	 * is to wait on a semaphore before begining execution or wait until
	 * it's start method is called.
	 * 
	 * @param start semaphore to wait before executing thread.
	 * @param pri thread base priority relative to it's parent.
	 * @param stack space as needed in some implementations.
	 */
	Thread(Semaphore *start = NULL, int pri = 0, size_t stack = 0);
	/**
	 * A thread of execution can also be specified by cloning an existing
	 * thread.  The existing thread's properties (cancel mode, priority,
	 * etc), are also duplicated.
	 * 
	 * @param th currently executing thread object to clone.
	 */
	Thread(const Thread &th);
	/**
	 * The thread destructor should clear up any resources that have
	 * been allocated by the thread.  The desctructor of a derived
	 * thread should begin with Terminate() and is presumed to then
	 * execute within the context of the thread causing terminaton.
	 */
	virtual ~Thread()
		{Terminate();};
	
	/**
	 * When a new thread is created, it does not begin immediate
	 * execution.  This is because the derived class virtual tables
	 * are not properly loaded at the time the C++ object is created
	 * within the constructor itself, at least in some compiler/system 
	 * combinations.  The thread can either be told to wait for an
	 * external semaphore, or it can be started directly after the
	 * constructor completes by calling the Start() method.
	 * 
	 * @return error code if execution fails.
	 * @param start optional starting semaphore to alternately use.
	 */
	int Start(Semaphore *start = NULL);

	/**
	 * Gets the pointer to the Thread class which created the current
	 * thread object.
	 * 
	 * @return a Thread *, or "(Thread *)this" if no parent.
	 */
	inline Thread *getParent(void)
		{return _parent;};
		
	/**
	 * Delivers a Posix signal to the current thread.
	 * 
	 * @param signo a posix signal id.
	 */
	inline void SignalThread(int signo)
		{pthread_kill(_tid, signo);};

	/**
	 * Suspends execution of the selected thread.  Pthreads do not
	 * normally support suspendable threads, so the behavior is
	 * simulated with signals.  On systems such as Linux that
	 * define threads as processes, SIGSTOP and SIGCONT may be used.
	 */
#ifdef _THR_SUNOS5
	inline void Suspend(void)
		{thr_suspend((thread_t)_tid);};
#else
	inline void Suspend(void)
		{pthread_kill(_tid, _SIG_THREAD_SUSPEND);};
#endif

	/**
	 * Resumes execution of the selected thread.
	 */
#ifdef	_THR_SUNOS5
	inline void Resume(void)
		{thr_continue((thread_t)_tid);};
#else
	inline void Resume(void)
		{pthread_kill(_tid, _SIG_THREAD_RESUME);};
#endif

	/**
	 * Used to retrieve the cancellation mode in effect for the
	 * selected thread.
	 * 
	 * @return cancellation mode constant.
	 */
	inline int getCancel(void)
		{return _cancel;};

	/**
	 * Verifies if the thread is still running or has already been
	 * terminated but not yet deleted.
	 * 
	 * @return true if the thread is still executing.
	 */
	bool isRunning(void);

	/**
	 * Tests to see if the current execution context is the same as
	 * the specified thread object.
	 * 
	 * @return true if the current context is this object.
	 */
	bool isThread(void);

	/**
	 * Get exception mode of the current thread.
	 *
	 * @return exception mode.
	 */
	friend throw_t getException(void);

	/**
	 * Set exception mode of the current thread.
	 *
	 * @return exception mode.
	 */
	friend void setException(throw_t mode);

	/**
	 * Thread safe sleep call replacement.  This is mapped into sleep().
	 * 
	 * @param msec timeout in millisecond time range.
	 */
	friend void ccxx_sleep(timeout_t msec);

	/**
	 * Suspend the execution of the specified thread.
	 * 
	 * @param th specified thread.
	 */
	friend void suspend(Thread &th)
		{pthread_kill(th._tid, _SIG_THREAD_SUSPEND);};

	/**
	 * Resume execution of the specified thread.
	 * 
	 * @param th specified thread.
	 */
	friend void resume(Thread &th)
		{pthread_kill(th._tid, _SIG_THREAD_RESUME);};

	/**
	 * Signal the semaphore that the specified thread is waiting for
	 * before beginning execution.
	 * 
	 * @param th specified thread.
	 */
	friend inline void operator++(Thread &th)
		{th._start->Post();};

	friend inline void operator--(Thread &th)
		{th._start->Wait();};

	/**
	 * Start execution of a specified thread.
	 */
	friend inline int start(Thread &th, Semaphore *start)
		{return th.Start(start);};

	/**
	 * Install a signal handler for use by threads and
	 * the OnSignal() event notification handler.
	 *
	 * @param signo posix signal id.
	 */

	friend void siginstall(int signo);
};

/**
 * This class allows the creation of a thread context unique "pointer"
 * that can be set and retrieved and can be used to create thread specific
 * data areas for implementing "thread safe" library routines.
 * 
 *  Finally, Common C++ supports a
 * thread-safe "AtomicCounter" class.  This can often be used for reference
 * counting without having to protect the counter with a separate Mutex
 * counter.  This lends to lighter-weight code.
 * 
 * 
 * @author David Sugar <dyfet@ostel.com>
 * @short container for thread specific data storage.
 */
class ThreadKey
{
private:
	pthread_key_t key;

public:
	/**
	 * Create a unique thread specific container.
	 */
	ThreadKey();
	/**
	 * Destroy a thread specific container and any contents reserved.
	 */
	~ThreadKey();
	/**
	 * Get the value of the pointer for the thread specific data
	 * container.  A unique pointer can be set for each execution
	 * context.
	 * 
	 * @return a unique void * for each execution context.
	 */
	void *getKey(void);
	/**
	 * Set the value of the pointer for the current thread specific
	 * execution context.  This can be used to store thread context
	 * specific data.
	 * 
	 * @param ptr to thread context specific data.
	 */
	void setKey(void *);
};

/**
 * Timer ports are used to provide synchronized timing events when managed
 * under a "service thread" such as SocketService.  This is made into a
 * stand-alone base class since other derived libraries (such as the
 * serial handlers) may also use the pooled "service thread" model
 * and hence also require this code for managing timing.
 *
 * @author David Sugar <dyfet@ostel.com>
 * @short synchronized millisecond timing for service threads.
 */
class TimerPort
{
	struct timeval timer;
	bool active;

protected:
	/**
	 * Create a timer, mark it as inactive, and set the initial
	 * "start" time to the creation time of the timer object.  This
	 * allows "incTimer" to initially refer to time delays relative
	 * to the original start time of the object.
	 */
	TimerPort();

public:
	/**
	 * Set a new start time for the object based on when this call is
	 * made and optionally activate the timer for a specified number
	 * of milliseconds.  This can be used to set the starting time
	 * of a realtime session.
	 *
	 * @param timeout delay in milliseconds from "now"
	 */
	void setTimer(timeout_t timeout = 0);

	/**
	 * Set a timeout based on the current time reference value either
	 * from object creation or the last setTimer().  This reference
	 * can be used to time synchronize realtime data over specified
	 * intervals and force expiration when a new frame should be
	 * released in a synchronized manner.  
	 *
	 * @param timeout delay in milliseconds from reference.
	 */
	void incTimer(timeout_t timeout);

	/**
	 * This is used to "disable" the service thread from expiring
	 * the timer object.  It does not effect the reference time from
	 * either creation or a setTimer().
	 */
	void endTimer(void);

	/**
	 * This is used by service threads to determine how much time
	 * remains before the timer expires based on a timeout specified
	 * in setTimer() or incTimer().  It can also be called after
	 * setting a timeout with incTimer() to see if the current timeout
	 * has already expired and hence that the application is already
	 * delayed and should skip frame(s).
	 *
	 * return time remaining in milliseconds, or -1 if inactive.
	 */
	timeout_t getTimer(void);
};

inline void *getKey(ThreadKey &tk)
	{return tk.getKey();};

inline void setKey(ThreadKey &tk, void *ptr)
	{tk.setKey(ptr);};

inline void operator ++(Mutex &m)
	{m.EnterMutex();};
		
inline void operator --(Mutex &m)
	{m.LeaveMutex();};

inline void operator ++(Semaphore &s)
	{s.Post();};

inline void operator --(Semaphore &s)
	{s.Wait();};

inline void operator ++(Event &s)
	{s.Signal();};

inline void operator --(Event &s)
	{s.Wait();};

/*
 * on some systems, signal(signum, handler) is a macro
 */
#undef signal
inline void signal(Thread &th, int signo)
	{th.SignalThread(signo);};

inline void signal(Event &ev)
	{ev.Signal();};

inline void signal(Semaphore &sem)
	{sem.Post();};

inline void wait(Semaphore &sem)
	{sem.Wait();};

inline void wait(Event &ev, timeout_t timer)
	{ev.Wait(timer);};

inline void reset(Event &ev)
	{ev.Reset();};

inline int get(Buffer &b, void *o)
	{return b.Wait(o);};

inline int put(Buffer &b, void *o)
	{return b.Post(o);};

inline int peek(Buffer &b, void *o)
	{return b.Peek(o);};

int operator++(MutexCounter &mc);
int operator--(MutexCounter &mc);

struct	timespec *gettimeout(struct timespec *spec, timeout_t timeout);	
void	ccxx_sleep(timeout_t msec);
void	ccxx_yield(void);
void	wait(signo_t signo);
/**
 *   
 * This function provides a simple and portable means to
 * fork/detach a process into a daemon.
 *
 * @author David Sugar <dyfet@ostel.com>
 * @short This function provides a simple and portable means to fork/detach a process into a daemon.
 */
void	pdetach(void);

#ifdef	__CCXX_NAMESPACE_H__
#undef	__CCXX_NAMESPACE_H__
#include <cc++/namespace.h>
#endif

#if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
#if defined(HAVE_SYS_STREAM_H)
#if defined(__linux__)
#define __CCXX_USE_POLL 1
#endif
#else
#define	__CCXX_USE_POLL 1
#endif
#endif

#ifdef __CCXX_USE_POLL

/**
 * The poller class is used to help manage pollfd structs for use in the
 * updated serial and socket "port" code.
 *
 * @author Gianni Mariani <gianni@mariani.ws>
 * @short pollfd assistance class for port classes.
 */
class Poller 
{
private:
	int nufds;
	pollfd *ufds;

public:
	Poller();

	~Poller();

	/**
	 * reserve a specified number of poll descriptors.  If additional
	 * descriptors are needed, they are allocated.
	 *
	 * @return new array of descriptors.
	 * @param number of desctiptors to reserve
	 */
	pollfd *getList(int cnt);

	/**
	 * Retreive the current array of poll descriptors.
	 *
	 * @return array of descriptors.
	 */
	inline	pollfd *getList(void)
		{return ufds;};
};
#endif


#endif
/** EMACS **
 * Local variables:
 * mode: c++
 * c-basic-offset: 8
 * End:
 */
// Copyright (C) 1999-2001 Open Source Telecom Corporation.
//  
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
// 
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
// 
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software 
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
// 
// As a special exception to the GNU General Public License, permission is 
// granted for additional uses of the text contained in its release 
// of Common C++.
// 
// The exception is that, if you link the Common C++ library with other
// files to produce an executable, this does not by itself cause the
// resulting executable to be covered by the GNU General Public License.
// Your use of that executable is in no way restricted on account of
// linking the Common C++ library code into it.
//
// This exception does not however invalidate any other reasons why
// the executable file might be covered by the GNU General Public License.
//
// This exception applies only to the code released under the 
// name Common C++.  If you copy code from other releases into a copy of
// Common C++, as the General Public License permits, the exception does
// not apply to the code that you add in this way.  To avoid misleading
// anyone as to the status of such modified files, you must delete
// this exception notice from them.
// 
// If you write modifications of your own for Common C++, it is your choice
// whether to permit this exception to apply to your modifications.
// If you do not wish that, delete this exception notice.  

#include "config.h"
#include "macros.h"
#include "thread.h"
#include <stdlib.h>

typedef	void	*(*exec_t)(void *);
typedef	RETSIGTYPE (*signalexec_t)(int);

#ifdef	_SIG_THREAD_CANCEL

static	RETSIGTYPE	sigcancel(int)
{
	pthread_exit(NULL);
}

#endif

#ifndef	_SIG_THREAD_STOPCONT
#ifndef _THR_SUNOS5

static	RETSIGTYPE	sigsuspend(int)
{
	sigset_t sigs;

	sigemptyset(&sigs);
	sigaddset(&sigs, SIGUSR3);
#ifdef	HAVE_SIGWAIT2
	sigwait(&sigs, NULL);
#else
	sigwait(&sigs);
#endif
}
#endif
#endif

static	sigset_t *blocked_signals(sigset_t *sig)
{
	sigemptyset(sig);
	sigaddset(sig, SIGINT);
	sigaddset(sig, SIGKILL);
	sigaddset(sig, SIGHUP);
	sigaddset(sig, SIGABRT);
	sigaddset(sig, SIGALRM);
	sigaddset(sig, SIGPIPE);
#ifndef	_SIG_THREAD_STOPCONT
	sigaddset(sig, SIGUSR3);
#endif
	return sig;
}

class MainThread : public Thread
{
protected:
	void Run(void) {return;};
	void OnSignal(int signo) {exit(signo);};

public:
	MainThread() : Thread(true) {};
};

static	ThreadKey _self;
static	MainThread _mainthread;

Thread	*Thread::_main = NULL;

#ifndef	_SIG_THREAD_ALARM
Thread	*Thread::_timer = NULL;
Mutex	Thread::_arm;
#endif

Thread::Thread(bool flag)
{
	_tid = pthread_self();
	_throw = THROW_OBJECT;
	_parent = NULL;
	_msgpos = 0;
	struct sigaction act;

	if(flag == true)
	{
		siginstall(SIGHUP);
		siginstall(SIGALRM);
		siginstall(SIGPIPE);
		siginstall(SIGABRT);

		act.sa_handler = (signalexec_t)&sigHandler;
		sigemptyset(&act.sa_mask);
#ifdef	SA_RESTART
		act.sa_flags = SA_RESTART;
#else
		act.sa_flags = 0;
#endif
#ifdef	SA_INTERRUPT
		act.sa_flags |= SA_INTERRUPT;
#endif
#ifdef	SIGPOLL
		sigaction(SIGPOLL, &act, NULL);
#else
		sigaction(SIGIO, &act, NULL);
#endif		

#ifndef	_SIG_THREAD_STOPCONT
#ifndef	_THR_SUNOS5
		act.sa_handler = sigsuspend;
		sigemptyset(&act.sa_mask);
#ifdef	SA_RESTART
		act.sa_flags = SA_RESTART;
#else
		act.sa_flags = 0;
#endif
		sigaction(SIGUSR3, &act, NULL);
#endif
#endif

#ifdef	_SIG_THREAD_CANCEL
		act.sa_flags = 0;
		act.sa_handler = sigcancel;
		sigemptyset(&act.sa_mask);
		sigaddset(&act.sa_mask, SIGHUP);
		sigaddset(&act.sa_mask, SIGALRM);
		sigaddset(&act.sa_mask, SIGPIPE);

		sigaction(_SIG_THREAD_CANCEL, &act, NULL);
#endif		
		_main = this;
    	}
	_self.setKey(this);
}	

Thread::Thread(Semaphore *start, int pri, size_t stack)
{
	int	rc;

	_msgpos = 0;
	_tid = 0;

	pthread_attr_init(&_attr);
	//pthread_attr_setdetachstate(&_attr, PTHREAD_CREATE_DETACHED);
	pthread_attr_setdetachstate(&_attr, PTHREAD_CREATE_JOINABLE);
	pthread_attr_setinheritsched(&_attr, PTHREAD_INHERIT_SCHED);

#ifdef	PTHREAD_STACK_MIN
	if(pthread_attr_setstacksize(&_attr, stack <= PTHREAD_STACK_MIN ? PTHREAD_STACK_MIN : stack))
	{
		switch(getException())
		{
		case THROW_OBJECT:
			throw(this);
		default:
			return;
		}
	}
#endif

#ifndef	__FreeBSD__
#ifdef	_POSIX_THREAD_PRIORITY_SCHEDULING	
	if(pri)
	{
		struct sched_param sched;
		int policy;

		policy = sched_getscheduler(0);
		if(policy < 0)
		{
			switch(getException())
			{
			case THROW_OBJECT:
				throw(this);
			default:
				return;
			}
		}

		sched_getparam(0, &sched);

		pri = sched.sched_priority - pri;
		if(pri 	> sched_get_priority_max(policy))
			pri = sched_get_priority_max(policy);

		if(pri < sched_get_priority_min(policy))
			pri = sched_get_priority_min(policy);

		sched.sched_priority = pri;
		pthread_attr_setschedparam(&_attr, &sched);
	}	
#endif
#endif

	_parent = getThread();
	_throw = _parent->_throw;
	if(!_parent)
		_parent = this;

	_cancel = THREAD_CANCEL_INITIAL;
	_start = start;

	if(_start)
	{
		rc = pthread_create(&_tid, &_attr, exec_t(&execHandler), this);
		if(rc && getException() == THROW_OBJECT)
			throw(this);
	}
};

Thread::Thread(const Thread &th)
{
	sigset_t mask, newmask;
	int rc;

	_parent = th._parent;
	_cancel = THREAD_CANCEL_INITIAL;
	pthread_sigmask(SIG_BLOCK, blocked_signals(&newmask), &mask);
	rc = pthread_create(&_tid, &_attr, exec_t(&execHandler), this);
	pthread_sigmask(SIG_SETMASK, &mask, NULL);
	if(rc && getException() == THROW_OBJECT)
		throw(this);
}

bool Thread::isThread(void)
{
	return (_tid == pthread_self()) ? true : false;
}

bool Thread::isRunning(void)
{
	return (_tid != 0) ? true : false;
}

int Thread::Start(Semaphore *start)
{
	if(_tid)
	{
		if(_start)
		{
			_start->Post();
			return 0;
		}
		else
			return -1;
	}

	_start = start;
	return pthread_create(&_tid, &_attr, exec_t(&execHandler), this);
}

void Thread::Terminate(void)
{
	if(!_tid)
		return;

	if(pthread_self() != _tid)
	{
		// assure thread has ran before we try to cancel...
		if(_start)
			_start->Post();

		pthread_cancel(_tid);
		pthread_join(_tid, NULL);
	}
	if(_parent)
		_parent->Notify(this);
	_tid = 0;
	pthread_attr_destroy(&_attr);
}

extern "C"
{

  void sigHandler(int signo)
  {
	Thread	*th = getThread();

	switch(signo)
	{
	case SIGHUP:
		if(th)
			th->OnHangup();
		break;
	case SIGABRT:
		if(th)
			th->OnException();
		break;
	case SIGPIPE:
		if(th)
			th->OnDisconnect();
		break;
	case SIGALRM:
#ifndef	_SIG_THREAD_ALARM
		if(Thread::_timer)
		{
			Thread::_timer->_alarm = 0;
			Thread::_timer->OnTimer();
		}
		else
#endif 
		    if(th)
			th->OnTimer();
		break;
#ifdef	SIGPOLL
	case SIGPOLL:
#else
	case SIGIO:
#endif
		if(th)
			th->OnPolling();
		break;
	default:
		if(th)
			th->OnSignal(signo);
	}
  }

  void	execHandler(Thread *th)
  {
	sigset_t mask;

	pthread_sigmask(SIG_BLOCK, blocked_signals(&mask), NULL);
	_self.setKey(th);
	th->setCancel(THREAD_CANCEL_INITIAL);
	th->Yield();
	if(th->_start)
	{
		th->_start->Wait();
		th->_start = NULL;
	}

	if(!setjmp(th->_env))
	{
		th->Initial();	
		if(th->getCancel() == THREAD_CANCEL_INITIAL)
			th->setCancel(THREAD_CANCEL_DEFAULT);

		th->Run();
	}

#ifndef	_SIG_THREAD_ALARM
	if(th == Thread::_timer)
		--Thread::_arm;
#endif

	th->setCancel(THREAD_CANCEL_DISABLED);
	th->_parent->Notify(th);
	th->Final();
	pthread_exit(NULL);
  }
}
void	Thread::setTimer(timeout_t timer)
{
	sigset_t sigs;

#ifdef	HAVE_SETITIMER
	struct itimerval itimer;
	
	memset(&itimer, 0, sizeof(itimer));
	itimer.it_value.tv_usec = (timer * 1000) % 1000000;
	itimer.it_value.tv_sec = timer / 1000;
#else
	timer /= 1000;
#endif

#ifndef	_SIG_THREAD_ALARM	
	_arm;
	_timer = this;
#endif
	time(&_alarm);
	sigemptyset(&sigs);
	sigaddset(&sigs, SIGALRM);
	pthread_sigmask(SIG_UNBLOCK, &sigs, NULL);
#ifdef	HAVE_SETITIMER
	setitimer(ITIMER_REAL, &itimer, NULL);
#else
	alarm(timer);
#endif
}	

timeout_t Thread::getTimer(void)
{
#ifdef	HAVE_SETITIMER
	struct itimerval itimer;
#endif

	if(!_alarm)
		return 0;

#ifdef	HAVE_SETITIMER
	getitimer(ITIMER_REAL, &itimer);
	return (timeout_t)(itimer.it_value.tv_sec * 1000 +
		itimer.it_value.tv_usec / 1000);
#else
	time_t now;
	time(&now);
	return (timeout_t)(((now - _alarm) * 1000) + 500);
#endif
}

void Thread::endTimer(void)
{
#ifdef	HAVE_SETITIMER
	static const struct itimerval itimer = {{0, 0},{0,0}};
#endif

	sigset_t sigs;
#ifndef	_SIG_THREAD_ALARM
	if(_timer != this)
		return;
#endif

#ifdef	HAVE_SETITIMER
	setitimer(ITIMER_REAL, &itimer, NULL);
#else
	alarm(0);
#endif
	sigemptyset(&sigs);
	sigaddset(&sigs, SIGALRM);
	pthread_sigmask(SIG_BLOCK, &sigs, NULL);
#ifndef	_SIG_THREAD_ALARM
	--_arm;
	_timer = NULL;
#endif
}

void	Thread::WaitSignal(signo_t signo)
{
	sigset_t	mask;

	sigemptyset(&mask);
	sigaddset(&mask, signo);
#ifndef HAVE_SIGWAIT2
	signo = sigwait(&mask);
#else
	sigwait(&mask, &signo);
#endif
}	

void	Thread::setSuspend(thread_suspend_t mode)
{
	sigset_t mask;

	sigemptyset(&mask);
	sigaddset(&mask, _SIG_THREAD_SUSPEND);

	switch(mode)
	{
	case THREAD_SUSPEND_ENABLE:
		pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
		return;
	case THREAD_SUSPEND_DISABLE:
		pthread_sigmask(SIG_BLOCK, &mask, NULL);
	}
}

#ifdef	_SIG_THREAD_CANCEL

void	Thread::setCancel(thread_cancel_t mode)
{
	sigset_t	mask;

	sigemptyset(&mask);
	sigaddset(&mask, _SIG_THREAD_CANCEL);
	
	switch(mode)
	{
	case THREAD_CANCEL_IMMEDIATE:
		pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
		break;
	case THREAD_CANCEL_INITIAL:
	case THREAD_CANCEL_DISABLED:
	case THREAD_CANCEL_DEFERRED:
		pthread_sigmask(SIG_BLOCK, &mask, NULL);
		break;
	}
	_cancel = mode;
}
#else

void	Thread::setSignal(int signo, bool mode)
{
	sigset_t sigs;

	sigemptyset(&sigs);
	sigaddset(&sigs, signo);

	if(mode)
		pthread_sigmask(SIG_UNBLOCK, &sigs, NULL);
	else
		pthread_sigmask(SIG_BLOCK, &sigs, NULL);
}	

void	Thread::setCancel(thread_cancel_t mode)
{
	switch(mode)
	{
	case THREAD_CANCEL_IMMEDIATE:
		pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
		break;
	case THREAD_CANCEL_DEFERRED:
		pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
		break;
	case THREAD_CANCEL_INITIAL:
	case THREAD_CANCEL_DISABLED:
		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
		break;
	default:
		mode = THREAD_CANCEL_INVALID;
	}
	if(mode != THREAD_CANCEL_INVALID)
		_cancel = mode;
}

#endif

void	Thread::Yield(void)
{
#ifdef	_SIG_THREAD_CANCEL
	sigset_t cancel, old;

	sigemptyset(&cancel);
	sigaddset(&cancel, _SIG_THREAD_CANCEL);

	if(_cancel != THREAD_CANCEL_DISABLED && 
	   _cancel != THREAD_CANCEL_INITIAL)
		pthread_sigmask(SIG_UNBLOCK, &cancel, &old);
#else
	pthread_testcancel();
#endif
#ifdef	HAVE_PTHREAD_YIELD
	pthread_yield();
#endif

#ifdef	_SIG_THREAD_CANCEL
	if(_cancel != THREAD_CANCEL_DISABLED)
		pthread_sigmask(SIG_SETMASK, &old, NULL);
#endif
}

void Thread::testCancel(void)
{
#ifdef  _SIG_THREAD_CANCEL
        sigset_t cancel, old;
 
        sigemptyset(&cancel);
        sigaddset(&cancel, _SIG_THREAD_CANCEL);
 
        if(_cancel != THREAD_CANCEL_DISABLED &&
           _cancel != THREAD_CANCEL_INITIAL)
                pthread_sigmask(SIG_UNBLOCK, &cancel, &old);
#else
        pthread_testcancel();
#endif
 
#ifdef  _SIG_THREAD_CANCEL
        if(_cancel != THREAD_CANCEL_DISABLED)
                pthread_sigmask(SIG_SETMASK, &old, NULL);
#endif
} 


void siginstall(int signo)
{
	struct sigaction act;
	
	act.sa_handler = (signalexec_t)&sigHandler;
	sigemptyset(&act.sa_mask);

#ifdef	SA_INTERRUPT
	act.sa_flags = SA_INTERRUPT;
#else
	act.sa_flags = 0;
#endif
	sigaction(signo, &act, NULL);
}

Thread *getThread(void)
{
	Thread *thread;

	if(!Thread::_main)
		return &_mainthread;

	thread = (Thread *)_self.getKey();

	if(!thread)
	{
		thread = (Thread *)new MainThread;
		_self.setKey(thread);
	}
	return thread;
}

#ifdef	__CCXX_USE_POLL

Poller::Poller()
{
	nufds = 0;
	ufds = NULL;
};


Poller::~Poller()
{
	if(ufds)
		delete[] ufds;
}

pollfd *Poller::getList(int cnt)
{
	if(nufds < cnt)
	{
		if(ufds)
			delete[] ufds;
		ufds = new pollfd[cnt];
		nufds = cnt;
	}
	return ufds;
}

#endif

/** EMACS **
 * Local variables:
 * mode: c++
 * c-basic-offset: 8
 * End:
 */

Reply to: