Skip navigation links
IT services and product development
Menu
TwoLogs
IT services and product development

Source code for C_EventProxy

Download

The source files for this module are listed below.  You can also download the module C_EventProxy as a zip archive; this archive contains all the source files and documentation.

Description

EventProxy allows you to easily raise events in ATL projects from worker threads to event sinks that are registered on the main thread. All inter-thread marshalling of the connection points is worked out in a set of two base classes.

Information

This source code also uses our CLock class.  You can also download it at our website.

In your ATL project you may have objects that spin off worker threads to do some lengthy processing.  You might also be interested to raise an event every now and then from these worker threads.  You cannot let the worker thread fire it's events using the sinks that are registered with the original thread; to use these sinks, you have to marshall them first to your worker thread.  All inter-thread marshalling issues are worked out for you in in a set of two template classes you have to inherit from; CEventProxySource and CEventProxyDest.

I will use two fictive class names in this explanation;

  • CMyObject: the original COM object that implements the events
  • CMyWorker: the class that operates in the worker thread

Event sinks thus need to be marshalled from CMyObject to CMyWorker in order for CMyWorker to be able to raise events.

After your code is set up (see the section "source modifications" in the source file comment), make a call SetProxyDest from CMyObject to enable all advised event sinks to be marshalled to the given proxy destination (the worker thread).  To stop marshalling, simply pass NULL to SetProxyDest.

When a proxy destination is set, all newly advised event sinks will automatically be marshalled for you.  All Unadvised sinks will also get removed from the proxy destination.  If the already marshalled sinks get destroyed (because e.g. when your worker thread gets restarted), you need to call ReMarshall from the CMyObject thread before the event sinks are available in CMyWorker again.

To fire your events from CMyWorker, simply use the same syntax as you would with CMyObject, and add an extra parameter to the call (the list of marshalled sinks to fire the event to).  Thus, raising an event from CMyWorker could look something like this:

  CLockController oLocker;
  Fire_MyEvent(GetSinks(oLocker), sMyStringParam, nMyNumberParam);
  oLocker.Unlock();

Source modifications

You can marshall-enable CMyObject inheriting it from CEventProxySource.  Marshall-enable CMyWorker by letting it inherit from CEventProxyDest.  The parameters for the templates are:

TBase:
The base class of the source, in this case CMyObject.
TInterface:
The interface you want to use, in this case e.g. IMyObjectEvents.
TpuInterfaceIID:
A reference to the IID of the interface, e.g. &DIID_IMyObjectEvents.
TATLEventProxy:
The ATL-generated proxy you want to use to fire events.  This class can be found in the header file <yourprojectname>CP.h; e.g. CProxy_IMyObjectEvents.
TSinkStoreType:
Mostly you'll want what the ATL wizard fills in for this member as well (which is the default for this parameter).

There are some other modifications you have to perform to your source files as well;

  1. Open the connectionpoint definition file that the ATL wizards have generated for you when you chose your class to implement an event interface.  It is generally named <yourprojectname>CP.h.
  2. In this file, find the wizard-generated proxy for your event interface (e.g. CProxy_IMyObjectEvents).
  3. Copy the whole class, give the copy a new name (e.g. CMarshalledProxy_IMyObjectEvents)
  4. Strip the derivation from IConnectionPointImpl from this class; e.g.
      public IConnectionPointImpl<
        T,
        &DIID__IMyObjectEvents,
        CComDynamicUnkArray
      >
    and also delete the template definition in front of the proxy class;
      template<T>
  5. Let the worker class inherit from this proxy, just like CMyObject derives from the original proxy.
  6. Modify the newly generated proxy class.  Each member function should be modified a little bit;
    • give it the extra 1st parameter:
        CComDynamicUnkArray* puSinks
    • replace the two 'm_vec.' calls with 'puSinks->'
    • delete the following lines:
        T* pT = static_cast<T*>(this);
        pT->Lock();
        pT->Unlock();
      (I may look into the issues this brings at a later time).
  7. Whenever you add a method to your event interface, make sure to update both the original proxy AND the copied proxy!

