20 """Auxiliary functions for batch execution of MIRTK commands using HTCondor.""" 26 from xml.etree
import ElementTree
30 def submit(name, command=None, args=[], opts={}, script=None, tasks=0, deps=[],
31 threads=0, memory=8 * 1024, retries=5, requirements=[], environment={},
32 logdir=None, log=None, workdir=None, verbose=1):
33 """Submit batch job to HTCondor.""" 35 raise NotImplementedError(
"Cannot submit individual HTCondor jobs with dependencies yet, this requires use of DAGMan")
38 log = os.path.abspath(log)
39 logdir = os.path.dirname(log)
41 logdir = os.path.abspath(logdir)
43 log = os.path.join(logdir, name +
"_$(Cluster).$(Process).log")
45 log = os.path.join(logdir, name +
"_$(Cluster).log")
47 jobdesc =
"universe = vanilla\n" 49 jobdesc +=
"request_cpus = {0}\n".format(threads)
51 jobdesc +=
"request_memory = {0}\n".format(memory)
53 jobdesc +=
"requirements = " +
" && ".join(requirements) +
"\n" 55 jobdesc +=
"environment = \"" 56 for envname, envval
in environment.items():
57 jobdesc +=
" {0}='{1}'".format(envname,
':'.join(envval)
if isinstance(envval, (list, tuple))
else envval)
60 jobdesc +=
"initialdir = {0}\n".format(os.path.abspath(workdir))
62 jobdesc +=
"on_exit_remove = (ExitBySignal == False && ExitCode != 6 && ExitCode != 247 && ExitCode != 241) || (ExitBySignal == True && ExitSignal != 9 && ExitSignal != 15)\n" 64 jobdesc +=
"max_retries = {0}\n".format(retries)
67 raise ValueError(
"Keyword arguments 'command' and 'script' are mutually exclusive")
69 raise ValueError(
"Script submission of batch to HTCondor requires log path for script file!")
70 script_path = os.path.join(logdir, name +
".py")
71 with open(script_path,
"wt")
as f:
72 f.write(script.format(**opts))
73 jobdesc +=
"executable = {0}\n".format(sys.executable)
74 jobdesc +=
"arguments = \"'{0}'".format(script_path)
76 jobdesc +=
" $(Process)" 79 jobdesc +=
"output = {0}\n".format(log)
80 jobdesc +=
"error = {0}\n".format(log)
83 jobdesc +=
" {}".format(tasks)
86 jobdesc +=
"executable = {0}\n".format(sys.executable)
87 jobdesc +=
"arguments = \"-c 'import sys; import socket;" 88 jobdesc +=
" sys.stdout.write(\"\"Host: \"\" + socket.gethostname() + \"\"\\n\\n\"\");" 89 jobdesc +=
" sys.path.insert(0, \"\"{0}\"\");".format(os.path.dirname(mirtk.__file__))
90 jobdesc +=
" import mirtk; mirtk.check_call([\"\"{0}\"\"] + sys.argv[1:])'".format(command
if command
else name)
102 if isinstance(arg, (list, tuple)):
103 arg =
' '.join([str(x)
for x
in arg])
109 jobdesc +=
"output = {0}\n".format(log)
110 jobdesc +=
"error = {0}\n".format(log)
112 proc = subprocess.Popen([
"condor_submit",
"-"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
113 (out, err) = proc.communicate(input=jobdesc.encode(
'utf-8'))
114 if proc.returncode != 0:
116 match = re.search(
'[0-9]+ job(s) submitted to cluster ([0-9]+)\.', out)
118 match = re.search(
'\*\* Proc ([0-9]+)(\.[0-9]+)?:', out)
120 raise Exception(
"Failed to determine job ID from condor_submit output:\n" + out)
121 jobid = int(match.group(1))
124 print(
" Submitted batch {} (JobId={}, Tasks={})".format(name, jobid, tasks))
126 print(
" Submitted job {} (JobId={})".format(name, jobid))
127 return jobid
if tasks == 0
else (jobid, tasks)
131 def _parse_condor_xml(s):
132 """Parse XML ClassAds returned by condor_q or condor_history with -xml option.""" 136 starttag =
"<classads>" 137 endtag =
"</classads>" 139 start = s.find(starttag, end)
142 end = s.find(endtag, start)
144 raise ValueError(
"Malformed <classads> XML, could not find matching </classads>!")
146 elem = ElementTree.fromstring(s[start:end])
150 classads.extend(elem)
155 def wait(jobs, max_time=0, max_error=5, interval=60, verbose=0):
156 if not isinstance(jobs, list):
160 if isinstance(job, tuple):
167 clusters.append((cluid, tasks))
168 num_wait = len(clusters)
172 while num_wait > 0
and (max_time <= 0
or total_time < max_time
or total_time == 0)
and num_error < max_error:
174 total_time += interval
181 for cluster, tasks
in clusters:
182 classads = subprocess.check_output([
"condor_q",
"-xml", str(cluster)], stderr=subprocess.STDOUT)
183 classads = _parse_condor_xml(classads)
184 for process
in range(tasks):
185 classad = classads.find(
".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
189 status = int(classad.find(
"a[@n='JobStatus']/i").text)
199 elif status == 2
or status == 6:
210 raise Exception(
"Unknown job status: {}".format(status))
211 num_wait = num_running + num_pending + num_held + num_suspended
212 if verbose > 0
and (num_wait <= 0
or iterations % verbose == 0):
213 sys.stdout.write(
"{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
214 sys.stdout.write(
" WAIT {p} pending, {r} running, {s} suspended, {h} held, {d} completed\n".format(
215 p=num_pending, r=num_running, s=num_suspended, h=num_held, d=num_done
219 except subprocess.CalledProcessError:
220 sys.stdout.write(
"{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
221 sys.stdout.write(
" WAIT Failed to retrieve job status, will retry {0} more times!\n".format(max_error - num_error))
225 if num_error >= max_error:
226 raise Exception(
"Exceeded maximum number of retries to query status of jobs!")
227 if num_wait > 0
and max_time > 0
and total_time >= max_time:
228 raise Exception(
"Exceeded maximum time waiting for jobs to complete!")
234 for cluster, tasks
in clusters:
236 unknown[cluster] = [0] * tasks
238 num_unknown = num_jobs
239 while num_unknown > 0
and num_error <= max_error:
242 for cluster, tasks
in clusters:
243 classads = _parse_condor_xml(subprocess.check_output([
"condor_history",
"-xml", str(cluster)]))
244 for process
in range(tasks):
245 classad = classads.find(
".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
247 unknown[cluster][process] += 1
249 unknown[cluster][process] = 0
250 status = int(classad.find(
"a[@n='JobStatus']/i").text)
251 exit_code = int(classad.find(
"a[@n='ExitCode']/i").text)
252 if status != 4
or exit_code != 0:
255 for cluster, tasks
in clusters:
256 for process
in range(tasks):
257 if unknown[cluster][process] > 0:
258 if unknown[cluster][process] > max_error:
259 raise Exception(
"Could not retrieve exit code of job {}.{} for the past {} attempts!".format(cluster, process, unknown[cluster][process]))
262 sys.stdout.write(
"{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
263 sys.stdout.write(
" DONE {0} succeeded, {1} failed".format(num_jobs - num_fail - num_unknown, num_fail))
265 sys.stdout.write(
", {} unknown, will retry".format(num_unknown))
266 sys.stdout.write(
"\n")
268 except subprocess.CalledProcessError:
269 sys.stdout.write(
"{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
270 sys.stdout.write(
" WAIT Failed to retrieve exit codes, will retry {0} more times!\n".format(max_error - num_error))
273 if num_error >= max_error:
274 raise Exception(
"Exceeded maximum number of retries to query exit codes of jobs!")