Skip to content

Commit 53abd66

Browse files
committed
DPL: allow parallel WS backed drivers
1 parent 0beed6b commit 53abd66

File tree

5 files changed

+21
-4
lines changed

5 files changed

+21
-4
lines changed

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -961,7 +961,7 @@ void split(const std::string& str, Container& cont)
961961
}
962962
} // namespace
963963

964-
void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
964+
void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, unsigned short driverPort,
965965
std::vector<DataProcessorInfo> const& processorInfos,
966966
std::vector<DeviceSpec> const& deviceSpecs,
967967
std::vector<DeviceExecution>& deviceExecutions,
@@ -1037,6 +1037,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
10371037
// has option --session been specified on the command line?
10381038
bool haveSessionArg = false;
10391039
using FilterFunctionT = std::function<void(decltype(argc), decltype(argv), decltype(od))>;
1040+
bool useDefaultWS = false;
10401041

10411042
// the filter function will forward command line arguments based on the option
10421043
// definition passed to it. All options of the program option definition will be forwarded
@@ -1109,6 +1110,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
11091110
}
11101111

11111112
haveSessionArg = haveSessionArg || varmap.count("session") != 0;
1113+
useDefaultWS = useDefaultWS || (varmap.count("driver-client-backend") != 0) && varmap["driver-client-backend"].as<std::string>() == "ws://";
11121114

11131115
for (const auto varit : varmap) {
11141116
// find the option belonging to key, add if the option has been parsed
@@ -1160,6 +1162,16 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
11601162
tmpArgs.emplace_back(std::string("--session"));
11611163
tmpArgs.emplace_back("dpl_" + uniqueWorkflowId);
11621164
}
1165+
// In case we use only ws://, we need to expand the address
1166+
// with the correct port.
1167+
if (useDefaultWS) {
1168+
auto it = std::find(tmpArgs.begin(), tmpArgs.end(), "--driver-client-backend");
1169+
if ((it != tmpArgs.end()) && (it + 1 != tmpArgs.end())) {
1170+
tmpArgs.erase(it, it + 2);
1171+
}
1172+
tmpArgs.emplace_back(std::string("--driver-client-backend"));
1173+
tmpArgs.emplace_back("ws://0.0.0.0:" + std::to_string(driverPort));
1174+
}
11631175

11641176
if (spec.resourceMonitoringInterval > 0) {
11651177
tmpArgs.emplace_back(std::string("--resources-monitoring"));

Framework/Core/src/DeviceSpecHelpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ struct DeviceSpecHelpers {
101101
static void prepareArguments(
102102
bool defaultQuiet,
103103
bool defaultStopped,
104+
unsigned short driverPort,
104105
std::vector<DataProcessorInfo> const& processorInfos,
105106
std::vector<DeviceSpec> const& deviceSpecs,
106107
std::vector<DeviceExecution>& deviceExecutions,

Framework/Core/src/runDataProcessing.cxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// In applying this license CERN does not waive the privileges and immunities
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
10+
#include <stdexcept>
1011
#include "Framework/BoostOptionsRetriever.h"
1112
#include "Framework/ChannelConfigurationPolicy.h"
1213
#include "Framework/ChannelMatching.h"
@@ -1159,6 +1160,9 @@ int runStateMachine(DataProcessorSpecs const& workflow,
11591160
if (result != 0) {
11601161
driverInfo.port++;
11611162
}
1163+
if (driverInfo.port > 64000) {
1164+
throw runtime_error_f("Unable to find a free port for the driver. Last attempt returned %d", result);
1165+
}
11621166
} while (result != 0);
11631167
}
11641168

@@ -1398,6 +1402,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
13981402
DeviceSpecHelpers::reworkShmSegmentSize(dataProcessorInfos);
13991403
DeviceSpecHelpers::prepareArguments(driverControl.defaultQuiet,
14001404
driverControl.defaultStopped,
1405+
driverInfo.port,
14011406
dataProcessorInfos,
14021407
deviceSpecs,
14031408
deviceExecutions, controls,

Framework/Core/test/test_DeviceSpecHelpers.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,13 @@ void check(const std::vector<std::string>& arguments,
7575
workflowOptions,
7676
});
7777
}
78-
DeviceSpecHelpers::prepareArguments(true, true,
78+
DeviceSpecHelpers::prepareArguments(true, true, 8080,
7979
dataProcessorInfos,
8080
deviceSpecs,
8181
deviceExecutions,
8282
deviceControls,
8383
"workflow-id");
8484

85-
8685
for (size_t index = 0; index < deviceSpecs.size(); index++) {
8786
const auto& deviceSpec = deviceSpecs[index];
8887
const auto& deviceExecution = deviceExecutions[index];

Framework/Core/test/test_FrameworkDataFlowToDDS.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ BOOST_AUTO_TEST_CASE(TestDDS)
9292
{"C", "foo", {}, workflowOptions},
9393
{"D", "foo", {}, workflowOptions},
9494
}};
95-
DeviceSpecHelpers::prepareArguments(false, false,
95+
DeviceSpecHelpers::prepareArguments(false, false, 8080,
9696
dataProcessorInfos,
9797
devices, executions, controls,
9898
"workflow-id");

0 commit comments

Comments
 (0)