Skip to content

Commit 9fc5d4f

Browse files
committed
Adjustments in embedding pipeline
* treat each digitizer individually * estimate CPU for each task * make collision-context production an individual task * adjustments to DPL parameters (use large shared mem only when necessary)
1 parent 65c587a commit 9fc5d4f

File tree

1 file changed

+48
-26
lines changed

1 file changed

+48
-26
lines changed

MC/run/PWGHF/create_embedding_workflow.py

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,24 @@
4545
workflow['stages'] = []
4646

4747
taskcounter=0
48-
def createTask(name='', needs=[], tf=-1, cwd='./', lab=[]):
48+
def createTask(name='', needs=[], tf=-1, cwd='./', lab=[], cpu=0, mem=0):
4949
global taskcounter
5050
taskcounter = taskcounter + 1
51-
return { 'name': name, 'cmd':'', 'needs': needs, 'resources': { 'cpu': -1 , 'mem': -1 }, 'timeframe' : tf, 'labels' : lab, 'cwd' : cwd }
51+
return { 'name': name, 'cmd':'', 'needs': needs, 'resources': { 'cpu': cpu , 'mem': mem }, 'timeframe' : tf, 'labels' : lab, 'cwd' : cwd }
5252

53-
def getDPL_global_options():
53+
def getDPL_global_options(bigshm=False):
5454
if args.noIPC!=None:
55-
return "-b --run --no-IPC"
56-
return "-b --run --shm-segment-size ${SHMSIZE:-50000000000} --session " + str(taskcounter)
57-
55+
return "-b --run --rate 1 --no-IPC"
56+
if bigshm:
57+
return "-b --run --rate 1 --shm-segment-size ${SHMSIZE:-50000000000} --session " + str(taskcounter) + ' --driver-client-backend ws://'
58+
else:
59+
return "-b --run --rate 1 --session " + str(taskcounter) + ' --driver-client-backend ws://'
5860

5961
doembedding=True if args.embedding=='True' or args.embedding==True else False
6062

6163
if doembedding:
6264
# ---- background transport task -------
63-
BKGtask=createTask(name='bkgsim', lab=["GEANT"])
65+
BKGtask=createTask(name='bkgsim', lab=["GEANT"], cpu='8')
6466
BKGtask['cmd']='o2-sim -e ' + SIMENGINE + ' -j ' + str(NWORKERS) + ' -n ' + str(NBKGEVENTS) + ' -g pythia8hi ' + str(MODULES) + ' -o bkg --configFile ${O2DPG_ROOT}/MC/config/common/ini/basic.ini'
6567
workflow['stages'].append(BKGtask)
6668

@@ -101,7 +103,7 @@ def getDPL_global_options():
101103
embeddinto= "--embedIntoFile bkg_Kine.root" if doembedding else ""
102104
if doembedding:
103105
signalneeds = signalneeds + [ BKGtask['name'], LinkBKGtask['name'] ]
104-
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"])
106+
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf), lab=["GEANT"], cpu='5.')
105107
SGNtask['cmd']='o2-sim -e '+str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' -j ' + str(NWORKERS) + ' -g pythia8 '\
106108
+ ' -o ' + signalprefix + ' ' + embeddinto
107109
workflow['stages'].append(SGNtask)
@@ -122,40 +124,60 @@ def getDPL_global_options():
122124
CONTEXTFILE='collisioncontext.root'
123125

