[Common-dev] Re: [Filesystem-dev] CR-Client: Fix httpfsys so it works with AutoUpgrade.

[Common-dev] Re: [Filesystem-dev] CR-Client: Fix httpfsys so it works with AutoUpgrade.

Greg Wright gwright at real.com
Fri Sep 24 10:50:13 PDT 2004


New diff. Differences from first diff:

o  No semaphore work, just reverted back to original impl and
    let the threads to extra wake-ups.
o Added IsHandled() to ThreadMessageSink and use it in HXTaskManger.
o Make Task use the thread it is created on as the response thread.
   Creator can override with SetResponseThread().
o Fix AddHandler to correctly return FAIL, OK and UNEXPECTED.

a couple files attached because it is easier to read a couple
methods that way.


--greg.


Index: hxtaskmanager.cpp
===================================================================
RCS file: /cvsroot/common/util/hxtaskmanager.cpp,v
retrieving revision 1.1
diff -u -w -r1.1 hxtaskmanager.cpp
--- hxtaskmanager.cpp	19 Jul 2004 23:23:03 -0000	1.1
+++ hxtaskmanager.cpp	24 Sep 2004 17:46:01 -0000
@@ -57,11 +57,57 @@
  #define D_THREADTASKMGR D_INFO //XXXLCM


+HXTaskManager::Task::Task()
+    : m_pResponseThread(NULL),
+      m_pMsgSink(NULL)
+{
+    HXThread::MakeThread(m_pResponseThread);
+    HX_ASSERT(m_pResponseThread);
+}
+
+
+HXTaskManager::Task::~Task()
+{
+    HX_DELETE(m_pResponseThread);
+    HX_RELEASE(m_pMsgSink);
+}
+
+
+HXThread* HXTaskManager::Task::GetResponseThread()
+{
+    return m_pResponseThread;
+}
+
+void HXTaskManager::Task::SetResponseThread(HXThread* pThread )
+{
+    HX_ASSERT(pThread);
+    HX_DELETE(m_pResponseThread);
+    m_pResponseThread = pThread;
+}
+
+
+HXThreadMessageSink* HXTaskManager::Task::GetMessageSink()
+{
+    return m_pMsgSink;
+}
+
+void HXTaskManager::Task::SetMessageSink(HXThreadMessageSink* pMsgSink)
+{
+    HX_ASSERT(pMsgSink);
+    HX_ASSERT(!m_pMsgSink);
+    HX_RELEASE(m_pMsgSink);
+    if( pMsgSink )
+    {
+        m_pMsgSink = pMsgSink;
+        m_pMsgSink->AddRef();
+    }
+}
+
+
+

  HXTaskManager::HXTaskManager(UINT32 taskDoneMsg)
  : m_pWorkEvent(0)
-, m_pParentThread(0)
-, m_pMsgSink(0)
  , m_pTaskMutex(0)
  , m_exit(false)
  , m_taskDoneMsg(taskDoneMsg) /*unique thread message required per instance*/
@@ -75,7 +121,6 @@
      DestroyPool();
      HX_DELETE(m_pWorkEvent);
      HX_DELETE(m_pTaskMutex);
-    HX_DELETE(m_pParentThread);

      // clean up any outstanding tasks
      CHXSimpleList::Iterator end = m_pending.End();
@@ -86,12 +131,6 @@
          pTask->Release();
      }
      m_pending.RemoveAll();
-
-    if(m_pMsgSink)
-    {
-        m_pMsgSink->RemoveHandler(m_taskDoneMsg);
-        HX_RELEASE(m_pMsgSink);
-    }
  }

  HX_RESULT HXTaskManager::Init(UINT32 threadCount, UINT32 threadPriority)
@@ -105,13 +144,6 @@
          goto exit;
      }

