Source code for dgenies.bin.local_scheduler

#!/usr/bin/env python3

import os
import time
import psutil
import atexit
from datetime import datetime
from tendo import singleton
import argparse

from dgenies.database import Job, Session
from dgenies.config_reader import AppConfigReader
from dgenies.lib.job_manager import JobManager

# Allow only one instance:
me = singleton.SingleInstance()

config_reader = AppConfigReader()

# Add DRMAA lib in env:
if config_reader.drmaa_lib_path is not None and config_reader.batch_system_type != "local":
    os.environ["DRMAA_LIBRARY_PATH"] = config_reader.drmaa_lib_path
    try:
        import drmaa
        from dgenies.lib.drmaasession import DrmaaSession
        DRMAA_SESSION = DrmaaSession()
    except ImportError:
        pass

NB_RUN = config_reader.local_nb_runs
NB_PREPARE = config_reader.nb_data_prepare

DEBUG = config_reader.debug

LOG_FILE = "stdout"


def _printer(*messages):
    """
    print messages to stdout or to a file (according to LOG_FILE global constant)

    :param messages: messages to print
    """
    if DEBUG:
        if LOG_FILE == "stdout":
            print(*messages)
        else:
            with open(LOG_FILE, "a") as log_f:
                print(*messages, file=log_f)


