20 """Auxiliary functions for batch execution of MIRTK commands using HTCondor."""    26 from xml.etree 
import ElementTree
    30 def submit(name, command=None, args=[], opts={}, script=None, tasks=0, deps=[],
    31            threads=0, memory=8 * 1024, retries=5, requirements=[], environment={},
    32            logdir=None, log=None, workdir=None, verbose=1):
    33     """Submit batch job to HTCondor."""    35         raise NotImplementedError(
"Cannot submit individual HTCondor jobs with dependencies yet, this requires use of DAGMan")
    38             log = os.path.abspath(log)
    39             logdir = os.path.dirname(log)
    41             logdir = os.path.abspath(logdir)
    43                 log = os.path.join(logdir, name + 
"_$(Cluster).$(Process).log")
    45                 log = os.path.join(logdir, name + 
"_$(Cluster).log")
    47     jobdesc = 
"universe = vanilla\n"    49         jobdesc += 
"request_cpus = {0}\n".format(threads)
    51         jobdesc += 
"request_memory = {0}\n".format(memory)
    53         jobdesc += 
"requirements = " + 
" && ".join(requirements) + 
"\n"    55         jobdesc += 
"environment = \""    56         for envname, envval 
in environment.items():
    57             jobdesc += 
" {0}='{1}'".format(envname, 
':'.join(envval) 
if isinstance(envval, (list, tuple)) 
else envval)
    60         jobdesc += 
"initialdir = {0}\n".format(os.path.abspath(workdir))
    62     jobdesc += 
"on_exit_remove = (ExitBySignal == False && ExitCode != 6 && ExitCode != 247 && ExitCode != 241) || (ExitBySignal == True && ExitSignal != 9 && ExitSignal != 15)\n"    64         jobdesc += 
"max_retries = {0}\n".format(retries)
    67             raise ValueError(
"Keyword arguments 'command' and 'script' are mutually exclusive")
    69             raise ValueError(
"Script submission of batch to HTCondor requires log path for script file!")
    70         script_path = os.path.join(logdir, name + 
".py")
    71         with open(script_path, 
"wt") 
as f:
    72             f.write(script.format(**opts))
    73         jobdesc += 
"executable = {0}\n".format(sys.executable)
    74         jobdesc += 
"arguments = \"'{0}'".format(script_path)
    76             jobdesc += 
" $(Process)"    79             jobdesc += 
"output = {0}\n".format(log)
    80             jobdesc += 
"error = {0}\n".format(log)
    83             jobdesc += 
" {}".format(tasks)
    86         jobdesc += 
"executable = {0}\n".format(sys.executable)
    87         jobdesc += 
"arguments = \"-c 'import sys; import socket;"    88         jobdesc += 
" sys.stdout.write(\"\"Host: \"\" + socket.gethostname() + \"\"\\n\\n\"\");"    89         jobdesc += 
" sys.path.insert(0, \"\"{0}\"\");".format(os.path.dirname(mirtk.__file__))
    90         jobdesc += 
" import mirtk; mirtk.check_call([\"\"{0}\"\"] + sys.argv[1:])'".format(command 
if command 
else name)
   102                 if isinstance(arg, (list, tuple)):
   103                     arg = 
' '.join([str(x) 
for x 
in arg])
   109             jobdesc += 
"output = {0}\n".format(log)
   110             jobdesc += 