-    // HXThread wrapper for the parent thread (i.e., this thread)
-    hr = HXThread::MakeThread(m_pParentThread);
-    if(FAILED(hr))
-    {
-        goto exit;
-    }
-
      // mutex for accessing task queue
      hr = HXMutex::MakeMutex(m_pTaskMutex);
      if(FAILED(hr))
@@ -126,13 +158,6 @@
          goto exit;
      }

-    hr = HXThreadMessageSink::GetThreadInstance(m_pMsgSink);
-    if(SUCCEEDED(hr))
-    {
-        // note: we need one message per task manager instance
-        hr = m_pMsgSink->AddHandler(m_taskDoneMsg, this);
-    }
-
  exit:

      return hr;
@@ -161,27 +186,43 @@

  HX_RESULT HXTaskManager::AddTask(Task* pTask)
  {
-    HX_ASSERT(pTask);
-
-    HX_RESULT hr = HXR_OK;
+    HXThread*            pTmp     = NULL;
+    HX_RESULT            hr       = HXR_FAIL;
+    HXThreadMessageSink* pMsgSink = NULL;

      HX_ASSERT(m_threads.GetCount() > 0);
+    HX_ASSERT(pTask);

-    if(HXR_OK == hr)
+    hr = HXThreadMessageSink::GetThreadInstance(pMsgSink);
+    if(SUCCEEDED(hr) && pMsgSink)
+    {
+        if( !pMsgSink->IsHandled(m_taskDoneMsg) )
+        {
+            hr = pMsgSink->AddHandler(m_taskDoneMsg, this);
+            HX_ASSERT(SUCCEEDED(hr));
+        }
+
+        if( SUCCEEDED(hr) )
+        {
+            hr = HXThread::MakeThread(pTmp);
+            if( SUCCEEDED(hr) && pTmp )
      {
          // and add to pending task list and tell a task thread to start working
          HXScopeLock lock(m_pTaskMutex);
-
          DPRINTF(D_THREADTASKMGR, ("HXTaskManager::AddTask(): adding task #%lu\n", 
m_pending.GetCount()));

          pTask->AddRef();
+                pTask->SetMessageSink(pMsgSink);
          m_pending.AddTail(pTask);
+
          // XXXLCM use a semaphore instead; more appropriate
          if( m_pending.GetCount() == 1)
          {
              m_pWorkEvent->SignalEvent();
          }
      }
+        }
+    }

      return hr;
  }
@@ -293,6 +334,10 @@
  void* HXTaskManager::TaskThreadProc()
  {

+    HXThreadMessageSink* pMsgSink      = NULL;
+    HXThread*            pResponseThread = NULL;
+    Task*                pTask         = NULL;
+
      for( ; ; )
      {
          DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): waiting for work event...\n"));
@@ -309,14 +354,22 @@
          }

          // fetch and execute next task
-        Task* pTask = GetNextTask();
+        pTask = GetNextTask();
          if(pTask)
          {
              pTask->Execute();

+            pResponseThread = pTask->GetResponseThread();
+            pMsgSink = pTask->GetMessageSink();
+            HX_ASSERT(pResponseThread && pMsgSink);
+
+            if( pResponseThread && pMsgSink )
+            {
              // post response to (response) thread
              HXThreadMessage msgResponse(m_taskDoneMsg, pTask, 0);
-            m_pParentThread->PostMessage(&msgResponse, m_pMsgSink->GetSinkHandle());
+                pResponseThread->PostMessage(&msgResponse, pMsgSink->GetSinkHandle());
+            }
+
          }
      }

Index: hxthreadmessagesink.cpp
===================================================================
RCS file: /cvsroot/common/util/hxthreadmessagesink.cpp,v
retrieving revision 1.2
diff -u -w -r1.2 hxthreadmessagesink.cpp
--- hxthreadmessagesink.cpp	22 Jul 2004 19:23:50 -0000	1.2
+++ hxthreadmessagesink.cpp	24 Sep 2004 17:46:01 -0000
@@ -134,6 +134,8 @@

  HXThreadMessageSink::~HXThreadMessageSink()
  {
+    HX_ASSERT( m_handlers.IsEmpty() );
+    m_handlers.RemoveAll();
  }

  //
