Skip navigation links
IT services and product development
Menu
TwoLogs
IT services and product development

Source code for C_EventProxy

Download

The source files for this module are listed below.  You can also download the module C_EventProxy as a zip archive; this archive contains all the source files and documentation.

Description

EventProxy allows you to easily raise events in ATL projects from worker threads to event sinks that are registered on the main thread. All inter-thread marshalling of the connection points is worked out in a set of two base classes.

Information

This source code also uses our TLock class.  You can also download it at our website.

In your ATL project you may have objects that spin off worker threads to do some lengthy processing.  You might also be interested to raise an event every now and then from these worker threads.  You cannot let the worker thread fire it's events using the sinks that are registered with the original thread; to use these sinks, you have to marshall them first to your worker thread.  All inter-thread marshalling issues are worked out for you in in a set of two template classes you have to inherit from; TEventProxySource and TEventProxyDest.

I will use two fictive class names in this explanation;

  • TMyObject: the original COM object that implements the events
  • TMyWorker: the class that operates in the worker thread

Event sinks thus need to be marshalled from TMyObject to TMyWorker in order for TMyWorker to be able to raise events.

After your code is set up (see the section "source modifications" in the source file comment), make a call SetProxyDest from TMyObject to enable all advised event sinks to be marshalled to the given proxy destination (the worker thread).  To stop marshalling, simply pass NULL to SetProxyDest.

When a proxy destination is set, all newly advised event sinks will automatically be marshalled for you.  All Unadvised sinks will also get removed from the proxy destination.  If the already marshalled sinks get destroyed (because e.g. when your worker thread gets restarted), you need to call ReMarshall from the TMyObject thread before the event sinks are available in TMyWorker again.

To fire your events from TMyWorker, simply use the same syntax as you would with TMyObject, and add an extra parameter to the call (the list of marshalled sinks to fire the event to).  Thus, raising an event from TMyWorker could look something like this:

  TLocker locker;
  Fire_MyEvent(GetSinks(locker), myStringParam, myNumberParam);
  locker.Unlock();

Source modifications

You can marshall-enable TMyObject inheriting it from TEventProxySource.  Marshall-enable TMyWorker by letting it inherit from TEventProxyDest.  The parameters for the templates are:

TBase:
The base class of the source, in this case TMyObject.
TInterface:
The interface you want to use, in this case e.g. IMyObjectEvents.
TInterfaceIID:
A reference to the IID of the interface, e.g. &DIID_IMyObjectEvents.
TATLEventProxy:
The ATL-generated proxy you want to use to fire events.  This class can be found in the header file <yourprojectname>CP.h; e.g. TProxy_IMyObjectEvents.
TSinkStoreType:
Mostly you'll want what the ATL wizard fills in for this member as well (which is the default for this parameter).

There are some other modifications you have to perform to your source files as well;

  1. Open the connection point definition file that the ATL wizards have generated for you when you chose your class to implement an event interface.  It is generally named <yourprojectname>CP.h.
  2. In this file, find the wizard-generated proxy for your event interface (e.g. TProxy_IMyObjectEvents).
  3. Copy the whole class, give the copy a new name (e.g. CMarshalledProxy_IMyObjectEvents)
  4. Strip the derivation from IConnectionPointImpl from this class; e.g.
      public IConnectionPointImpl<
        T,
        &DIID__IMyObjectEvents,
        CComDynamicUnkArray
      >
    and also delete the template definition in front of the proxy class;
      template<T>
  5. Let the worker class inherit from this proxy, just like TMyObject derives from the original proxy.
  6. Modify the newly generated proxy class.  Each member function should be modified a little bit;
    • give it the extra 1st parameter:
        CComDynamicUnkArray* sinkPtrs
    • replace the two 'm_vec.' calls with 'sinkPtrs->'
    • delete the following lines:
        T* pT = static_cast<T*>(this);
        pT->Lock();
        pT->Unlock();
      (I may look into the issues this brings at a later time).
  7. Whenever you add a method to your event interface, make sure to update both the original proxy AND the copied proxy!

