Skip to content

Commit 8f20905

Browse files
committed
DPL: allow custom orbit when using timer / enumeration
The firstTForbit field is now given by the formula: firstTForbit = orbit-offset-enumeration + orbit-multiplier-enumeration + dh.tfCounter for the enumeration and timer cases. One can use, e.g.: --orbit-offset-enumeration and --orbit-multiplier-enumeration on the command line to affect the two parameters.
1 parent 8533ede commit 8f20905

12 files changed

+83
-31
lines changed

Framework/Core/include/Framework/ConfigParamsHelper.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ struct ConfigParamsHelper {
3636
options_description& options,
3737
boost::program_options::options_description vetos = options_description());
3838

39+
/// Add the ConfigParamSpec @a spec to @a specs if there is no parameter with
40+
/// the same name already.
41+
static void addOptionIfMissing(std::vector<ConfigParamSpec>& specs, ConfigParamSpec spec);
42+
3943
/// populate boost program options for a complete workflow
4044
template <typename ContainerType>
4145
static boost::program_options::options_description

Framework/Core/include/Framework/DataDescriptorMatcher.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ struct ContextRef {
4141
inline bool operator==(ContextRef const& other) const;
4242
};
4343

44+
/// Special positions for variables in context.
45+
enum ContextPos {
46+
STARTTIME_POS = 0, /// The DataProcessingHeader::startTime associated to the timeslice
47+
TFCOUNTER_POS = 14, /// The DataHeader::tfCounter associated to the timeslice
48+
FIRSTTFORBIT_POS = 15 /// The DataHeader::firstTFOrbit associated to the timeslice
49+
};
50+
4451
/// An element of the matching context. Context itself is really a vector of
4552
/// those. It's up to the matcher builder to build the vector in a suitable way.
4653
/// We do not have any float in the value, because AFAICT there is no need for

Framework/Core/include/Framework/ExpirationHandler.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,16 @@
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
1010

11-
#ifndef FRAMEWORK_EXPIRATIONHANDLER_H
12-
#define FRAMEWORK_EXPIRATIONHANDLER_H
11+
#ifndef O2_FRAMEWORK_EXPIRATIONHANDLER_H_
12+
#define O2_FRAMEWORK_EXPIRATIONHANDLER_H_
1313

1414
#include "Framework/Lifetime.h"
1515
#include "Framework/RoutingIndices.h"
16+
#include "Framework/DataDescriptorMatcher.h"
1617
#include <cstdint>
1718
#include <functional>
1819

19-
namespace o2
20-
{
21-
namespace framework
20+
namespace o2::framework
2221
{
2322

2423
struct PartRef;
@@ -29,7 +28,7 @@ struct TimesliceSlot;
2928
struct ExpirationHandler {
3029
using Creator = std::function<TimesliceSlot(TimesliceIndex&)>;
3130
using Checker = std::function<bool(uint64_t timestamp)>;
32-
using Handler = std::function<void(ServiceRegistry&, PartRef& expiredInput, uint64_t timestamp)>;
31+
using Handler = std::function<void(ServiceRegistry&, PartRef& expiredInput, uint64_t timestamp, data_matcher::VariableContext& variables)>;
3332

3433
RouteIndex routeIndex;
3534
Lifetime lifetime;
@@ -38,7 +37,6 @@ struct ExpirationHandler {
3837
Handler handler;
3938
};
4039

41-
} // namespace framework
42-
} // namespace o2
40+
} // namespace o2::framework
4341

44-
#endif // FRAMEWORK_EXPIRATIONHANDLER_H
42+
#endif // O2_FRAMEWORK_EXPIRATIONHANDLER_H_