@@ -141,13 +143,33 @@
  //
  HX_RESULT HXThreadMessageSink::AddHandler(UINT32 msg, MessageHandler* pHandler)
  {
+    HX_RESULT res = HXR_FAIL;
+
      DPRINTF(D_INFO, ("HXThreadMessageSink::AddHandler()[%p]: msg %lu\n", this, msg));
      HX_ASSERT(pHandler);
-    HX_ASSERT(0 == m_handlers.Lookup(reinterpret_cast<void*>(msg)));

+    if( pHandler )
+    {
+        void* pTmp = NULL;
+
+        if( m_handlers.Lookup(reinterpret_cast<void*>(msg), pTmp ))
+        {
+            //We already have a handler. Make sure it is the same
+            //one. If not, it is an error.
+            if( pTmp == reinterpret_cast<void*>(pHandler) )
+            {
+                res = HXR_UNEXPECTED;
+            }
+        }
+        else
+        {
+            //We don't have a handler for this one yet.
      m_handlers.SetAt(reinterpret_cast<void*>(msg), pHandler);
+            res = HXR_OK;
+        }
+    }

-    return HXR_OK;
+    return res;
  }


@@ -167,6 +189,16 @@
      }

      return hr;
+}
+
+BOOL HXThreadMessageSink::IsHandled(UINT32 msg)
+{
+    BOOL retVal = FALSE;
+    if( !m_handlers.IsEmpty() )
+    {
+        retVal = (0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
+    }
+    return retVal;
  }


Index: pub/hxtaskmanager.h
===================================================================
RCS file: /cvsroot/common/util/pub/hxtaskmanager.h,v
retrieving revision 1.2
diff -u -w -r1.2 hxtaskmanager.h
--- pub/hxtaskmanager.h	20 Jul 2004 00:24:13 -0000	1.2
+++ pub/hxtaskmanager.h	24 Sep 2004 17:46:01 -0000
@@ -62,10 +62,11 @@
  : public HXThreadMessageSink::MessageHandler
  {
  public:
-    class Task
-    : public HXRefCounted
+    class Task : public HXRefCounted
      {
      public:
+        Task();
+        virtual ~Task();

          // Called on task thread
          virtual void Execute() = 0;
@@ -77,6 +78,18 @@
          // itself which may be defined by the derived task.
          //
          virtual void OnTaskComplete(HX_RESULT hr) = 0;
+
+
+        HXThread* GetResponseThread();
+        void      SetResponseThread(HXThread* pThread);
+
+        HXThreadMessageSink* GetMessageSink();
+        void                 SetMessageSink(HXThreadMessageSink* pMsgSink);
+
+      protected:
+        HXThread*            m_pResponseThread;
+        HXThreadMessageSink* m_pMsgSink;
+
      };

  public:
@@ -106,9 +119,6 @@

  private:

-    // thread that adds task
-    HXThread*           m_pParentThread;
-
      // pool of worker threads that run tasks
      CHXSet              m_threads;
      HXEvent*            m_pWorkEvent;
@@ -120,10 +130,6 @@

      // unique message used by msg window to notify this instance
      UINT32              m_taskDoneMsg;
-
-    // receives task-done notifications on parent thread
-    HXThreadMessageSink*    m_pMsgSink;
-
  };


Index: pub/hxthreadmessagesink.h
===================================================================
RCS file: /cvsroot/common/util/pub/hxthreadmessagesink.h,v
retrieving revision 1.1
diff -u -w -r1.1 hxthreadmessagesink.h
--- pub/hxthreadmessagesink.h	19 Jul 2004 23:23:03 -0000	1.1
+++ pub/hxthreadmessagesink.h	24 Sep 2004 17:46:01 -0000
@@ -75,6 +75,8 @@

      HX_RESULT AddHandler(UINT32 msg, HXThreadMessageSink::MessageHandler* pHandler);
      HX_RESULT RemoveHandler(UINT32 msg);
+    BOOL      IsHandled(UINT32 msg);
+
      MsgSinkHandle GetSinkHandle() const;

  protected:
-------------- next part --------------
/* ***** BEGIN LICENSE BLOCK ***** 
 * Version: RCSL 1.0/RPSL 1.0 
 *  
 * Portions Copyright (c) 1995-2002 RealNetworks, Inc. All Rights Reserved. 
 *      
 * The contents of this file, and the files included with this file, are 
 * subject to the current version of the RealNetworks Public Source License 
 * Version 1.0 (the "RPSL") available at 
 * http://www.helixcommunity.org/content/rpsl unless you have licensed 
 * the file under the RealNetworks Community Source License Version 1.0 
 * (the "RCSL") available at http://www.helixcommunity.org/content/rcsl, 
 * in which case the RCSL will apply. You may also obtain the license terms 
 * directly from RealNetworks.  You may not use this file except in 
 * compliance with the RPSL or, if you have a valid RCSL with RealNetworks 
 * applicable to this file, the RCSL.  Please see the applicable RPSL or 
 * RCSL for the rights, obligations and limitations governing use of the 
 * contents of the file.  
 *  
 * This file is part of the Helix DNA Technology. RealNetworks is the 
 * developer of the Original Code and owns the copyrights in the portions 
 * it created. 
 *  
 * This file, and the files included with this file, is distributed and made 
 * available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER 
 * EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS ALL SUCH WARRANTIES, 
 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, FITNESS 
 * FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. 
 * 
 * Technology Compatibility Kit Test Suite(s) Location: 
 *    http://www.helixcommunity.org/content/tck 
 * 
 * Contributor(s): 
 *  
 * ***** END LICENSE BLOCK ***** */ 



#include "hxtypes.h"
#include "hxcom.h"
#include "hxslist.h"
#include "hxthread.h"
#include "hxscope_lock.h"

#include "hxmap.h"
#include "debug.h"
#include "hxassert.h"
#include "hxheap.h"

#include "hxtaskmanager.h"

#ifdef _DEBUG
#undef HX_THIS_FILE		
static const char HX_THIS_FILE[] = __FILE__;
#endif									 


#define D_THREADTASKMGR D_INFO //XXXLCM


HXTaskManager::Task::Task()
    : m_pResponseThread(NULL),
      m_pMsgSink(NULL)
{
    HXThread::MakeThread(m_pResponseThread);
    HX_ASSERT(m_pResponseThread);
}


HXTaskManager::Task::~Task()
{
    HX_DELETE(m_pResponseThread);
    HX_RELEASE(m_pMsgSink);
}


HXThread* HXTaskManager::Task::GetResponseThread()
{
    return m_pResponseThread;
}

void HXTaskManager::Task::SetResponseThread(HXThread* pThread )
{
    HX_ASSERT(pThread);
    HX_DELETE(m_pResponseThread);
    m_pResponseThread = pThread;
}


HXThreadMessageSink* HXTaskManager::Task::GetMessageSink()
{
    return m_pMsgSink;
}

void HXTaskManager::Task::SetMessageSink(HXThreadMessageSink* pMsgSink)
{
    HX_ASSERT(pMsgSink);
    HX_ASSERT(!m_pMsgSink);
    HX_RELEASE(m_pMsgSink);
    if( pMsgSink )
    {
        m_pMsgSink = pMsgSink;
        m_pMsgSink->AddRef();
    }
}




HXTaskManager::HXTaskManager(UINT32 taskDoneMsg)
: m_pWorkEvent(0)
, m_pTaskMutex(0)
, m_exit(false)
, m_taskDoneMsg(taskDoneMsg) /*unique thread message required per instance*/
{
}


HXTaskManager::~HXTaskManager()
{
    // clean up task thread and sync objects
    DestroyPool();
    HX_DELETE(m_pWorkEvent);
    HX_DELETE(m_pTaskMutex);

    // clean up any outstanding tasks
    CHXSimpleList::Iterator end = m_pending.End();
    for(CHXSimpleList::Iterator begin = m_pending.Begin(); begin != end; ++begin)
    {
        Task* pTask = reinterpret_cast<Task*>(*begin);
        pTask->OnTaskComplete(HXR_ABORT); 
        pTask->Release();
    }
    m_pending.RemoveAll();
}

HX_RESULT HXTaskManager::Init(UINT32 threadCount, UINT32 threadPriority)
{
    HX_ASSERT(threadCount > 0);
 
    // event used to signal task thread to wake up and start working 
    HX_RESULT hr = HXEvent::MakeEvent(m_pWorkEvent, NULL, TRUE /*TRUE == manual reset*/); 
    if(FAILED(hr))
    {
        goto exit;
    }

    // mutex for accessing task queue
    hr = HXMutex::MakeMutex(m_pTaskMutex);
    if(FAILED(hr))
    {
        goto exit;
    }

    // create worker threads
    hr = CreatePool(threadCount, threadPriority);
    if(FAILED(hr))
    {
        goto exit;
    }

exit:
    
    return hr;
}

HX_RESULT HXTaskManager::RemoveTask(Task* pTask)
{
    HX_ASSERT(pTask);
    HXScopeLock lock(m_pTaskMutex);

    LISTPOSITION pos = m_pending.Find(pTask);
    if(pos)
    {
        DPRINTF(D_THREADTASKMGR, ("HXTaskManager::HXTaskManager::CancelTask(): removing task %p\n", pTask));
        m_pending.RemoveAt(pos);
        pTask->OnTaskComplete(HXR_ABORT);
        pTask->Release();
        return HXR_OK;
    }

    DPRINTF(D_THREADTASKMGR, ("HXTaskManager::HXTaskManager::CancelTask(): no task %p (already started or complete)\n", pTask));
    return HXR_FAIL;
}



HX_RESULT HXTaskManager::AddTask(Task* pTask)
{
    HXThread*            pTmp     = NULL;
    HX_RESULT            hr       = HXR_FAIL;
    HXThreadMessageSink* pMsgSink = NULL;
    
    HX_ASSERT(m_threads.GetCount() > 0);
    HX_ASSERT(pTask);

    hr = HXThreadMessageSink::GetThreadInstance(pMsgSink);
    if(SUCCEEDED(hr) && pMsgSink)
    {
        if( !pMsgSink->IsHandled(m_taskDoneMsg) )
        {
            hr = pMsgSink->AddHandler(m_taskDoneMsg, this);
            HX_ASSERT(SUCCEEDED(hr));
        }

        if( SUCCEEDED(hr) )
        {
            hr = HXThread::MakeThread(pTmp);
            if( SUCCEEDED(hr) && pTmp )
            {
                // and add to pending task list and tell a task thread to start working
                HXScopeLock lock(m_pTaskMutex);
                DPRINTF(D_THREADTASKMGR, ("HXTaskManager::AddTask(): adding task #%lu\n", m_pending.GetCount()));

                pTask->AddRef();
                pTask->SetMessageSink(pMsgSink);
                m_pending.AddTail(pTask);
        
                // XXXLCM use a semaphore instead; more appropriate
                if( m_pending.GetCount() == 1)
                {
                    m_pWorkEvent->SignalEvent();
                }
            }
        }
    }

    return hr;
}

UINT32 HXTaskManager::GetPendingTaskCount() const
{
    HXScopeLock lock(m_pTaskMutex);
    return m_pending.GetCount();

}


HX_RESULT HXTaskManager::CreateTaskThread(HXThread*& pThread, UINT32 threadPriority)
{
    DPRINTF(D_THREADTASKMGR, ("HXTaskManager::CreateTaskThread(): creating thread...\n"));

    HX_ASSERT(!pThread);
    HX_RESULT hr = HXThread::MakeThread(pThread);
    if(HXR_OK == hr)
    {
        hr = pThread->CreateThread(TaskThreadProc_, this, HX_CREATE_SUSPENDED);
        if(SUCCEEDED(hr))
        {
            hr = pThread->SetPriority(threadPriority);
            if(SUCCEEDED(hr))
            {
                hr = pThread->Resume();
            }
        }

        if (FAILED(hr))
        {
            HX_ASSERT(false);
            HX_DELETE(pThread);
        }
    }
    return hr;
}

HX_RESULT HXTaskManager::CreatePool(UINT32 count, UINT32 threadPriority)
{
    DPRINTF(D_THREADTASKMGR, ("HXTaskManager::CreatePool(): creating %lu threads...\n", count));
    HX_RESULT hr = HXR_FAIL;
    for(UINT32 idx = 0; idx < count; ++idx)
    {
        HXThread* pThread = 0;
        hr = CreateTaskThread(pThread, threadPriority);
        if(FAILED(hr))
        {
            HX_ASSERT(false);
            DestroyPool();
            break;
        }
        m_threads.Add(pThread);
    }
    return hr;
}

void HXTaskManager::DestroyPool()
{
    if(!m_threads.IsEmpty())
    {
        // signal task threads to exit
        m_exit = true;
        m_pWorkEvent->SignalEvent();

        CHXSet::Iterator end = m_threads.End();
        for(CHXSet::Iterator begin = m_threads.Begin(); begin != end; ++begin)
        {
            // wait for thread to exit gracefully
            HXThread* pThread = reinterpret_cast<HXThread*>(*begin);
	    pThread->Exit(0);
            DPRINTF(D_THREADTASKMGR, ("HXTaskManager::DestroyPool(): thread [%p] exited\n", pThread));
	    HX_DELETE(pThread);
     
        }
        m_threads.RemoveAll();
        DPRINTF(D_THREADTASKMGR, ("HXTaskManager::DestroyPool(): %lu tasks abandoned\n", GetPendingTaskCount()));
    }
}

//
// called on task thread
//
HXTaskManager::Task* HXTaskManager::GetNextTask()
{
    Task* pTask = 0;
    HXScopeLock lock(m_pTaskMutex);
    if(!m_pending.IsEmpty())
    {
        pTask = reinterpret_cast<Task*>(m_pending.RemoveHead());
        if(m_pending.IsEmpty())
        {
            // no more tasks
            DPRINTF(D_THREADTASKMGR, ("HXTaskManager::GetNextTask(): no more tasks...\n"));
            m_pWorkEvent->ResetEvent();
        }
    }
    
    return pTask;
}

void* HXTaskManager::TaskThreadProc_(void* pv)
{
    HXTaskManager* pThis = reinterpret_cast<HXTaskManager*>(pv);
    HX_ASSERT(pThis);
    return pThis->TaskThreadProc();  
}
void* HXTaskManager::TaskThreadProc()
{

    HXThreadMessageSink* pMsgSink      = NULL;
    HXThread*            pResponseThread = NULL;
    Task*                pTask         = NULL;

    for( ; ; )
    {
        DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): waiting for work event...\n"));

        // sleep until there is work to do
        m_pWorkEvent->Wait(ALLFS /*INFINITE*/);
        
        DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): got work event...\n"));

        if(m_exit)
        {
            DPRINTF(D_THREADTASKMGR, ("HXTaskManager::TaskThreadProc(): exiting...\n"));
            break;
        }
        
        // fetch and execute next task
        pTask = GetNextTask();
        if(pTask)
        {
            pTask->Execute();

            pResponseThread = pTask->GetResponseThread();
            pMsgSink = pTask->GetMessageSink();
            HX_ASSERT(pResponseThread && pMsgSink);

            if( pResponseThread && pMsgSink )
            {
                // post response to (response) thread
                HXThreadMessage msgResponse(m_taskDoneMsg, pTask, 0);
                pResponseThread->PostMessage(&msgResponse, pMsgSink->GetSinkHandle());
            }
            
        }        
    }
    
    return 0;
}