124126
simsoption=' --sims ' + ('bkg,'+signalprefix if doembedding else signalprefix)
125-
TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=[SGNtask['name'], LinkGRPFileTask['name']],
126-
tf=tf, cwd=timeframeworkdir, lab=["DIGI"])
127-
TPCDigitask['cmd'] = 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TPC --interactionRate 50000 --tpc-lanes ' + str(NWORKERS) + ' --outcontext ' + str(CONTEXTFILE)
127+
128+
ContextTask=createTask(name='digicontext_'+str(tf), needs=[SGNtask['name'], LinkGRPFileTask['name']], tf=tf,
129+
cwd=timeframeworkdir, lab=["DIGI"], cpu='8')
130+
ContextTask['cmd'] = 'o2-sim-digitizer-workflow --only-context --interactionRate 50000 ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption
131+
workflow['stages'].append(ContextTask)
132+
133+
TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=[ContextTask['name'], LinkGRPFileTask['name']],
134+
tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu='8', mem='16000')
135+
TPCDigitask['cmd'] = 'o2-sim-digitizer-workflow ' + getDPL_global_options(bigshm=True) + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TPC --interactionRate 50000 --tpc-lanes ' + str(NWORKERS) + ' --incontext ' + str(CONTEXTFILE)
128136
workflow['stages'].append(TPCDigitask)
129137

