1 ##############################################################################
2 # Medical Image Registration ToolKit (MIRTK)
3 #
4 # Copyright 2017 Imperial College London
5 # Copyright 2017 Andreas Schuh
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 ##############################################################################
20 """Auxiliary functions for batch execution of MIRTK commands using HTCondor."""
22 import re
23 import os
24 import sys
25 import subprocess
26 from xml.etree import ElementTree
29 # ----------------------------------------------------------------------------
30 def submit(name, command=None, args=[], opts={}, script=None, tasks=0, deps=[],
31  threads=0, memory=8 * 1024, retries=5, requirements=[], environment={},
32  logdir=None, log=None, workdir=None, verbose=1):
33  """Submit batch job to HTCondor."""
34  if deps:
35  raise NotImplementedError("Cannot submit individual HTCondor jobs with dependencies yet, this requires use of DAGMan")
36  if logdir or log:
37  if not logdir:
38  log = os.path.abspath(log)
39  logdir = os.path.dirname(log)
40  elif not log:
41  logdir = os.path.abspath(logdir)
42  if tasks > 0:
43  log = os.path.join(logdir, name + "_$(Cluster).$(Process).log")
44  else:
45  log = os.path.join(logdir, name + "_$(Cluster).log")
46  makedirs(logdir)
47  jobdesc = "universe = vanilla\n"
48  if threads > 0:
49  jobdesc += "request_cpus = {0}\n".format(threads)
50  if memory > 0:
51  jobdesc += "request_memory = {0}\n".format(memory)
52  if requirements:
53  jobdesc += "requirements = " + " && ".join(requirements) + "\n"
54  if environment:
55  jobdesc += "environment = \""
56  for envname, envval in environment.items():
57  jobdesc += " {0}='{1}'".format(envname, ':'.join(envval) if isinstance(envval, (list, tuple)) else envval)
58  jobdesc += "\"\n"
59  if workdir:
60  jobdesc += "initialdir = {0}\n".format(os.path.abspath(workdir))
61  # Note: MIRTK executables return exit code 6 when memory allocation fails, other codes are kill/term signals
62  jobdesc += "on_exit_remove = (ExitBySignal == False && ExitCode != 6 && ExitCode != 247 && ExitCode != 241) || (ExitBySignal == True && ExitSignal != 9 && ExitSignal != 15)\n"
63  if retries > 0:
64  jobdesc += "max_retries = {0}\n".format(retries)
65  if script:
66  if command:
67  raise ValueError("Keyword arguments 'command' and 'script' are mutually exclusive")
68  if not log:
69  raise ValueError("Script submission of batch to HTCondor requires log path for script file!")
70  script_path = os.path.join(logdir, name + ".py")
71  with open(script_path, "wt") as f:
72  f.write(script.format(**opts))
73  jobdesc += "executable = {0}\n".format(sys.executable)
74  jobdesc += "arguments = \"'{0}'".format(script_path)
75  if tasks > 0:
76  jobdesc += " $(Process)"
77  jobdesc += "\"\n"
78  if log:
79  jobdesc += "output = {0}\n".format(log)
80  jobdesc += "error = {0}\n".format(log)
81  jobdesc += "queue"
82  if tasks > 0:
83  jobdesc += " {}".format(tasks)
84  jobdesc += "\n"
85  else:
86  jobdesc += "executable = {0}\n".format(sys.executable)
87  jobdesc += "arguments = \"-c 'import sys; import socket;"
88  jobdesc += " sys.stdout.write(\"\"Host: \"\" + socket.gethostname() + \"\"\\n\\n\"\");"
89  jobdesc += " sys.path.insert(0, \"\"{0}\"\");".format(os.path.dirname(mirtk.__file__))
90  jobdesc += " import mirtk; mirtk.check_call([\"\"{0}\"\"] + sys.argv[1:])'".format(command if command else name)
91  for arg in args:
92  arg = str(arg)
93  if ' ' in arg:
94  arg = "'" + arg + "'"
95  jobdesc += ' ' + arg
96  for opt in opts:
97  arg = opts[opt]
98  if opt[0] != '-':
99  opt = '-' + opt
100  jobdesc += ' ' + opt
101  if arg is not None:
102  if isinstance(arg, (list, tuple)):
103  arg = ' '.join([str(x) for x in arg])
104  else:
105  arg = str(arg)
106  jobdesc += ' ' + arg
107  jobdesc += "\"\n"
108  if log:
109  jobdesc += "output = {0}\n".format(log)
110  jobdesc += "error = {0}\n".format(log)
111  jobdesc += "queue\n"
112  proc = subprocess.Popen(["condor_submit", "-"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
113  (out, err) = proc.communicate(input=jobdesc.encode('utf-8'))
114  if proc.returncode != 0:
115  raise Exception(err)
116  match ='[0-9]+ job(s) submitted to cluster ([0-9]+)\.', out)
117  if not match:
118  match ='\*\* Proc ([0-9]+)(\.[0-9]+)?:', out)
119  if not match:
120  raise Exception("Failed to determine job ID from condor_submit output:\n" + out)
121  jobid = int(
122  if verbose > 0:
123  if tasks > 0:
124  print(" Submitted batch {} (JobId={}, Tasks={})".format(name, jobid, tasks))
125  else:
126  print(" Submitted job {} (JobId={})".format(name, jobid))
127  return jobid if tasks == 0 else (jobid, tasks)
130 # ----------------------------------------------------------------------------
131 def _parse_condor_xml(s):
132  """Parse XML ClassAds returned by condor_q or condor_history with -xml option."""
133  # Note: condor_history may return multiple (only one non-empty) <classads> tags
134  end = 0
135  classads = None
136  starttag = "<classads>"
137  endtag = "</classads>"
138  while True:
139  start = s.find(starttag, end)
140  if start == -1:
141  break
142  end = s.find(endtag, start)
143  if end == -1:
144  raise ValueError("Malformed <classads> XML, could not find matching </classads>!")
145  end += len(endtag)
146  elem = ElementTree.fromstring(s[start:end])
147  if classads is None:
148  classads = elem
149  else:
150  classads.extend(elem)
151  return classads
154 # ----------------------------------------------------------------------------
155 def wait(jobs, max_time=0, max_error=5, interval=60, verbose=0):
156  if not isinstance(jobs, list):
157  jobs = [jobs]
158  clusters = []
159  for job in jobs:
160  if isinstance(job, tuple):
161  cluid = job[0]
162  tasks = job[1]
163  else:
164  cluid = job
165  tasks = 1
166  if cluid > 0:
167  clusters.append((cluid, tasks))
168  num_wait = len(clusters)
169  num_error = 0
170  total_time = 0
171  iterations = 0
172  while num_wait > 0 and (max_time <= 0 or total_time < max_time or total_time == 0) and num_error < max_error:
173  time.sleep(interval)
174  total_time += interval
175  try:
176  num_pending = 0
177  num_running = 0
178  num_suspended = 0
179  num_held = 0
180  num_done = 0
181  for cluster, tasks in clusters:
182  classads = subprocess.check_output(["condor_q", "-xml", str(cluster)], stderr=subprocess.STDOUT)
183  classads = _parse_condor_xml(classads)
184  for process in range(tasks):
185  classad = classads.find(".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
186  if classad is None:
187  num_done += 1
188  else:
189  status = int(classad.find("a[@n='JobStatus']/i").text)
190  # 1) Idle
191  # 2) Running
192  # 3) Removed
193  # 4) Completed (also when failed, check ExitCode afterwards using condor_history)
194  # 5) Held
195  # 6) Transferring Output
196  # 7) Suspended
197  if status == 1:
198  num_pending += 1
199  elif status == 2 or status == 6:
200  num_running += 1
201  elif status == 3:
202  num_done += 1
203  elif status == 4:
204  num_done += 1
205  elif status == 5:
206  num_held += 1
207  elif status == 7:
208  num_suspended += 1
209  else:
210  raise Exception("Unknown job status: {}".format(status))
211  num_wait = num_running + num_pending + num_held + num_suspended
212  if verbose > 0 and (num_wait <= 0 or iterations % verbose == 0):
213  sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(
214  sys.stdout.write(" WAIT {p} pending, {r} running, {s} suspended, {h} held, {d} completed\n".format(
215  p=num_pending, r=num_running, s=num_suspended, h=num_held, d=num_done
216  ))
217  sys.stdout.flush()
218  num_error = 0
219  except subprocess.CalledProcessError:
220  sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(
221  sys.stdout.write(" WAIT Failed to retrieve job status, will retry {0} more times!\n".format(max_error - num_error))
222  sys.stdout.flush()
223  num_error += 1
224  iterations += 1
225  if num_error >= max_error:
226  raise Exception("Exceeded maximum number of retries to query status of jobs!")
227  if num_wait > 0 and max_time > 0 and total_time >= max_time:
228  raise Exception("Exceeded maximum time waiting for jobs to complete!")
229  num_fail = 0
230  if total_time > 0:
231  time.sleep(10)
232  num_jobs = 0
233  unknown = {}
234  for cluster, tasks in clusters:
235  num_jobs += tasks
236  unknown[cluster] = [0] * tasks
237  num_error = 0
238  num_unknown = num_jobs
239  while num_unknown > 0 and num_error <= max_error:
240  try:
241  num_fail = 0
242  for cluster, tasks in clusters:
243  classads = _parse_condor_xml(subprocess.check_output(["condor_history", "-xml", str(cluster)]))
244  for process in range(tasks):
245  classad = classads.find(".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
246  if classad is None:
247  unknown[cluster][process] += 1
248  else:
249  unknown[cluster][process] = 0
250  status = int(classad.find("a[@n='JobStatus']/i").text)
251  exit_code = int(classad.find("a[@n='ExitCode']/i").text)
252  if status != 4 or exit_code != 0:
253  num_fail += 1
254  num_unknown = 0
255  for cluster, tasks in clusters:
256  for process in range(tasks):
257  if unknown[cluster][process] > 0:
258  if unknown[cluster][process] > max_error:
259  raise Exception("Could not retrieve exit code of job {}.{} for the past {} attempts!".format(cluster, process, unknown[cluster][process]))
260  num_unknown += 1
261  if verbose > 0:
262  sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(
263  sys.stdout.write(" DONE {0} succeeded, {1} failed".format(num_jobs - num_fail - num_unknown, num_fail))
264  if num_unknown > 0:
265  sys.stdout.write(", {} unknown, will retry".format(num_unknown))
266  sys.stdout.write("\n")
267  sys.stdout.flush()
268  except subprocess.CalledProcessError:
269  sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(
270  sys.stdout.write(" WAIT Failed to retrieve exit codes, will retry {0} more times!\n".format(max_error - num_error))
271  sys.stdout.flush()
272  num_error += 1
273  if num_error >= max_error:
274  raise Exception("Exceeded maximum number of retries to query exit codes of jobs!")
275  return num_fail == 0