Skip to content

Commit 3880176

Browse files
committed
DPL: attempt at supporting cloned processors
This introduces a --clone <template>:<new-name>[, ...] option which allows duplicating and renaming a given dataprocessor <template>. There is still a few caveats: * It cannot have outputs. * The dpl-config.json needs to mention the <new-name> in order to configure it, options of <template> will not be used. * The workflow.json must include an entry for <new-name>.
1 parent 8aeb68e commit 3880176

File tree

3 files changed

+47
-0
lines changed

3 files changed

+47
-0
lines changed

Framework/Core/include/Framework/runDataProcessing.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ class ConfigContext;
102102
/// Helper used to customize a workflow pipelining options
103103
void overridePipeline(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
104104

105+
/// Helper used to customize a workflow via a template data processor
106+
void overrideCloning(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);
107+
105108
// This comes from the framework itself. This way we avoid code duplication.
106109
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
107110
std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
@@ -127,6 +130,7 @@ int main(int argc, char** argv)
127130
UserCustomizationsHelper::userDefinedCustomization(workflowOptions, 0);
128131
workflowOptions.push_back(ConfigParamSpec{"readers", VariantType::Int64, 1ll, {"number of parallel readers to use"}});
129132
workflowOptions.push_back(ConfigParamSpec{"pipeline", VariantType::String, "", {"override default pipeline size"}});
133+
workflowOptions.push_back(ConfigParamSpec{"clone", VariantType::String, "", {"clone processors from a template"}});
130134

131135
// options for AOD rate limiting
132136
workflowOptions.push_back(ConfigParamSpec{"aod-memory-rate-limit", VariantType::Int64, 0LL, {"Rate limit AOD processing based on memory"}});
@@ -173,6 +177,7 @@ int main(int argc, char** argv)
173177
ConfigContext configContext(workflowOptionsRegistry, argc, argv);
174178
o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
175179
overridePipeline(configContext, specs);
180+
overrideCloning(configContext, specs);
176181
for (auto& spec : specs) {
177182
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices, 0);
178183
}

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ void WorkflowHelpers::constructGraph(const WorkflowSpec& workflow,
511511
std::vector<LogicalForwardInfo>& forwardedInputsInfo)
512512
{
513513
assert(!workflow.empty());
514+
514515
// This is the state. Oif is the iterator I use for the searches.
515516
std::list<LogicalOutputInfo> availableOutputsInfo;
516517
auto const& constOutputs = outputs; // const version of the outputs

Framework/Core/src/runDataProcessing.cxx

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,6 +1383,47 @@ bool isOutputToPipe()
13831383
return ((s.st_mode & S_IFIFO) != 0);
13841384
}
13851385

1386+
void overrideCloning(ConfigContext& ctx, WorkflowSpec& workflow)
1387+
{
1388+
struct CloningSpec {
1389+
std::string templateMatcher;
1390+
std::string cloneName;
1391+
};
1392+
auto s = ctx.options().get<std::string>("clone");
1393+
std::vector<CloningSpec> specs;
1394+
std::string delimiter = ",";
1395+
1396+
size_t pos = 0;
1397+
while (s.empty() == false) {
1398+
auto newPos = s.find(delimiter);
1399+
auto token = s.substr(0, newPos);
1400+
auto split = token.find(":");
1401+
if (split == std::string::npos) {
1402+
throw std::runtime_error("bad clone definition. Syntax <template-processor>:<clone-name>");
1403+
}
1404+
auto key = token.substr(0, split);
1405+
token.erase(0, split + 1);
1406+
auto value = token;
1407+
specs.push_back({key, value});
1408+
s.erase(0, newPos + (newPos == std::string::npos ? 0 : 1));
1409+
}
1410+
if (s.empty() == false && specs.empty() == true) {
1411+
throw std::runtime_error("bad pipeline definition. Syntax <processor>:<pipeline>");
1412+
}
1413+
1414+
std::vector<DataProcessorSpec> extraSpecs;
1415+
for (auto& spec : specs) {
1416+
for (auto& processor : workflow) {
1417+
if (processor.name == spec.templateMatcher) {
1418+
auto clone = processor;
1419+
clone.name = spec.cloneName;
1420+
extraSpecs.push_back(clone);
1421+
}
1422+
}
1423+
}
1424+
workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
1425+
}
1426+
13861427
void overridePipeline(ConfigContext& ctx, WorkflowSpec& workflow)
13871428
{
13881429
struct PipelineSpec {

0 commit comments

Comments
 (0)