130-
# The TRD digi task has a dependency on TPC only because of the digitization context (and because they both use CPU efficiently)
131-
# TODO: activate only if TRD present
132-
TRDDigitask=createTask(name='trddigi_'+str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI"])
138+
TRDDigitask=createTask(name='trddigi_'+str(tf), needs=[ContextTask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu='8', mem='8000')
133139
TRDDigitask['cmd'] = 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TRD --interactionRate 50000 --configKeyValues \"TRDSimParams.digithreads=' + str(NWORKERS) + '\" --incontext ' + str(CONTEXTFILE)
134140
workflow['stages'].append(TRDDigitask)
135141

136-
RESTDigitask=createTask(name='restdigi_'+str(tf), needs=[TPCDigitask['name'], LinkGRPFileTask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI"])
137-
RESTDigitask['cmd'] = 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --skipDet TRD,TPC --interactionRate 50000 --incontext ' + str(CONTEXTFILE)
138-
workflow['stages'].append(RESTDigitask)
142+
# RESTDigitask=createTask(name='restdigi_'+str(tf), needs=[ContextTask['name'], LinkGRPFileTask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI"], cpu='medium', mem='8000')
143+
# RESTDigitask['cmd'] = 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --skipDet TRD,TPC --interactionRate 50000 --incontext ' + str(CONTEXTFILE)
144+
# workflow['stages'].append(RESTDigitask)
145+
146+
# we split the digitizers for improved load balancing --> the precise list needs to be made consistent with geometry and active sensors
147+
sensorlist = [ "ITS", "TOF", "FT0", "FV0", "FDD", "MCH", "MID", "MFT", "HMP", "EMC", "PHS", "CPV" ]
148+
# these are digitizers which are single threaded
149+
def createRestDigiTask(name):
150+
t = createTask(name=name, needs=[ContextTask['name']], tf=tf, cwd=timeframeworkdir, lab=["DIGI","SMALLDIGI"], cpu='1')
151+
t['cmd'] = 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet ' + str(det) + ' --interactionRate 50000 --incontext ' + str(CONTEXTFILE)
152+
workflow['stages'].append(t)
153+
return t
154+
155+
det_to_digitask={}
156+
157+
for det in sensorlist:
158+
name=str(det).lower() + "digi_"+str(tf)
159+
t=createRestDigiTask(name)
160+
det_to_digitask[det]=t
139161

140162
# -----------
141163
# reco
142164
# -----------
143165

144166
# TODO: check value for MaxTimeBin; A large value had to be set tmp in order to avoid crashes bases on "exceeding timeframe limit"
145-
TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
146-
TPCRECOtask['cmd'] = 'o2-tpc-reco-workflow ' + getDPL_global_options() + ' --tpc-digit-reader "--infile tpcdigits.root" --input-type digits --output-type clusters,tracks,send-clusters-per-sector --configKeyValues "GPU_global.continuousMaxTimeBin=100000;GPU_proc.ompThreads='+str(NWORKERS)+'"'
167+
TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='3', mem='16000')
168+
TPCRECOtask['cmd'] = 'o2-tpc-reco-workflow ' + getDPL_global_options(bigshm=True) + ' --tpc-digit-reader "--infile tpcdigits.root" --input-type digits --output-type clusters,tracks,send-clusters-per-sector --configKeyValues "GPU_global.continuousMaxTimeBin=100000;GPU_proc.ompThreads='+str(NWORKERS)+'"'
147169
workflow['stages'].append(TPCRECOtask)
148170

149-
ITSRECOtask=createTask(name='itsreco_'+str(tf), needs=[RESTDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
171+
ITSRECOtask=createTask(name='itsreco_'+str(tf), needs=[det_to_digitask["ITS"]['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='1', mem='2000')
150172
ITSRECOtask['cmd'] = 'o2-its-reco-workflow --trackerCA --tracking-mode async ' + getDPL_global_options()
151173
workflow['stages'].append(ITSRECOtask)
152174

153-
FT0RECOtask=createTask(name='ft0reco_'+str(tf), needs=[RESTDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
175+
FT0RECOtask=createTask(name='ft0reco_'+str(tf), needs=[det_to_digitask["FT0"]['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
154176
FT0RECOtask['cmd'] = 'o2-ft0-reco-workflow ' + getDPL_global_options()
155177
workflow['stages'].append(FT0RECOtask)
156178

157-
ITSTPCMATCHtask=createTask(name='itstpcMatch_'+str(tf), needs=[TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
158-
ITSTPCMATCHtask['cmd']= 'o2-tpcits-match-workflow ' + getDPL_global_options() + ' --tpc-track-reader \"tpctracks.root\" --tpc-native-cluster-reader \"--infile tpc-native-clusters.root\"'
179+
ITSTPCMATCHtask=createTask(name='itstpcMatch_'+str(tf), needs=[TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], mem='8000', cpu='3')
180+
ITSTPCMATCHtask['cmd']= 'o2-tpcits-match-workflow ' + getDPL_global_options(bigshm=True) + ' --tpc-track-reader \"tpctracks.root\" --tpc-native-cluster-reader \"--infile tpc-native-clusters.root\"'
159181
workflow['stages'].append(ITSTPCMATCHtask)
160182

161183
# this can be combined with TRD digitization if benefical
@@ -164,14 +186,14 @@ def getDPL_global_options():
164186
workflow['stages'].append(TRDTRAPtask)
165187

166188
TRDTRACKINGtask = createTask(name='trdreco_'+str(tf), needs=[TRDTRAPtask['name'], ITSTPCMATCHtask['name'], TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
167-
TRDTRACKINGtask['cmd'] = 'o2-trd-global-tracking'
189+
TRDTRACKINGtask['cmd'] = 'echo "would do TRD tracking"' # 'o2-trd-global-tracking'
168190
workflow['stages'].append(TRDTRACKINGtask)
169191

170-
TOFRECOtask = createTask(name='tofmatch_'+str(tf), needs=[ITSTPCMATCHtask['name'], RESTDigitask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
192+
TOFRECOtask = createTask(name='tofmatch_'+str(tf), needs=[ITSTPCMATCHtask['name'], det_to_digitask["TOF"]['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
171193
TOFRECOtask['cmd'] = 'o2-tof-reco-workflow ' + getDPL_global_options()
172194
workflow['stages'].append(TOFRECOtask)
173195

174-
PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=[ITSTPCMATCHtask['name'], FT0RECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"])
196+
PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=[ITSTPCMATCHtask['name'], FT0RECOtask['name']], tf=tf, cwd=timeframeworkdir, lab=["RECO"], cpu='4')
175197
PVFINDERtask['cmd'] = 'o2-primary-vertexing-workflow ' + getDPL_global_options()
176198
workflow['stages'].append(PVFINDERtask)
177199

0 commit comments

Comments
 (0)