[Common-dev] Re: [Filesystem-dev] CR-Client: Fix httpfsys so it works with AutoUpgrade.

[Common-dev] Re: [Filesystem-dev] CR-Client: Fix httpfsys so it works with AutoUpgrade.

Liam Murray liamm at real.com
Fri Sep 24 12:11:19 PDT 2004


Looks good.

Liam

At 10:50 AM 9/24/2004, Greg Wright wrote:
>New diff. Differences from first diff:
>
>o  No semaphore work, just reverted back to original impl and
>    let the threads to extra wake-ups.
>o Added IsHandled() to ThreadMessageSink and use it in HXTaskManger.
>o Make Task use the thread it is created on as the response thread.
>   Creator can override with SetResponseThread().
>o Fix AddHandler to correctly return FAIL, OK and UNEXPECTED.
>
>a couple files attached because it is easier to read a couple
>methods that way.
>
>
>--greg.
>
>
>Index: hxtaskmanager.cpp
>===================================================================
>RCS file: /cvsroot/common/util/hxtaskmanager.cpp,v
>retrieving revision 1.1
>diff -u -w -r1.1 hxtaskmanager.cpp
>--- hxtaskmanager.cpp   19 Jul 2004 23:23:03 -0000      1.1
>+++ hxtaskmanager.cpp   24 Sep 2004 17:46:01 -0000
>@@ -57,11 +57,57 @@
>  #define D_THREADTASKMGR D_INFO //XXXLCM
>
>
>+HXTaskManager::Task::Task()
>+    : m_pResponseThread(NULL),
>+      m_pMsgSink(NULL)
>+{
>+    HXThread::MakeThread(m_pResponseThread);
>+    HX_ASSERT(m_pResponseThread);
>+}
>+
>+
>+HXTaskManager::Task::~Task()
>+{
>+    HX_DELETE(m_pResponseThread);
>+    HX_RELEASE(m_pMsgSink);
>+}
>+
>+
>+HXThread* HXTaskManager::Task::GetResponseThread()
>+{
>+    return m_pResponseThread;
>+}
>+
>+void HXTaskManager::Task::SetResponseThread(HXThread* pThread )
>+{
>+    HX_ASSERT(pThread);
>+    HX_DELETE(m_pResponseThread);
>+    m_pResponseThread = pThread;
>+}
>+
>+
>+HXThreadMessageSink* HXTaskManager::Task::GetMessageSink()
>+{
>+    return m_pMsgSink;
>+}
>+
>+void HXTaskManager::Task::SetMessageSink(HXThreadMessageSink* pMsgSink)
>+{
>+    HX_ASSERT(pMsgSink);
>+    HX_ASSERT(!m_pMsgSink);
>+    HX_RELEASE(m_pMsgSink);
>+    if( pMsgSink )
>+    {
>+        m_pMsgSink = pMsgSink;
>+        m_pMsgSink->AddRef();
>+    }
>+}
>+
>+
>+
>
>  HXTaskManager::HXTaskManager(UINT32 taskDoneMsg)
>  : m_pWorkEvent(0)
>-, m_pParentThread(0)
>-, m_pMsgSink(0)
>  , m_pTaskMutex(0)
>  , m_exit(false)
>  , m_taskDoneMsg(taskDoneMsg) /*unique thread message required per instance*/
>@@ -75,7 +121,6 @@
>      DestroyPool();
>      HX_DELETE(m_pWorkEvent);
>      HX_DELETE(m_pTaskMutex);
>-    HX_DELETE(m_pParentThread);
>
>      // clean up any outstanding tasks
>      CHXSimpleList::Iterator end = m_pending.End();
>@@ -86,12 +131,6 @@
>          pTask->Release();
>      }
>      m_pending.RemoveAll();
>-
>-    if(m_pMsgSink)
>-    {
>-        m_pMsgSink->RemoveHandler(m_taskDoneMsg);
>-        HX_RELEASE(m_pMsgSink);
>-    }
>  }
>
>  HX_RESULT HXTaskManager::Init(UINT32 threadCount, UINT32 threadPriority)
>@@ -105,13 +144,6 @@
>          goto exit;
>      }
>
>-    // HXThread wrapper for the parent thread (i.e., this thread)
>-    hr = HXThread::MakeThread(m_pParentThread);
>-    if(FAILED(hr))
>-    {
>-        goto exit;
>-    }
>-
>      // mutex for accessing task queue
>      hr = HXMutex::MakeMutex(m_pTaskMutex);
>      if(FAILED(hr))
>@@ -126,13 +158,6 @@
>          goto exit;
>      }
>
>-    hr = HXThreadMessageSink::GetThreadInstance(m_pMsgSink);
>-    if(SUCCEEDED(hr))
>-    {
>-        // note: we need one message per task manager instance
>-        hr = m_pMsgSink->AddHandler(m_taskDoneMsg, this);
>-    }
>-
>  exit:
>
>      return hr;
>@@ -161,27 +186,43 @@
>
>  HX_RESULT HXTaskManager::AddTask(Task* pTask)
>  {
>-    HX_ASSERT(pTask);
>-
>-    HX_RESULT hr = HXR_OK;
>+    HXThread*            pTmp     = NULL;
>+    HX_RESULT            hr       = HXR_FAIL;
>+    HXThreadMessageSink* pMsgSink = NULL;
>
>      HX_ASSERT(m_threads.GetCount() > 0);
>+    HX_ASSERT(pTask);
>
>-    if(HXR_OK == hr)
>+    hr = HXThreadMessageSink::GetThreadInstance(pMsgSink);
>+    if(SUCCEEDED(hr) && pMsgSink)
>+    {
>+        if( !pMsgSink->IsHandled(m_taskDoneMsg) )
>+        {
>+            hr = pMsgSink->AddHandler(m_taskDoneMsg, this);
>+            HX_ASSERT(SUCCEEDED(hr));
>+        }
>+
>+        if( SUCCEEDED(hr) )
>+        {
>+            hr = HXThread::MakeThread(pTmp);
>+            if( SUCCEEDED(hr) && pTmp )
>      {
>          // and add to pending task list and tell a task thread to start 
> working
>          HXScopeLock lock(m_pTaskMutex);
>-
>          DPRINTF(D_THREADTASKMGR, ("HXTaskManager::AddTask(): adding task 
> #%lu\n", m_pending.GetCount()));
>
>          pTask->AddRef();
>+                pTask->SetMessageSink(pMsgSink);
>          m_pending.AddTail(pTask);
>+
>          // XXXLCM use a semaphore instead; more appropriate
>          if( m_pending.GetCount() == 1)
>          {
>              m_pWorkEvent->SignalEvent();
>          }
>      }
>+        }
>+    }
>
>      return hr;
>  }
>@@ -293,6 +334,10 @@
>  void* HXTaskManager::TaskThreadProc()
>  {
>
>+    HXThreadMessageSink* pMsgSink      = NULL;
>+    HXThread*            pResponseThread = NULL;
>+    Task*                pTask         = NULL;
>+
>      for( ; ; )
>      {
>          DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): 
> waiting for work event...\n"));
>@@ -309,14 +354,22 @@
>          }
>
>          // fetch and execute next task
>-        Task* pTask = GetNextTask();
>+        pTask = GetNextTask();
>          if(pTask)
>          {
>              pTask->Execute();
>
>+            pResponseThread = pTask->GetResponseThread();
>+            pMsgSink = pTask->GetMessageSink();
>+            HX_ASSERT(pResponseThread && pMsgSink);
>+
>+            if( pResponseThread && pMsgSink )
>+            {
>              // post response to (response) thread
>              HXThreadMessage msgResponse(m_taskDoneMsg, pTask, 0);
>-            m_pParentThread->PostMessage(&msgResponse, 
>m_pMsgSink->GetSinkHandle());
>+                pResponseThread->PostMessage(&msgResponse, 
>pMsgSink->GetSinkHandle());
>+            }
>+
>          }
>      }
>
>Index: hxthreadmessagesink.cpp
>===================================================================
>RCS file: /cvsroot/common/util/hxthreadmessagesink.cpp,v
>retrieving revision 1.2
>diff -u -w -r1.2 hxthreadmessagesink.cpp
>--- hxthreadmessagesink.cpp     22 Jul 2004 19:23:50 -0000      1.2
>+++ hxthreadmessagesink.cpp     24 Sep 2004 17:46:01 -0000
>@@ -134,6 +134,8 @@
>
>  HXThreadMessageSink::~HXThreadMessageSink()
>  {
>+    HX_ASSERT( m_handlers.IsEmpty() );
>+    m_handlers.RemoveAll();
>  }
>
>  //
>@@ -141,13 +143,33 @@
>  //
>  HX_RESULT HXThreadMessageSink::AddHandler(UINT32 msg, MessageHandler* 
> pHandler)
>  {
>+    HX_RESULT res = HXR_FAIL;
>+
>      DPRINTF(D_INFO, ("HXThreadMessageSink::AddHandler()[%p]: msg %lu\n", 
> this, msg));
>      HX_ASSERT(pHandler);
>-    HX_ASSERT(0 == m_handlers.Lookup(reinterpret_cast<void*>(msg)));
>
>+    if( pHandler )
>+    {
>+        void* pTmp = NULL;
>+
>+        if( m_handlers.Lookup(reinterpret_cast<void*>(msg), pTmp ))
>+        {
>+            //We already have a handler. Make sure it is the same
>+            //one. If not, it is an error.
>+            if( pTmp == reinterpret_cast<void*>(pHandler) )
>+            {
>+                res = HXR_UNEXPECTED;
>+            }
>+        }
>+        else
>+        {
>+            //We don't have a handler for this one yet.
>      m_handlers.SetAt(reinterpret_cast<void*>(msg), pHandler);
>+            res = HXR_OK;
>+        }
>+    }
>
>-    return HXR_OK;
>+    return res;
>  }
>
>
>@@ -167,6 +189,16 @@
>      }
>
>      return hr;
>+}
>+
>+BOOL HXThreadMessageSink::IsHandled(UINT32 msg)
>+{
>+    BOOL retVal = FALSE;
>+    if( !m_handlers.IsEmpty() )
>+    {
>+        retVal = (0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
>+    }
>+    return retVal;
>  }


>Index: pub/hxtaskmanager.h
>===================================================================
>RCS file: /cvsroot/common/util/pub/hxtaskmanager.h,v
>retrieving revision 1.2
>diff -u -w -r1.2 hxtaskmanager.h
>--- pub/hxtaskmanager.h 20 Jul 2004 00:24:13 -0000      1.2
>+++ pub/hxtaskmanager.h 24 Sep 2004 17:46:01 -0000
>@@ -62,10 +62,11 @@
>  : public HXThreadMessageSink::MessageHandler
>  {
>  public:
>-    class Task
>-    : public HXRefCounted
>+    class Task : public HXRefCounted
>      {
>      public:
>+        Task();
>+        virtual ~Task();
>
>          // Called on task thread
>          virtual void Execute() = 0;
>@@ -77,6 +78,18 @@
>          // itself which may be defined by the derived task.
>          //
>          virtual void OnTaskComplete(HX_RESULT hr) = 0;
>+
>+
>+        HXThread* GetResponseThread();
>+        void      SetResponseThread(HXThread* pThread);
>+
>+        HXThreadMessageSink* GetMessageSink();
>+        void                 SetMessageSink(HXThreadMessageSink* pMsgSink);
>+
>+      protected:
>+        HXThread*            m_pResponseThread;
>+        HXThreadMessageSink* m_pMsgSink;
>+
>      };
>
>  public:
>@@ -106,9 +119,6 @@
>
>  private:
>
>-    // thread that adds task
>-    HXThread*           m_pParentThread;
>-
>      // pool of worker threads that run tasks
>      CHXSet              m_threads;
>      HXEvent*            m_pWorkEvent;
>@@ -120,10 +130,6 @@
>
>      // unique message used by msg window to notify this instance
>      UINT32              m_taskDoneMsg;
>-
>-    // receives task-done notifications on parent thread
>-    HXThreadMessageSink*    m_pMsgSink;
>-
>  };
>
>
>Index: pub/hxthreadmessagesink.h
>===================================================================
>RCS file: /cvsroot/common/util/pub/hxthreadmessagesink.h,v
>retrieving revision 1.1
>diff -u -w -r1.1 hxthreadmessagesink.h
>--- pub/hxthreadmessagesink.h   19 Jul 2004 23:23:03 -0000      1.1
>+++ pub/hxthreadmessagesink.h   24 Sep 2004 17:46:01 -0000
>@@ -75,6 +75,8 @@
>
>      HX_RESULT AddHandler(UINT32 msg, 
> HXThreadMessageSink::MessageHandler* pHandler);
>      HX_RESULT RemoveHandler(UINT32 msg);
>+    BOOL      IsHandled(UINT32 msg);
>+
>      MsgSinkHandle GetSinkHandle() const;
>
>  protected:
>
>
>/* ***** BEGIN LICENSE BLOCK *****
>  * Version: RCSL 1.0/RPSL 1.0
>  *
>  * Portions Copyright (c) 1995-2002 RealNetworks, Inc. All Rights Reserved.
>  *
>  * The contents of this file, and the files included with this file, are
>  * subject to the current version of the RealNetworks Public Source License
>  * Version 1.0 (the "RPSL") available at
>  * http://www.helixcommunity.org/content/rpsl unless you have licensed
>  * the file under the RealNetworks Community Source License Version 1.0
>  * (the "RCSL") available at http://www.helixcommunity.org/content/rcsl,
>  * in which case the RCSL will apply. You may also obtain the license terms
>  * directly from RealNetworks.  You may not use this file except in
>  * compliance with the RPSL or, if you have a valid RCSL with RealNetworks
>  * applicable to this file, the RCSL.  Please see the applicable RPSL or
>  * RCSL for the rights, obligations and limitations governing use of the
>  * contents of the file.
>  *
>  * This file is part of the Helix DNA Technology. RealNetworks is the
>  * developer of the Original Code and owns the copyrights in the portions
>  * it created.
>  *
>  * This file, and the files included with this file, is distributed and made
>  * available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
>  * EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS ALL SUCH 
> WARRANTIES,
>  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, FITNESS
>  * FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
>  *
>  * Technology Compatibility Kit Test Suite(s) Location:
>  *    http://www.helixcommunity.org/content/tck
>  *
>  * Contributor(s):
>  *
>  * ***** END LICENSE BLOCK ***** */
>
>
>
>#include "hxtypes.h"
>#include "hxcom.h"
>#include "hxslist.h"
>#include "hxthread.h"
>#include "hxscope_lock.h"
>
>#include "hxmap.h"
>#include "debug.h"
>#include "hxassert.h"
>#include "hxheap.h"
>
>#include "hxtaskmanager.h"
>
>#ifdef _DEBUG
>#undef HX_THIS_FILE
>static const char HX_THIS_FILE[] = __FILE__;
>#endif
>
>
>#define D_THREADTASKMGR D_INFO //XXXLCM
>
>
>HXTaskManager::Task::Task()
>     : m_pResponseThread(NULL),
>       m_pMsgSink(NULL)
>{
>     HXThread::MakeThread(m_pResponseThread);
>     HX_ASSERT(m_pResponseThread);
>}
>
>
>HXTaskManager::Task::~Task()
>{
>     HX_DELETE(m_pResponseThread);
>     HX_RELEASE(m_pMsgSink);
>}
>
>
>HXThread* HXTaskManager::Task::GetResponseThread()
>{
>     return m_pResponseThread;
>}
>
>void HXTaskManager::Task::SetResponseThread(HXThread* pThread )
>{
>     HX_ASSERT(pThread);
>     HX_DELETE(m_pResponseThread);
>     m_pResponseThread = pThread;
>}
>
>
>HXThreadMessageSink* HXTaskManager::Task::GetMessageSink()
>{
>     return m_pMsgSink;
>}
>
>void HXTaskManager::Task::SetMessageSink(HXThreadMessageSink* pMsgSink)
>{
>     HX_ASSERT(pMsgSink);
>     HX_ASSERT(!m_pMsgSink);
>     HX_RELEASE(m_pMsgSink);
>     if( pMsgSink )
>     {
>         m_pMsgSink = pMsgSink;
>         m_pMsgSink->AddRef();
>     }
>}
>
>
>
>
>HXTaskManager::HXTaskManager(UINT32 taskDoneMsg)
>: m_pWorkEvent(0)
>, m_pTaskMutex(0)
>, m_exit(false)
>, m_taskDoneMsg(taskDoneMsg) /*unique thread message required per instance*/
>{
>}
>
>
>HXTaskManager::~HXTaskManager()
>{
>     // clean up task thread and sync objects
>     DestroyPool();
>     HX_DELETE(m_pWorkEvent);
>     HX_DELETE(m_pTaskMutex);
>
>     // clean up any outstanding tasks
>     CHXSimpleList::Iterator end = m_pending.End();
>     for(CHXSimpleList::Iterator begin = m_pending.Begin(); begin != end; 
> ++begin)
>     {
>         Task* pTask = reinterpret_cast<Task*>(*begin);
>         pTask->OnTaskComplete(HXR_ABORT);
>         pTask->Release();
>     }
>     m_pending.RemoveAll();
>}
>
>HX_RESULT HXTaskManager::Init(UINT32 threadCount, UINT32 threadPriority)
>{
>     HX_ASSERT(threadCount > 0);
>
>     // event used to signal task thread to wake up and start working
>     HX_RESULT hr = HXEvent::MakeEvent(m_pWorkEvent, NULL, TRUE /*TRUE == 
> manual reset*/);
>     if(FAILED(hr))
>     {
>         goto exit;
>     }
>
>     // mutex for accessing task queue
>     hr = HXMutex::MakeMutex(m_pTaskMutex);
>     if(FAILED(hr))
>     {
>         goto exit;
>     }
>
>     // create worker threads
>     hr = CreatePool(threadCount, threadPriority);
>     if(FAILED(hr))
>     {
>         goto exit;
>     }
>
>exit:
>
>     return hr;
>}
>
>HX_RESULT HXTaskManager::RemoveTask(Task* pTask)
>{
>     HX_ASSERT(pTask);
>     HXScopeLock lock(m_pTaskMutex);
>
>     LISTPOSITION pos = m_pending.Find(pTask);
>     if(pos)
>     {
>         DPRINTF(D_THREADTASKMGR, 
> ("HXTaskManager::HXTaskManager::CancelTask(): removing task %p\n", pTask));
>         m_pending.RemoveAt(pos);
>         pTask->OnTaskComplete(HXR_ABORT);
>         pTask->Release();
>         return HXR_OK;
>     }
>
>     DPRINTF(D_THREADTASKMGR, 
> ("HXTaskManager::HXTaskManager::CancelTask(): no task %p (already started 
> or complete)\n", pTask));
>     return HXR_FAIL;
>}
>
>
>
>HX_RESULT HXTaskManager::AddTask(Task* pTask)
>{
>     HXThread*            pTmp     = NULL;
>     HX_RESULT            hr       = HXR_FAIL;
>     HXThreadMessageSink* pMsgSink = NULL;
>
>     HX_ASSERT(m_threads.GetCount() > 0);
>     HX_ASSERT(pTask);
>
>     hr = HXThreadMessageSink::GetThreadInstance(pMsgSink);
>     if(SUCCEEDED(hr) && pMsgSink)
>     {
>         if( !pMsgSink->IsHandled(m_taskDoneMsg) )
>         {
>             hr = pMsgSink->AddHandler(m_taskDoneMsg, this);
>             HX_ASSERT(SUCCEEDED(hr));
>         }
>
>         if( SUCCEEDED(hr) )
>         {
>             hr = HXThread::MakeThread(pTmp);
>             if( SUCCEEDED(hr) && pTmp )
>             {
>                 // and add to pending task list and tell a task thread to 
> start working
>                 HXScopeLock lock(m_pTaskMutex);
>                 DPRINTF(D_THREADTASKMGR, ("HXTaskManager::AddTask(): 
> adding task #%lu\n", m_pending.GetCount()));
>
>                 pTask->AddRef();
>                 pTask->SetMessageSink(pMsgSink);
>                 m_pending.AddTail(pTask);
>
>                 // XXXLCM use a semaphore instead; more appropriate
>                 if( m_pending.GetCount() == 1)
>                 {
>                     m_pWorkEvent->SignalEvent();
>                 }
>             }
>         }
>     }
>
>     return hr;
>}
>
>UINT32 HXTaskManager::GetPendingTaskCount() const
>{
>     HXScopeLock lock(m_pTaskMutex);
>     return m_pending.GetCount();
>
>}
>
>
>HX_RESULT HXTaskManager::CreateTaskThread(HXThread*& pThread, UINT32 
>threadPriority)
>{
>     DPRINTF(D_THREADTASKMGR, ("HXTaskManager::CreateTaskThread(): 
> creating thread...\n"));
>
>     HX_ASSERT(!pThread);
>     HX_RESULT hr = HXThread::MakeThread(pThread);
>     if(HXR_OK == hr)
>     {
>         hr = pThread->CreateThread(TaskThreadProc_, this, 
> HX_CREATE_SUSPENDED);
>         if(SUCCEEDED(hr))
>         {
>             hr = pThread->SetPriority(threadPriority);
>             if(SUCCEEDED(hr))
>             {
>                 hr = pThread->Resume();
>             }
>         }
>
>         if (FAILED(hr))
>         {
>             HX_ASSERT(false);
>             HX_DELETE(pThread);
>         }
>     }
>     return hr;
>}
>
>HX_RESULT HXTaskManager::CreatePool(UINT32 count, UINT32 threadPriority)
>{
>     DPRINTF(D_THREADTASKMGR, ("HXTaskManager::CreatePool(): creating %lu 
> threads...\n", count));
>     HX_RESULT hr = HXR_FAIL;
>     for(UINT32 idx = 0; idx < count; ++idx)
>     {
>         HXThread* pThread = 0;
>         hr = CreateTaskThread(pThread, threadPriority);
>         if(FAILED(hr))
>         {
>             HX_ASSERT(false);
>             DestroyPool();
>             break;
>         }
>         m_threads.Add(pThread);
>     }
>     return hr;
>}
>
>void HXTaskManager::DestroyPool()
>{
>     if(!m_threads.IsEmpty())
>     {
>         // signal task threads to exit
>         m_exit = true;
>         m_pWorkEvent->SignalEvent();
>
>         CHXSet::Iterator end = m_threads.End();
>         for(CHXSet::Iterator begin = m_threads.Begin(); begin != end; 
> ++begin)
>         {
>             // wait for thread to exit gracefully
>             HXThread* pThread = reinterpret_cast<HXThread*>(*begin);
>             pThread->Exit(0);
>             DPRINTF(D_THREADTASKMGR, ("HXTaskManager::DestroyPool(): 
> thread [%p] exited\n", pThread));
>             HX_DELETE(pThread);
>
>         }
>         m_threads.RemoveAll();
>         DPRINTF(D_THREADTASKMGR, ("HXTaskManager::DestroyPool(): %lu 
> tasks abandoned\n", GetPendingTaskCount()));
>     }
>}
>
>//
>// called on task thread
>//
>HXTaskManager::Task* HXTaskManager::GetNextTask()
>{
>     Task* pTask = 0;
>     HXScopeLock lock(m_pTaskMutex);
>     if(!m_pending.IsEmpty())
>     {
>         pTask = reinterpret_cast<Task*>(m_pending.RemoveHead());
>         if(m_pending.IsEmpty())
>         {
>             // no more tasks
>             DPRINTF(D_THREADTASKMGR, ("HXTaskManager::GetNextTask(): no 
> more tasks...\n"));
>             m_pWorkEvent->ResetEvent();
>         }
>     }
>
>     return pTask;
>}
>
>void* HXTaskManager::TaskThreadProc_(void* pv)
>{
>     HXTaskManager* pThis = reinterpret_cast<HXTaskManager*>(pv);
>     HX_ASSERT(pThis);
>     return pThis->TaskThreadProc();
>}
>void* HXTaskManager::TaskThreadProc()
>{
>
>     HXThreadMessageSink* pMsgSink      = NULL;
>     HXThread*            pResponseThread = NULL;
>     Task*                pTask         = NULL;
>
>     for( ; ; )
>     {
>         DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): 
> waiting for work event...\n"));
>
>         // sleep until there is work to do
>         m_pWorkEvent->Wait(ALLFS /*INFINITE*/);
>
>         DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): got 
> work event...\n"));
>
>         if(m_exit)
>         {
>             DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): 
> exiting...\n"));
>             break;
>         }
>
>         // fetch and execute next task
>         pTask = GetNextTask();
>         if(pTask)
>         {
>             pTask->Execute();
>
>             pResponseThread = pTask->GetResponseThread();
>             pMsgSink = pTask->GetMessageSink();
>             HX_ASSERT(pResponseThread && pMsgSink);
>
>             if( pResponseThread && pMsgSink )
>             {
>                 // post response to (response) thread
>                 HXThreadMessage msgResponse(m_taskDoneMsg, pTask, 0);
>                 pResponseThread->PostMessage(&msgResponse, 
> pMsgSink->GetSinkHandle());
>             }
>
>         }
>     }
>
>     return 0;
>}
>
>
>
>//
>// HXThreadMessageSink::MessageHandler
>//
>// Called on parent thread after the resolver thread posts m_taskDoneMsg
>//
>UINT32 HXTaskManager::HandleMessage(const HXThreadMessage& msg)
>{
>     HX_ASSERT(m_taskDoneMsg == msg.m_ulMessage);
>
>     // get message data
>     Task* pTask = reinterpret_cast<Task*>(msg.m_pParam1);
>     HX_ASSERT(pTask);
>
>     // notify completion of task
>     pTask->OnTaskComplete(HXR_OK);
>     pTask->Release();
>
>     return 0;
>}
>
>
>
>/* ***** BEGIN LICENSE BLOCK *****
>  * Version: RCSL 1.0/RPSL 1.0
>  *
>  * Portions Copyright (c) 1995-2002 RealNetworks, Inc. All Rights Reserved.
>  *
>  * The contents of this file, and the files included with this file, are
>  * subject to the current version of the RealNetworks Public Source License
>  * Version 1.0 (the "RPSL") available at
>  * http://www.helixcommunity.org/content/rpsl unless you have licensed
>  * the file under the RealNetworks Community Source License Version 1.0
>  * (the "RCSL") available at http://www.helixcommunity.org/content/rcsl,
>  * in which case the RCSL will apply. You may also obtain the license terms
>  * directly from RealNetworks.  You may not use this file except in
>  * compliance with the RPSL or, if you have a valid RCSL with RealNetworks
>  * applicable to this file, the RCSL.  Please see the applicable RPSL or
>  * RCSL for the rights, obligations and limitations governing use of the
>  * contents of the file.
>  *
>  * This file is part of the Helix DNA Technology. RealNetworks is the
>  * developer of the Original Code and owns the copyrights in the portions
>  * it created.
>  *
>  * This file, and the files included with this file, is distributed and made
>  * available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
>  * EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS ALL SUCH 
> WARRANTIES,
>  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, FITNESS
>  * FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
>  *
>  * Technology Compatibility Kit Test Suite(s) Location:
>  *    http://www.helixcommunity.org/content/tck
>  *
>  * Contributor(s):
>  *
>  * ***** END LICENSE BLOCK ***** */
>
>#include "hxtypes.h"
>#include "hxthread.h"
>#include "debug.h"
>#include "hxassert.h"
>#include "hxheap.h"
>#include "hxthreadmessagesink.h"
>
>
>#ifdef _DEBUG
>#undef HX_THIS_FILE
>static const char HX_THIS_FILE[] = __FILE__;
>#endif
>
>#if defined(_WINDOWS) //XXXLCM need tls abtraction
>
>static DWORD g_idxSinkSlot = TLS_OUT_OF_INDEXES;
>static INT32 g_sinkCount = 0;
>
>HX_RESULT HXThreadMessageSink::GetThreadInstance(HXThreadMessageSink*& pSink)
>{
>     // XXXLCM 1) remove windows specific tls 2) clean up tls
>     HX_RESULT hr = HXR_FAIL;
>
>     if(TLS_OUT_OF_INDEXES == g_idxSinkSlot)
>     {
>         // allocate global index so threads can use index to access slots
>         g_idxSinkSlot = TlsAlloc();
>     }
>
>     if( g_idxSinkSlot != TLS_OUT_OF_INDEXES)
>     {
>         pSink = reinterpret_cast<HXThreadMessageSink*> 
> (TlsGetValue(g_idxSinkSlot));
>         if(pSink)
>         {
>             hr = HXR_OK;
>             pSink->AddRef();
>         }
>         else
>         {
>             // create sink
>             hr =  HXThreadMessageSink::Create(pSink);
>             if (SUCCEEDED(hr))
>             {
>                 // put sink in slot
>                 TlsSetValue(g_idxSinkSlot, pSink);
>                 ++g_sinkCount;
>             }
>             else if (0 == g_sinkCount)
>             {
>                 // free index (no sinks)
>                 TlsFree(g_idxSinkSlot);
>                 g_idxSinkSlot = TLS_OUT_OF_INDEXES;
>             }
>         }
>
>     }
>
>     return hr;
>}
>
>void HXThreadMessageSink::FinalRelease()
>{
>     // remove this sink from thread sink slot
>     HXThreadMessageSink* pSink = reinterpret_cast<HXThreadMessageSink*> 
> (TlsGetValue(g_idxSinkSlot));
>     HX_ASSERT(pSink == this);
>     if(pSink)
>     {
>         TlsSetValue(g_idxSinkSlot, 0);
>         HX_ASSERT(g_sinkCount > 0);
>         if (--g_sinkCount == 0)
>         {
>             // free index (no sinks)
>             TlsFree(g_idxSinkSlot);
>             g_idxSinkSlot = TLS_OUT_OF_INDEXES;
>         }
>     }
>
>     // continue final release (delete this)
>     HXRefCounted::FinalRelease();
>}
>#else
>HX_RESULT HXThreadMessageSink::GetThreadInstance(HXThreadMessageSink*& pSink)
>{
>     HX_ASSERT(false);
>     return HXR_NOTIMPL;
>}
>
>void HXThreadMessageSink::FinalRelease()
>{
>     HX_ASSERT(false);
>}
>
>#endif //_WINDOWS
>
>HXThreadMessageSink::HXThreadMessageSink()
>:m_handle(INVALID_MSGSINK_HANDLE)
>{
>}
>
>HXThreadMessageSink::~HXThreadMessageSink()
>{
>     HX_ASSERT( m_handlers.IsEmpty() );
>     m_handlers.RemoveAll();
>}
>
>//
>// add handler for given message
>//
>HX_RESULT HXThreadMessageSink::AddHandler(UINT32 msg, MessageHandler* 
>pHandler)
>{
>     HX_RESULT res = HXR_FAIL;
>
>     DPRINTF(D_INFO, ("HXThreadMessageSink::AddHandler()[%p]: msg %lu\n", 
> this, msg));
>     HX_ASSERT(pHandler);
>
>     if( pHandler )
>     {
>         void* pTmp = NULL;
>
>         if( m_handlers.Lookup(reinterpret_cast<void*>(msg), pTmp ))
>         {
>             //We already have a handler. Make sure it is the same
>             //one. If not, it is an error.
>             if( pTmp == reinterpret_cast<void*>(pHandler) )
>             {
>                 res = HXR_UNEXPECTED;
>             }
>         }
>         else
>         {
>             //We don't have a handler for this one yet.
>             m_handlers.SetAt(reinterpret_cast<void*>(msg), pHandler );
>             res = HXR_OK;
>         }
>     }
>
>     return res;
>}
>
>
>//
>// remove handler for given message
>//
>HX_RESULT HXThreadMessageSink::RemoveHandler(UINT32 msg)
>{
>     DPRINTF(D_INFO, ("HXThreadMessageSink::RemoveHandler()[%p]: msg 
> %lu\n", this, msg));
>
>     HX_RESULT hr = HXR_OK;
>     HX_ASSERT(0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
>     if (!m_handlers.RemoveKey(reinterpret_cast<void*>(msg)))
>     {
>         hr = HXR_FAIL;
>         HX_ASSERT(false);
>     }
>
>     return hr;
>}
>
>BOOL HXThreadMessageSink::IsHandled(UINT32 msg)
>{
>     BOOL retVal = FALSE;
>     if( !m_handlers.IsEmpty() )
>     {
>         retVal = (0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
>     }
>     return retVal;
>}




More information about the Common-dev mailing list
 

Site Map   |   Terms of Use   |   Privacy Policy   |   Contact Us

Copyright © 1995-2007 RealNetworks, Inc. All rights reserved. RealNetworks and Helix are trademarks of RealNetworks.
All other trademarks or registered trademarks are the property of their respective holders.