Skip to content

Commit dc5317f

Browse files
wiechulashahor02
authored andcommitted
Parallel processing and mergin of pedestal calibration in DPL
o Possibility to run processing of sector selections o Possibility to run parallel processes o Add merger for CalDet objects, which can send data to ccdb-populator
1 parent 71b535b commit dc5317f

File tree

10 files changed

+280
-60
lines changed

10 files changed

+280
-60
lines changed

Detectors/TPC/base/src/TPCBaseLinkDef.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#pragma link C++ class o2::tpc::CalDet < short> + ;
2828
#pragma link C++ class o2::tpc::CalDet < bool> + ;
2929
#pragma link C++ class std::vector < o2::tpc::CalDet < float>> + ;
30+
#pragma link C++ class std::vector < o2::tpc::CalDet < float>*> + ;
3031
#pragma link C++ class std::unordered_map < std::string, o2::tpc::CalDet < float>> + ;
3132
#pragma link C++ class o2::tpc::CDBInterface;
3233
#pragma link C++ class o2::tpc::ContainerFactory;

Detectors/TPC/calibration/include/TPCCalibration/CalibPedestal.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ class CalibPedestal : public CalibRawBase
105105
/// \return noise calibration object
106106
const CalPad& getNoise() const { return mNoise; }
107107

108+
/// return all pad clibrations as vector
109+
const std::vector<const o2::tpc::CalDet<float>*> getCalDets() const { return std::vector<const o2::tpc::CalDet<float>*>{&mPedestal, &mNoise}; }
110+
108111
/// Get the statistics type
109112
StatisticsType getStatisticsType() const { return mStatisticsType; }
110113