Files

Each file belonging to this source code module is listed below.

EventProxy.h

/*******************************************************************************

  Version: 2
  Author:  Carl Colijn, TwoLogs
  Contact: c.colijn@twologs.com
  Source:  https://www.twologs.com/sourcecode

  This code is freely distributable, as long as this comment remains intact.
  If you find this source useful, you may use this code in your own projects
  free of charge, but some acknowledgement to the author of this code is always
  appreciated :)
  The source is however distributed 'as is' without waranty and/or support, and
  may not be fit for each and every application.  Use it at your own discretion
  and at your own risk.
  The source already has undergone testing.  This doesn't mean however that all
  bugs are removed from this piece of code.  If you find one of them, please
  contact me about it.  I can however not guarantee when and if the bug will be
  fixed.

  More information about this module can be found in the accompanying HTML file.

*******************************************************************************/

#ifndef INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H
#define INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H

#include <vector>
#include <map>
#include "Lock.h"

// Class predefine to make 'friend' work
template <
  class TSourceBase,
  class TInterface,
  const IID* TpuInterfaceIID,
  class TATLEventProxy,
  class TSinkStoreType = CComDynamicUnkArray
>
class CEventProxySource;

// Marshalled end of the event proxy
template <
  class TSourceBase,
  class TInterface,
  const IID* TpuInterfaceIID,
  class TATLEventProxy,
  class TSinkStoreType = CComDynamicUnkArray
>
class CEventProxyDest {
  typedef CEventProxySource<
    TSourceBase,
    TInterface,
    TpuInterfaceIID,
    TATLEventProxy,
    TSinkStoreType
  > CEventProxySourceInst;
  friend CEventProxySourceInst;

public:
  // Constructor
  CEventProxyDest():
   m_bNewSinks(false),
   m_poSource(NULL) {
  }

  // Destructor
  virtual ~CEventProxyDest() {
    // Aqcuire access first
    CLockController oLocker;
    oLocker.Lock(m_oAccessLock);

    // Make sure there are no unmarshalled sinks left in limbo
    Unmarshall();

    // Make sure all sinks are dereffed
    Clear();

    // And remove us from the source end
    if (m_poSource != NULL) {
      m_poSource->m_poDest = NULL;
    }
  }

  // Clears all marshalled proxy's
  void Clear() {
    // Aqcuire access first
    CLockController oLocker;
    oLocker.Lock(m_oAccessLock);

    // Unmarshall any waiting sinks first
    Unmarshall();

    // And remove the sinks
    IUnknown** ppoNextSink = m_apoSinks.begin();
    IUnknown** ppoLastSink = m_apoSinks.end();
    for (; ppoNextSink != ppoLastSink; ++ppoNextSink) {
      if (*ppoNextSink != NULL) {
        (*ppoNextSink)->Release();
      }
    }
    m_apoSinks.clear();
    m_anCookies.clear();
  }

  // Gets the sink array.  Unlock the locker when you are finished
  // using the sinks
  TSinkStoreType* GetSinks(CLockController& oLocker) {
    // Aqcuire access first
    oLocker.Lock(m_oAccessLock);

    // Unmarshall any waiting sinks first
    Unmarshall();

    // And return the new list of sinks
    return &m_apoSinks;
  }

protected:
  // Whether there are new sinks waiting
  bool m_bNewSinks;

  // Single streamed interface
  struct SStreamedInterface {
    CComPtr<IStream> poStream;
    DWORD nCookie;
  };

  // The temporary stream buffer for marshalled interfaces
  typedef std::vector<SStreamedInterface> CStreamedInterfaces;
  CStreamedInterfaces m_aoStreamedSinks;

  // The connection points
  TSinkStoreType m_apoSinks;