[docs]def start_job(id_job, batch_system_type="local"): """ Start a job (mapping step) :param id_job: job id :type id_job: str :param batch_system_type: local, slurm or sge :type batch_system_type: str """ _printer("Start job", id_job) with Job.connect(): job = Job.get(Job.id_job == id_job) job.status = "starting" job.save() job_mng = JobManager(id_job=id_job, email=job.email, tool=job.tool, options=job.options) job_mng.set_inputs_from_res_dir() job_mng.run_job_in_thread(batch_system_type)
[docs]def get_scheduled_local_jobs(): """ Get list of jobs ready to be started (for local runs) :return: list of jobs :rtype: list """ all_jobs = [] with Job.connect(): jobs = Job.select().where((Job.batch_type == "local") & ((Job.status == "prepared") | (Job.status == "scheduled"))).\ order_by(Job.date_created) for job in jobs: all_jobs.append(job.id_job) job.status = "scheduled" job.save() return all_jobs
[docs]def get_scheduled_cluster_jobs(): """ Get list of jobs ready to be started (for cluster runs) :return: list of jobs :rtype: list """ all_jobs = [] with Job.connect(): jobs = Job.select().where((Job.batch_type != "local") & ((Job.status == "prepared") | (Job.status == "scheduled"))).\ order_by(Job.date_created) for job in jobs: all_jobs.append({"job_id": job.id_job, "batch_type": job.batch_type}) job.status = "scheduled" job.save() return all_jobs
[docs]def prepare_job(id_job): """ Launch job preparation of data :param id_job: job id :type id_job: str """ _printer("Prepare data for job:", id_job) with Job.connect(): job = Job.get(Job.id_job == id_job) job.status = "preparing" job.save() job_mng = JobManager(id_job=id_job, email=job.email, tool=job.tool) # job_mng = JobManager(id_job=id_job, email=job.email, tool=job.tool, options=job.options) job_mng.set_inputs_from_res_dir() job_mng.prepare_data_in_thread()
[docs]def get_prep_scheduled_jobs(): """ Get list of jobs ready to be prepared (all data is downloaded and parsed) :return: list of jobs :rtype: list """ with Job.connect(): jobs = Job.select().where(Job.status == "waiting").order_by(Job.date_created) return [(j.id_job, j.batch_type) for j in jobs]
[docs]def get_preparing_jobs_nb(): """ Get number of jobs in preparation step (for local runs) :return: number of jobs :rtype: int """ with Job.connect(): return len(Job.select().where(Job.status == "preparing"))
[docs]def get_preparing_jobs_cluster_nb(): """ Get number of jobs in preparation step (for cluster runs) :return: number of jobs :rtype: int """ with Job.connect(): return len(Job.select().where(Job.status == "preparing-cluster")), \ len(Job.select().where(Job.status == "prepare-scheduled"))
[docs]def parse_started_jobs(): """ Parse all started jobs: check all is OK, change jobs status if needed. Look for died jobs :return: (list of id of jobs started locally, list of id of jobs started on cluster) :rtype: (list, list) """ with Job.connect(): jobs_started = [] # Only local jobs cluster_jobs_started = [] # Only cluster jobs jobs = Job.select().where((Job.status == "started") | (Job.status == "starting") | (Job.status == "succeed") | (Job.status == "merging") | (Job.status == "scheduled-cluster") | (Job.status == "prepare-scheduled") | (Job.status == "prepare-cluster")) for job in jobs: pid = job.id_process if job.batch_type == "local": if job.status != "started" or psutil.pid_exists(pid): jobs_started.append(job.id_job) else: print("Job %s (pid: %d) has died!" % (job.id_job, job.id_process)) job.status = "fail" job.error = "<p>Your job has failed for an unexpected reason. Please contact the support.</p>" job.save() # Todo: send mail about the error else: if job.status in ["started", "scheduled-cluster", "prepare-scheduled", "preparing-cluster"]: s = DRMAA_SESSION.session status = s.jobStatus(str(job.id_process)) if status not in [drmaa.JobState.RUNNING, drmaa.JobState.DONE, drmaa.JobState.QUEUED_ACTIVE, drmaa.JobState.SYSTEM_ON_HOLD, drmaa.JobState.USER_ON_HOLD, drmaa.JobState.USER_SYSTEM_ON_HOLD]: if job.batch_type == "slurm": os.system("scancel %s" % job.id_process) elif job.batch_type == "sge": os.system("qdel %s" % job.id_process) print("Job %s (id on cluster: %d) has died!" % (job.id_job, job.id_process)) job.status = "fail" job.error = "<p>Your job has failed for an unexpected reason. Please contact the support.</p>" job.save() # Todo: send mail about the error else: if job.status == "scheduled-cluster" and status == drmaa.JobState.RUNNING: job.status = "started" job.save() cluster_jobs_started.append(job.id_job) elif job.status == "prepare-scheduled" and status == drmaa.JobState.RUNNING: job.status = "preparing-cluster" job.save() elif job.status == "started": cluster_jobs_started.append(job.id_job) else: cluster_jobs_started.append(job.id_job) return jobs_started, cluster_jobs_started
[docs]def parse_uploads_asks(): """ Parse asks for an upload: allow new uploads when other end, remove expired sessions, ... """ with Session.connect(): now = datetime.now() # Get allowed: all_sessions = Session.select() nb_sessions = len(all_sessions) _printer("All sessions:", nb_sessions) sessions = Session.select().where(Session.status == "active") nb_active_dl = len(sessions) _printer("Active_dl:", nb_active_dl) for session in sessions: if not session.keep_active and (now - session.last_ping).total_seconds() > 50: _printer("Delete 1 active session:", session.s_id) session.delete_instance() # We consider the user has left nb_active_dl -= 1 # Get pending: sessions = Session.select().where(Session.status == "pending").order_by(Session.date_created) _printer("Pending:", len(sessions)) for session in sessions: delay = (now - session.last_ping).total_seconds() if delay > 30: session.status = "reset" # Reset position, the user has probably left session.save() _printer("Reset 1 session:", session.s_id) elif nb_active_dl < config_reader.max_concurrent_dl: session.status = "active" session.save() nb_active_dl += 1 _printer("Enable 1 session:", session.s_id) # Remove old sessions: for session in all_sessions: delay = (now - session.last_ping).total_seconds() if delay > 86400: # Session has more than 1 day _printer("Delete 1 outdated session:", session.s_id) session.delete_instance() # Session has expired
[docs]@atexit.register def cleaner(): """ Exit DRMAA session at program exit """ if "DRMAA_SESSION" in globals(): DRMAA_SESSION.exit()
[docs]def move_job_to_cluster(id_job): """ Change local job to be run on the cluster :param id_job: :return: """ with Job.connect(): job = Job.get(Job.id_job == id_job) job.batch_type = config_reader.batch_system_type job.save()
[docs]def parse_args(): """ Parse command line arguments and define DEBUG and LOG_FILE constants """ global DEBUG, LOG_FILE parser = argparse.ArgumentParser(description="Start local scheduler") parser.add_argument('-d', '--debug', type=str, required=False, help="Set to True to enable debug") parser.add_argument('-l', '--log-dir', type=str, required=False, help="Folder into store logs") args = parser.parse_args() if args.debug is not None: if args.debug.lower() == "true" or args.debug.lower == "1": DEBUG = True elif args.debug.lower() == "false" or args.debug.lower == "0": DEBUG = False else: raise Exception("Invalid value for debug: %s (valid values: True, False)" % args.debug) if args.log_dir is not None: log_dir = args.log_dir else: log_dir = config_reader.log_dir if DEBUG: if log_dir == "stdout": LOG_FILE = "stdout" else: LOG_FILE = os.path.join(config_reader.log_dir, "local_scheduler.log")
if __name__ == '__main__': parse_args() while True: _printer("Check uploads...") parse_uploads_asks() _printer("") _printer("Checking jobs...") scheduled_jobs_local = get_scheduled_local_jobs() scheduled_jobs_cluster = get_scheduled_cluster_jobs() prep_scheduled_jobs = get_prep_scheduled_jobs() _printer("Waiting for preparing:", len(prep_scheduled_jobs)) nb_preparing_jobs = get_preparing_jobs_nb() nb_preparing_jobs_cluster = get_preparing_jobs_cluster_nb() _printer("Preparing:", nb_preparing_jobs, "(local)", "".join([str(nb_preparing_jobs_cluster[0]), "[", str(nb_preparing_jobs_cluster[1]), "]"]), "(cluster)") _printer("Scheduled:", len(scheduled_jobs_local), "(local),", len(scheduled_jobs_cluster), "(cluster)") started_jobs, cluster_started_jobs = parse_started_jobs() nb_started = len(started_jobs) _printer("Started:", nb_started, "(local),", len(cluster_started_jobs), "(cluster)") nj = 0 local_waiting_jobs = [] while nj < len(prep_scheduled_jobs): job_batch_type = prep_scheduled_jobs[nj][1] if nb_preparing_jobs < NB_PREPARE or job_batch_type != "local": prepare_job(prep_scheduled_jobs[nj][0]) if job_batch_type == "local": nb_preparing_jobs += 1 del prep_scheduled_jobs[nj] else: if job_batch_type == "local": local_waiting_jobs.append(prep_scheduled_jobs[nj][0]) nj += 1 if config_reader.batch_system_type != "local" and len(local_waiting_jobs) > config_reader.max_wait_local: for id_job in local_waiting_jobs[config_reader.max_wait_local:]: move_job_to_cluster(id_job) while len(scheduled_jobs_local) > 0 and nb_started < NB_RUN: start_job(scheduled_jobs_local.pop(0)) nb_started += 1 if config_reader.batch_system_type != "local" and len(scheduled_jobs_local) > config_reader.max_wait_local: for id_job in scheduled_jobs_local[config_reader.max_wait_local:]: move_job_to_cluster(id_job) for job in scheduled_jobs_cluster: start_job(job["job_id"], job["batch_type"]) # Wait before return _printer("Sleeping...") time.sleep(15) _printer("\n")