[Common-dev] Re: [Filesystem-dev] CR-Client: Fix httpfsys so it works with AutoUpgrade.
Greg Wright gwright at real.comNew diff. Differences from first diff:
o No semaphore work, just reverted back to original impl and
let the threads to extra wake-ups.
o Added IsHandled() to ThreadMessageSink and use it in HXTaskManger.
o Make Task use the thread it is created on as the response thread.
Creator can override with SetResponseThread().
o Fix AddHandler to correctly return FAIL, OK and UNEXPECTED.
a couple files attached because it is easier to read a couple
methods that way.
--greg.
Index: hxtaskmanager.cpp
===================================================================
RCS file: /cvsroot/common/util/hxtaskmanager.cpp,v
retrieving revision 1.1
diff -u -w -r1.1 hxtaskmanager.cpp
--- hxtaskmanager.cpp 19 Jul 2004 23:23:03 -0000 1.1
+++ hxtaskmanager.cpp 24 Sep 2004 17:46:01 -0000
@@ -57,11 +57,57 @@
#define D_THREADTASKMGR D_INFO //XXXLCM
+HXTaskManager::Task::Task()
+ : m_pResponseThread(NULL),
+ m_pMsgSink(NULL)
+{
+ HXThread::MakeThread(m_pResponseThread);
+ HX_ASSERT(m_pResponseThread);
+}
+
+
+HXTaskManager::Task::~Task()
+{
+ HX_DELETE(m_pResponseThread);
+ HX_RELEASE(m_pMsgSink);
+}
+
+
+HXThread* HXTaskManager::Task::GetResponseThread()
+{
+ return m_pResponseThread;
+}
+
+void HXTaskManager::Task::SetResponseThread(HXThread* pThread )
+{
+ HX_ASSERT(pThread);
+ HX_DELETE(m_pResponseThread);
+ m_pResponseThread = pThread;
+}
+
+
+HXThreadMessageSink* HXTaskManager::Task::GetMessageSink()
+{
+ return m_pMsgSink;
+}
+
+void HXTaskManager::Task::SetMessageSink(HXThreadMessageSink* pMsgSink)
+{
+ HX_ASSERT(pMsgSink);
+ HX_ASSERT(!m_pMsgSink);
+ HX_RELEASE(m_pMsgSink);
+ if( pMsgSink )
+ {
+ m_pMsgSink = pMsgSink;
+ m_pMsgSink->AddRef();
+ }
+}
+
+
+
HXTaskManager::HXTaskManager(UINT32 taskDoneMsg)
: m_pWorkEvent(0)
-, m_pParentThread(0)
-, m_pMsgSink(0)
, m_pTaskMutex(0)
, m_exit(false)
, m_taskDoneMsg(taskDoneMsg) /*unique thread message required per instance*/
@@ -75,7 +121,6 @@
DestroyPool();
HX_DELETE(m_pWorkEvent);
HX_DELETE(m_pTaskMutex);
- HX_DELETE(m_pParentThread);
// clean up any outstanding tasks
CHXSimpleList::Iterator end = m_pending.End();
@@ -86,12 +131,6 @@
pTask->Release();
}
m_pending.RemoveAll();
-
- if(m_pMsgSink)
- {
- m_pMsgSink->RemoveHandler(m_taskDoneMsg);
- HX_RELEASE(m_pMsgSink);
- }
}
HX_RESULT HXTaskManager::Init(UINT32 threadCount, UINT32 threadPriority)
@@ -105,13 +144,6 @@
goto exit;
}
- // HXThread wrapper for the parent thread (i.e., this thread)
- hr = HXThread::MakeThread(m_pParentThread);
- if(FAILED(hr))
- {
- goto exit;
- }
-
// mutex for accessing task queue
hr = HXMutex::MakeMutex(m_pTaskMutex);
if(FAILED(hr))
@@ -126,13 +158,6 @@
goto exit;
}
- hr = HXThreadMessageSink::GetThreadInstance(m_pMsgSink);
- if(SUCCEEDED(hr))
- {
- // note: we need one message per task manager instance
- hr = m_pMsgSink->AddHandler(m_taskDoneMsg, this);
- }
-
exit:
return hr;
@@ -161,27 +186,43 @@
HX_RESULT HXTaskManager::AddTask(Task* pTask)
{
- HX_ASSERT(pTask);
-
- HX_RESULT hr = HXR_OK;
+ HXThread* pTmp = NULL;
+ HX_RESULT hr = HXR_FAIL;
+ HXThreadMessageSink* pMsgSink = NULL;
HX_ASSERT(m_threads.GetCount() > 0);
+ HX_ASSERT(pTask);
- if(HXR_OK == hr)
+ hr = HXThreadMessageSink::GetThreadInstance(pMsgSink);
+ if(SUCCEEDED(hr) && pMsgSink)
+ {
+ if( !pMsgSink->IsHandled(m_taskDoneMsg) )
+ {
+ hr = pMsgSink->AddHandler(m_taskDoneMsg, this);
+ HX_ASSERT(SUCCEEDED(hr));
+ }
+
+ if( SUCCEEDED(hr) )
+ {
+ hr = HXThread::MakeThread(pTmp);
+ if( SUCCEEDED(hr) && pTmp )
{
// and add to pending task list and tell a task thread to start working
HXScopeLock lock(m_pTaskMutex);
-
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::AddTask(): adding task #%lu\n",
m_pending.GetCount()));
pTask->AddRef();
+ pTask->SetMessageSink(pMsgSink);
m_pending.AddTail(pTask);
+
// XXXLCM use a semaphore instead; more appropriate
if( m_pending.GetCount() == 1)
{
m_pWorkEvent->SignalEvent();
}
}
+ }
+ }
return hr;
}
@@ -293,6 +334,10 @@
void* HXTaskManager::TaskThreadProc()
{
+ HXThreadMessageSink* pMsgSink = NULL;
+ HXThread* pResponseThread = NULL;
+ Task* pTask = NULL;
+
for( ; ; )
{
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): waiting for work event...\n"));
@@ -309,14 +354,22 @@
}
// fetch and execute next task
- Task* pTask = GetNextTask();
+ pTask = GetNextTask();
if(pTask)
{
pTask->Execute();
+ pResponseThread = pTask->GetResponseThread();
+ pMsgSink = pTask->GetMessageSink();
+ HX_ASSERT(pResponseThread && pMsgSink);
+
+ if( pResponseThread && pMsgSink )
+ {
// post response to (response) thread
HXThreadMessage msgResponse(m_taskDoneMsg, pTask, 0);
- m_pParentThread->PostMessage(&msgResponse, m_pMsgSink->GetSinkHandle());
+ pResponseThread->PostMessage(&msgResponse, pMsgSink->GetSinkHandle());
+ }
+
}
}
Index: hxthreadmessagesink.cpp
===================================================================
RCS file: /cvsroot/common/util/hxthreadmessagesink.cpp,v
retrieving revision 1.2
diff -u -w -r1.2 hxthreadmessagesink.cpp
--- hxthreadmessagesink.cpp 22 Jul 2004 19:23:50 -0000 1.2
+++ hxthreadmessagesink.cpp 24 Sep 2004 17:46:01 -0000
@@ -134,6 +134,8 @@
HXThreadMessageSink::~HXThreadMessageSink()
{
+ HX_ASSERT( m_handlers.IsEmpty() );
+ m_handlers.RemoveAll();
}
//
@@ -141,13 +143,33 @@
//
HX_RESULT HXThreadMessageSink::AddHandler(UINT32 msg, MessageHandler* pHandler)
{
+ HX_RESULT res = HXR_FAIL;
+
DPRINTF(D_INFO, ("HXThreadMessageSink::AddHandler()[%p]: msg %lu\n", this, msg));
HX_ASSERT(pHandler);
- HX_ASSERT(0 == m_handlers.Lookup(reinterpret_cast<void*>(msg)));
+ if( pHandler )
+ {
+ void* pTmp = NULL;
+
+ if( m_handlers.Lookup(reinterpret_cast<void*>(msg), pTmp ))
+ {
+ //We already have a handler. Make sure it is the same
+ //one. If not, it is an error.
+ if( pTmp == reinterpret_cast<void*>(pHandler) )
+ {
+ res = HXR_UNEXPECTED;
+ }
+ }
+ else
+ {
+ //We don't have a handler for this one yet.
m_handlers.SetAt(reinterpret_cast<void*>(msg), pHandler);
+ res = HXR_OK;
+ }
+ }
- return HXR_OK;
+ return res;
}
@@ -167,6 +189,16 @@
}
return hr;
+}
+
+BOOL HXThreadMessageSink::IsHandled(UINT32 msg)
+{
+ BOOL retVal = FALSE;
+ if( !m_handlers.IsEmpty() )
+ {
+ retVal = (0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
+ }
+ return retVal;
}
Index: pub/hxtaskmanager.h
===================================================================
RCS file: /cvsroot/common/util/pub/hxtaskmanager.h,v
retrieving revision 1.2
diff -u -w -r1.2 hxtaskmanager.h
--- pub/hxtaskmanager.h 20 Jul 2004 00:24:13 -0000 1.2
+++ pub/hxtaskmanager.h 24 Sep 2004 17:46:01 -0000
@@ -62,10 +62,11 @@
: public HXThreadMessageSink::MessageHandler
{
public:
- class Task
- : public HXRefCounted
+ class Task : public HXRefCounted
{
public:
+ Task();
+ virtual ~Task();
// Called on task thread
virtual void Execute() = 0;
@@ -77,6 +78,18 @@
// itself which may be defined by the derived task.
//
virtual void OnTaskComplete(HX_RESULT hr) = 0;
+
+
+ HXThread* GetResponseThread();
+ void SetResponseThread(HXThread* pThread);
+
+ HXThreadMessageSink* GetMessageSink();
+ void SetMessageSink(HXThreadMessageSink* pMsgSink);
+
+ protected:
+ HXThread* m_pResponseThread;
+ HXThreadMessageSink* m_pMsgSink;
+
};
public:
@@ -106,9 +119,6 @@
private:
- // thread that adds task
- HXThread* m_pParentThread;
-
// pool of worker threads that run tasks
CHXSet m_threads;
HXEvent* m_pWorkEvent;
@@ -120,10 +130,6 @@
// unique message used by msg window to notify this instance
UINT32 m_taskDoneMsg;
-
- // receives task-done notifications on parent thread
- HXThreadMessageSink* m_pMsgSink;
-
};
Index: pub/hxthreadmessagesink.h
===================================================================
RCS file: /cvsroot/common/util/pub/hxthreadmessagesink.h,v
retrieving revision 1.1
diff -u -w -r1.1 hxthreadmessagesink.h
--- pub/hxthreadmessagesink.h 19 Jul 2004 23:23:03 -0000 1.1
+++ pub/hxthreadmessagesink.h 24 Sep 2004 17:46:01 -0000
@@ -75,6 +75,8 @@
HX_RESULT AddHandler(UINT32 msg, HXThreadMessageSink::MessageHandler* pHandler);
HX_RESULT RemoveHandler(UINT32 msg);
+ BOOL IsHandled(UINT32 msg);
+
MsgSinkHandle GetSinkHandle() const;
protected:
-------------- next part --------------
/* ***** BEGIN LICENSE BLOCK *****
* Version: RCSL 1.0/RPSL 1.0
*
* Portions Copyright (c) 1995-2002 RealNetworks, Inc. All Rights Reserved.
*
* The contents of this file, and the files included with this file, are
* subject to the current version of the RealNetworks Public Source License
* Version 1.0 (the "RPSL") available at
* http://www.helixcommunity.org/content/rpsl unless you have licensed
* the file under the RealNetworks Community Source License Version 1.0
* (the "RCSL") available at http://www.helixcommunity.org/content/rcsl,
* in which case the RCSL will apply. You may also obtain the license terms
* directly from RealNetworks. You may not use this file except in
* compliance with the RPSL or, if you have a valid RCSL with RealNetworks
* applicable to this file, the RCSL. Please see the applicable RPSL or
* RCSL for the rights, obligations and limitations governing use of the
* contents of the file.
*
* This file is part of the Helix DNA Technology. RealNetworks is the
* developer of the Original Code and owns the copyrights in the portions
* it created.
*
* This file, and the files included with this file, is distributed and made
* available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
*
* Technology Compatibility Kit Test Suite(s) Location:
* http://www.helixcommunity.org/content/tck
*
* Contributor(s):
*
* ***** END LICENSE BLOCK ***** */
#include "hxtypes.h"
#include "hxcom.h"
#include "hxslist.h"
#include "hxthread.h"
#include "hxscope_lock.h"
#include "hxmap.h"
#include "debug.h"
#include "hxassert.h"
#include "hxheap.h"
#include "hxtaskmanager.h"
#ifdef _DEBUG
#undef HX_THIS_FILE
static const char HX_THIS_FILE[] = __FILE__;
#endif
#define D_THREADTASKMGR D_INFO //XXXLCM
HXTaskManager::Task::Task()
: m_pResponseThread(NULL),
m_pMsgSink(NULL)
{
HXThread::MakeThread(m_pResponseThread);
HX_ASSERT(m_pResponseThread);
}
HXTaskManager::Task::~Task()
{
HX_DELETE(m_pResponseThread);
HX_RELEASE(m_pMsgSink);
}
HXThread* HXTaskManager::Task::GetResponseThread()
{
return m_pResponseThread;
}
void HXTaskManager::Task::SetResponseThread(HXThread* pThread )
{
HX_ASSERT(pThread);
HX_DELETE(m_pResponseThread);
m_pResponseThread = pThread;
}
HXThreadMessageSink* HXTaskManager::Task::GetMessageSink()
{
return m_pMsgSink;
}
void HXTaskManager::Task::SetMessageSink(HXThreadMessageSink* pMsgSink)
{
HX_ASSERT(pMsgSink);
HX_ASSERT(!m_pMsgSink);
HX_RELEASE(m_pMsgSink);
if( pMsgSink )
{
m_pMsgSink = pMsgSink;
m_pMsgSink->AddRef();
}
}
HXTaskManager::HXTaskManager(UINT32 taskDoneMsg)
: m_pWorkEvent(0)
, m_pTaskMutex(0)
, m_exit(false)
, m_taskDoneMsg(taskDoneMsg) /*unique thread message required per instance*/
{
}
HXTaskManager::~HXTaskManager()
{
// clean up task thread and sync objects
DestroyPool();
HX_DELETE(m_pWorkEvent);
HX_DELETE(m_pTaskMutex);
// clean up any outstanding tasks
CHXSimpleList::Iterator end = m_pending.End();
for(CHXSimpleList::Iterator begin = m_pending.Begin(); begin != end; ++begin)
{
Task* pTask = reinterpret_cast<Task*>(*begin);
pTask->OnTaskComplete(HXR_ABORT);
pTask->Release();
}
m_pending.RemoveAll();
}
HX_RESULT HXTaskManager::Init(UINT32 threadCount, UINT32 threadPriority)
{
HX_ASSERT(threadCount > 0);
// event used to signal task thread to wake up and start working
HX_RESULT hr = HXEvent::MakeEvent(m_pWorkEvent, NULL, TRUE /*TRUE == manual reset*/);
if(FAILED(hr))
{
goto exit;
}
// mutex for accessing task queue
hr = HXMutex::MakeMutex(m_pTaskMutex);
if(FAILED(hr))
{
goto exit;
}
// create worker threads
hr = CreatePool(threadCount, threadPriority);
if(FAILED(hr))
{
goto exit;
}
exit:
return hr;
}
HX_RESULT HXTaskManager::RemoveTask(Task* pTask)
{
HX_ASSERT(pTask);
HXScopeLock lock(m_pTaskMutex);
LISTPOSITION pos = m_pending.Find(pTask);
if(pos)
{
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::HXTaskManager::CancelTask(): removing task %p\n", pTask));
m_pending.RemoveAt(pos);
pTask->OnTaskComplete(HXR_ABORT);
pTask->Release();
return HXR_OK;
}
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::HXTaskManager::CancelTask(): no task %p (already started or complete)\n", pTask));
return HXR_FAIL;
}
HX_RESULT HXTaskManager::AddTask(Task* pTask)
{
HXThread* pTmp = NULL;
HX_RESULT hr = HXR_FAIL;
HXThreadMessageSink* pMsgSink = NULL;
HX_ASSERT(m_threads.GetCount() > 0);
HX_ASSERT(pTask);
hr = HXThreadMessageSink::GetThreadInstance(pMsgSink);
if(SUCCEEDED(hr) && pMsgSink)
{
if( !pMsgSink->IsHandled(m_taskDoneMsg) )
{
hr = pMsgSink->AddHandler(m_taskDoneMsg, this);
HX_ASSERT(SUCCEEDED(hr));
}
if( SUCCEEDED(hr) )
{
hr = HXThread::MakeThread(pTmp);
if( SUCCEEDED(hr) && pTmp )
{
// and add to pending task list and tell a task thread to start working
HXScopeLock lock(m_pTaskMutex);
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::AddTask(): adding task #%lu\n", m_pending.GetCount()));
pTask->AddRef();
pTask->SetMessageSink(pMsgSink);
m_pending.AddTail(pTask);
// XXXLCM use a semaphore instead; more appropriate
if( m_pending.GetCount() == 1)
{
m_pWorkEvent->SignalEvent();
}
}
}
}
return hr;
}
UINT32 HXTaskManager::GetPendingTaskCount() const
{
HXScopeLock lock(m_pTaskMutex);
return m_pending.GetCount();
}
HX_RESULT HXTaskManager::CreateTaskThread(HXThread*& pThread, UINT32 threadPriority)
{
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::CreateTaskThread(): creating thread...\n"));
HX_ASSERT(!pThread);
HX_RESULT hr = HXThread::MakeThread(pThread);
if(HXR_OK == hr)
{
hr = pThread->CreateThread(TaskThreadProc_, this, HX_CREATE_SUSPENDED);
if(SUCCEEDED(hr))
{
hr = pThread->SetPriority(threadPriority);
if(SUCCEEDED(hr))
{
hr = pThread->Resume();
}
}
if (FAILED(hr))
{
HX_ASSERT(false);
HX_DELETE(pThread);
}
}
return hr;
}
HX_RESULT HXTaskManager::CreatePool(UINT32 count, UINT32 threadPriority)
{
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::CreatePool(): creating %lu threads...\n", count));
HX_RESULT hr = HXR_FAIL;
for(UINT32 idx = 0; idx < count; ++idx)
{
HXThread* pThread = 0;
hr = CreateTaskThread(pThread, threadPriority);
if(FAILED(hr))
{
HX_ASSERT(false);
DestroyPool();
break;
}
m_threads.Add(pThread);
}
return hr;
}
void HXTaskManager::DestroyPool()
{
if(!m_threads.IsEmpty())
{
// signal task threads to exit
m_exit = true;
m_pWorkEvent->SignalEvent();
CHXSet::Iterator end = m_threads.End();
for(CHXSet::Iterator begin = m_threads.Begin(); begin != end; ++begin)
{
// wait for thread to exit gracefully
HXThread* pThread = reinterpret_cast<HXThread*>(*begin);
pThread->Exit(0);
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::DestroyPool(): thread [%p] exited\n", pThread));
HX_DELETE(pThread);
}
m_threads.RemoveAll();
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::DestroyPool(): %lu tasks abandoned\n", GetPendingTaskCount()));
}
}
//
// called on task thread
//
HXTaskManager::Task* HXTaskManager::GetNextTask()
{
Task* pTask = 0;
HXScopeLock lock(m_pTaskMutex);
if(!m_pending.IsEmpty())
{
pTask = reinterpret_cast<Task*>(m_pending.RemoveHead());
if(m_pending.IsEmpty())
{
// no more tasks
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::GetNextTask(): no more tasks...\n"));
m_pWorkEvent->ResetEvent();
}
}
return pTask;
}
void* HXTaskManager::TaskThreadProc_(void* pv)
{
HXTaskManager* pThis = reinterpret_cast<HXTaskManager*>(pv);
HX_ASSERT(pThis);
return pThis->TaskThreadProc();
}
void* HXTaskManager::TaskThreadProc()
{
HXThreadMessageSink* pMsgSink = NULL;
HXThread* pResponseThread = NULL;
Task* pTask = NULL;
for( ; ; )
{
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): waiting for work event...\n"));
// sleep until there is work to do
m_pWorkEvent->Wait(ALLFS /*INFINITE*/);
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): got work event...\n"));
if(m_exit)
{
DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): exiting...\n"));
break;
}
// fetch and execute next task
pTask = GetNextTask();
if(pTask)
{
pTask->Execute();
pResponseThread = pTask->GetResponseThread();
pMsgSink = pTask->GetMessageSink();
HX_ASSERT(pResponseThread && pMsgSink);
if( pResponseThread && pMsgSink )
{
// post response to (response) thread
HXThreadMessage msgResponse(m_taskDoneMsg, pTask, 0);
pResponseThread->PostMessage(&msgResponse, pMsgSink->GetSinkHandle());
}
}
}
return 0;
}
//
// HXThreadMessageSink::MessageHandler
//
// Called on parent thread after the resolver thread posts m_taskDoneMsg
//
UINT32 HXTaskManager::HandleMessage(const HXThreadMessage& msg)
{
HX_ASSERT(m_taskDoneMsg == msg.m_ulMessage);
// get message data
Task* pTask = reinterpret_cast<Task*>(msg.m_pParam1);
HX_ASSERT(pTask);
// notify completion of task
pTask->OnTaskComplete(HXR_OK);
pTask->Release();
return 0;
}
-------------- next part --------------
/* ***** BEGIN LICENSE BLOCK *****
* Version: RCSL 1.0/RPSL 1.0
*
* Portions Copyright (c) 1995-2002 RealNetworks, Inc. All Rights Reserved.
*
* The contents of this file, and the files included with this file, are
* subject to the current version of the RealNetworks Public Source License
* Version 1.0 (the "RPSL") available at
* http://www.helixcommunity.org/content/rpsl unless you have licensed
* the file under the RealNetworks Community Source License Version 1.0
* (the "RCSL") available at http://www.helixcommunity.org/content/rcsl,
* in which case the RCSL will apply. You may also obtain the license terms
* directly from RealNetworks. You may not use this file except in
* compliance with the RPSL or, if you have a valid RCSL with RealNetworks
* applicable to this file, the RCSL. Please see the applicable RPSL or
* RCSL for the rights, obligations and limitations governing use of the
* contents of the file.
*
* This file is part of the Helix DNA Technology. RealNetworks is the
* developer of the Original Code and owns the copyrights in the portions
* it created.
*
* This file, and the files included with this file, is distributed and made
* available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
* EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS ALL SUCH WARRANTIES,
* INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
*
* Technology Compatibility Kit Test Suite(s) Location:
* http://www.helixcommunity.org/content/tck
*
* Contributor(s):
*
* ***** END LICENSE BLOCK ***** */
#include "hxtypes.h"
#include "hxthread.h"
#include "debug.h"
#include "hxassert.h"
#include "hxheap.h"
#include "hxthreadmessagesink.h"
#ifdef _DEBUG
#undef HX_THIS_FILE
static const char HX_THIS_FILE[] = __FILE__;
#endif
#if defined(_WINDOWS) //XXXLCM need tls abtraction
static DWORD g_idxSinkSlot = TLS_OUT_OF_INDEXES;
static INT32 g_sinkCount = 0;
HX_RESULT HXThreadMessageSink::GetThreadInstance(HXThreadMessageSink*& pSink)
{
// XXXLCM 1) remove windows specific tls 2) clean up tls
HX_RESULT hr = HXR_FAIL;
if(TLS_OUT_OF_INDEXES == g_idxSinkSlot)
{
// allocate global index so threads can use index to access slots
g_idxSinkSlot = TlsAlloc();
}
if( g_idxSinkSlot != TLS_OUT_OF_INDEXES)
{
pSink = reinterpret_cast<HXThreadMessageSink*> (TlsGetValue(g_idxSinkSlot));
if(pSink)
{
hr = HXR_OK;
pSink->AddRef();
}
else
{
// create sink
hr = HXThreadMessageSink::Create(pSink);
if (SUCCEEDED(hr))
{
// put sink in slot
TlsSetValue(g_idxSinkSlot, pSink);
++g_sinkCount;
}
else if (0 == g_sinkCount)
{
// free index (no sinks)
TlsFree(g_idxSinkSlot);
g_idxSinkSlot = TLS_OUT_OF_INDEXES;
}
}
}
return hr;
}
void HXThreadMessageSink::FinalRelease()
{
// remove this sink from thread sink slot
HXThreadMessageSink* pSink = reinterpret_cast<HXThreadMessageSink*> (TlsGetValue(g_idxSinkSlot));
HX_ASSERT(pSink == this);
if(pSink)
{
TlsSetValue(g_idxSinkSlot, 0);
HX_ASSERT(g_sinkCount > 0);
if (--g_sinkCount == 0)
{
// free index (no sinks)
TlsFree(g_idxSinkSlot);
g_idxSinkSlot = TLS_OUT_OF_INDEXES;
}
}
// continue final release (delete this)
HXRefCounted::FinalRelease();
}
#else
HX_RESULT HXThreadMessageSink::GetThreadInstance(HXThreadMessageSink*& pSink)
{
HX_ASSERT(false);
return HXR_NOTIMPL;
}
void HXThreadMessageSink::FinalRelease()
{
HX_ASSERT(false);
}
#endif //_WINDOWS
HXThreadMessageSink::HXThreadMessageSink()
:m_handle(INVALID_MSGSINK_HANDLE)
{
}
HXThreadMessageSink::~HXThreadMessageSink()
{
HX_ASSERT( m_handlers.IsEmpty() );
m_handlers.RemoveAll();
}
//
// add handler for given message
//
HX_RESULT HXThreadMessageSink::AddHandler(UINT32 msg, MessageHandler* pHandler)
{
HX_RESULT res = HXR_FAIL;
DPRINTF(D_INFO, ("HXThreadMessageSink::AddHandler()[%p]: msg %lu\n", this, msg));
HX_ASSERT(pHandler);
if( pHandler )
{
void* pTmp = NULL;
if( m_handlers.Lookup(reinterpret_cast<void*>(msg), pTmp ))
{
//We already have a handler. Make sure it is the same
//one. If not, it is an error.
if( pTmp == reinterpret_cast<void*>(pHandler) )
{
res = HXR_UNEXPECTED;
}
}
else
{
//We don't have a handler for this one yet.
m_handlers.SetAt(reinterpret_cast<void*>(msg), pHandler );
res = HXR_OK;
}
}
return res;
}
//
// remove handler for given message
//
HX_RESULT HXThreadMessageSink::RemoveHandler(UINT32 msg)
{
DPRINTF(D_INFO, ("HXThreadMessageSink::RemoveHandler()[%p]: msg %lu\n", this, msg));
HX_RESULT hr = HXR_OK;
HX_ASSERT(0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
if (!m_handlers.RemoveKey(reinterpret_cast<void*>(msg)))
{
hr = HXR_FAIL;
HX_ASSERT(false);
}
return hr;
}
BOOL HXThreadMessageSink::IsHandled(UINT32 msg)
{
BOOL retVal = FALSE;
if( !m_handlers.IsEmpty() )
{
retVal = (0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
}
return retVal;
}