Skip to content

Commit 0c657c4

Browse files
shahor02noferini
authored andcommitted
Add reader-driver workflow to allow TF throttling with readers
The workflows driven by the input from the root files produced by detectors (e.g. by the o2-global-track-cluster-reader), can be preceded by the o2-reader-driver-workflow workflow to have TFs throttling via usual --timeframes-rate-limit <NTF> --timeframes-rate-limit-ipcid <ID> options. The o2-reader-driver-workflow as well as all reader workflows must be provided with the --hbfutils-config <tf_idinfo_file>,upstream option, with <tf_idinfo_file> being the file produced by the o2-tfidinfo-writer-workflow: usually o2_tfidinfo.root file. If this file is in the working directory, then the --hbfutils-config option can be shortened to upstream only. The o2-reader-driver-workflow is not supported for o2simdigitizerworkflow_configuration.ini version of --hbfutils-config, since it is used only for MC, where by construction there is only 1 TF, hence the throttling is meaningless. Option --max-tf <n> of the o2-reader-driver-workflow allows to inject only 1st <n> TFs by the dowsntream readers.
1 parent eb66a94 commit 0c657c4

File tree

7 files changed

+266
-17
lines changed

7 files changed

+266
-17
lines changed

Detectors/GlobalTrackingWorkflow/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ o2_add_library(GlobalTrackingWorkflow
3333
src/GlobalFwdTrackWriterSpec.cxx
3434
src/GlobalFwdMatchingAssessmentSpec.cxx
3535
src/MatchedMFTMCHWriterSpec.cxx
36+
src/ReaderDriverSpec.cxx
3637
PUBLIC_LINK_LIBRARIES O2::GlobalTracking
3738
O2::GlobalTrackingWorkflowReaders
3839
O2::GlobalTrackingWorkflowHelpers
@@ -48,6 +49,10 @@ o2_add_library(GlobalTrackingWorkflow
4849
O2::SimulationDataFormat
4950
O2::DetectorsVertexing
5051
O2::StrangenessTracking)
52+
o2_add_executable(driver-workflow
53+
COMPONENT_NAME reader
54+
SOURCES src/reader-driver-workflow.cxx
55+
PUBLIC_LINK_LIBRARIES O2::GlobalTrackingWorkflow )
5156

5257
o2_add_executable(match-workflow
5358
COMPONENT_NAME tpcits

Detectors/GlobalTrackingWorkflow/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,23 @@ o2-cosmics-match-workflow --shm-segment-size 10000000000 --run | tee cosmics.log
2828

2929
One can account contributions of a limited set of track sources (currently by default: ITS, TPC, ITS-TPC, TPC-TOF, ITS-TPC-TOF) by providing optiont `--track-sources`.
3030

31+
## Using TF throttling when reading root files from detectors processing (tracks, clusters etc.)
3132

33+
The workflows driven by the input from the root files produced by detectors (e.g. by the `o2-global-track-cluster-reader`), can be preceded by the
34+
`o2-reader-driver-workflow` which will allow to have TF throttling via usual `--timeframes-rate-limit <NTF>` and `--timeframes-rate-limit-ipcid <ID>`
35+
options.
36+
The `o2-reader-driver-workflow` as well as all reader workflows must be provided with the `--hbfutils-config <tf_idinfo_file>,upstream` option, with <tf_idinfo_file> being the file produced by the
37+
`o2-tfidinfo-writer-workflow` (usually o2_tfidinfo.root file). If this file is in the working directory, then the `--hbfutils-config` option can be shortened to `upstream` only.
38+
39+
The `o2-reader-driver-workflow` is not supported for `o2simdigitizerworkflow_configuration.ini` version of the `--hbfutils-config`, since it is used only for MC,
40+
where by construction there is only 1 TF, hence the throttling is meaningless.
41+
42+
Option `--max-tf <n>` of the `o2-reader-driver-workflow` allows to inject only 1st <n> TFs by the dowsntream readers.
43+
44+
A typical invocation of the throttled workflow is:
45+
46+
```
47+
GLOSET=" --shm-segment-size 24000000000 --timeframes-rate-limit 2 --timeframes-rate-limit-ipcid 0"
48+
HBFSET=" --hbfutils-config upstream,o2_tfidinfo.root "
49+
o2-reader-driver-workflow $GLOSET $HBFSET --max-tf 3 | o2-global-track-cluster-reader $GLOSET $HBFSET --disable-mc --track-types <...> --cluster-types <...> | [<other readers> $GLOSET $HBFSET ] | <consumer workflows > $GLOSET --run
50+
```
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
/// @file ReaderDriverSpec.h
13+
14+
#ifndef O2_READER_DRIVER_
15+
#define O2_READER_DRIVER_
16+
17+
#include "Framework/DataProcessorSpec.h"
18+
#include <string>
19+
20+
namespace o2
21+
{
22+
namespace globaltracking
23+
{
24+
25+
/// create a processor spec
26+
/// pushes an empty output to provide timing to downstream devices
27+
framework::DataProcessorSpec getReaderDriverSpec(const std::string& metricChannel = "", size_t minSHM = 0);
28+
29+
} // namespace globaltracking
30+
} // namespace o2
31+
32+
#endif
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
/// @file ReaderDriverSpec.cxx
13+
14+
#include <vector>
15+
#include <cassert>
16+
#include <fairmq/Device.h>
17+
#include "Framework/ControlService.h"
18+
#include "Framework/ConfigParamRegistry.h"
19+
#include "Framework/Task.h"
20+
#include "Framework/Logger.h"
21+
#include "Framework/RateLimiter.h"
22+
#include "Framework/RawDeviceService.h"
23+
#include "GlobalTrackingWorkflow/ReaderDriverSpec.h"
24+
#include "DetectorsRaw/HBFUtilsInitializer.h"
25+
#include "CommonUtils/StringUtils.h"
26+
#include "TFile.h"
27+
#include "TTree.h"
28+
29+
using namespace o2::framework;
30+
31+
namespace o2
32+
{
33+
namespace globaltracking
34+
{
35+
36+
class ReadeDriverSpec : public o2::framework::Task
37+
{
38+
public:
39+
ReadeDriverSpec(size_t minSHM) : mMinSHM(minSHM) {}
40+
~ReadeDriverSpec() override = default;
41+
void init(o2::framework::InitContext& ic) final;
42+
void run(o2::framework::ProcessingContext& pc) final;
43+
44+
protected:
45+
int mTFRateLimit = -999;
46+
int mNTF = 0;
47+
size_t mMinSHM = 0;
48+
};
49+
50+
void ReadeDriverSpec::init(InitContext& ic)
51+
{
52+
mNTF = ic.options().get<int>("max-tf");
53+
}
54+
55+
void ReadeDriverSpec::run(ProcessingContext& pc)
56+
{
57+
if (mTFRateLimit == -999) {
58+
mTFRateLimit = std::stoi(pc.services().get<RawDeviceService>().device()->fConfig->GetValue<std::string>("timeframes-rate-limit"));
59+
}
60+
static RateLimiter limiter;
61+
static int count = 0;
62+
if (!count) {
63+
if (o2::raw::HBFUtilsInitializer::NTFs < 0) {
64+
LOGP(fatal, "Number of TFs to process was not initizalized in the HBFUtilsInitializer");
65+
}
66+
mNTF = (mNTF > 0 && mNTF < o2::raw::HBFUtilsInitializer::NTFs) ? mNTF : o2::raw::HBFUtilsInitializer::NTFs;
67+
} else { // check only for count > 0
68+
limiter.check(pc, mTFRateLimit, mMinSHM);
69+
}
70+
std::vector<char> v{};
71+
pc.outputs().snapshot(Output{"GLO", "READER_DRIVER", 0}, v);
72+
if (++count >= mNTF) {
73+
pc.services().get<ControlService>().endOfStream();
74+
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
75+
}
76+
}
77+
78+
DataProcessorSpec getReaderDriverSpec(const std::string& metricChannel, size_t minSHM)
79+
{
80+
std::vector<OutputSpec> outputSpec;
81+
std::vector<ConfigParamSpec> options;
82+
options.emplace_back(ConfigParamSpec{"max-tf", o2::framework::VariantType::Int, -1, {"max TFs to process (<1 : no limits)"}});
83+
if (!metricChannel.empty()) {
84+
options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, metricChannel, {"Out-of-band channel config for TF throttling"}});
85+
}
86+
return DataProcessorSpec{
87+
o2::raw::HBFUtilsInitializer::ReaderDriverDevice,
88+
Inputs{},
89+
Outputs{{"GLO", "READER_DRIVER", 0, Lifetime::Timeframe}},
90+
AlgorithmSpec{adaptFromTask<ReadeDriverSpec>(minSHM)},
91+
options};
92+
}
93+
94+
} // namespace globaltracking
95+
} // namespace o2
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "CommonUtils/ConfigurableParam.h"
13+
#include "Framework/CompletionPolicy.h"
14+
#include "Framework/ConfigParamSpec.h"
15+
#include "Framework/ChannelSpecHelpers.h"
16+
#include "GlobalTrackingWorkflow/ReaderDriverSpec.h"
17+
#include "DetectorsRaw/HBFUtilsInitializer.h"
18+
#include "Framework/CallbacksPolicy.h"
19+
20+
using namespace o2::framework;
21+
22+
// ------------------------------------------------------------------
23+
void customize(std::vector<o2::framework::CallbacksPolicy>& policies)
24+
{
25+
o2::raw::HBFUtilsInitializer::addNewTimeSliceCallback(policies);
26+
}
27+
28+
// we need to add workflow options before including Framework/runDataProcessing
29+
void customize(std::vector<ConfigParamSpec>& workflowOptions)
30+
{
31+
// option allowing to set parameters
32+
std::vector<o2::framework::ConfigParamSpec> options{
33+
{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}},
34+
{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}},
35+
{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings ..."}}};
36+
o2::raw::HBFUtilsInitializer::addConfigOption(options, "o2_tfidinfo.root,upstream");
37+
std::swap(workflowOptions, options);
38+
}
39+
40+
// ------------------------------------------------------------------
41+
42+
#include "Framework/runDataProcessing.h"
43+
44+
WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
45+
{
46+
// Update the (declared) parameters if changed from the command line
47+
o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
48+
49+
int rateLimitingIPCID = std::stoi(configcontext.options().get<std::string>("timeframes-rate-limit-ipcid"));
50+
std::string chanFmt = configcontext.options().get<std::string>("metric-feedback-channel-format");
51+
std::string metricChannel{};
52+
if (rateLimitingIPCID > -1 && !chanFmt.empty()) {
53+
metricChannel = fmt::format(fmt::runtime(chanFmt), o2::framework::ChannelSpecHelpers::defaultIPCFolder(), rateLimitingIPCID);
54+
}
55+
size_t minSHM = std::stoul(configcontext.options().get<std::string>("timeframes-shm-limit"));
56+
57+
WorkflowSpec specs;
58+
specs.emplace_back(o2::globaltracking::getReaderDriverSpec(metricChannel, minSHM));
59+
60+
// configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
61+
o2::raw::HBFUtilsInitializer hbfIni(configcontext, specs);
62+
63+
return std::move(specs);
64+
}