Framework/Core/include/Framework/LifetimeHelpers.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ struct LifetimeHelpers {
7676
/// dataOrigin, dataDescrition and dataSpecification of the given @a route.
7777
/// The payload of each message will contain an incremental number for each
7878
/// message being created.
79-
static ExpirationHandler::Handler enumerate(ConcreteDataMatcher const& spec, std::string const& sourceChannel);
79+
static ExpirationHandler::Handler enumerate(ConcreteDataMatcher const& spec, std::string const& sourceChannel,
80+
int64_t orbitOffset, int64_t orbitMultiplier);
8081

8182
/// Create a dummy (empty) message every time a record expires, suing @a spec
8283
/// as content of the payload.

Framework/Core/include/Framework/TimesliceIndex.inc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ inline uint32_t TimesliceIndex::getFirstTFOrbitForSlot(TimesliceSlot slot) const
122122
{
123123
assert(mVariables.size() > slot.index);
124124
// firstTForbit is always at register 15
125-
auto pval = std::get_if<uint32_t>(&mVariables[slot.index].get(15));
125+
auto pval = std::get_if<uint32_t>(&mVariables[slot.index].get(data_matcher::FIRSTTFORBIT_POS));
126126
if (pval == nullptr) {
127127
return -1;
128128
}
@@ -132,7 +132,7 @@ inline uint32_t TimesliceIndex::getFirstTFOrbitForSlot(TimesliceSlot slot) const
132132
inline uint32_t TimesliceIndex::getFirstTFCounterForSlot(TimesliceSlot slot) const
133133
{
134134
assert(mVariables.size() > slot.index);
135-
// firstTForbit is always at register 15
135+
// tfCounter is always at register 14
136136
auto pval = std::get_if<uint32_t>(&mVariables[slot.index].get(14));
137137
if (pval == nullptr) {
138138
return -1;

Framework/Core/src/ConfigParamsHelper.cxx

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@ void ConfigParamsHelper::populateBoostProgramOptions(
8989
}
9090
}
9191

92+
void ConfigParamsHelper::addOptionIfMissing(std::vector<ConfigParamSpec>& specs, ConfigParamSpec spec)
93+
{
94+
for (auto& old : specs) {
95+
if (old.name == spec.name) {
96+
return;
97+
}
98+
}
99+
specs.push_back(spec);
100+
}
101+
92102
/// populate boost program options making all options of type string
93103
/// this is used for filtering the command line argument
94104
bool ConfigParamsHelper::dpl2BoostOptions(const std::vector<ConfigParamSpec>& spec,

Framework/Core/src/DataDescriptorMatcher.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,9 @@ bool StartTimeValueMatcher::match(header::DataHeader const& dh, DataProcessingHe
106106
}
107107
context.put({ref->index, dph.startTime / mScale});
108108
// We always put in 14 the tfCounter
109-
context.put({14, dh.tfCounter});
109+
context.put({TFCOUNTER_POS, dh.tfCounter});
110110
// We always put in 15 the firstTForbit
111-
context.put({15, dh.firstTForbit});
111+
context.put({FIRSTTFORBIT_POS, dh.firstTForbit});
112112
return true;
113113
} else if (auto v = std::get_if<uint64_t>(&mValue)) {
114114
return (dph.startTime / mScale) == *v;

Framework/Core/src/DataRelayer.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
108108
}
109109
assert(mDistinctRoutesIndex.empty() == false);
110110
auto timestamp = mTimesliceIndex.getTimesliceForSlot(slot);
111+
auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
111112
// We iterate on all the hanlders checking if they need to be expired.
112113
for (size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
113114
auto& expirator = expirationHandlers[ei];
@@ -137,7 +138,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
137138
if (part.size() == 0) {
138139
part.parts.resize(1);
139140
}
140-
expirator.handler(services, part[0], timestamp.value);
141+
expirator.handler(services, part[0], timestamp.value, variables);
141142
activity.expiredSlots++;
142143

143144
mTimesliceIndex.markAsDirty(slot, true);

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,10 @@ struct ExpirationHandlerHelpers {
177177
throw runtime_error("InputSpec for Timers must be fully qualified");
178178
}
179179
// We copy the matcher to avoid lifetime issues.
180-
return [matcher = *m, sourceChannel](DeviceState&, ConfigParamRegistry const&) { return LifetimeHelpers::enumerate(matcher, sourceChannel); };
180+
return [matcher = *m, sourceChannel](DeviceState&, ConfigParamRegistry const& config) {
181+
// Timers do not have any orbit associated to them
182+
return LifetimeHelpers::enumerate(matcher, sourceChannel, 0, 0);
183+
};
181184
}
182185

183186
static RouteConfigurator::ExpirationConfigurator expiringEnumerationConfigurator(InputSpec const& spec, std::string const& sourceChannel)
@@ -187,8 +190,10 @@ struct ExpirationHandlerHelpers {
187190
throw runtime_error("InputSpec for Enumeration must be fully qualified");
188191
}
189192
// We copy the matcher to avoid lifetime issues.
190-
return [matcher = *m, sourceChannel](DeviceState&, ConfigParamRegistry const&) {
191-
return LifetimeHelpers::enumerate(matcher, sourceChannel);
193+
return [matcher = *m, sourceChannel](DeviceState&, ConfigParamRegistry const& config) {
194+
size_t orbitOffset = config.get<int64_t>("orbit-offset-enumeration");
195+
size_t orbitMultiplier = config.get<int64_t>("orbit-multiplier-enumeration");
196+
return LifetimeHelpers::enumerate(matcher, sourceChannel, orbitOffset, orbitMultiplier);
192197
};
193198
}
194199

@@ -222,17 +227,17 @@ struct ExpirationHandlerHelpers {
222227
{
223228
try {
224229
ConcreteDataMatcher concrete = DataSpecUtils::asConcreteDataMatcher(spec);
225-
return [concrete, sourceChannel](DeviceState&, ConfigParamRegistry const&) {
230+
return [concrete, sourceChannel](DeviceState&, ConfigParamRegistry const& config) {
226231
return LifetimeHelpers::dummy(concrete, sourceChannel);
227232
};
228233
} catch (...) {
229234
ConcreteDataTypeMatcher dataType = DataSpecUtils::asConcreteDataTypeMatcher(spec);
230235
ConcreteDataMatcher concrete{dataType.origin, dataType.description, 0xdeadbeef};
231-
return [concrete, sourceChannel](DeviceState&, ConfigParamRegistry const&) {
236+
return [concrete, sourceChannel](DeviceState&, ConfigParamRegistry const& config) {
232237
return LifetimeHelpers::dummy(concrete, sourceChannel);
233238
};
239+
// We copy the matcher to avoid lifetime issues.
234240
}
235-
// We copy the matcher to avoid lifetime issues.
236241
}
237242
};
238243

Framework/Core/src/LifetimeHelpers.cxx

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "Framework/DataProcessingHeader.h"
1212
#include "Framework/InputSpec.h"
1313
#include "Framework/LifetimeHelpers.h"
14+
#include "Framework/DataDescriptorMatcher.h"
1415
#include "Framework/Logger.h"
1516
#include "Framework/RawDeviceService.h"
1617
#include "Framework/ServiceRegistry.h"
@@ -143,7 +144,7 @@ ExpirationHandler::Checker LifetimeHelpers::expireTimed(std::chrono::microsecond
143144
/// expires via this mechanism).
144145
ExpirationHandler::Handler LifetimeHelpers::doNothing()
145146
{
146-
return [](ServiceRegistry&, PartRef& ref, uint64_t) -> void { return; };
147+
return [](ServiceRegistry&, PartRef& ref, uint64_t, data_matcher::VariableContext&) -> void { return; };
147148
}
148149

149150
// We simply put everything in a stringstream and read it afterwards.
@@ -188,7 +189,7 @@ ExpirationHandler::Handler
188189
if (matcher == nullptr) {
189190
throw runtime_error("InputSpec for Conditions must be fully qualified");
190191
}
191-
return [spec, matcher, sourceChannel, serverUrl = prefix, overrideTimestampMilliseconds](ServiceRegistry& services, PartRef& ref, uint64_t timestamp) -> void {
192+
return [spec, matcher, sourceChannel, serverUrl = prefix, overrideTimestampMilliseconds](ServiceRegistry& services, PartRef& ref, uint64_t timestamp, data_matcher::VariableContext&) -> void {
192193
// We should invoke the handler only once.
193194
assert(!ref.header);
194195
assert(!ref.payload);
@@ -265,7 +266,7 @@ ExpirationHandler::Handler
265266
/// FIXME: provide a way to customise the histogram from the configuration.
266267
ExpirationHandler::Handler LifetimeHelpers::fetchFromQARegistry()
267268
{
268-
return [](ServiceRegistry&, PartRef& ref, uint64_t) -> void {
269+
return [](ServiceRegistry&, PartRef& ref, uint64_t, data_matcher::VariableContext&) -> void {
269270
throw runtime_error("fetchFromQARegistry: Not yet implemented");
270271
return;
271272
};
@@ -276,18 +277,19 @@ ExpirationHandler::Handler LifetimeHelpers::fetchFromQARegistry()
276277
/// FIXME: provide a way to customise the histogram from the configuration.
277278
ExpirationHandler::Handler LifetimeHelpers::fetchFromObjectRegistry()
278279
{
279-
return [](ServiceRegistry&, PartRef& ref, uint64_t) -> void {
280+
return [](ServiceRegistry&, PartRef& ref, uint64_t, data_matcher::VariableContext&) -> void {
280281
throw runtime_error("fetchFromObjectRegistry: Not yet implemented");
281282
return;
282283
};
283284
}
284285

285286
/// Enumerate entries on every invokation.
286-
ExpirationHandler::Handler LifetimeHelpers::enumerate(ConcreteDataMatcher const& matcher, std::string const& sourceChannel)
287+
ExpirationHandler::Handler LifetimeHelpers::enumerate(ConcreteDataMatcher const& matcher, std::string const& sourceChannel,
288+
int64_t orbitOffset, int64_t orbitMultiplier)
287289
{
288290
using counter_t = int64_t;
289291
auto counter = std::make_shared<counter_t>(0);
290-
auto f = [matcher, counter, sourceChannel](ServiceRegistry& services, PartRef& ref, uint64_t timestamp) -> void {
292+
return [matcher, counter, sourceChannel, orbitOffset, orbitMultiplier](ServiceRegistry& services, PartRef& ref, uint64_t timestamp, data_matcher::VariableContext& variables) -> void {
291293
// We should invoke the handler only once.
292294
assert(!ref.header);
293295
assert(!ref.payload);
@@ -299,6 +301,10 @@ ExpirationHandler::Handler LifetimeHelpers::enumerate(ConcreteDataMatcher const&
299301
dh.subSpecification = matcher.subSpec;
300302
dh.payloadSize = sizeof(counter_t);
301303
dh.payloadSerializationMethod = gSerializationMethodNone;
304+
dh.tfCounter = timestamp;
305+
dh.firstTForbit = timestamp * orbitMultiplier + orbitOffset;
306+
variables.put({data_matcher::FIRSTTFORBIT_POS, dh.firstTForbit});
307+
variables.put({data_matcher::TFCOUNTER_POS, dh.tfCounter});
302308

303309
DataProcessingHeader dph{timestamp, 1};
304310

@@ -312,15 +318,14 @@ ExpirationHandler::Handler LifetimeHelpers::enumerate(ConcreteDataMatcher const&
312318
ref.payload = std::move(payload);
313319
(*counter)++;
314320
};
315-
return f;
316321
}
317322

318323
/// Create a dummy message with the provided ConcreteDataMatcher
319324
ExpirationHandler::Handler LifetimeHelpers::dummy(ConcreteDataMatcher const& matcher, std::string const& sourceChannel)
320325
{
321326
using counter_t = int64_t;
322327
auto counter = std::make_shared<counter_t>(0);
323-
auto f = [matcher, counter, sourceChannel](ServiceRegistry& services, PartRef& ref, uint64_t timestamp) -> void {
328+
auto f = [matcher, counter, sourceChannel](ServiceRegistry& services, PartRef& ref, uint64_t timestamp, data_matcher::VariableContext& variables) -> void {
324329
// We should invoke the handler only once.
325330
assert(!ref.header);
326331
assert(!ref.payload);
@@ -332,7 +337,23 @@ ExpirationHandler::Handler LifetimeHelpers::dummy(ConcreteDataMatcher const& mat
332337
dh.subSpecification = matcher.subSpec;
333338
dh.payloadSize = 0;
334339
dh.payloadSerializationMethod = gSerializationMethodNone;
335-
dh.tfCounter = timestamp;
340+
341+
{
342+
auto pval = std::get_if<uint32_t>(&variables.get(data_matcher::FIRSTTFORBIT_POS));
343+
if (pval == nullptr) {
344+
dh.firstTForbit = -1;
345+
} else {
346+
dh.firstTForbit = *pval;
347+
}
348+
}
349+
{
350+
auto pval = std::get_if<uint32_t>(&variables.get(data_matcher::TFCOUNTER_POS));
351+
if (pval == nullptr) {
352+
dh.tfCounter = timestamp;
353+
} else {
354+
dh.tfCounter = *pval;
355+
}
356+
}
336357

337358
DataProcessingHeader dph{timestamp, 1};
338359

0 commit comments

Comments
 (0)