//
// HXThreadMessageSink::MessageHandler
//
// Called on parent thread after the resolver thread posts m_taskDoneMsg
//
UINT32 HXTaskManager::HandleMessage(const HXThreadMessage& msg)
{
    HX_ASSERT(m_taskDoneMsg == msg.m_ulMessage);

    // get message data
    Task* pTask = reinterpret_cast<Task*>(msg.m_pParam1);
    HX_ASSERT(pTask);
       
    // notify completion of task
    pTask->OnTaskComplete(HXR_OK);
    pTask->Release();

    return 0;
}



-------------- next part --------------
/* ***** BEGIN LICENSE BLOCK ***** 
 * Version: RCSL 1.0/RPSL 1.0 
 *  
 * Portions Copyright (c) 1995-2002 RealNetworks, Inc. All Rights Reserved. 
 *      
 * The contents of this file, and the files included with this file, are 
 * subject to the current version of the RealNetworks Public Source License 
 * Version 1.0 (the "RPSL") available at 
 * http://www.helixcommunity.org/content/rpsl unless you have licensed 
 * the file under the RealNetworks Community Source License Version 1.0 
 * (the "RCSL") available at http://www.helixcommunity.org/content/rcsl, 
 * in which case the RCSL will apply. You may also obtain the license terms 
 * directly from RealNetworks.  You may not use this file except in 
 * compliance with the RPSL or, if you have a valid RCSL with RealNetworks 
 * applicable to this file, the RCSL.  Please see the applicable RPSL or 
 * RCSL for the rights, obligations and limitations governing use of the 
 * contents of the file.  
 *  
 * This file is part of the Helix DNA Technology. RealNetworks is the 
 * developer of the Original Code and owns the copyrights in the portions 
 * it created. 
 *  
 * This file, and the files included with this file, is distributed and made 
 * available on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER 
 * EXPRESS OR IMPLIED, AND REALNETWORKS HEREBY DISCLAIMS ALL SUCH WARRANTIES, 
 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, FITNESS 
 * FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. 
 * 
 * Technology Compatibility Kit Test Suite(s) Location: 
 *    http://www.helixcommunity.org/content/tck 
 * 
 * Contributor(s): 
 *  
 * ***** END LICENSE BLOCK ***** */ 

