[Common-dev] Re: [Filesystem-dev] CR-Client: Fix httpfsys so it works with AutoUpgrade.
Liam Murray liamm at real.comLooks good.
Liam
At 10:50 AM 9/24/2004, Greg Wright wrote:
>New diff. Differences from first diff:
>
>o No semaphore work, just reverted back to original impl and
> let the threads to extra wake-ups.
>o Added IsHandled() to ThreadMessageSink and use it in HXTaskManger.
>o Make Task use the thread it is created on as the response thread.
> Creator can override with SetResponseThread().
>o Fix AddHandler to correctly return FAIL, OK and UNEXPECTED.
>
>a couple files attached because it is easier to read a couple
>methods that way.
>
>
>--greg.
>
>
>Index: hxtaskmanager.cpp
>===================================================================
>RCS file: /cvsroot/common/util/hxtaskmanager.cpp,v
>retrieving revision 1.1
>diff -u -w -r1.1 hxtaskmanager.cpp
>--- hxtaskmanager.cpp 19 Jul 2004 23:23:03 -0000 1.1
>+++ hxtaskmanager.cpp 24 Sep 2004 17:46:01 -0000
>@@ -57,11 +57,57 @@
> #define D_THREADTASKMGR D_INFO //XXXLCM
>
>
>+HXTaskManager::Task::Task()
>+ : m_pResponseThread(NULL),
>+ m_pMsgSink(NULL)
>+{
>+ HXThread::MakeThread(m_pResponseThread);
>+ HX_ASSERT(m_pResponseThread);
>+}
>+
>+
>+HXTaskManager::Task::~Task()
>+{
>+ HX_DELETE(m_pResponseThread);
>+ HX_RELEASE(m_pMsgSink);
>+}
>+
>+
>+HXThread* HXTaskManager::Task::GetResponseThread()
>+{
>+ return m_pResponseThread;
>+}
>+
>+void HXTaskManager::Task::SetResponseThread(HXThread* pThread )
>+{
>+ HX_ASSERT(pThread);
>+ HX_DELETE(m_pResponseThread);
>+ m_pResponseThread = pThread;
>+}
>+
>+
>+HXThreadMessageSink* HXTaskManager::Task::GetMessageSink()
>+{
>+ return m_pMsgSink;
>+}
>+
>+void HXTaskManager::Task::SetMessageSink(HXThreadMessageSink* pMsgSink)
>+{
>+ HX_ASSERT(pMsgSink);
>+ HX_ASSERT(!m_pMsgSink);
>+ HX_RELEASE(m_pMsgSink);
>+ if( pMsgSink )
>+ {
>+ m_pMsgSink = pMsgSink;
>+ m_pMsgSink->AddRef();
>+ }
>+}
>+
>+
>+
>
> HXTaskManager::HXTaskManager(UINT32 taskDoneMsg)
> : m_pWorkEvent(0)
>-, m_pParentThread(0)
>-, m_pMsgSink(0)
> , m_pTaskMutex(0)
> , m_exit(false)
> , m_taskDoneMsg(taskDoneMsg) /*unique thread message required per instance*/
>@@ -75,7 +121,6 @@
> DestroyPool();
> HX_DELETE(m_pWorkEvent);
> HX_DELETE(m_pTaskMutex);
>- HX_DELETE(m_pParentThread);
>
> // clean up any outstanding tasks
> CHXSimpleList::Iterator end = m_pending.End();
>@@ -86,12 +131,6 @@
> pTask->Release();
> }
> m_pending.RemoveAll();
>-
>- if(m_pMsgSink)
>- {
>- m_pMsgSink->RemoveHandler(m_taskDoneMsg);
>- HX_RELEASE(m_pMsgSink);
>- }
> }
>
> HX_RESULT HXTaskManager::Init(UINT32 threadCount, UINT32 threadPriority)
>@@ -105,13 +144,6 @@
> goto exit;
> }
>
>- // HXThread wrapper for the parent thread (i.e., this thread)
>- hr = HXThread::MakeThread(m_pParentThread);
>- if(FAILED(hr))
>- {
>- goto exit;
>- }
>-
> // mutex for accessing task queue
> hr = HXMutex::MakeMutex(m_pTaskMutex);
> if(FAILED(hr))
>@@ -126,13 +158,6 @@
> goto exit;
> }
>
>- hr = HXThreadMessageSink::GetThreadInstance(m_pMsgSink);
>- if(SUCCEEDED(hr))
>- {
>- // note: we need one message per task manager instance
>- hr = m_pMsgSink->AddHandler(m_taskDoneMsg, this);
>- }
>-
> exit:
>
> return hr;
>@@ -161,27 +186,43 @@
>
> HX_RESULT HXTaskManager::AddTask(Task* pTask)
> {
>- HX_ASSERT(pTask);
>-
>- HX_RESULT hr = HXR_OK;
>+ HXThread* pTmp = NULL;
>+ HX_RESULT hr = HXR_FAIL;
>+ HXThreadMessageSink* pMsgSink = NULL;
>
> HX_ASSERT(m_threads.GetCount() > 0);
>+ HX_ASSERT(pTask);
>
>- if(HXR_OK == hr)
>+ hr = HXThreadMessageSink::GetThreadInstance(pMsgSink);
>+ if(SUCCEEDED(hr) && pMsgSink)
>+ {
>+ if( !pMsgSink->IsHandled(m_taskDoneMsg) )
>+ {
>+ hr = pMsgSink->AddHandler(m_taskDoneMsg, this);
>+ HX_ASSERT(SUCCEEDED(hr));
>+ }
>+
>+ if( SUCCEEDED(hr) )
>+ {
>+ hr = HXThread::MakeThread(pTmp);
>+ if( SUCCEEDED(hr) && pTmp )
> {
> // and add to pending task list and tell a task thread to start
> working
> HXScopeLock lock(m_pTaskMutex);
>-
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::AddTask(): adding task
> #%lu\n", m_pending.GetCount()));
>
> pTask->AddRef();
>+ pTask->SetMessageSink(pMsgSink);
> m_pending.AddTail(pTask);
>+
> // XXXLCM use a semaphore instead; more appropriate
> if( m_pending.GetCount() == 1)
> {
> m_pWorkEvent->SignalEvent();
> }
> }
>+ }
>+ }
>
> return hr;
> }
>@@ -293,6 +334,10 @@
> void* HXTaskManager::TaskThreadProc()
> {
>
>+ HXThreadMessageSink* pMsgSink = NULL;
>+ HXThread* pResponseThread = NULL;
>+ Task* pTask = NULL;
>+
> for( ; ; )
> {
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc():
> waiting for work event...\n"));
>@@ -309,14 +354,22 @@
> }
>
> // fetch and execute next task
>- Task* pTask = GetNextTask();
>+ pTask = GetNextTask();
> if(pTask)
> {
> pTask->Execute();
>
>+ pResponseThread = pTask->GetResponseThread();
>+ pMsgSink = pTask->GetMessageSink();
>+ HX_ASSERT(pResponseThread && pMsgSink);
>+
>+ if( pResponseThread && pMsgSink )
>+ {
> // post response to (response) thread
> HXThreadMessage msgResponse(m_taskDoneMsg, pTask, 0);
>- m_pParentThread->PostMessage(&msgResponse,
>m_pMsgSink->GetSinkHandle());
>+ pResponseThread->PostMessage(&msgResponse,
>pMsgSink->GetSinkHandle());
>+ }
>+
> }
> }
>
>Index: hxthreadmessagesink.cpp
>===================================================================
>RCS file: /cvsroot/common/util/hxthreadmessagesink.cpp,v
>retrieving revision 1.2
>diff -u -w -r1.2 hxthreadmessagesink.cpp
>--- hxthreadmessagesink.cpp 22 Jul 2004 19:23:50 -0000 1.2
>+++ hxthreadmessagesink.cpp 24 Sep 2004 17:46:01 -0000
>@@ -134,6 +134,8 @@
>
> HXThreadMessageSink::~HXThreadMessageSink()
> {
>+ HX_ASSERT( m_handlers.IsEmpty() );
>+ m_handlers.RemoveAll();
> }
>
> //
>@@ -141,13 +143,33 @@
> //
> HX_RESULT HXThreadMessageSink::AddHandler(UINT32 msg, MessageHandler*
> pHandler)
> {
>+ HX_RESULT res = HXR_FAIL;
>+
> DPRINTF(D_INFO, ("HXThreadMessageSink::AddHandler()[%p]: msg %lu\n",
> this, msg));
> HX_ASSERT(pHandler);
>- HX_ASSERT(0 == m_handlers.Lookup(reinterpret_cast<void*>(msg)));
>
>+ if( pHandler )
>+ {
>+ void* pTmp = NULL;
>+
>+ if( m_handlers.Lookup(reinterpret_cast<void*>(msg), pTmp ))
>+ {
>+ //We already have a handler. Make sure it is the same
>+ //one. If not, it is an error.
>+ if( pTmp == reinterpret_cast<void*>(pHandler) )
>+ {
>+ res = HXR_UNEXPECTED;
>+ }
>+ }
>+ else
>+ {
>+ //We don't have a handler for this one yet.
> m_handlers.SetAt(reinterpret_cast<void*>(msg), pHandler);
>+ res = HXR_OK;
>+ }
>+ }
>
>- return HXR_OK;
>+ return res;
> }
>
>
>@@ -167,6 +189,16 @@
> }
>
> return hr;
>+}
>+
>+BOOL HXThreadMessageSink::IsHandled(UINT32 msg)
>+{
>+ BOOL retVal = FALSE;
>+ if( !m_handlers.IsEmpty() )
>+ {
>+ retVal = (0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
>+ }
>+ return retVal;
> }
>Index: pub/hxtaskmanager.h
>===================================================================
>RCS file: /cvsroot/common/util/pub/hxtaskmanager.h,v
>retrieving revision 1.2
>diff -u -w -r1.2 hxtaskmanager.h
>--- pub/hxtaskmanager.h 20 Jul 2004 00:24:13 -0000 1.2
>+++ pub/hxtaskmanager.h 24 Sep 2004 17:46:01 -0000
>@@ -62,10 +62,11 @@
> : public HXThreadMessageSink::MessageHandler
> {
> public:
>- class Task
>- : public HXRefCounted
>+ class Task : public HXRefCounted
> {
> public:
>+ Task();
>+ virtual ~Task();
>
> // Called on task thread
> virtual void Execute() = 0;
>@@ -77,6 +78,18 @@
> // itself which may be defined by the derived task.
> //
> virtual void OnTaskComplete(HX_RESULT hr) = 0;
>+
>+
>+ HXThread* GetResponseThread();
>+ void SetResponseThread(HXThread* pThread);
>+
>+ HXThreadMessageSink* GetMessageSink();
>+ void SetMessageSink(HXThreadMessageSink* pMsgSink);
>+
>+ protected:
>+ HXThread* m_pResponseThread;
>+ HXThreadMessageSink* m_pMsgSink;
>+
> };
>
> public:
>@@ -106,9 +119,6 @@
>
> private:
>
>- // thread that adds task
>- HXThread* m_pParentThread;
>-
> // pool of worker threads that run tasks
> CHXSet m_threads;
> HXEvent* m_pWorkEvent;
>@@ -120,10 +130,6 @@
>
> // unique message used by msg window to notify this instance
> UINT32 m_taskDoneMsg;
>-
>- // receives task-done notifications on parent thread
>- HXThreadMessageSink* m_pMsgSink;
>-
> };
>
>
>Index: pub/hxthreadmessagesink.h
>===================================================================
>RCS file: /cvsroot/common/util/pub/hxthreadmessagesink.h,v
>retrieving revision 1.1
>diff -u -w -r1.1 hxthreadmessagesink.h
>--- pub/hxthreadmessagesink.h 19 Jul 2004 23:23:03 -0000 1.1
>+++ pub/hxthreadmessagesink.h 24 Sep 2004 17:46:01 -0000
>@@ -75,6 +75,8 @@
>
> HX_RESULT AddHandler(UINT32 msg,
> HXThreadMessageSink::MessageHandler* pHandler);
> HX_RESULT RemoveHandler(UINT32 msg);
>+ BOOL IsHandled(UINT32 msg);
>+
> MsgSinkHandle GetSinkHandle() const;
>
> protected:
>
>
>/* ***** BEGIN LICENSE BLOCK *****
> * Version: RCSL 1.0/RPSL 1.0
> *
> * Portions Copyright (c) 1995-2002 RealNetworks, Inc. All Rights Reserved.
> *
> * The contents of this file, and the files included with this file, are
> * subject to the current version of the RealNetworks Public Source License
> * Version 1.0 (the "RPSL") available at
> * http://www.helixcommunity.org/content/rpsl unless you have licensed
> * the file under the RealNetworks Community Source License Version 1.0
> * (the "RCSL") available at http://www.helixcommunity.org/content/rcsl,
> * in which case the RCSL will apply. You may also obtain the license terms
> * directly from RealNetworks. You may not use this file except in
> * compliance with the RPSL or, if you have a valid RCSL with RealNetworks
> * applicable to this file, the RCSL. Please see the applicable RPSL or
> * RCSL for the rights, obligations and limitations governing use of the
> * contents of the file.
> *
> * This file is part of the Helix DNA Technology. RealNetworks is the
> * developer of the Original Code and owns the copyrights in the portions
> * it created.
> *
> * This file, and the files included with this file, is distributed and made
> * available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
> * EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS ALL SUCH
> WARRANTIES,
> * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, FITNESS
> * FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
> *
> * Technology Compatibility Kit Test Suite(s) Location:
> * http://www.helixcommunity.org/content/tck
> *
> * Contributor(s):
> *
> * ***** END LICENSE BLOCK ***** */
>
>
>
>#include "hxtypes.h"
>#include "hxcom.h"
>#include "hxslist.h"
>#include "hxthread.h"
>#include "hxscope_lock.h"
>
>#include "hxmap.h"
>#include "debug.h"
>#include "hxassert.h"
>#include "hxheap.h"
>
>#include "hxtaskmanager.h"
>
>#ifdef _DEBUG
>#undef HX_THIS_FILE
>static const char HX_THIS_FILE[] = __FILE__;
>#endif
>
>
>#define D_THREADTASKMGR D_INFO //XXXLCM
>
>
>HXTaskManager::Task::Task()
> : m_pResponseThread(NULL),
> m_pMsgSink(NULL)
>{
> HXThread::MakeThread(m_pResponseThread);
> HX_ASSERT(m_pResponseThread);
>}
>
>
>HXTaskManager::Task::~Task()
>{
> HX_DELETE(m_pResponseThread);
> HX_RELEASE(m_pMsgSink);
>}
>
>
>HXThread* HXTaskManager::Task::GetResponseThread()
>{
> return m_pResponseThread;
>}
>
>void HXTaskManager::Task::SetResponseThread(HXThread* pThread )
>{
> HX_ASSERT(pThread);
> HX_DELETE(m_pResponseThread);
> m_pResponseThread = pThread;
>}
>
>
>HXThreadMessageSink* HXTaskManager::Task::GetMessageSink()
>{
> return m_pMsgSink;
>}
>
>void HXTaskManager::Task::SetMessageSink(HXThreadMessageSink* pMsgSink)
>{
> HX_ASSERT(pMsgSink);
> HX_ASSERT(!m_pMsgSink);
> HX_RELEASE(m_pMsgSink);
> if( pMsgSink )
> {
> m_pMsgSink = pMsgSink;
> m_pMsgSink->AddRef();
> }
>}
>
>
>
>
>HXTaskManager::HXTaskManager(UINT32 taskDoneMsg)
>: m_pWorkEvent(0)
>, m_pTaskMutex(0)
>, m_exit(false)
>, m_taskDoneMsg(taskDoneMsg) /*unique thread message required per instance*/
>{
>}
>
>
>HXTaskManager::~HXTaskManager()
>{
> // clean up task thread and sync objects
> DestroyPool();
> HX_DELETE(m_pWorkEvent);
> HX_DELETE(m_pTaskMutex);
>
> // clean up any outstanding tasks
> CHXSimpleList::Iterator end = m_pending.End();
> for(CHXSimpleList::Iterator begin = m_pending.Begin(); begin != end;
> ++begin)
> {
> Task* pTask = reinterpret_cast<Task*>(*begin);
> pTask->OnTaskComplete(HXR_ABORT);
> pTask->Release();
> }
> m_pending.RemoveAll();
>}
>
>HX_RESULT HXTaskManager::Init(UINT32 threadCount, UINT32 threadPriority)
>{
> HX_ASSERT(threadCount > 0);
>
> // event used to signal task thread to wake up and start working
> HX_RESULT hr = HXEvent::MakeEvent(m_pWorkEvent, NULL, TRUE /*TRUE ==
> manual reset*/);
> if(FAILED(hr))
> {
> goto exit;
> }
>
> // mutex for accessing task queue
> hr = HXMutex::MakeMutex(m_pTaskMutex);
> if(FAILED(hr))
> {
> goto exit;
> }
>
> // create worker threads
> hr = CreatePool(threadCount, threadPriority);
> if(FAILED(hr))
> {
> goto exit;
> }
>
>exit:
>
> return hr;
>}
>
>HX_RESULT HXTaskManager::RemoveTask(Task* pTask)
>{
> HX_ASSERT(pTask);
> HXScopeLock lock(m_pTaskMutex);
>
> LISTPOSITION pos = m_pending.Find(pTask);
> if(pos)
> {
> DPRINTF(D_THREADTASKMGR,
> ("HXTaskManager::HXTaskManager::CancelTask(): removing task %p\n", pTask));
> m_pending.RemoveAt(pos);
> pTask->OnTaskComplete(HXR_ABORT);
> pTask->Release();
> return HXR_OK;
> }
>
> DPRINTF(D_THREADTASKMGR,
> ("HXTaskManager::HXTaskManager::CancelTask(): no task %p (already started
> or complete)\n", pTask));
> return HXR_FAIL;
>}
>
>
>
>HX_RESULT HXTaskManager::AddTask(Task* pTask)
>{
> HXThread* pTmp = NULL;
> HX_RESULT hr = HXR_FAIL;
> HXThreadMessageSink* pMsgSink = NULL;
>
> HX_ASSERT(m_threads.GetCount() > 0);
> HX_ASSERT(pTask);
>
> hr = HXThreadMessageSink::GetThreadInstance(pMsgSink);
> if(SUCCEEDED(hr) && pMsgSink)
> {
> if( !pMsgSink->IsHandled(m_taskDoneMsg) )
> {
> hr = pMsgSink->AddHandler(m_taskDoneMsg, this);
> HX_ASSERT(SUCCEEDED(hr));
> }
>
> if( SUCCEEDED(hr) )
> {
> hr = HXThread::MakeThread(pTmp);
> if( SUCCEEDED(hr) && pTmp )
> {
> // and add to pending task list and tell a task thread to
> start working
> HXScopeLock lock(m_pTaskMutex);
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::AddTask():
> adding task #%lu\n", m_pending.GetCount()));
>
> pTask->AddRef();
> pTask->SetMessageSink(pMsgSink);
> m_pending.AddTail(pTask);
>
> // XXXLCM use a semaphore instead; more appropriate
> if( m_pending.GetCount() == 1)
> {
> m_pWorkEvent->SignalEvent();
> }
> }
> }
> }
>
> return hr;
>}
>
>UINT32 HXTaskManager::GetPendingTaskCount() const
>{
> HXScopeLock lock(m_pTaskMutex);
> return m_pending.GetCount();
>
>}
>
>
>HX_RESULT HXTaskManager::CreateTaskThread(HXThread*& pThread, UINT32
>threadPriority)
>{
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::CreateTaskThread():
> creating thread...\n"));
>
> HX_ASSERT(!pThread);
> HX_RESULT hr = HXThread::MakeThread(pThread);
> if(HXR_OK == hr)
> {
> hr = pThread->CreateThread(TaskThreadProc_, this,
> HX_CREATE_SUSPENDED);
> if(SUCCEEDED(hr))
> {
> hr = pThread->SetPriority(threadPriority);
> if(SUCCEEDED(hr))
> {
> hr = pThread->Resume();
> }
> }
>
> if (FAILED(hr))
> {
> HX_ASSERT(false);
> HX_DELETE(pThread);
> }
> }
> return hr;
>}
>
>HX_RESULT HXTaskManager::CreatePool(UINT32 count, UINT32 threadPriority)
>{
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::CreatePool(): creating %lu
> threads...\n", count));
> HX_RESULT hr = HXR_FAIL;
> for(UINT32 idx = 0; idx < count; ++idx)
> {
> HXThread* pThread = 0;
> hr = CreateTaskThread(pThread, threadPriority);
> if(FAILED(hr))
> {
> HX_ASSERT(false);
> DestroyPool();
> break;
> }
> m_threads.Add(pThread);
> }
> return hr;
>}
>
>void HXTaskManager::DestroyPool()
>{
> if(!m_threads.IsEmpty())
> {
> // signal task threads to exit
> m_exit = true;
> m_pWorkEvent->SignalEvent();
>
> CHXSet::Iterator end = m_threads.End();
> for(CHXSet::Iterator begin = m_threads.Begin(); begin != end;
> ++begin)
> {
> // wait for thread to exit gracefully
> HXThread* pThread = reinterpret_cast<HXThread*>(*begin);
> pThread->Exit(0);
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::DestroyPool():
> thread [%p] exited\n", pThread));
> HX_DELETE(pThread);
>
> }
> m_threads.RemoveAll();
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::DestroyPool(): %lu
> tasks abandoned\n", GetPendingTaskCount()));
> }
>}
>
>//
>// called on task thread
>//
>HXTaskManager::Task* HXTaskManager::GetNextTask()
>{
> Task* pTask = 0;
> HXScopeLock lock(m_pTaskMutex);
> if(!m_pending.IsEmpty())
> {
> pTask = reinterpret_cast<Task*>(m_pending.RemoveHead());
> if(m_pending.IsEmpty())
> {
> // no more tasks
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::GetNextTask(): no
> more tasks...\n"));
> m_pWorkEvent->ResetEvent();
> }
> }
>
> return pTask;
>}
>
>void* HXTaskManager::TaskThreadProc_(void* pv)
>{
> HXTaskManager* pThis = reinterpret_cast<HXTaskManager*>(pv);
> HX_ASSERT(pThis);
> return pThis->TaskThreadProc();
>}
>void* HXTaskManager::TaskThreadProc()
>{
>
> HXThreadMessageSink* pMsgSink = NULL;
> HXThread* pResponseThread = NULL;
> Task* pTask = NULL;
>
> for( ; ; )
> {
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc():
> waiting for work event...\n"));
>
> // sleep until there is work to do
> m_pWorkEvent->Wait(ALLFS /*INFINITE*/);
>
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): got
> work event...\n"));
>
> if(m_exit)
> {
> DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc():
> exiting...\n"));
> break;
> }
>
> // fetch and execute next task
> pTask = GetNextTask();
> if(pTask)
> {
> pTask->Execute();
>
> pResponseThread = pTask->GetResponseThread();
> pMsgSink = pTask->GetMessageSink();
> HX_ASSERT(pResponseThread && pMsgSink);
>
> if( pResponseThread && pMsgSink )
> {
> // post response to (response) thread
> HXThreadMessage msgResponse(m_taskDoneMsg, pTask, 0);
> pResponseThread->PostMessage(&msgResponse,
> pMsgSink->GetSinkHandle());
> }
>
> }
> }
>
> return 0;
>}
>
>
>
>//
>// HXThreadMessageSink::MessageHandler
>//
>// Called on parent thread after the resolver thread posts m_taskDoneMsg
>//
>UINT32 HXTaskManager::HandleMessage(const HXThreadMessage& msg)
>{
> HX_ASSERT(m_taskDoneMsg == msg.m_ulMessage);
>
> // get message data
> Task* pTask = reinterpret_cast<Task*>(msg.m_pParam1);
> HX_ASSERT(pTask);
>
> // notify completion of task
> pTask->OnTaskComplete(HXR_OK);
> pTask->Release();
>
> return 0;
>}
>
>
>
>/* ***** BEGIN LICENSE BLOCK *****
> * Version: RCSL 1.0/RPSL 1.0
> *
> * Portions Copyright (c) 1995-2002 RealNetworks, Inc. All Rights Reserved.
> *
> * The contents of this file, and the files included with this file, are
> * subject to the current version of the RealNetworks Public Source License
> * Version 1.0 (the "RPSL") available at
> * http://www.helixcommunity.org/content/rpsl unless you have licensed
> * the file under the RealNetworks Community Source License Version 1.0
> * (the "RCSL") available at http://www.helixcommunity.org/content/rcsl,
> * in which case the RCSL will apply. You may also obtain the license terms
> * directly from RealNetworks. You may not use this file except in
> * compliance with the RPSL or, if you have a valid RCSL with RealNetworks
> * applicable to this file, the RCSL. Please see the applicable RPSL or
> * RCSL for the rights, obligations and limitations governing use of the
> * contents of the file.
> *
> * This file is part of the Helix DNA Technology. RealNetworks is the
> * developer of the Original Code and owns the copyrights in the portions
> * it created.
> *
> * This file, and the files included with this file, is distributed and made
> * available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
> * EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS ALL SUCH
> WARRANTIES,
> * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, FITNESS
> * FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
> *
> * Technology Compatibility Kit Test Suite(s) Location:
> * http://www.helixcommunity.org/content/tck
> *
> * Contributor(s):
> *
> * ***** END LICENSE BLOCK ***** */
>
>#include "hxtypes.h"
>#include "hxthread.h"
>#include "debug.h"
>#include "hxassert.h"
>#include "hxheap.h"
>#include "hxthreadmessagesink.h"
>
>
>#ifdef _DEBUG
>#undef HX_THIS_FILE
>static const char HX_THIS_FILE[] = __FILE__;
>#endif
>
>#if defined(_WINDOWS) //XXXLCM need tls abtraction
>
>static DWORD g_idxSinkSlot = TLS_OUT_OF_INDEXES;
>static INT32 g_sinkCount = 0;
>
>HX_RESULT HXThreadMessageSink::GetThreadInstance(HXThreadMessageSink*& pSink)
>{
> // XXXLCM 1) remove windows specific tls 2) clean up tls
> HX_RESULT hr = HXR_FAIL;
>
> if(TLS_OUT_OF_INDEXES == g_idxSinkSlot)
> {
> // allocate global index so threads can use index to access slots
> g_idxSinkSlot = TlsAlloc();
> }
>
> if( g_idxSinkSlot != TLS_OUT_OF_INDEXES)
> {
> pSink = reinterpret_cast<HXThreadMessageSink*>
> (TlsGetValue(g_idxSinkSlot));
> if(pSink)
> {
> hr = HXR_OK;
> pSink->AddRef();
> }
> else
> {
> // create sink
> hr = HXThreadMessageSink::Create(pSink);
> if (SUCCEEDED(hr))
> {
> // put sink in slot
> TlsSetValue(g_idxSinkSlot, pSink);
> ++g_sinkCount;
> }
> else if (0 == g_sinkCount)
> {
> // free index (no sinks)
> TlsFree(g_idxSinkSlot);
> g_idxSinkSlot = TLS_OUT_OF_INDEXES;
> }
> }
>
> }
>
> return hr;
>}
>
>void HXThreadMessageSink::FinalRelease()
>{
> // remove this sink from thread sink slot
> HXThreadMessageSink* pSink = reinterpret_cast<HXThreadMessageSink*>
> (TlsGetValue(g_idxSinkSlot));
> HX_ASSERT(pSink == this);
> if(pSink)
> {
> TlsSetValue(g_idxSinkSlot, 0);
> HX_ASSERT(g_sinkCount > 0);
> if (--g_sinkCount == 0)
> {
> // free index (no sinks)
> TlsFree(g_idxSinkSlot);
> g_idxSinkSlot = TLS_OUT_OF_INDEXES;
> }
> }
>
> // continue final release (delete this)
> HXRefCounted::FinalRelease();
>}
>#else
>HX_RESULT HXThreadMessageSink::GetThreadInstance(HXThreadMessageSink*& pSink)
>{
> HX_ASSERT(false);
> return HXR_NOTIMPL;
>}
>
>void HXThreadMessageSink::FinalRelease()
>{
> HX_ASSERT(false);
>}
>
>#endif //_WINDOWS
>
>HXThreadMessageSink::HXThreadMessageSink()
>:m_handle(INVALID_MSGSINK_HANDLE)
>{
>}
>
>HXThreadMessageSink::~HXThreadMessageSink()
>{
> HX_ASSERT( m_handlers.IsEmpty() );
> m_handlers.RemoveAll();
>}
>
>//
>// add handler for given message
>//
>HX_RESULT HXThreadMessageSink::AddHandler(UINT32 msg, MessageHandler*
>pHandler)
>{
> HX_RESULT res = HXR_FAIL;
>
> DPRINTF(D_INFO, ("HXThreadMessageSink::AddHandler()[%p]: msg %lu\n",
> this, msg));
> HX_ASSERT(pHandler);
>
> if( pHandler )
> {
> void* pTmp = NULL;
>
> if( m_handlers.Lookup(reinterpret_cast<void*>(msg), pTmp ))
> {
> //We already have a handler. Make sure it is the same
> //one. If not, it is an error.
> if( pTmp == reinterpret_cast<void*>(pHandler) )
> {
> res = HXR_UNEXPECTED;
> }
> }
> else
> {
> //We don't have a handler for this one yet.
> m_handlers.SetAt(reinterpret_cast<void*>(msg), pHandler );
> res = HXR_OK;
> }
> }
>
> return res;
>}
>
>
>//
>// remove handler for given message
>//
>HX_RESULT HXThreadMessageSink::RemoveHandler(UINT32 msg)
>{
> DPRINTF(D_INFO, ("HXThreadMessageSink::RemoveHandler()[%p]: msg
> %lu\n", this, msg));
>
> HX_RESULT hr = HXR_OK;
> HX_ASSERT(0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
> if (!m_handlers.RemoveKey(reinterpret_cast<void*>(msg)))
> {
> hr = HXR_FAIL;
> HX_ASSERT(false);
> }
>
> return hr;
>}
>
>BOOL HXThreadMessageSink::IsHandled(UINT32 msg)
>{
> BOOL retVal = FALSE;
> if( !m_handlers.IsEmpty() )
> {
> retVal = (0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
> }
> return retVal;
>}