condor.py
1 ##############################################################################
2 # Medical Image Registration ToolKit (MIRTK)
3 #
4 # Copyright 2017 Imperial College London
5 # Copyright 2017 Andreas Schuh
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 ##############################################################################
19 
20 """Auxiliary functions for batch execution of MIRTK commands using HTCondor."""
21 
22 import re
23 import os
24 import sys
25 import subprocess
26 from xml.etree import ElementTree
27 
28 
29 # ----------------------------------------------------------------------------
30 def submit(name, command=None, args=[], opts={}, script=None, tasks=0, deps=[],
31  threads=0, memory=8 * 1024, retries=5, requirements=[], environment={},
32  logdir=None, log=None, workdir=None, verbose=1):
33  """Submit batch job to HTCondor."""
34  if deps:
35  raise NotImplementedError("Cannot submit individual HTCondor jobs with dependencies yet, this requires use of DAGMan")
36  if logdir or log:
37  if not logdir:
38  log = os.path.abspath(log)
39  logdir = os.path.dirname(log)
40  elif not log:
41  logdir = os.path.abspath(logdir)
42  if tasks > 0:
43  log = os.path.join(logdir, name + "_$(Cluster).$(Process).log")
44  else:
45  log = os.path.join(logdir, name + "_$(Cluster).log")
46  makedirs(logdir)
47  jobdesc = "universe = vanilla\n"
48  if threads > 0:
49  jobdesc += "request_cpus = {0}\n".format(threads)
50  if memory > 0:
51  jobdesc += "request_memory = {0}\n".format(memory)
52  if requirements:
53  jobdesc += "requirements = " + " && ".join(requirements) + "\n"
54  if environment:
55  jobdesc += "environment = \""
56  for envname, envval in environment.items():
57  jobdesc += " {0}='{1}'".format(envname, ':'.join(envval) if isinstance(envval, (list, tuple)) else envval)
58  jobdesc += "\"\n"
59  if workdir:
60  jobdesc += "initialdir = {0}\n".format(os.path.abspath(workdir))
61  # Note: MIRTK executables return exit code 6 when memory allocation fails, other codes are kill/term signals
62  jobdesc += "on_exit_remove = (ExitBySignal == False && ExitCode != 6 && ExitCode != 247 && ExitCode != 241) || (ExitBySignal == True && ExitSignal != 9 && ExitSignal != 15)\n"
63  if retries > 0:
64  jobdesc += "max_retries = {0}\n".format(retries)
65  if script:
66  if command:
67  raise ValueError("Keyword arguments 'command' and 'script' are mutually exclusive")
68  if not log:
69  raise ValueError("Script submission of batch to HTCondor requires log path for script file!")
70  script_path = os.path.join(logdir, name + ".py")
71  with open(script_path, "wt") as f:
72  f.write(script.format(**opts))
73  jobdesc += "executable = {0}\n".format(sys.executable)
74  jobdesc += "arguments = \"'{0}'".format(script_path)
75  if tasks > 0:
76  jobdesc += " $(Process)"
77  jobdesc += "\"\n"
78  if log:
79  jobdesc += "output = {0}\n".format(log)
80  jobdesc += "error = {0}\n".format(log)
81  jobdesc += "queue"
82  if tasks > 0:
83  jobdesc += " {}".format(tasks)
84  jobdesc += "\n"
85  else:
86  jobdesc += "executable = {0}\n".format(sys.executable)
87  jobdesc += "arguments = \"-c 'import sys; import socket;"
88  jobdesc += " sys.stdout.write(\"\"Host: \"\" + socket.gethostname() + \"\"\\n\\n\"\");"
89  jobdesc += " sys.path.insert(0, \"\"{0}\"\");".format(os.path.dirname(mirtk.__file__))
90  jobdesc += " import mirtk; mirtk.check_call([\"\"{0}\"\"] + sys.argv[1:])'".format(command if command else name)
91  for arg in args:
92  arg = str(arg)
93  if ' ' in arg:
94  arg = "'" + arg + "'"
95  jobdesc += ' ' + arg
96  for opt in opts:
97  arg = opts[opt]
98  if opt[0] != '-':
99  opt = '-' + opt
100  jobdesc += ' ' + opt
101  if arg is not None:
102  if isinstance(arg, (list, tuple)):
103  arg = ' '.join([str(x) for x in arg])
104  else:
105  arg = str(arg)
106  jobdesc += ' ' + arg
107  jobdesc += "\"\n"
108  if log:
109  jobdesc += "output = {0}\n".format(log)
110  jobdesc += "error = {0}\n".format(log)
111  jobdesc += "queue\n"
112  proc = subprocess.Popen(["condor_submit", "-"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
113  (out, err) = proc.communicate(input=jobdesc.encode('utf-8'))
114  if proc.returncode != 0:
115  raise Exception(err)
116  match = re.search('[0-9]+ job(s) submitted to cluster ([0-9]+)\.', out)
117  if not match:
118  match = re.search('\*\* Proc ([0-9]+)(\.[0-9]+)?:', out)
119  if not match:
120  raise Exception("Failed to determine job ID from condor_submit output:\n" + out)
121  jobid = int(match.group(1))
122  if verbose > 0:
123  if tasks > 0:
124  print(" Submitted batch {} (JobId={}, Tasks={})".format(name, jobid, tasks))
125  else:
126  print(" Submitted job {} (JobId={})".format(name, jobid))
127  return jobid if tasks == 0 else (jobid, tasks)
128 
129 
130 # ----------------------------------------------------------------------------
131 def _parse_condor_xml(s):
132  """Parse XML ClassAds returned by condor_q or condor_history with -xml option."""
133  # Note: condor_history may return multiple (only one non-empty) <classads> tags
134  end = 0
135  classads = None
136  starttag = "<classads>"
137  endtag = "</classads>"
138  while True:
139  start = s.find(starttag, end)
140  if start == -1:
141  break
142  end = s.find(endtag, start)
143  if end == -1:
144  raise ValueError("Malformed <classads> XML, could not find matching </classads>!")
145  end += len(endtag)
146  elem = ElementTree.fromstring(s[start:end])
147  if classads is None:
148  classads = elem
149  else:
150  classads.extend(elem)
151  return classads
152 
153 
154 # ----------------------------------------------------------------------------
155 def wait(jobs, max_time=0, max_error=5, interval=60, verbose=0):
156  if not isinstance(jobs, list):
157  jobs = [jobs]
158  clusters = []
159  for job in jobs:
160  if isinstance(job, tuple):
161  cluid = job[0]
162  tasks = job[1]
163  else:
164  cluid = job
165  tasks = 1
166  if cluid > 0:
167  clusters.append((cluid, tasks))
168  num_wait = len(clusters)
169  num_error = 0
170  total_time = 0
171  iterations = 0
172  while num_wait > 0 and (max_time <= 0 or total_time < max_time or total_time == 0) and num_error < max_error:
173  time.sleep(interval)
174  total_time += interval
175  try:
176  num_pending = 0
177  num_running = 0
178  num_suspended = 0
179  num_held = 0
180  num_done = 0
181  for cluster, tasks in clusters:
182  classads = subprocess.check_output(["condor_q", "-xml", str(cluster)], stderr=subprocess.STDOUT)
183  classads = _parse_condor_xml(classads)
184  for process in range(tasks):
185  classad = classads.find(".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
186  if classad is None:
187  num_done += 1
188  else:
189  status = int(classad.find("a[@n='JobStatus']/i").text)
190  # 1) Idle
191  # 2) Running
192  # 3) Removed
193  # 4) Completed (also when failed, check ExitCode afterwards using condor_history)
194  # 5) Held
195  # 6) Transferring Output
196  # 7) Suspended
197  if status == 1:
198  num_pending += 1
199  elif status == 2 or status == 6:
200  num_running += 1
201  elif status == 3:
202  num_done += 1
203  elif status == 4:
204  num_done += 1
205  elif status == 5:
206  num_held += 1
207  elif status == 7:
208  num_suspended += 1
209  else:
210  raise Exception("Unknown job status: {}".format(status))
211  num_wait = num_running + num_pending + num_held + num_suspended
212  if verbose > 0 and (num_wait <= 0 or iterations % verbose == 0):
213  sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
214  sys.stdout.write(" WAIT {p} pending, {r} running, {s} suspended, {h} held, {d} completed\n".format(
215  p=num_pending, r=num_running, s=num_suspended, h=num_held, d=num_done
216  ))
217  sys.stdout.flush()
218  num_error = 0
219  except subprocess.CalledProcessError:
220  sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
221  sys.stdout.write(" WAIT Failed to retrieve job status, will retry {0} more times!\n".format(max_error - num_error))
222  sys.stdout.flush()
223  num_error += 1
224  iterations += 1
225  if num_error >= max_error:
226  raise Exception("Exceeded maximum number of retries to query status of jobs!")
227  if num_wait > 0 and max_time > 0 and total_time >= max_time:
228  raise Exception("Exceeded maximum time waiting for jobs to complete!")
229  num_fail = 0
230  if total_time > 0:
231  time.sleep(10)
232  num_jobs = 0
233  unknown = {}
234  for cluster, tasks in clusters:
235  num_jobs += tasks
236  unknown[cluster] = [0] * tasks
237  num_error = 0
238  num_unknown = num_jobs
239  while num_unknown > 0 and num_error <= max_error:
240  try:
241  num_fail = 0
242  for cluster, tasks in clusters:
243  classads = _parse_condor_xml(subprocess.check_output(["condor_history", "-xml", str(cluster)]))
244  for process in range(tasks):
245  classad = classads.find(".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
246  if classad is None:
247  unknown[cluster][process] += 1
248  else:
249  unknown[cluster][process] = 0
250  status = int(classad.find("a[@n='JobStatus']/i").text)
251  exit_code = int(classad.find("a[@n='ExitCode']/i").text)
252  if status != 4 or exit_code != 0:
253  num_fail += 1
254  num_unknown = 0
255  for cluster, tasks in clusters:
256  for process in range(tasks):
257  if unknown[cluster][process] > 0:
258  if unknown[cluster][process] > max_error:
259  raise Exception("Could not retrieve exit code of job {}.{} for the past {} attempts!".format(cluster, process, unknown[cluster][process]))
260  num_unknown += 1
261  if verbose > 0:
262  sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
263  sys.stdout.write(" DONE {0} succeeded, {1} failed".format(num_jobs - num_fail - num_unknown, num_fail))
264  if num_unknown > 0:
265  sys.stdout.write(", {} unknown, will retry".format(num_unknown))
266  sys.stdout.write("\n")
267  sys.stdout.flush()
268  except subprocess.CalledProcessError:
269  sys.stdout.write("{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
270  sys.stdout.write(" WAIT Failed to retrieve exit codes, will retry {0} more times!\n".format(max_error - num_error))
271  sys.stdout.flush()
272  num_error += 1
273  if num_error >= max_error:
274  raise Exception("Exceeded maximum number of retries to query exit codes of jobs!")
275  return num_fail == 0