#include "hxtypes.h"
#include "hxthread.h"
#include "debug.h"
#include "hxassert.h"
#include "hxheap.h"
#include "hxthreadmessagesink.h"


#ifdef _DEBUG
#undef HX_THIS_FILE             
static const char HX_THIS_FILE[] = __FILE__;
#endif  

#if defined(_WINDOWS) //XXXLCM need tls abtraction

static DWORD g_idxSinkSlot = TLS_OUT_OF_INDEXES;
static INT32 g_sinkCount = 0;

HX_RESULT HXThreadMessageSink::GetThreadInstance(HXThreadMessageSink*& pSink)
{
    // XXXLCM 1) remove windows specific tls 2) clean up tls
    HX_RESULT hr = HXR_FAIL;
    
    if(TLS_OUT_OF_INDEXES == g_idxSinkSlot)
    { 
        // allocate global index so threads can use index to access slots
        g_idxSinkSlot = TlsAlloc();
    }

    if( g_idxSinkSlot != TLS_OUT_OF_INDEXES)
    {
        pSink = reinterpret_cast<HXThreadMessageSink*> (TlsGetValue(g_idxSinkSlot));
        if(pSink)
        {
            hr = HXR_OK;
            pSink->AddRef();
        }
        else
        {
            // create sink
            hr =  HXThreadMessageSink::Create(pSink);
            if (SUCCEEDED(hr))
            {
                // put sink in slot
                TlsSetValue(g_idxSinkSlot, pSink);
                ++g_sinkCount;
            }
            else if (0 == g_sinkCount)
            {
                // free index (no sinks)
                TlsFree(g_idxSinkSlot);
                g_idxSinkSlot = TLS_OUT_OF_INDEXES;
            }
        }
        
    }

    return hr;
}