Files

Each file belonging to this source code module is listed below.

EventProxy.h

/*******************************************************************************

  Version: 2
  Author:  Carl Colijn, TwoLogs
  Contact: c.colijn@twologs.com
  Source:  https://www.twologs.com/sourcecode

  This code is freely distributable, as long as this comment remains intact.
  If you find this source useful, you may use this code in your own projects
  free of charge, but some acknowledgement to the author of this code is always
  appreciated :)
  The source is however distributed 'as is' without waranty and/or support, and
  may not be fit for each and every application.  Use it at your own discretion
  and at your own risk.
  The source already has undergone testing.  This doesn't mean however that all
  bugs are removed from this piece of code.  If you find one of them, please
  contact me about it.  I can however not guarantee when and if the bug will be
  fixed.

  More information about this module can be found in the accompanying HTML file.

*******************************************************************************/

#ifndef INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H
#define INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H

#include <vector>
#include <map>
#include "Lock.h"

// Class predefine to make 'friend' work
template <
  class TSourceBase,
  class TInterface,
  const IID* interfaceIIDPtr,
  class TATLEventProxy,
  class TSinkStoreType = CComDynamicUnkArray
>
class TEventProxySource;

// Marshalled end of the event proxy
template <
  class TSourceBase,
  class TInterface,
  const IID* interfaceIIDPtr,
  class TATLEventProxy,
  class TSinkStoreType = CComDynamicUnkArray
>
class TEventProxyDest {
  typedef TEventProxySource<
    TSourceBase,
    TInterface,
    interfaceIIDPtr,
    TATLEventProxy,
    TSinkStoreType
  > TEventProxySourceInst;
  friend TEventProxySourceInst;

public:
  // Constructor
  TEventProxyDest():
   m_newSinksWaiting(false),
   m_proxySourcePtr(NULL) {
  }

  // Destructor
  virtual ~TEventProxyDest() {
    // Aqcuire access first
    TLockController locker;
    locker.Lock(m_accessLock);

    // Make sure there are no unmarshalled sinks left in limbo
    Unmarshall();

    // Make sure all sinks are dereffed
    Clear();

    // And remove us from the source end
    if (m_proxySourcePtr != NULL) {
      m_proxySourcePtr->m_proxyDestPtr = NULL;
    }
  }

  // Clears all marshalled proxy's
  void Clear() {
    // Aqcuire access first
    TLockController locker;
    locker.Lock(m_accessLock);

    // Unmarshall any waiting sinks first
    Unmarshall();

    // And remove the sinks
    IUnknown** nextSinkPtrPtr = m_sinkPtrs.begin();
    IUnknown** lastSinkPtrPtr = m_sinkPtrs.end();
    for (; nextSinkPtrPtr != lastSinkPtrPtr; ++nextSinkPtrPtr) {
      if (*nextSinkPtrPtr != NULL) {
        (*nextSinkPtrPtr)->Release();
      }
    }
    m_sinkPtrs.clear();
    m_cookies.clear();
  }

  // Gets the sink array.  Unlock the locker when you are finished
  // using the sinks
  TSinkStoreType* GetSinks(TLockController& locker) {
    // Aqcuire access first
    locker.Lock(m_accessLock);

    // Unmarshall any waiting sinks first
    Unmarshall();

    // And return the new list of sinks
    return &m_sinkPtrs;
  }

protected:
  // Whether there are new sinks waiting
  bool m_newSinksWaiting;

  // Single streamed interface
  struct TStreamedInterface {
    CComPtr<IStream> streamPtr;
    DWORD cookie;
  };

  // The temporary stream buffer for marshalled interfaces
  typedef std::vector<TStreamedInterface> TStreamedInterfaces;
  TStreamedInterfaces m_streamedSinks;

  // The connection points
  TSinkStoreType m_sinkPtrs;

  // Destination cookies by source cookies for the connection points
  typedef std::map<DWORD, DWORD> TDestCookiesBySourceCookie;
  TDestCookiesBySourceCookie m_cookies;

