Source code for C_EventProxy
Download
The source files for this module are listed below. You can also download the module C_EventProxy as a zip archive; this archive contains all the source files and documentation.
Description
EventProxy allows you to easily raise events in ATL projects from worker threads to event sinks that are registered on the main thread. All inter-thread marshalling of the connection points is worked out in a set of two base classes.
Information
This source code also uses our CLock class. You can also download it at our website.
In your ATL project you may have objects that spin off worker threads to do some lengthy processing. You might also be interested to raise an event every now and then from these worker threads. You cannot let the worker thread fire it's events using the sinks that are registered with the original thread; to use these sinks, you have to marshall them first to your worker thread. All inter-thread marshalling issues are worked out for you in in a set of two template classes you have to inherit from; CEventProxySource and CEventProxyDest.
I will use two fictive class names in this explanation;
- CMyObject: the original COM object that implements the events
- CMyWorker: the class that operates in the worker thread
After your code is set up (see the section "source modifications" in the source file comment), make a call SetProxyDest from CMyObject to enable all advised event sinks to be marshalled to the given proxy destination (the worker thread). To stop marshalling, simply pass NULL to SetProxyDest.
When a proxy destination is set, all newly advised event sinks will automatically be marshalled for you. All Unadvised sinks will also get removed from the proxy destination. If the already marshalled sinks get destroyed (because e.g. when your worker thread gets restarted), you need to call ReMarshall from the CMyObject thread before the event sinks are available in CMyWorker again.
To fire your events from CMyWorker, simply use the same syntax as you would with CMyObject, and add an extra parameter to the call (the list of marshalled sinks to fire the event to). Thus, raising an event from CMyWorker could look something like this:
CLockController oLocker; Fire_MyEvent(GetSinks(oLocker), sMyStringParam, nMyNumberParam); oLocker.Unlock();
Source modifications
You can marshall-enable CMyObject inheriting it from CEventProxySource. Marshall-enable CMyWorker by letting it inherit from CEventProxyDest. The parameters for the templates are:
- TBase:
The base class of the source, in this case CMyObject. - TInterface:
The interface you want to use, in this case e.g. IMyObjectEvents. - TpuInterfaceIID:
A reference to the IID of the interface, e.g. &DIID_IMyObjectEvents. - TATLEventProxy:
The ATL-generated proxy you want to use to fire events. This class can be found in the header file <yourprojectname>CP.h; e.g. CProxy_IMyObjectEvents. - TSinkStoreType:
Mostly you'll want what the ATL wizard fills in for this member as well (which is the default for this parameter).
There are some other modifications you have to perform to your source files as well;
- Open the connectionpoint definition file that the ATL wizards have generated for you when you chose your class to implement an event interface. It is generally named <yourprojectname>CP.h.
- In this file, find the wizard-generated proxy for your event interface (e.g. CProxy_IMyObjectEvents).
- Copy the whole class, give the copy a new name (e.g. CMarshalledProxy_IMyObjectEvents)
- Strip the derivation from IConnectionPointImpl from this class; e.g.
public IConnectionPointImpl<T, &DIID__IMyObjectEvents, CComDynamicUnkArray>
and also delete the template definition in front of the proxy class;template<T>
- Let the worker class inherit from this proxy, just like CMyObject derives from the original proxy.
- Modify the newly generated proxy class. Each member function should
be modified a little bit;
- give it the extra 1st parameter:
CComDynamicUnkArray* puSinks
- replace the two 'm_vec.' calls with 'puSinks->'
- delete the following lines:
T* pT = static_cast<T*>(this); pT->Lock(); pT->Unlock();
(I may look into the issues this brings at a later time).
- give it the extra 1st parameter:
- Whenever you add a method to your event interface, make sure to update both the original proxy AND the copied proxy!
Files
Each file belonging to this source code module is listed below.
EventProxy.h
/*******************************************************************************
Version: 2
Author: Carl Colijn, TwoLogs
Contact: c.colijn@twologs.com
Source: http://www.twologs.com/en/resources/sourcecode.asp
This code is freely distributable, as long as this comment remains intact.
If you find this source useful, you may use this code in your own projects
free of charge, but some acknowledgement to the author of this code is always
appreciated :)
The source is however distributed 'as is' without waranty and/or support, and
may not be fit for each and every application. Use it at your own discretion
and at your own risk.
The source already has undergone testing. This doesn't mean however that all
bugs are removed from this piece of code. If you find one of them, please
contact me about it. I can however not guarantee when and if the bug will be
fixed.
More information about this module can be found in the accompanying HTML file.
*******************************************************************************/
#ifndef INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H
#define INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H
#include <vector>
#include <map>
#include "Lock.h"
// Class predefine to make 'friend' work
template <
class TSourceBase,
class TInterface,
const IID* TpuInterfaceIID,
class TATLEventProxy,
class TSinkStoreType = CComDynamicUnkArray
>
class CEventProxySource;
// Marshalled end of the event proxy
template <
class TSourceBase,
class TInterface,
const IID* TpuInterfaceIID,
class TATLEventProxy,
class TSinkStoreType = CComDynamicUnkArray
>
class CEventProxyDest {
typedef CEventProxySource<
TSourceBase,
TInterface,
TpuInterfaceIID,
TATLEventProxy,
TSinkStoreType
> CEventProxySourceInst;
friend CEventProxySourceInst;
public:
// Constructor
CEventProxyDest():
m_bNewSinks(false),
m_poSource(NULL) {
}
// Destructor
virtual ~CEventProxyDest() {
// Aqcuire access first
CLockController oLocker;
oLocker.Lock(m_oAccessLock);
// Make sure there are no unmarshalled sinks left in limbo
Unmarshall();
// Make sure all sinks are dereffed
Clear();
// And remove us from the source end
if (m_poSource != NULL) {
m_poSource->m_poDest = NULL;
}
}
// Clears all marshalled proxy's
void Clear() {
// Aqcuire access first
CLockController oLocker;
oLocker.Lock(m_oAccessLock);
// Unmarshall any waiting sinks first
Unmarshall();
// And remove the sinks
IUnknown** ppoNextSink = m_apoSinks.begin();
IUnknown** ppoLastSink = m_apoSinks.end();
for (; ppoNextSink != ppoLastSink; ++ppoNextSink) {
if (*ppoNextSink != NULL) {
(*ppoNextSink)->Release();
}
}
m_apoSinks.clear();
m_anCookies.clear();
}
// Gets the sink array. Unlock the locker when you are finished
// using the sinks
TSinkStoreType* GetSinks(CLockController& oLocker) {
// Aqcuire access first
oLocker.Lock(m_oAccessLock);
// Unmarshall any waiting sinks first
Unmarshall();
// And return the new list of sinks
return &m_apoSinks;
}
protected:
// Whether there are new sinks waiting
bool m_bNewSinks;
// Single streamed interface
struct SStreamedInterface {
CComPtr<IStream> poStream;
DWORD nCookie;
};
// The temporary stream buffer for marshalled interfaces
typedef std::vector<SStreamedInterface> CStreamedInterfaces;
CStreamedInterfaces m_aoStreamedSinks;
// The connection points
TSinkStoreType m_apoSinks;
// Destination cookies by source cookies for the connection points
typedef std::map<DWORD, DWORD> CDestCookiesBySourceCookie;
CDestCookiesBySourceCookie m_anCookies;
// The update lock
CLock m_oAccessLock;
// The source end of the proxy
CEventProxySourceInst* m_poSource;
private:
// Finalizes the unmarshalling of all remaining marshalled interfaces
// Called from the created thread automatically when you GetSinks
bool Unmarshall() {
// Look if there is anything to marshall
bool bSuccess = true;
if (m_bNewSinks) {
// Yes -> Process all marshalled sinks
int nNumConnects = m_aoStreamedSinks.size();
CStreamedInterfaces::iterator poNextSink = m_aoStreamedSinks.begin();
CStreamedInterfaces::iterator poLastSink = m_aoStreamedSinks.end();
for (; poNextSink != poLastSink; ++poNextSink) {
// Next marshalled sink -> unmarshall it
SStreamedInterface oNextSink = *poNextSink;
TInterface* poMarshalledSink;
HRESULT eResult = CoGetInterfaceAndReleaseStream(
oNextSink.poStream,
*TpuInterfaceIID,
(void**)&poMarshalledSink
);
if (SUCCEEDED(eResult)) {
// Success -> put it in our streamed sink list
DWORD nDestCookie = m_apoSinks.Add(poMarshalledSink);
m_anCookies.insert(CDestCookiesBySourceCookie::value_type(
oNextSink.nCookie,
nDestCookie
));
} else {
bSuccess = false;
}
}
// And remove all old stream pointers
m_aoStreamedSinks.clear();
m_bNewSinks = false;
}
// And return if all sinks could be unmarshalled
return bSuccess;
}
// Removes the given interface
bool Remove(DWORD nCookie) {
// Aqcuire access first
CLockController oLocker;
oLocker.Lock(m_oAccessLock);
// Unmarshall any waiting sinks first
Unmarshall();
// Try to find the translation for the cookie
CDestCookiesBySourceCookie::iterator pnFoundCookie =
m_anCookies.find(nCookie);
bool bFound = pnFoundCookie != m_anCookies.end();
if (bFound) {
// Done -> remove the corresponding sink and cookie
DWORD nDestCookie = pnFoundCookie->second;
IUnknown* poUnk = m_apoSinks.GetUnknown(nDestCookie);
if (poUnk != NULL) {
poUnk->Release();
}
m_apoSinks.Remove(nDestCookie);
m_anCookies.erase(pnFoundCookie);
}
// And return if all went well
return bFound;
}
};
// Source end of the event proxy
template <
class TSourceBase,
class TInterface,
const IID* TpuInterfaceIID,
class TATLEventProxy,
class TSinkStoreType = CComDynamicUnkArray
>
class CEventProxySource:
public TATLEventProxy,
public IConnectionPointContainerImpl<TSourceBase> {
typedef CEventProxyDest<
TSourceBase,
TInterface,
TpuInterfaceIID,
TATLEventProxy,
TSinkStoreType
> CEventProxyDestInst;
friend CEventProxyDestInst;
public:
// Constructor
CEventProxySource():
m_poDest(NULL) {
}
// Destructor
virtual ~CEventProxySource() {
// Look if the destination is still around
if (m_poDest != NULL) {
// Acquire access first
CLockController oLocker;
oLocker.Lock(m_poDest->m_oAccessLock);
// And make the destination know were gone
m_poDest->m_poSource = NULL;
}
}
// Sets the destination for the marshalling
// (provide NULL to disable marshalling)
bool SetProxyDest(CEventProxyDestInst* poDestination) {
// Look if a new destination is set
bool bSuccess = true;
if (poDestination != m_poDest) {
// Yes -> replace the destination
m_poDest = poDestination;
// And look if to marshall any sinks
if (m_poDest != NULL) {
// Yes -> aqcuire access first
CLockController oLocker;
oLocker.Lock(m_poDest->m_oAccessLock);
// Link it back to us
m_poDest->m_poSource = this;
// And marshall all separate sinks
bSuccess = ReMarshall();
}
}
// And return if all went smooth
return bSuccess;
}
// Remarshalls all existing event sinks to the destination
// Call from the parent thread
bool ReMarshall() {
// Look if a destination is set
bool bSuccess = m_poDest != NULL;
if (bSuccess) {
// Yes -> aqcuire access first
CLockController oLocker;
oLocker.Lock(m_poDest->m_oAccessLock);
// Clear any existing sinks first
m_poDest->Clear();
// And marshall all registered sinks
int nNumConnects = m_vec.GetSize();
for (int nConnectNr = 0; nConnectNr < nNumConnects; nConnectNr++) {
// Get the next interface to marshall
IUnknown* poNextSink = m_vec.GetAt(nConnectNr);
if (poNextSink != NULL) {
// Done -> marshall it
if (!Marshall(poNextSink, m_vec.GetCookie(&poNextSink))) {
// Couldn't -> note
bSuccess = false;
}
}
}
}
// And return our status
return bSuccess;
}
// Marshalls the given event sink to the destination
// Call from the parent thread
bool Marshall(IUnknown* poSink, DWORD nCookie) {
// Look if a destination is set
bool bSuccess = m_poDest != NULL;
if (bSuccess) {
// Yes -> look if a new sink is specified
if (poSink != NULL) {
// Yes -> aqcuire access first
CLockController oLocker;
oLocker.Lock(m_poDest->m_oAccessLock);
// Look if this sink is already marshalled
CEventProxyDestInst::CDestCookiesBySourceCookie::iterator
pnFoundCookie = m_poDest->m_anCookies.find(nCookie);
if (pnFoundCookie == m_poDest->m_anCookies.end()) {
// No -> marshall the sink
IStream* poStream = NULL;
bSuccess = SUCCEEDED(CoMarshalInterThreadInterfaceInStream(
*TpuInterfaceIID,
poSink,
&poStream
));
if (bSuccess) {
// Success -> store the sink
CEventProxyDestInst::SStreamedInterface oNewStream;
oNewStream.poStream = poStream;
oNewStream.nCookie = nCookie;
m_poDest->m_aoStreamedSinks.push_back(oNewStream);
// And note that new sinks await
m_poDest->m_bNewSinks = true;
}
}
}
}
// And return if the sink could be marshalled
return bSuccess;
}
// Also marshalls the new sink to the marshall destination
STDMETHOD(Advise)(IUnknown* poUnkSink, DWORD* pnCookie) {
// Use the sink ourselves
HRESULT eResult = IConnectionPointImpl<
TSourceBase,
TpuInterfaceIID,
TSinkStoreType
>::Advise(poUnkSink, pnCookie);
// Also marshall a copy, if needed
if (m_poDest != NULL) {
Marshall(poUnkSink, *pnCookie);
}
// And return the result
return eResult;
}
// Also removes the sink from the marshall destination
STDMETHOD(Unadvise)(DWORD nCookie) {
// Let the marshalled sink be released, if needed
if (m_poDest != NULL) {
m_poDest->Remove(nCookie);
}
// And remove the sink from our list as well
return IConnectionPointImpl<
TSourceBase,
TpuInterfaceIID,
TSinkStoreType
>::Unadvise(nCookie);
}
protected:
// The destination for the marshalling
CEventProxyDestInst* m_poDest;
};
#endif // INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H

Products
Overview
C_EventProxy