  // Destination cookies by source cookies for the connection points
  typedef std::map<DWORD, DWORD> CDestCookiesBySourceCookie;
  CDestCookiesBySourceCookie m_anCookies;

  // The update lock
  CLock m_oAccessLock;

  // The source end of the proxy
  CEventProxySourceInst* m_poSource;

private:
  // Finalizes the unmarshalling of all remaining marshalled interfaces
  // Called from the created thread automatically when you GetSinks
  bool Unmarshall() {
    // Look if there is anything to marshall
    bool bSuccess = true;
    if (m_bNewSinks) {
      // Yes -> Process all marshalled sinks
      int nNumConnects = m_aoStreamedSinks.size();
      CStreamedInterfaces::iterator poNextSink = m_aoStreamedSinks.begin();
      CStreamedInterfaces::iterator poLastSink = m_aoStreamedSinks.end();
      for (; poNextSink != poLastSink; ++poNextSink) {
        // Next marshalled sink -> unmarshall it
        SStreamedInterface oNextSink = *poNextSink;
        TInterface* poMarshalledSink;
        HRESULT eResult = CoGetInterfaceAndReleaseStream(
          oNextSink.poStream,
          *TpuInterfaceIID,
          (void**)&poMarshalledSink
        );
        if (SUCCEEDED(eResult)) {
          // Success -> put it in our streamed sink list
          DWORD nDestCookie = m_apoSinks.Add(poMarshalledSink);
          m_anCookies.insert(CDestCookiesBySourceCookie::value_type(
            oNextSink.nCookie,
            nDestCookie
          ));
        } else {
          bSuccess = false;
        }
      }

      // And remove all old stream pointers
      m_aoStreamedSinks.clear();
      m_bNewSinks = false;
    }

    // And return if all sinks could be unmarshalled
    return bSuccess;
  }

  // Removes the given interface
  bool Remove(DWORD nCookie) {
    // Aqcuire access first
    CLockController oLocker;
    oLocker.Lock(m_oAccessLock);

    // Unmarshall any waiting sinks first
    Unmarshall();

    // Try to find the translation for the cookie
    CDestCookiesBySourceCookie::iterator pnFoundCookie =
      m_anCookies.find(nCookie);
    bool bFound = pnFoundCookie != m_anCookies.end();
    if (bFound) {
      // Done -> remove the corresponding sink and cookie
      DWORD nDestCookie = pnFoundCookie->second;
      IUnknown* poUnk = m_apoSinks.GetUnknown(nDestCookie);
      if (poUnk != NULL) {
        poUnk->Release();
      }
      m_apoSinks.Remove(nDestCookie);
      m_anCookies.erase(pnFoundCookie);
    }

    // And return if all went well
    return bFound;
  }
};


// Source end of the event proxy
template <
  class TSourceBase,
  class TInterface,
  const IID* TpuInterfaceIID,
  class TATLEventProxy,
  class TSinkStoreType = CComDynamicUnkArray
