The source files for this module are listed below. You can also download the module C_EventProxy as a zip archive; this archive contains all the source files and documentation.
EventProxy allows you to easily raise events in ATL projects from worker threads to event sinks that are registered on the main thread. All inter-thread marshalling of the connection points is worked out in a set of two base classes.
This source code also uses our TLock class. You can also download it at our website.
In your ATL project you may have objects that spin off worker threads to do some lengthy processing. You might also be interested to raise an event every now and then from these worker threads. You cannot let the worker thread fire it's events using the sinks that are registered with the original thread; to use these sinks, you have to marshall them first to your worker thread. All inter-thread marshalling issues are worked out for you in in a set of two template classes you have to inherit from; TEventProxySource and TEventProxyDest.
I will use two fictive class names in this explanation;
Event sinks thus need to be marshalled from TMyObject to TMyWorker in order for TMyWorker to be able to raise events.
After your code is set up (see the section "source modifications" in the source file comment), make a call SetProxyDest from TMyObject to enable all advised event sinks to be marshalled to the given proxy destination (the worker thread). To stop marshalling, simply pass NULL to SetProxyDest.
When a proxy destination is set, all newly advised event sinks will automatically be marshalled for you. All Unadvised sinks will also get removed from the proxy destination. If the already marshalled sinks get destroyed (because e.g. when your worker thread gets restarted), you need to call ReMarshall from the TMyObject thread before the event sinks are available in TMyWorker again.
To fire your events from TMyWorker, simply use the same syntax as you would with TMyObject, and add an extra parameter to the call (the list of marshalled sinks to fire the event to). Thus, raising an event from TMyWorker could look something like this:
TLocker locker; Fire_MyEvent(GetSinks(locker), myStringParam, myNumberParam); locker.Unlock();
You can marshall-enable TMyObject inheriting it from TEventProxySource. Marshall-enable TMyWorker by letting it inherit from TEventProxyDest. The parameters for the templates are:
There are some other modifications you have to perform to your source files as well;
public IConnectionPointImpl< T, &DIID__IMyObjectEvents, CComDynamicUnkArray >and also delete the template definition in front of the proxy class;
template<T>
CComDynamicUnkArray* sinkPtrs
T* pT = static_cast<T*>(this); pT->Lock(); pT->Unlock();(I may look into the issues this brings at a later time).
Each file belonging to this source code module is listed below.
/*******************************************************************************
Version: 2
Author: Carl Colijn, TwoLogs
Contact: c.colijn@twologs.com
Source: https://www.twologs.com/sourcecode
This code is freely distributable, as long as this comment remains intact.
If you find this source useful, you may use this code in your own projects
free of charge, but some acknowledgement to the author of this code is always
appreciated :)
The source is however distributed 'as is' without waranty and/or support, and
may not be fit for each and every application. Use it at your own discretion
and at your own risk.
The source already has undergone testing. This doesn't mean however that all
bugs are removed from this piece of code. If you find one of them, please
contact me about it. I can however not guarantee when and if the bug will be
fixed.
More information about this module can be found in the accompanying HTML file.
*******************************************************************************/
#ifndef INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H
#define INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H
#include <vector>
#include <map>
#include "Lock.h"
// Class predefine to make 'friend' work
template <
class TSourceBase,
class TInterface,
const IID* interfaceIIDPtr,
class TATLEventProxy,
class TSinkStoreType = CComDynamicUnkArray
>
class TEventProxySource;
// Marshalled end of the event proxy
template <
class TSourceBase,
class TInterface,
const IID* interfaceIIDPtr,
class TATLEventProxy,
class TSinkStoreType = CComDynamicUnkArray
>
class TEventProxyDest {
typedef TEventProxySource<
TSourceBase,
TInterface,
interfaceIIDPtr,
TATLEventProxy,
TSinkStoreType
> TEventProxySourceInst;
friend TEventProxySourceInst;
public:
// Constructor
TEventProxyDest():
m_newSinksWaiting(false),
m_proxySourcePtr(NULL) {
}
// Destructor
virtual ~TEventProxyDest() {
// Aqcuire access first
TLockController locker;
locker.Lock(m_accessLock);
// Make sure there are no unmarshalled sinks left in limbo
Unmarshall();
// Make sure all sinks are dereffed
Clear();
// And remove us from the source end
if (m_proxySourcePtr != NULL) {
m_proxySourcePtr->m_proxyDestPtr = NULL;
}
}
// Clears all marshalled proxy's
void Clear() {
// Aqcuire access first
TLockController locker;
locker.Lock(m_accessLock);
// Unmarshall any waiting sinks first
Unmarshall();
// And remove the sinks
IUnknown** nextSinkPtrPtr = m_sinkPtrs.begin();
IUnknown** lastSinkPtrPtr = m_sinkPtrs.end();
for (; nextSinkPtrPtr != lastSinkPtrPtr; ++nextSinkPtrPtr) {
if (*nextSinkPtrPtr != NULL) {
(*nextSinkPtrPtr)->Release();
}
}
m_sinkPtrs.clear();
m_cookies.clear();
}
// Gets the sink array. Unlock the locker when you are finished
// using the sinks
TSinkStoreType* GetSinks(TLockController& locker) {
// Aqcuire access first
locker.Lock(m_accessLock);
// Unmarshall any waiting sinks first
Unmarshall();
// And return the new list of sinks
return &m_sinkPtrs;
}
protected:
// Whether there are new sinks waiting
bool m_newSinksWaiting;
// Single streamed interface
struct TStreamedInterface {
CComPtr<IStream> streamPtr;
DWORD cookie;
};
// The temporary stream buffer for marshalled interfaces
typedef std::vector<TStreamedInterface> TStreamedInterfaces;
TStreamedInterfaces m_streamedSinks;
// The connection points
TSinkStoreType m_sinkPtrs;
// Destination cookies by source cookies for the connection points
typedef std::map<DWORD, DWORD> TDestCookiesBySourceCookie;
TDestCookiesBySourceCookie m_cookies;
// The update lock
TLock m_accessLock;
// The source end of the proxy
TEventProxySourceInst* m_proxySourcePtr;
private:
// Finalizes the unmarshalling of all remaining marshalled interfaces
// Called from the created thread automatically when you GetSinks
bool Unmarshall() {
// Look if there is anything to marshall
bool allOK = true;
if (m_newSinksWaiting) {
// Yes -> Process all marshalled sinks
int numConnects = m_streamedSinks.size();
TStreamedInterfaces::iterator nextSinkPtr = m_streamedSinks.begin();
TStreamedInterfaces::iterator lastSinkPtr = m_streamedSinks.end();
for (; nextSinkPtr != lastSinkPtr; ++nextSinkPtr) {
// Next marshalled sink -> unmarshall it
TStreamedInterface nextSink = *nextSinkPtr;
TInterface* marshalledSinkPtr;
HRESULT result = CoGetInterfaceAndReleaseStream(
nextSink.streamPtr,
*interfaceIIDPtr,
(void**)&marshalledSinkPtr
);
if (SUCCEEDED(result)) {
// Success -> put it in our streamed sink list
DWORD destCookie = m_sinkPtrs.Add(marshalledSinkPtr);
m_cookies.insert(TDestCookiesBySourceCookie::value_type(
nextSink.cookie,
destCookie
));
} else {
allOK = false;
}
}
// And remove all old stream pointers
m_streamedSinks.clear();
m_newSinksWaiting = false;
}
// And return if all sinks could be unmarshalled
return allOK;
}
// Removes the given interface
bool Remove(DWORD cookie) {
// Aqcuire access first
TLockController locker;
locker.Lock(m_accessLock);
// Unmarshall any waiting sinks first
Unmarshall();
// Try to find the translation for the cookie
TDestCookiesBySourceCookie::iterator foundCookiePtr =
m_cookies.find(cookie);
bool cookieFound = foundCookiePtr != m_cookies.end();
if (cookieFound) {
// Done -> remove the corresponding sink and cookie
DWORD destCookie = foundCookiePtr->second;
IUnknown* unkPtr = m_sinkPtrs.GetUnknown(destCookie);
if (unkPtr != NULL) {
unkPtr->Release();
}
m_sinkPtrs.Remove(destCookie);
m_cookies.erase(foundCookiePtr);
}
// And return if all went well
return cookieFound;
}
};
// Source end of the event proxy
template <
class TSourceBase,
class TInterface,
const IID* interfaceIIDPtr,
class TATLEventProxy,
class TSinkStoreType = CComDynamicUnkArray
>
class TEventProxySource:
public TATLEventProxy,
public IConnectionPointContainerImpl<TSourceBase> {
typedef TEventProxyDest<
TSourceBase,
TInterface,
interfaceIIDPtr,
TATLEventProxy,
TSinkStoreType
> TEventProxyDestInst;
friend TEventProxyDestInst;
public:
// Constructor
TEventProxySource():
m_proxyDestPtr(NULL) {
}
// Destructor
virtual ~TEventProxySource() {
// Look if the destination is still around
if (m_proxyDestPtr != NULL) {
// Acquire access first
TLockController locker;
locker.Lock(m_proxyDestPtr->m_accessLock);
// And make the destination know were gone
m_proxyDestPtr->m_proxySourcePtr = NULL;
}
}
// Sets the destination for the marshalling
// (provide NULL to disable marshalling)
bool SetProxyDest(TEventProxyDestInst* destinationPtr) {
// Look if a new destination is set
bool allOK = true;
if (destinationPtr != m_proxyDestPtr) {
// Yes -> replace the destination
m_proxyDestPtr = destinationPtr;
// And look if to marshall any sinks
if (m_proxyDestPtr != NULL) {
// Yes -> aqcuire access first
TLockController locker;
locker.Lock(m_proxyDestPtr->m_accessLock);
// Link it back to us
m_proxyDestPtr->m_proxySourcePtr = this;
// And marshall all separate sinks
allOK = ReMarshall();
}
}
// And return if all went smooth
return allOK;
}
// Remarshalls all existing event sinks to the destination
// Call from the parent thread
bool ReMarshall() {
// Look if a destination is set
bool allOK = m_proxyDestPtr != NULL;
if (allOK) {
// Yes -> aqcuire access first
TLockController locker;
locker.Lock(m_proxyDestPtr->m_accessLock);
// Clear any existing sinks first
m_proxyDestPtr->Clear();
// And marshall all registered sinks
int numConnects = m_vec.GetSize();
for (int connectNr = 0; connectNr < numConnects; connectNr++) {
// Get the next interface to marshall
IUnknown* nextSinkPtr = m_vec.GetAt(connectNr);
if (nextSinkPtr != NULL) {
// Done -> marshall it
if (!Marshall(nextSinkPtr, m_vec.GetCookie(&nextSinkPtr))) {
// Couldn't -> note
allOK = false;
}
}
}
}
// And return our status
return allOK;
}
// Marshalls the given event sink to the destination
// Call from the parent thread
bool Marshall(IUnknown* sinkPtr, DWORD cookie) {
// Look if a destination is set
bool allOK = m_proxyDestPtr != NULL;
if (allOK) {
// Yes -> look if a new sink is specified
if (sinkPtr != NULL) {
// Yes -> aqcuire access first
TLockController locker;
locker.Lock(m_proxyDestPtr->m_accessLock);
// Look if this sink is already marshalled
TEventProxyDestInst::TDestCookiesBySourceCookie::iterator
foundCookiePtr = m_proxyDestPtr->m_cookies.find(cookie);
if (foundCookiePtr == m_proxyDestPtr->m_cookies.end()) {
// No -> marshall the sink
IStream* streamPtr = NULL;
allOK = SUCCEEDED(CoMarshalInterThreadInterfaceInStream(
*interfaceIIDPtr,
sinkPtr,
&streamPtr
));
if (allOK) {
// Success -> store the sink
TEventProxyDestInst::TStreamedInterface newStream;
newStream.streamPtr = streamPtr;
newStream.cookie = cookie;
m_proxyDestPtr->m_streamedSinks.push_back(newStream);
// And note that new sinks await
m_proxyDestPtr->m_newSinksWaiting = true;
}
}
}
}
// And return if the sink could be marshalled
return allOK;
}
// Also marshalls the new sink to the marshall destination
STDMETHOD(Advise)(IUnknown* unkSinkPtr, DWORD* cookiePtr) {
// Use the sink ourselves
HRESULT result = IConnectionPointImpl<
TSourceBase,
interfaceIIDPtr,
TSinkStoreType
>::Advise(unkSinkPtr, cookiePtr);
// Also marshall a copy, if needed
if (m_proxyDestPtr != NULL) {
Marshall(unkSinkPtr, *cookiePtr);
}
// And return the result
return result;
}
// Also removes the sink from the marshall destination
STDMETHOD(Unadvise)(DWORD cookie) {
// Let the marshalled sink be released, if needed
if (m_proxyDestPtr != NULL) {
m_proxyDestPtr->Remove(cookie);
}
// And remove the sink from our list as well
return IConnectionPointImpl<
TSourceBase,
interfaceIIDPtr,
TSinkStoreType
>::Unadvise(cookie);
}
protected:
// The destination for the marshalling
TEventProxyDestInst* m_proxyDestPtr;
};
#endif // INCLUDE_TWOLOGS_COMMON_EVENTPROXY_H