"error = {0}\n".format(log)
   112     proc = subprocess.Popen([
"condor_submit", 
"-"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
   113     (out, err) = proc.communicate(input=jobdesc.encode(
'utf-8'))
   114     if proc.returncode != 0:
   116     match = re.search(
'[0-9]+ job(s) submitted to cluster ([0-9]+)\.', out)
   118         match = re.search(
'\*\* Proc ([0-9]+)(\.[0-9]+)?:', out)
   120             raise Exception(
"Failed to determine job ID from condor_submit output:\n" + out)
   121     jobid = int(match.group(1))
   124             print(
"  Submitted batch {} (JobId={}, Tasks={})".format(name, jobid, tasks))
   126             print(
"  Submitted job {} (JobId={})".format(name, jobid))
   127     return jobid 
if tasks == 0 
else (jobid, tasks)
   131 def _parse_condor_xml(s):
   132     """Parse XML ClassAds returned by condor_q or condor_history with -xml option."""   136     starttag = 
"<classads>"   137     endtag = 
"</classads>"   139         start = s.find(starttag, end)
   142         end = s.find(endtag, start)
   144             raise ValueError(
"Malformed <classads> XML, could not find matching </classads>!")
   146         elem = ElementTree.fromstring(s[start:end])
   150             classads.extend(elem)
   155 def wait(jobs, max_time=0, max_error=5, interval=60, verbose=0):
   156     if not isinstance(jobs, list):
   160         if isinstance(job, tuple):
   167             clusters.append((cluid, tasks))
   168     num_wait = len(clusters)
   172     while num_wait > 0 
and (max_time <= 0 
or total_time < max_time 
or total_time == 0) 
and num_error < max_error:
   174         total_time += interval
   181             for cluster, tasks 
in clusters:
   182                 classads = subprocess.check_output([
"condor_q", 
"-xml", str(cluster)], stderr=subprocess.STDOUT)
   183                 classads = _parse_condor_xml(classads)
   184                 for process 
in range(tasks):
   185                     classad = classads.find(
".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
   189                         status = int(classad.find(
"a[@n='JobStatus']/i").text)
   199                         elif status == 2 
or status == 6:
   210                             raise Exception(
"Unknown job status: {}".format(status))
   211             num_wait = num_running + num_pending + num_held + num_suspended
   212             if verbose > 0 
and (num_wait <= 0 
or iterations % verbose == 0):
   213                 sys.stdout.write(
"{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
   214                 sys.stdout.write(
" WAIT {p} pending, {r} running, {s} suspended, {h} held, {d} completed\n".format(
   215                     p=num_pending, r=num_running, s=num_suspended, h=num_held, d=num_done
   219         except subprocess.CalledProcessError:
   220             sys.stdout.write(
"{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
   221             sys.stdout.write(
" WAIT Failed to retrieve job status, will retry {0} more times!\n".format(max_error - num_error))
   225     if num_error >= max_error:
   226         raise Exception(
"Exceeded maximum number of retries to query status of jobs!")
   227     if num_wait > 0 
and max_time > 0 
and total_time >= max_time:
   228         raise Exception(
"Exceeded maximum time waiting for jobs to complete!")
   234         for cluster, tasks 
in clusters:
   236             unknown[cluster] = [0] * tasks
   238         num_unknown = num_jobs
   239         while num_unknown > 0 
and num_error <= max_error:
   242                 for cluster, tasks 
in clusters:
   243                     classads = _parse_condor_xml(subprocess.check_output([
"condor_history", 
"-xml", str(cluster)]))
   244                     for process 
in range(tasks):
   245                         classad = classads.find(
".c/a[@n='ClusterId'][i='{0}']/../a[@n='ProcId'][i='{1}']/..".format(cluster, process))
   247                             unknown[cluster][process] += 1
   249                             unknown[cluster][process] = 0
   250                             status = int(classad.find(
"a[@n='JobStatus']/i").text)
   251                             exit_code = int(classad.find(
"a[@n='ExitCode']/i").text)
   252                             if status != 4 
or exit_code != 0:
   255                 for cluster, tasks 
in clusters:
   256                     for process 
in range(tasks):
   257                         if unknown[cluster][process] > 0:
   258                             if unknown[cluster][process] > max_error:
   259                                 raise Exception(
"Could not retrieve exit code of job {}.{} for the past {} attempts!".format(cluster, process, unknown[cluster][process]))
   262                     sys.stdout.write(
"{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
   263                     sys.stdout.write(
" DONE {0} succeeded, {1} failed".format(num_jobs - num_fail - num_unknown, num_fail))
   265                         sys.stdout.write(
", {} unknown, will retry".format(num_unknown))
   266                     sys.stdout.write(
"\n")
   268             except subprocess.CalledProcessError:
   269                 sys.stdout.write(
"{:%Y-%b-%d %H:%M:%S}".format(datetime.now()))
   270                 sys.stdout.write(
" WAIT Failed to retrieve exit codes, will retry {0} more times!\n".format(max_error - num_error))
   273         if num_error >= max_error:
   274             raise Exception(
"Exceeded maximum number of retries to query exit codes of jobs!")