#!/usr/bin/python #emacs: -*- mode: python-mode; c-basic-offset: 4; tab-width: 4; indent-tabs-mode: t -*- #ex: set sts=4 ts=4 sw=4 noet: #------------------------- =+- Python script -+= ------------------------- """ @file memory_control.py @date Mon Sep 17 12:31:35 2007 @brief Yaroslav Halchenko CS@UNM, CS@NJIT web: http://www.onerussian.com & PSYCH@RUTGERS e-mail: yoh@onerussian.com ICQ#: 60653192 DESCRIPTION (NOTES): COPYRIGHT: Yaroslav Halchenko 2007 LICENSE: This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. On Debian system see /usr/share/common-licenses/GPL for the full license. """ #-----------------\____________________________________/------------------ __author__ = 'Yaroslav Halhenko' __revision__ = '$Revision: $' __date__ = '$Date: $' __copyright__ = 'Copyright (c) 2007 Yaroslav Halchenko' __license__ = 'GPL' import commands, re, sys, time, os, exceptions, signal mem_increment = 300000 # lets agree to boost vmem requested by 500MB max_wait_period = 12*3600 # maximum waiting period 12h by default policy = 'wait' # possible supported policies: requeue and wait # if amount of available memory is insufficient to add mem_increment then # we can either requeue the job (thus loosing results of the job) or wait (indefinetly) # until this amount of memory becomes available -- policy "wait". admin_email = 'ravana-admin@onerussian.com' sleep_interval = 60 # how long to sleep between the iterations verbosity = 2 dry_run = False errors_to_stop = 10 # if we reach this amount of external errors -- better quit # and send an announcement # dictionary/list of waiting jobs # if we cannot requeue or policy is wait waiting_jobs = {} global errors errors = 0 def print_debug(msg, level): if level <= verbosity: print time.asctime(), " "*level, msg def terminate(msg, return_code=0): # log message print_debug(msg, 0) # email the admin msgbody = "memory_control exit with code %d. Additional information: '%s'" % (return_code, msg) ecmd = "echo -en '%s' | mail -s 'memory_control@PBS exited' %s" \ % (msgbody, admin_email) run_command(ecmd) sys.exit(return_code) class SigHandler: def __call__(self, sn, sf): if sn==signal.SIGINT: return_code = 0 # Ctrl-C is ok ;-) else: return_code = 1 terminate("Signal interruption with " + `sn`, return_code) sh = SigHandler() signal.signal(signal.SIGINT, sh) signal.signal(signal.SIGABRT, sh) signal.signal(signal.SIGQUIT, sh) def print_error(msg, critical=False): sys.stderr.write("%s\n" % msg) errors += 1 if critical: terminate("EXITING due to critical error '%s'" % msg, 1) sys.exit(1) if errors >= errors_to_stop: terminate("Number of errors %d exceeded allowed number. Exiting" % errors, 1) def run_command(cmd): print_debug("Running '%s'" % cmd, 3) if not dry_run: exit_code = os.system(cmd) else: return 0 if exit_code: print_error("Running command '%s' failed with exit code %d" % (cmd, exit_code)) def split_output(cmdLine, line_separator='\n', strip=True): """ Run cmdLine and split output in a list of entries """ print_debug("Running '%s' for splitting output" % cmdLine, 4) cmdoutput_ = commands.getoutput(cmdLine) cmdoutputl = cmdoutput_.split('\n') if strip: cmdoutputl = map(lambda x:x.strip(), cmdoutputl) return cmdoutputl def parse_cmdoutput_dict(cmdLine, separator=' *= *'): """ Parse output of the command cmdLine into a dictionary if every entry is easily split into 2 items """ cmdoutputl = split_output(cmdLine) entries = map(lambda y: re.split(separator, y.strip(), 1) , cmdoutputl) result = {} for entry in entries: if len(entry) == 1: continue if len(entry) != 2: print_error("Entry %s has wrong number of elements" % `entry`) result[entry[0]] = entry[1] return result def email_victim(msgbody, job): """ Email user updating on the status of the job """ user_record = parse_cmdoutput_dict('finger %s | grep "^Email:"' \ % job['user'], separator = ' *: *') if not user_record.has_key('Email'): print_error('An Email for user %s is unknown. Please check his ~%s/.plan file' %\ ( job['user'], job['user'] ) ) return False ecmd = "echo -en '%s' | mail -c %s -s 'PBS@itanix: memory exhaustion for task %s report' %s" \ % (msgbody, admin_email, job['id'], user_record['Email']) print_debug("Emailing user %s<%s> on behalf of task %s." % \ (job['user'], user_record['Email'], job['id']), 2) run_command(ecmd) def fillout_dict_fromcmd(cmdLine, fields, dictionary): # not used full_dict = parse_cmdoutput_dict(cmdLine) for key, value in full_dict.iteritems(): if key in fields: dictionary[key] = value def memoryKB(entry): """ Convert reported memory in KB. If no suffix is given -- asume mb (like diagnoze has) """ entry = entry.strip() if entry.endswith('kb'): entry = re.sub('kb$', '', entry) result = int(entry) elif entry.endswith('gb'): entry = re.sub('gb$', '', entry) result = int(entry) * 1024 * 1024 elif entry.endswith('mb') or re.match('^[-0-9]+$', entry): entry = re.sub('mb$', '', entry) result = int(entry) * 1024 else: print_error("Unknown format of memory size in '%s'" % entry) result=None return result def place_job_into_waiting(job): waiting_jobs[job['id']] = {} waiting_jobs[job['id']]['first_seen'] = time.time() waiting_jobs[job['id']]['last_seen'] = time.time() def remove_job_from_waiting(job): if waiting_jobs.has_key(job['id']): waiting_jobs.pop(job['id']) # free to go # # qstat_entries = [ 'exec_host', 'Rerunable', 'resources_used.vmem', 'Resource_List.mem' ] try: while 1: # get list of suspended tasks suspended = split_output('qstat | tail -n +3 | grep " S "') for jobs in suspended: jobs = jobs.strip() if jobs == "": continue jobl = re.split('[ \t]+', jobs) if len(jobl) != 6: print_error('Not recognized line "%s"' % jobs) job={} (job['id'], job['name'], job['user'], job['time_used'], job['state'], job['queue']) = jobl # Lets see how much memory it actually used and on which node is it running job_dict = parse_cmdoutput_dict("qstat -f %(id)s | tail -n +2 | grep '^ '" % job) job['exec_host'] = re.sub('/[0-9]+$', '', job_dict['exec_host']) if not job_dict.has_key('resources_used.vmem'): print_debug("Resuming the job %(id)s since it has no used resources but was suspended" \ % job, 3) run_command("qsig -s resume %(id)s" % job) continue job['used_vmem'] = memoryKB(job_dict['resources_used.vmem']) job['resource_mem'] = memoryKB(job_dict['Resource_List.mem']) job['rerunable'] = bool(job_dict['Rerunable']) job['needed_vmem'] = job['used_vmem'] + mem_increment # Lets get status for the node of interest on either we have sufficient # amount of memory to qalter it for the task and resume it diag_line = split_output('diagnose -n %(exec_host)s | tail -n +4 | head -1' % job) diag_mem = re.split('[ \t]+', diag_line[0].strip())[3] node_free, node_total = map(memoryKB, diag_mem.split(':')) emailMsg = "" cmd = "" if not waiting_jobs.has_key(job['id']): if job['used_vmem'] > job['resource_mem']: # if only we haven't adjusted it yet cmd = "qalter -l 'mem=%(needed_vmem)skb' %(id)s; " % job emailMsg += "\nYour job %(id)s got adjusted amount or memory " % job \ + "assigned to it to be %(needed_vmem)skb." % job \ + " Old value was %(resource_mem)skb." % job print_debug('Adjusted memory for %(id)s to be %(needed_vmem)skb.' % job, 2) else: # we must have restarted or memory requested for the job # was adjusted externally, so lets simply place it in the list place_job_into_waiting(job) if node_free > mem_increment: # if we have sufficient amount of memory -- no problem cmd += "qsig -s resume %s;" % ( job['id'] ) emailMsg += "\nSince the node %(exec_host)s has RAM available to acommodate your task" % job \ +" with adjusted memory -- your task was resumed." remove_job_from_waiting(job) print_debug('Job %(id)s was resumed.' % job, 1) else: emailMsgPrefix = "\nThe node %(exec_host)s has no sufficient amount of RAM available to acommodate your task at the moment" % job \ + "(%dkb available < %dkb increment). " % (node_free, mem_increment) if (not job['rerunable']) or policy=='wait': # we need to wait since we can't rerun the job if not waiting_jobs.has_key(job['id']): place_job_into_waiting(job) emailMsg += emailMsgPrefix + \ " Your task was placed in the waiting state for up to %d hours." % (max_wait_period/3600) \ +" If no sufficient RAM becomes available during that period of time -- your task will be requed and you will get additional notification by email." print_debug('Job %(id)s placed into the waiting list.' % job, 1) else: waiting_jobs[job['id']]['last_seen'] = time.time() if waiting_jobs[job['id']]['last_seen'] - \ waiting_jobs[job['id']]['first_seen'] > max_wait_period: emailMsg += "\nThe job %(id)s is to be requeued since it exceeded waiting time and " % job \ + "no additional memory became available" cmd += "qrerun %s;" % ( job['id'] ) remove_job_from_waiting(job) print_debug('Job %(id)s was requeued.' % job, 1) elif policy=="requeue": # we have to rerun it since we can rerun it and since policy must be cmd += "qrerun %s;" % ( job['id'] ) emailMsg += emailMsgPrefix + \ " Your task is requeued (rerun) with adjusted amount of memory requested." print_debug('Job %(id)s was requeued.' % job, 1) else: print_error("Unknown policy %s" % policy) if emailMsg != "": emailMsg += "\n\nThis is an email from the deamon -- do not reply directly. Direct any replies and concerns to Ravana-Admin Mailing List ." email_victim(emailMsg, job) if cmd != "": run_command(cmd) print_debug("Sleeping for %d sec" % sleep_interval, 6) time.sleep(sleep_interval) terminate("Normal Exit", 0) except exceptions.SystemExit: pass except exceptions.KeyboardInterrupt, e: terminate("Was terminated from the keyboard", 0) except Exception, e: terminate("Was abrubtly terminated due to exception: " + `e`, 1) raise