void HXThreadMessageSink::FinalRelease()
{
    // remove this sink from thread sink slot 
    HXThreadMessageSink* pSink = reinterpret_cast<HXThreadMessageSink*> (TlsGetValue(g_idxSinkSlot));
    HX_ASSERT(pSink == this);
    if(pSink)
    {
        TlsSetValue(g_idxSinkSlot, 0);
        HX_ASSERT(g_sinkCount > 0);
        if (--g_sinkCount == 0)
        {
            // free index (no sinks)
            TlsFree(g_idxSinkSlot);
            g_idxSinkSlot = TLS_OUT_OF_INDEXES;
        }
    }

    // continue final release (delete this)
    HXRefCounted::FinalRelease();
}
#else
HX_RESULT HXThreadMessageSink::GetThreadInstance(HXThreadMessageSink*& pSink)
{
    HX_ASSERT(false);
    return HXR_NOTIMPL;
}

void HXThreadMessageSink::FinalRelease()
{
    HX_ASSERT(false);
}

#endif //_WINDOWS

HXThreadMessageSink::HXThreadMessageSink()
:m_handle(INVALID_MSGSINK_HANDLE)
{
}

HXThreadMessageSink::~HXThreadMessageSink()
{
    HX_ASSERT( m_handlers.IsEmpty() );
    m_handlers.RemoveAll();
}

