2727#include < typeinfo>
2828#include < type_traits>
2929#include < string>
30- #include < FairMQChannel.h>
31- #include < FairMQMessage.h>
32- #include < FairMQParts.h>
3330#include < TMessage.h>
3431#include " CommonUtils/ShmManager.h"
3532#include " CommonUtils/ShmAllocator.h"
3835#include < unistd.h>
3936#include < cassert>
4037
38+ class FairMQParts ;
39+ class FairMQChannel ;
40+
4141namespace o2 {
4242namespace Base {
4343
@@ -214,78 +214,36 @@ inline std::string demangle(const char* name)
214214 return (status == 0 ) ? res.get () : name;
215215}
216216
217- template <typename Container>
218- void attachShmMessage (Container const & hits, FairMQChannel& channel, FairMQParts& parts, bool * busy_ptr)
219- {
220- struct shmcontext {
221- int id;
222- void * object_ptr;
223- bool * busy_ptr;
224- };
225-
226- auto & instance = o2::utils::ShmManager::Instance ();
227- shmcontext info{ instance.getShmID (), (void *)&hits, busy_ptr };
228- LOG (DEBUG) << " -- SHM SEND --" ;
229- LOG (INFO) << " -- SENDING -- " << hits.size () << " HITS " ;
230- LOG (INFO) << " -- OBJ PTR -- " << info.object_ptr << " " ;
231- assert (instance.isPointerOk (info.object_ptr ));
232-
233- std::unique_ptr<FairMQMessage> message (channel.NewSimpleMessage (info));
234- parts.AddPart (std::move (message));
235- }
217+ void attachShmMessage (void * hitsptr, FairMQChannel& channel, FairMQParts& parts, bool * busy_ptr);
218+ void * decodeShmCore (FairMQParts& dataparts, int index, bool *& busy);
236219
237220template <typename T>
238221T decodeShmMessage (FairMQParts& dataparts, int index, bool *& busy)
239222{
240- auto rawmessage = std::move (dataparts.At (index));
241- struct shmcontext {
242- int id;
243- void * object_ptr;
244- bool * busy_ptr;
245- };
246-
247- shmcontext* info = (shmcontext*)rawmessage->GetData ();
248- LOG (DEBUG) << " GOT SHMID " << info->id ;
249-
250- busy = info->busy_ptr ;
251- return reinterpret_cast <T>(info->object_ptr );
223+ return reinterpret_cast <T>(decodeShmCore (dataparts, index, busy));
252224}
253225
226+ // this goes into the source
227+ void attachMessageBufferToParts (FairMQParts& parts, FairMQChannel& channel,
228+ void * data, size_t size, void (*func_ptr)(void * data, void * hint), void* hint);
229+
254230template <typename Container>
255231void attachTMessage (Container const & hits, FairMQChannel& channel, FairMQParts& parts)
256232{
257233 TMessage* tmsg = new TMessage ();
258234 tmsg->WriteObjectAny ((void *)&hits, TClass::GetClass (typeid (hits)));
259- auto free_tmessage = [](void * data, void * hint) { delete static_cast <TMessage*>(hint); };
260-
261- std::unique_ptr<FairMQMessage> message (channel.NewMessage (tmsg->Buffer (), tmsg->BufferSize (), free_tmessage, tmsg));
262- parts.AddPart (std::move (message));
235+ attachMessageBufferToParts (parts, channel, tmsg->Buffer (), tmsg->BufferSize (),
236+ [](void * data, void * hint) { delete static_cast <TMessage*>(hint); }, tmsg);
263237}
264238
239+ void * decodeTMessageCore (FairMQParts& dataparts, int index);
265240template <typename T>
266241T decodeTMessage (FairMQParts& dataparts, int index)
267242{
268- // sanity check
269- if (index >= dataparts.Size ()) {
270- return T (0 );
271- }
272- class TMessageWrapper : public TMessage
273- {
274- public:
275- TMessageWrapper (void * buf, Int_t len) : TMessage(buf, len) { ResetBit (kIsOwner ); }
276- ~TMessageWrapper () override = default ;
277- };
278- auto rawmessage = std::move (dataparts.At (index));
279- auto message = std::make_unique<TMessageWrapper>(rawmessage->GetData (), rawmessage->GetSize ());
280- return static_cast <T>(message.get ()->ReadObjectAny (message.get ()->GetClass ()));
243+ return static_cast <T>(decodeTMessageCore (dataparts, index));
281244}
282245
283- template <typename T>
284- void attachMetaMessage (T secret, FairMQChannel& channel, FairMQParts& parts)
285- {
286- std::unique_ptr<FairMQMessage> message (channel.NewSimpleMessage (secret));
287- parts.AddPart (std::move (message));
288- }
246+ void attachDetIDHeaderMessage (int id, FairMQChannel& channel, FairMQParts& parts);
289247
290248template <typename T>
291249TBranch* getOrMakeBranch (TTree& tree, const char * brname, T* ptr)
@@ -348,7 +306,7 @@ class DetImpl : public o2::Base::Detector
348306 return ;
349307 }
350308
351- attachMetaMessage (GetDetId (), channel, parts); // the DetId s are universal as they come from o2::detector::DetID
309+ attachDetIDHeaderMessage (GetDetId (), channel, parts); // the DetId s are universal as they come from o2::detector::DetID
352310
353311 while (auto hits = static_cast <Det*>(this )->Det ::getHits (probe++)) {
354312 if (!UseShm<Det>::value || !o2::utils::ShmManager::Instance ().isOperational ()) {
@@ -357,7 +315,7 @@ class DetImpl : public o2::Base::Detector
357315 // this is the shared mem variant
358316 // we will just send the sharedmem ID and the offset inside
359317 *mShmBusy [mCurrentBuffer ] = true ;
360- attachShmMessage (* hits, channel, parts, mShmBusy [mCurrentBuffer ]);
318+ attachShmMessage (( void *) hits, channel, parts, mShmBusy [mCurrentBuffer ]);
361319 }
362320 }
363321 }
0 commit comments