  // The update lock
  TLock m_accessLock;

  // The source end of the proxy
  TEventProxySourceInst* m_proxySourcePtr;

private:
  // Finalizes the unmarshalling of all remaining marshalled interfaces
  // Called from the created thread automatically when you GetSinks
  bool Unmarshall() {
    // Look if there is anything to marshall
    bool allOK = true;
    if (m_newSinksWaiting) {
      // Yes -> Process all marshalled sinks
      int numConnects = m_streamedSinks.size();
      TStreamedInterfaces::iterator nextSinkPtr = m_streamedSinks.begin();
      TStreamedInterfaces::iterator lastSinkPtr = m_streamedSinks.end();
      for (; nextSinkPtr != lastSinkPtr; ++nextSinkPtr) {
        // Next marshalled sink -> unmarshall it
        TStreamedInterface nextSink = *nextSinkPtr;
        TInterface* marshalledSinkPtr;
        HRESULT result = CoGetInterfaceAndReleaseStream(
          nextSink.streamPtr,
          *interfaceIIDPtr,
          (void**)&marshalledSinkPtr
        );
        if (SUCCEEDED(result)) {
          // Success -> put it in our streamed sink list
          DWORD destCookie = m_sinkPtrs.Add(marshalledSinkPtr);
          m_cookies.insert(TDestCookiesBySourceCookie::value_type(
            nextSink.cookie,
            destCookie
          ));
        } else {
          allOK = false;
        }
      }

      // And remove all old stream pointers
      m_streamedSinks.clear();
      m_newSinksWaiting = false;
    }

    // And return if all sinks could be unmarshalled
    return allOK;
  }

  // Removes the given interface
  bool Remove(DWORD cookie) {
    // Aqcuire access first
    TLockController locker;
    locker.Lock(m_accessLock);

    // Unmarshall any waiting sinks first
    Unmarshall();

    // Try to find the translation for the cookie
    TDestCookiesBySourceCookie::iterator foundCookiePtr =
      m_cookies.find(cookie);
    bool cookieFound = foundCookiePtr != m_cookies.end();
    if (cookieFound) {
      // Done -> remove the corresponding sink and cookie
      DWORD destCookie = foundCookiePtr->second;
      IUnknown* unkPtr = m_sinkPtrs.GetUnknown(destCookie);
      if (unkPtr != NULL) {
        unkPtr->Release();
      }
      m_sinkPtrs.Remove(destCookie);
      m_cookies.erase(foundCookiePtr);
    }

    // And return if all went well
    return cookieFound;
  }
};


// Source end of the event proxy
template <
  class TSourceBase,
  class TInterface,
  const IID* interfaceIIDPtr,
  class TATLEventProxy,
  class TSinkStoreType = CComDynamicUnkArray