>
class CEventProxySource:
  public TATLEventProxy,
  public IConnectionPointContainerImpl<TSourceBase> {
  typedef CEventProxyDest<
    TSourceBase,
    TInterface,
    TpuInterfaceIID,
    TATLEventProxy,
    TSinkStoreType
  > CEventProxyDestInst;
  friend CEventProxyDestInst;

public:
  // Constructor
  CEventProxySource():
   m_poDest(NULL) {
  }

  // Destructor
  virtual ~CEventProxySource() {
    // Look if the destination is still around
    if (m_poDest != NULL) {
      // Acquire access first
      CLockController oLocker;
      oLocker.Lock(m_poDest->m_oAccessLock);

      // And make the destination know were gone
      m_poDest->m_poSource = NULL;
    }
  }

  // Sets the destination for the marshalling
  // (provide NULL to disable marshalling)
  bool SetProxyDest(CEventProxyDestInst* poDestination) {
    // Look if a new destination is set
    bool bSuccess = true;
    if (poDestination != m_poDest) {
      // Yes -> replace the destination
      m_poDest = poDestination;

      // And look if to marshall any sinks
      if (m_poDest != NULL) {
        // Yes -> aqcuire access first
        CLockController oLocker;
        oLocker.Lock(m_poDest->m_oAccessLock);

        // Link it back to us
        m_poDest->m_poSource = this;

        // And marshall all separate sinks
        bSuccess = ReMarshall();
      }
    }

    // And return if all went smooth
    return bSuccess;
  }

  // Remarshalls all existing event sinks to the destination
  // Call from the parent thread
  bool ReMarshall() {
    // Look if a destination is set
    bool bSuccess = m_poDest != NULL;
    if (bSuccess) {
      // Yes -> aqcuire access first
      CLockController oLocker;
      oLocker.Lock(m_poDest->m_oAccessLock);

      // Clear any existing sinks first
      m_poDest->Clear();

      // And marshall all registered sinks
      int nNumConnects = m_vec.GetSize();
      for (int nConnectNr = 0; nConnectNr < nNumConnects; nConnectNr++) {
        // Get the next interface to marshall
        IUnknown* poNextSink = m_vec.GetAt(nConnectNr);
        if (poNextSink != NULL) {
          // Done -> marshall it
          if (!Marshall(poNextSink, m_vec.GetCookie(&poNextSink))) {
            // Couldn't -> note
            bSuccess = false;
          }
        }
      }
    }

    // And return our status
    return bSuccess;
  }

  // Marshalls the given event sink to the destination
  // Call from the parent thread
  bool Marshall(IUnknown* poSink, DWORD nCookie) {
    // Look if a destination is set
    bool bSuccess = m_poDest != NULL;
    if (bSuccess) {
      // Yes -> look if a new sink is specified
      if (poSink != NULL) {
        // Yes -> aqcuire access first
        CLockController oLocker;
        oLocker.Lock(m_poDest->m_oAccessLock);

        // Look if this sink is already marshalled
        CEventProxyDestInst::CDestCookiesBySourceCookie::iterator
          pnFoundCookie = m_poDest->m_anCookies.find(nCookie);
        if (pnFoundCookie == m_poDest->m_anCookies.end()) {
          // No -> marshall the sink
          IStream* poStream = NULL;
          bSuccess = SUCCEEDED(CoMarshalInterThreadInterfaceInStream(
            *TpuInterfaceIID,
            poSink,
            &poStream
          ));
          if (bSuccess) {
            // Success -> store the sink
            CEventProxyDestInst::SStreamedInterface oNewStream;
            oNewStream.poStream = poStream;
            oNewStream.nCookie = nCookie;
            m_poDest->m_aoStreamedSinks.push_back(oNewStream);

            // And note that new sinks await
            m_poDest->m_bNewSinks = true;
          }
        }
      }
    }

    // And return if the sink could be marshalled
    return bSuccess;
  }

  // Also marshalls the new sink to the marshall destination
  STDMETHOD(Advise)(IUnknown* poUnkSink, DWORD* pnCookie) {
    // Use the sink ourselves
    HRESULT eResult = IConnectionPointImpl<
      TSourceBase,
      TpuInterfaceIID,
      TSinkStoreType
    >::Advise(poUnkSink, pnCookie);

    // Also marshall a copy, if needed
    if (m_poDest != NULL) {
      Marshall(poUnkSink, *pnCookie);
    }

    // And return the result
    return eResult;
  }

  // Also removes the sink from the marshall destination
  STDMETHOD(Unadvise)(DWORD nCookie) {
    // Let the marshalled sink be released, if needed
    if (m_poDest != NULL) {
      m_poDest->Remove(nCookie);
    }

    // And remove the sink from our list as well
    return IConnectionPointImpl<
      TSourceBase,
      TpuInterfaceIID,
      TSinkStoreType
    >::Unadvise(nCookie);
  }

protected:
  // The destination for the marshalling
  CEventProxyDestInst* m_poDest;
};

#endif // INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H