Detectors/TPC/workflow/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ o2_add_library(TPCWorkflow
2626
src/CalibProcessingHelper.cxx
2727
src/ClusterSharingMapSpec.cxx
2828
src/FileReaderWorkflow.cxx
29+
src/CalDetMergerPublisherSpec.cxx
2930
TARGETVARNAME targetName
3031
PUBLIC_LINK_LIBRARIES O2::Framework O2::DataFormatsTPC
3132
O2::DPLUtils O2::TPCReconstruction

Detectors/TPC/workflow/README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,12 @@ bz= magnetic field
152152
### Pedestal calibration
153153
#### Options
154154
```bash
155-
--direct-file-dump write final calibration to local root file
156-
--max-events maximum number of events to process
157-
--no-calib-output don't send the calibration data via DPL (required in case the calibration write is not attached)
158-
--use-old-subspec Subspecification is built from RDH like
155+
--direct-file-dump write final calibration to local root file
156+
--max-events maximum number of events to process
157+
--no-write-ccdb don't send the calibration data via DPL (required in case the calibration write is not attached)
158+
--use-old-subspec use old subspec definition (CruId << 16) | ((LinkId + 1) << (CruEndPoint == 1 ? 8 : 0))
159+
--lanes arg (=1) number of parallel processes
160+
--sectors arg (=0-35) list of TPC sectors, comma separated ranges, e.g. 0-3,7,9-15
159161
```
160162
161163
#### Running with data distribution
@@ -183,7 +185,7 @@ o2-raw-file-reader-workflow --input-conf raw-reader.cfg --nocheck-hbf-per-tf --
183185
```
184186
185187
#### Send data to CCDB
186-
Remove the `--no-calib-output` option and add
188+
Remove the `--no-write-ccdb` option and add
187189
```bash
188190
| o2-calibration-ccdb-populator-workflow
189191
```
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
#ifndef O2_TPC_CalDetMergerPublisherSpec_H
12+
#define O2_TPC_CalDetMergerPublisherSpec_H
13+
14+
/// @file CalDetMergerPublisherSpec.h
15+
/// @brief TPC CalDet merger and CCDB publisher
16+
/// @author Jens Wiechula, Jens.Wiechula@ikf.uni-frankfurt.de
17+
18+
#include "Framework/DataProcessorSpec.h"
19+
20+
namespace o2
21+
{
22+
namespace tpc
23+
{
24+
25+
o2::framework::DataProcessorSpec getCalDetMergerPublisherSpec(bool skipCCDB);
26+
27+
} // namespace tpc
28+
} // namespace o2
29+
30+
#endif

Detectors/TPC/workflow/include/TPCWorkflow/CalibProcessingHelper.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class RawReaderCRU;
2828
namespace calib_processing_helper
2929
{
3030

31-
uint64_t processRawData(o2::framework::InputRecord& inputs, std::unique_ptr<RawReaderCRU>& reader, bool useOldSubspec = false);
31+
uint64_t processRawData(o2::framework::InputRecord& inputs, std::unique_ptr<RawReaderCRU>& reader, bool useOldSubspec = false, const std::vector<int>& sectors = {});
3232
} // namespace calib_processing_helper
3333
} // namespace tpc
3434
} // namespace o2

Detectors/TPC/workflow/include/TPCWorkflow/TPCCalibPedestalSpec.h

Lines changed: 28 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717
#include <vector>
1818
#include <string>
1919
#include <chrono>
20+
#include <fmt/format.h>
2021

2122
#include "Framework/Task.h"
2223
#include "Framework/ControlService.h"
2324
#include "Framework/Logger.h"
2425
#include "Framework/ConfigParamRegistry.h"
26+
#include "Framework/DataProcessorSpec.h"
2527

28+
#include "CommonUtils/MemFileHelper.h"
2629
#include "Headers/DataHeader.h"
27-
#include "CCDB/CcdbApi.h"
2830
#include "DetectorsCalibration/Utils.h"
2931

3032
#include "TPCBase/CDBInterface.h"
@@ -38,13 +40,13 @@ using clbUtils = o2::calibration::Utils;
3840

3941
namespace o2
4042
{
41-
namespace calibration
43+
namespace tpc
4244
{
4345

4446
class TPCCalibPedestalDevice : public o2::framework::Task
4547
{
4648
public:
47-
TPCCalibPedestalDevice(bool skipCalib) : mSkipCalib(skipCalib) {}
49+
TPCCalibPedestalDevice(int lane, const std::vector<int>& sectors) : mLane{lane}, mSectors(sectors) {}
4850

4951
void init(o2::framework::InitContext& ic) final
5052
{
@@ -74,7 +76,7 @@ class TPCCalibPedestalDevice : public o2::framework::Task
7476
}
7577

7678
auto& reader = mRawReader.getReaders()[0];
77-
calib_processing_helper::processRawData(pc.inputs(), reader, mUseOldSubspec);
79+
calib_processing_helper::processRawData(pc.inputs(), reader, mUseOldSubspec, mSectors);
7880

7981
mCalibPedestal.incrementNEvents();
8082
LOGP(info, "Number of processed events: {} ({})", mCalibPedestal.getNumberOfProcessedEvents(), mMaxEvents);
@@ -96,51 +98,37 @@ class TPCCalibPedestalDevice : public o2::framework::Task
9698
{
9799
LOGP(info, "endOfStream");
98100
dumpCalibData();
99-
if (!mSkipCalib) {
100-
sendOutput(ec.outputs());
101-
}
101+
sendOutput(ec.outputs());
102102
ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
103103
}
104104

105105
private:
106106
CalibPedestal mCalibPedestal;
107107
rawreader::RawReaderCRUManager mRawReader;
108-
uint32_t mMaxEvents{100};
109-
bool mReadyToQuit{false};
110-
bool mCalibDumped{false};
111-
bool mUseOldSubspec{false};
112-
bool mForceQuit{false};
113-
bool mDirectFileDump{false};
114-
bool mSkipCalib{false};
108+
uint32_t mMaxEvents{100}; ///< maximum number of events to process
109+
int mLane{0}; ///< lane number of processor
110+
std::vector<int> mSectors{}; ///< sectors to process in this instance
111+
bool mReadyToQuit{false}; ///< if processor is ready to quit
112+
bool mCalibDumped{false}; ///< if calibration object already dumped
113+
bool mUseOldSubspec{false}; ///< use the old subspec definition
114+
bool mForceQuit{false}; ///< for quit after processing finished
115+
bool mDirectFileDump{false}; ///< directly dump the calibration data to file
115116

116117
//____________________________________________________________________________
117118
void sendOutput(DataAllocator& output)
118119
{
119-
CDBStorage::MetaData_t md;
120-
121-
// perhaps should be changed to time of the run
122-
const auto now = std::chrono::system_clock::now();
123-
long timeStart = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
124-
long timeEnd = 99999999999999;
125120

126121
std::array<const CalDet<float>*, 2> data = {&mCalibPedestal.getPedestal(), &mCalibPedestal.getNoise()};
127122
std::array<CDBType, 2> dataType = {CDBType::CalPedestal, CDBType::CalNoise};
128123

129124
for (size_t i = 0; i < data.size(); ++i) {
130125
auto cal = data[i];
131-
o2::ccdb::CcdbObjectInfo w;
132-
auto image = o2::ccdb::CcdbApi::createObjectImage(cal, &w);
133-
134-
w.setPath(CDBTypeMap.at(dataType[i]));
135-
w.setStartValidityTimestamp(timeStart);
136-
w.setEndValidityTimestamp(timeEnd);
137-
138-
LOG(INFO) << "Sending object " << w.getPath() << "/" << w.getFileName() << " of size " << image->size()
139-
<< " bytes, valid for " << w.getStartValidityTimestamp() << " : " << w.getEndValidityTimestamp();
126+
auto image = o2::utils::MemFileHelper::createFileImage(cal, typeid(*cal), cal->getName(), "data");
127+
int type = int(dataType[i]);
140128

141129
header::DataHeader::SubSpecificationType subSpec{(header::DataHeader::SubSpecificationType)i};
142-
output.snapshot(Output{clbUtils::gDataOriginCLB, clbUtils::gDataDescriptionCLBPayload, subSpec}, *image.get());
143-
output.snapshot(Output{clbUtils::gDataOriginCLB, clbUtils::gDataDescriptionCLBInfo, subSpec}, w);
130+
output.snapshot(Output{clbUtils::gDataOriginCLB, "TPCCLBPART", subSpec}, *image.get());
131+
output.snapshot(Output{clbUtils::gDataOriginCLB, "TPCCLBPARTINFO", subSpec}, type);
144132
}
145133
}
146134

@@ -150,32 +138,24 @@ class TPCCalibPedestalDevice : public o2::framework::Task
150138
if (mDirectFileDump && !mCalibDumped) {
151139
LOGP(info, "Dumping output");
152140
mCalibPedestal.analyse();
153-
mCalibPedestal.dumpToFile("pedestals.root");
141+
mCalibPedestal.dumpToFile(fmt::format("pedestals_{:02}.root", mLane));
154142
mCalibDumped = true;
155143
}
156144
}
157145
};
158146

159-
} // namespace calibration
160-
161-
namespace framework
147+
DataProcessorSpec getTPCCalibPedestalSpec(const std::string inputSpec, int ilane = 0, std::vector<int> sectors = {})
162148
{
149+
std::vector<o2::framework::OutputSpec> outputs;
150+
outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCLB, "TPCCLBPART"});
151+
outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCLB, "TPCCLBPARTINFO"});
163152

164-
DataProcessorSpec getTPCCalibPedestalSpec(const std::string inputSpec, bool skipCalib)
165-
{
166-
using device = o2::calibration::TPCCalibPedestalDevice;
167-
168-
std::vector<OutputSpec> outputs;
169-
if (!skipCalib) {
170-
outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCLB, clbUtils::gDataDescriptionCLBPayload});
171-
outputs.emplace_back(ConcreteDataTypeMatcher{clbUtils::gDataOriginCLB, clbUtils::gDataDescriptionCLBInfo});
172-
}
173-
153+
const auto id = fmt::format("calib-tpc-pedestal-{:02}", ilane);
174154
return DataProcessorSpec{
175-
"calib-tpc-pedestal",
155+
id.data(),
176156
select(inputSpec.data()),
177157
outputs,
178-
AlgorithmSpec{adaptFromTask<device>(skipCalib)},
158+
AlgorithmSpec{adaptFromTask<TPCCalibPedestalDevice>(ilane, sectors)},
179159
Options{
180160
{"max-events", VariantType::Int, 100, {"maximum number of events to process"}},
181161
{"use-old-subspec", VariantType::Bool, false, {"use old subsecifiation definition"}},
@@ -185,7 +165,7 @@ DataProcessorSpec getTPCCalibPedestalSpec(const std::string inputSpec, bool skip
185165
}; // end DataProcessorSpec
186166
}
187167

188-
} // namespace framework
168+
} // namespace tpc
189169
} // namespace o2
190170

191171
#endif

0 commit comments

Comments
 (0)