// 
// add handler for given message
//
HX_RESULT HXThreadMessageSink::AddHandler(UINT32 msg, MessageHandler* pHandler)
{
    HX_RESULT res = HXR_FAIL;
    
    DPRINTF(D_INFO, ("HXThreadMessageSink::AddHandler()[%p]: msg %lu\n", this, msg));
    HX_ASSERT(pHandler);

    if( pHandler )
    {
        void* pTmp = NULL;
        
        if( m_handlers.Lookup(reinterpret_cast<void*>(msg), pTmp ))
        {
            //We already have a handler. Make sure it is the same
            //one. If not, it is an error.
            if( pTmp == reinterpret_cast<void*>(pHandler) )
            {
                res = HXR_UNEXPECTED;
            }
        }
        else
        {
            //We don't have a handler for this one yet.
            m_handlers.SetAt(reinterpret_cast<void*>(msg), pHandler );
            res = HXR_OK;
        }
    }
    
    return res;
}


//
// remove handler for given message 
//
HX_RESULT HXThreadMessageSink::RemoveHandler(UINT32 msg)
{
    DPRINTF(D_INFO, ("HXThreadMessageSink::RemoveHandler()[%p]: msg %lu\n", this, msg));

    HX_RESULT hr = HXR_OK;
    HX_ASSERT(0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
    if (!m_handlers.RemoveKey(reinterpret_cast<void*>(msg)))
    {
        hr = HXR_FAIL;
        HX_ASSERT(false);
    }
    
    return hr;
}

BOOL HXThreadMessageSink::IsHandled(UINT32 msg)
{
    BOOL retVal = FALSE;
    if( !m_handlers.IsEmpty() )
    {
        retVal = (0 != m_handlers.Lookup(reinterpret_cast<void*>(msg)));
    }
    return retVal;
}













More information about the Common-dev mailing list
 

Site Map   |   Terms of Use   |   Privacy Policy   |   Contact Us

Copyright © 1995-2007 RealNetworks, Inc. All rights reserved. RealNetworks and Helix are trademarks of RealNetworks.
All other trademarks or registered trademarks are the property of their respective holders.