Detectors/Raw/include/DetectorsRaw/HBFUtilsInitializer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ struct HBFUtilsInitializer {
5151
static constexpr char HBFConfOpt[] = "hbfutils-config";
5252
static constexpr char HBFTFInfoOpt[] = "tf-info-source";
5353
static constexpr char HBFUSrc[] = "hbfutils";
54+
static constexpr char ReaderDriverDevice[] = "reader-driver";
55+
static constexpr char UpstreamOpt[] = "upstream";
56+
57+
static int NTFs;
5458

5559
HBFUtilsInitializer(const o2::framework::ConfigContext& configcontext, o2::framework::WorkflowSpec& wf);
5660
static HBFOpt getOptType(const std::string& optString);

Detectors/Raw/src/HBFUtilsInitializer.cxx

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,45 +38,74 @@ namespace o2f = o2::framework;
3838
/// In case the configcontext has relevant option, the HBFUtils will be beforehand updated from the file indicated by this option.
3939
/// (only those fields of HBFUtils which were not modified before, e.g. by ConfigurableParam::updateFromString)
4040

41+
int HBFUtilsInitializer::NTFs = 0;
42+
4143
//_________________________________________________________
4244
HBFUtilsInitializer::HBFUtilsInitializer(const o2f::ConfigContext& configcontext, o2f::WorkflowSpec& wf)
4345
{
44-
auto updateHBFUtils = [&configcontext]() -> std::string {
46+
bool upstream = false; // timing info will be provided from upstream readers-driver, just subscribe to it
47+
48+
auto updateHBFUtils = [&configcontext, &upstream]() -> std::string {
4549
static bool done = false;
4650
static std::string confTFInfo{};
4751
if (!done) {
4852
bool helpasked = configcontext.helpOnCommandLine(); // if help is asked, don't take for granted that the ini file is there, don't produce an error if it is not!
4953
auto conf = configcontext.options().isSet(HBFConfOpt) ? configcontext.options().get<std::string>(HBFConfOpt) : "";
54+
HBFOpt opt = HBFOpt::NONE;
55+
upstream = false;
5056
if (!conf.empty()) {
51-
auto opt = getOptType(conf);
52-
if ((opt == HBFOpt::INI || opt == HBFOpt::JSON) && (!(helpasked && !o2::conf::ConfigurableParam::configFileExists(conf)))) {
53-
o2::conf::ConfigurableParam::updateFromFile(conf, "HBFUtils", true); // update only those values which were not touched yet (provenance == kCODE)
54-
const auto& hbfu = o2::raw::HBFUtils::Instance();
55-
hbfu.checkConsistency();
56-
confTFInfo = HBFUSrc;
57-
} else if (opt == HBFOpt::HBFUTILS) {
58-
const auto& hbfu = o2::raw::HBFUtils::Instance();
59-
hbfu.checkConsistency();
60-
confTFInfo = HBFUSrc;
61-
} else if (opt == HBFOpt::ROOT) {
62-
confTFInfo = conf;
57+
auto vopts = o2::utils::Str::tokenize(conf, ',');
58+
for (const auto& optStr : vopts) {
59+
if (optStr == UpstreamOpt) {
60+
upstream = true;
61+
continue;
62+
}
63+
if (!confTFInfo.empty()) {
64+
throw std::runtime_error(fmt::format("too many options in {} {}", HBFConfOpt, conf));
65+
}
66+
opt = getOptType(optStr);
67+
if ((opt == HBFOpt::INI || opt == HBFOpt::JSON) && (!(helpasked && !o2::conf::ConfigurableParam::configFileExists(confTFInfo)))) {
68+
o2::conf::ConfigurableParam::updateFromFile(optStr, "HBFUtils", true); // update only those values which were not touched yet (provenance == kCODE)
69+
const auto& hbfu = o2::raw::HBFUtils::Instance();
70+
hbfu.checkConsistency();
71+
confTFInfo = HBFUSrc;
72+
} else if (opt == HBFOpt::HBFUTILS) {
73+
const auto& hbfu = o2::raw::HBFUtils::Instance();
74+
hbfu.checkConsistency();
75+
confTFInfo = HBFUSrc;
76+
} else if (opt == HBFOpt::ROOT) {
77+
confTFInfo = optStr;
78+
}
6379
}
6480
}
6581
done = true;
82+
if (opt != HBFOpt::NONE) {
83+
if (upstream) {
84+
if (opt != HBFOpt::ROOT) {
85+
throw std::runtime_error(fmt::format("invalid option {}: upstream can be used only with root file providing TFIDInfo", conf));
86+
}
87+
}
88+
} else if (upstream) {
89+
confTFInfo = "o2_tfidinfo.root";
90+
LOGP(debug, "--hbfutils-config input type is not provided but upstream keyword is found: assume {}", confTFInfo);
91+
}
6692
}
6793
return confTFInfo;
6894
};
6995

7096
if (configcontext.options().hasOption("disable-root-input") && configcontext.options().get<bool>("disable-root-input")) {
7197
return; // we apply HBFUtilsInitializer only in case of root readers
7298
}
73-
7499
const auto& hbfu = o2::raw::HBFUtils::Instance();
75100
for (auto& spec : wf) {
76101
if (spec.inputs.empty()) {
77102
auto conf = updateHBFUtils();
78-
o2f::ConfigParamsHelper::addOptionIfMissing(spec.options, o2f::ConfigParamSpec{HBFTFInfoOpt, o2f::VariantType::String, conf, {"root file with per-TF info"}});
79-
o2f::ConfigParamsHelper::addOptionIfMissing(spec.options, o2f::ConfigParamSpec{DelayOpt, o2f::VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
103+
if (!upstream || spec.name == ReaderDriverDevice) {
104+
o2f::ConfigParamsHelper::addOptionIfMissing(spec.options, o2f::ConfigParamSpec{HBFTFInfoOpt, o2f::VariantType::String, conf, {"root file with per-TF info"}});
105+
o2f::ConfigParamsHelper::addOptionIfMissing(spec.options, o2f::ConfigParamSpec{DelayOpt, o2f::VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
106+
} else { // subsribe to upstream timing info from readers-driver
107+
spec.inputs.emplace_back(o2f::InputSpec{"driverInfo", "GLO", "READER_DRIVER", 0, o2f::Lifetime::Timeframe});
108+
}
80109
}
81110
}
82111
}
@@ -114,6 +143,7 @@ std::vector<o2::dataformats::TFIDInfo> HBFUtilsInitializer::readTFIDInfoVector(c
114143
throw std::runtime_error(fmt::format("Failed to read tfidinfo vector from {}", fname));
115144
}
116145
std::vector<o2::dataformats::TFIDInfo> v(*vptr);
146+
NTFs = v.size();
117147
return v;
118148
}
119149

@@ -174,5 +204,5 @@ void HBFUtilsInitializer::addNewTimeSliceCallback(std::vector<o2::framework::Cal
174204

175205
void HBFUtilsInitializer::addConfigOption(std::vector<o2f::ConfigParamSpec>& opts, const std::string& defOpt)
176206
{
177-
o2f::ConfigParamsHelper::addOptionIfMissing(opts, o2f::ConfigParamSpec{HBFConfOpt, o2f::VariantType::String, defOpt, {R"(ConfigurableParam ini file or "hbfutils" for HBFUtils, root file with per-TF info or "none")"}});
207+
o2f::ConfigParamsHelper::addOptionIfMissing(opts, o2f::ConfigParamSpec{HBFConfOpt, o2f::VariantType::String, defOpt, {R"(ConfigurableParam ini file or "hbfutils" for HBFUtils, root file with per-TF info (augmented with ,upstream if reader-driver is used) or "none")"}});
178208
}

0 commit comments

Comments
 (0)