Skip to content

Commit 79fd4f4

Browse files
committed
DPL: handle deprecation of FairMQDevice::ReceiveAsync API
1 parent 1b55223 commit 79fd4f4

File tree

1 file changed

+23
-1
lines changed

1 file changed

+23
-1
lines changed

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "Framework/InputRecord.h"
2020
#include "ScopedExit.h"
2121
#include <fairmq/FairMQParts.h>
22+
#include <fairmq/FairMQSocket.h>
2223
#include <options/FairMQProgOptions.h>
2324
#include <Monitoring/Monitoring.h>
2425
#include <TMessage.h>
@@ -38,6 +39,27 @@ namespace o2
3839
namespace framework
3940
{
4041

42+
/// Handle the fact that FairMQ deprecated ReceiveAsync and changed the behavior of receive.
43+
namespace
44+
{
45+
struct FairMQDeviceLegacyWrapper {
46+
// If both APIs are available, this has precendence because of dummy.
47+
// Only the old API before the deprecation had FairMQSocket::TrySend().
48+
template <typename T>
49+
static auto ReceiveAsync(T* device, FairMQParts& parts, std::string const& channel, int dummy) -> typename std::enable_if<(sizeof(decltype(std::declval<FairMQSocket>().TrySend(parts.At(0)))) > 0), int>::type
50+
{
51+
return device->ReceiveAsync(parts, channel);
52+
}
53+
54+
// Otherwise if we are here it means that TrySend() is not there anymore
55+
template <typename T>
56+
static auto ReceiveAsync(T* device, FairMQParts& parts, std::string const& channel, long dummy) -> int
57+
{
58+
return device->Receive(parts, channel, 0, 0);
59+
}
60+
};
61+
}
62+
4163
DataProcessingDevice::DataProcessingDevice(DeviceSpec const& spec, ServiceRegistry& registry)
4264
: mSpec{ spec },
4365
mInit{ spec.algorithm.onInit },
@@ -103,7 +125,7 @@ bool DataProcessingDevice::ConditionalRun()
103125
bool active = false;
104126
for (auto& channel : mSpec.inputChannels) {
105127
FairMQParts parts;
106-
auto result = this->ReceiveAsync(parts, channel.name);
128+
auto result = FairMQDeviceLegacyWrapper::ReceiveAsync(this, parts, channel.name, 0);
107129
if (result > 0) {
108130
this->handleData(parts);
109131
active |= this->tryDispatchComputation();

0 commit comments

Comments
 (0)