>
class TEventProxySource:
  public TATLEventProxy,
  public IConnectionPointContainerImpl<TSourceBase> {
  typedef TEventProxyDest<
    TSourceBase,
    TInterface,
    interfaceIIDPtr,
    TATLEventProxy,
    TSinkStoreType
  > TEventProxyDestInst;
  friend TEventProxyDestInst;

public:
  // Constructor
  TEventProxySource():
   m_proxyDestPtr(NULL) {
  }

  // Destructor
  virtual ~TEventProxySource() {
    // Look if the destination is still around
    if (m_proxyDestPtr != NULL) {
      // Acquire access first
      TLockController locker;
      locker.Lock(m_proxyDestPtr->m_accessLock);

      // And make the destination know were gone
      m_proxyDestPtr->m_proxySourcePtr = NULL;
    }
  }

  // Sets the destination for the marshalling
  // (provide NULL to disable marshalling)
  bool SetProxyDest(TEventProxyDestInst* destinationPtr) {
    // Look if a new destination is set
    bool allOK = true;
    if (destinationPtr != m_proxyDestPtr) {
      // Yes -> replace the destination
      m_proxyDestPtr = destinationPtr;

      // And look if to marshall any sinks
      if (m_proxyDestPtr != NULL) {
        // Yes -> aqcuire access first
        TLockController locker;
        locker.Lock(m_proxyDestPtr->m_accessLock);

        // Link it back to us
        m_proxyDestPtr->m_proxySourcePtr = this;

        // And marshall all separate sinks
        allOK = ReMarshall();
      }
    }

    // And return if all went smooth
    return allOK;
  }

  // Remarshalls all existing event sinks to the destination
  // Call from the parent thread
  bool ReMarshall() {
    // Look if a destination is set
    bool allOK = m_proxyDestPtr != NULL;
    if (allOK) {
      // Yes -> aqcuire access first
      TLockController locker;
      locker.Lock(m_proxyDestPtr->m_accessLock);

      // Clear any existing sinks first
      m_proxyDestPtr->Clear();

      // And marshall all registered sinks
      int numConnects = m_vec.GetSize();
      for (int connectNr = 0; connectNr < numConnects; connectNr++) {
        // Get the next interface to marshall
        IUnknown* nextSinkPtr = m_vec.GetAt(connectNr);
        if (nextSinkPtr != NULL) {
          // Done -> marshall it
          if (!Marshall(nextSinkPtr, m_vec.GetCookie(&nextSinkPtr))) {
            // Couldn't -> note
            allOK = false;
          }
        }
      }
    }

    // And return our status
    return allOK;
  }

  // Marshalls the given event sink to the destination
  // Call from the parent thread
  bool Marshall(IUnknown* sinkPtr, DWORD cookie) {
    // Look if a destination is set
    bool allOK = m_proxyDestPtr != NULL;
    if (allOK) {
      // Yes -> look if a new sink is specified
      if (sinkPtr != NULL) {
        // Yes -> aqcuire access first
        TLockController locker;
        locker.Lock(m_proxyDestPtr->m_accessLock);

        // Look if this sink is already marshalled
        TEventProxyDestInst::TDestCookiesBySourceCookie::iterator
          foundCookiePtr = m_proxyDestPtr->m_cookies.find(cookie);
        if (foundCookiePtr == m_proxyDestPtr->m_cookies.end()) {
          // No -> marshall the sink
          IStream* streamPtr = NULL;
          allOK = SUCCEEDED(CoMarshalInterThreadInterfaceInStream(
            *interfaceIIDPtr,
            sinkPtr,
            &streamPtr
          ));
          if (allOK) {
            // Success -> store the sink
            TEventProxyDestInst::TStreamedInterface newStream;
            newStream.streamPtr = streamPtr;
            newStream.cookie = cookie;
            m_proxyDestPtr->m_streamedSinks.push_back(newStream);

            // And note that new sinks await
            m_proxyDestPtr->m_newSinksWaiting = true;
          }
        }
      }
    }

    // And return if the sink could be marshalled
    return allOK;
  }

  // Also marshalls the new sink to the marshall destination
  STDMETHOD(Advise)(IUnknown* unkSinkPtr, DWORD* cookiePtr) {
    // Use the sink ourselves
    HRESULT result = IConnectionPointImpl<
      TSourceBase,
      interfaceIIDPtr,
      TSinkStoreType
    >::Advise(unkSinkPtr, cookiePtr);

    // Also marshall a copy, if needed
    if (m_proxyDestPtr != NULL) {
      Marshall(unkSinkPtr, *cookiePtr);
    }

    // And return the result
    return result;
  }

  // Also removes the sink from the marshall destination
  STDMETHOD(Unadvise)(DWORD cookie) {
    // Let the marshalled sink be released, if needed
    if (m_proxyDestPtr != NULL) {
      m_proxyDestPtr->Remove(cookie);
    }

    // And remove the sink from our list as well
    return IConnectionPointImpl<
      TSourceBase,
      interfaceIIDPtr,
      TSinkStoreType
    >::Unadvise(cookie);
  }

protected:
  // The destination for the marshalling
  TEventProxyDestInst* m_proxyDestPtr;
};

#endif // INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H