Logo Search packages:      
Sourcecode: slurm-llnl version File versions  Download package

agent.c

/*****************************************************************************\
 *  agent.c - parallel background communication functions. This is where  
 *    logic could be placed for broadcast communications.
 *
 *  $Id: agent.c 12462 2007-10-08 17:42:47Z jette $
 *****************************************************************************
 *  Copyright (C) 2002-2007 The Regents of the University of California.
 *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
 *  Written by Morris Jette <jette1@llnl.gov>, et. al.
 *  Derived from pdsh written by Jim Garlick <garlick1@llnl.gov>
 *  UCRL-CODE-226842.
 *  
 *  This file is part of SLURM, a resource management program.
 *  For details, see <http://www.llnl.gov/linux/slurm/>.
 *  
 *  SLURM is free software; you can redistribute it and/or modify it under
 *  the terms of the GNU General Public License as published by the Free
 *  Software Foundation; either version 2 of the License, or (at your option)
 *  any later version.
 *
 *  In addition, as a special exception, the copyright holders give permission 
 *  to link the code of portions of this program with the OpenSSL library under
 *  certain conditions as described in each individual source file, and 
 *  distribute linked combinations including the two. You must obey the GNU 
 *  General Public License in all respects for all of the code used other than 
 *  OpenSSL. If you modify file(s) with this exception, you may extend this 
 *  exception to your version of the file(s), but you are not obligated to do 
 *  so. If you do not wish to do so, delete this exception statement from your
 *  version.  If you delete this exception statement from all source files in 
 *  the program, then also delete it here.
 *  
 *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
 *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 *  details.
 *  
 *  You should have received a copy of the GNU General Public License along
 *  with SLURM; if not, write to the Free Software Foundation, Inc.,
 *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
 *****************************************************************************
 *  Theory of operation:
 *
 *  The functions below permit slurm to initiate parallel tasks as a 
 *  detached thread and let the functions below make sure the work happens. 
 *  For example, when a job's time limit is to be changed slurmctld needs 
 *  to notify the slurmd on every node to which the job was allocated.  
 *  We don't want to hang slurmctld's primary function (the job update RPC)  
 *  to perform this work, so it just initiates an agent to perform the work.  
 *  The agent is passed all details required to perform the work, so it will 
 *  be possible to execute the agent as an pthread, process, or even a daemon 
 *  on some other computer.
 *
 *  The main agent thread creates a separate thread for each node to be
 *  communicated with up to AGENT_THREAD_COUNT. A special watchdog thread  
 *  sends SIGLARM to any threads that have been active (in DSH_ACTIVE state)  
 *  for more than COMMAND_TIMEOUT seconds. 
 *  The agent responds to slurmctld via a function call or an RPC as required.
 *  For example, informing slurmctld that some node is not responding.
 *
 *  All the state for each thread is maintained in thd_t struct, which is 
 *  used by the watchdog thread as well as the communication threads.
\*****************************************************************************/

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include <errno.h>
#include <pthread.h>
#include <pwd.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <stdlib.h>

#include "src/common/list.h"
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/node_select.h"
#include "src/common/xsignal.h"
#include "src/common/xassert.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_interface.h"
#include "src/common/uid.h"
#include "src/common/forward.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/ping_nodes.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/state_save.h"
#include "src/slurmctld/srun_comm.h"

#define MAX_RETRIES     10

typedef enum {
      DSH_NEW,        /* Request not yet started */
      DSH_ACTIVE,     /* Request in progress */
      DSH_DONE,       /* Request completed normally */
      DSH_NO_RESP,    /* Request timed out */
      DSH_FAILED      /* Request resulted in error */
} state_t;

typedef struct thd_complete {
      bool work_done;   /* assume all threads complete */
      int fail_cnt;     /* assume no threads failures */
      int no_resp_cnt;  /* assume all threads respond */
      int retry_cnt;    /* assume no required retries */
      int max_delay;
      time_t now;
} thd_complete_t;

typedef struct thd {
      pthread_t thread;       /* thread ID */
      pthread_attr_t attr;          /* thread attributes */
      state_t state;                /* thread state */
      time_t start_time;            /* start time */
      time_t end_time;        /* end time or delta time 
                               * upon termination */
      slurm_addr *addr;         /* specific addr to send to
                               * will not do nodelist if set */
      char *nodelist;                   /* list of nodes to send to */
      List ret_list;
} thd_t;

typedef struct agent_info {
      pthread_mutex_t thread_mutex; /* agent specific mutex */
      pthread_cond_t thread_cond;   /* agent specific condition */
      uint32_t thread_count;        /* number of threads records */
      uint32_t threads_active;      /* currently active threads */
      uint16_t retry;               /* if set, keep trying */
      thd_t *thread_struct;         /* thread structures */
      bool get_reply;               /* flag if reply expected */
      slurm_msg_type_t msg_type;    /* RPC to be issued */
      void **msg_args_pptr;         /* RPC data to be used */
} agent_info_t;

typedef struct task_info {
      pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific 
                                  * mutex */
      pthread_cond_t *thread_cond_ptr;/* pointer to agent specific
                               * condition */
      uint32_t *threads_active_ptr; /* currently active thread ptr */
      thd_t *thread_struct_ptr;     /* thread structures ptr */
      bool get_reply;               /* flag if reply expected */
      slurm_msg_type_t msg_type;    /* RPC to be issued */
      void *msg_args_ptr;           /* ptr to RPC data to be used */
} task_info_t;

typedef struct queued_request {
      agent_arg_t* agent_arg_ptr;   /* The queued request */
      time_t       last_attempt;    /* Time of last xmit attempt */
} queued_request_t;

typedef struct mail_info {
      char *user_name;
      char *message;
} mail_info_t;

static void _sig_handler(int dummy);
static inline int _comm_err(char *node_name);
static void _list_delete_retry(void *retry_entry);
static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr);
static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx);
static void _notify_slurmctld_jobs(agent_info_t *agent_ptr);
static void _notify_slurmctld_nodes(agent_info_t *agent_ptr, 
            int no_resp_cnt, int retry_cnt);
static void _purge_agent_args(agent_arg_t *agent_arg_ptr);
static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count);
static int _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr, 
                    int count, int *spot);
static void _slurmctld_free_batch_job_launch_msg(batch_job_launch_msg_t * msg);
static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr);
static void *_thread_per_group_rpc(void *args);
static int   _valid_agent_arg(agent_arg_t *agent_arg_ptr);
static void *_wdog(void *args);

static mail_info_t *_mail_alloc(void);
static void  _mail_free(void *arg);
static void  _mail_proc(mail_info_t *mi);
static char *_mail_type_str(uint16_t mail_type);

static pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t mail_mutex  = PTHREAD_MUTEX_INITIALIZER;
static List retry_list = NULL;            /* agent_arg_t list for retry */
static List mail_list = NULL;       /* pending e-mail requests */

static pthread_mutex_t agent_cnt_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  agent_cnt_cond  = PTHREAD_COND_INITIALIZER;
static int agent_cnt = 0;

static bool run_scheduler = false;

/*
 * agent - party responsible for transmitting an common RPC in parallel 
 *    across a set of nodes. Use agent_queue_request() if immediate 
 *    execution is not essential.
 * IN pointer to agent_arg_t, which is xfree'd (including hostlist, 
 *    and msg_args) upon completion if AGENT_IS_THREAD is set
 * RET always NULL (function format just for use as pthread)
 */
void *agent(void *args)
{
      int i, delay, rc, retries = 0;
      pthread_attr_t attr_wdog;
      pthread_t thread_wdog;
      agent_arg_t *agent_arg_ptr = args;
      agent_info_t *agent_info_ptr = NULL;
      thd_t *thread_ptr;
      task_info_t *task_specific_ptr;
      time_t begin_time;

      /* info("I am here and agent_cnt is %d of %d with type %d", */
/*         agent_cnt, MAX_AGENT_CNT, agent_arg_ptr->msg_type); */
      slurm_mutex_lock(&agent_cnt_mutex);
      while (slurmctld_config.shutdown_time == 0) {
            if (agent_cnt < MAX_AGENT_CNT) {
                  agent_cnt++;
                  break;
            } else {    /* wait for state change and retry */
                  pthread_cond_wait(&agent_cnt_cond, &agent_cnt_mutex);
            }
      }
      slurm_mutex_unlock(&agent_cnt_mutex);
      if (slurmctld_config.shutdown_time)
            return NULL;
      
      /* basic argument value tests */
      begin_time = time(NULL);
      if (_valid_agent_arg(agent_arg_ptr))
            goto cleanup;

      /* initialize the agent data structures */
      agent_info_ptr = _make_agent_info(agent_arg_ptr);
      thread_ptr = agent_info_ptr->thread_struct;

      /* start the watchdog thread */
      slurm_attr_init(&attr_wdog);
      if (pthread_attr_setdetachstate
          (&attr_wdog, PTHREAD_CREATE_JOINABLE))
            error("pthread_attr_setdetachstate error %m");
      while (pthread_create(&thread_wdog, &attr_wdog, _wdog,
                        (void *) agent_info_ptr)) {
            error("pthread_create error %m");
            if (++retries > MAX_RETRIES)
                  fatal("Can't create pthread");
            sleep(1);   /* sleep and again */
      }
      slurm_attr_destroy(&attr_wdog);
#if   AGENT_THREAD_COUNT < 1
      fatal("AGENT_THREAD_COUNT value is invalid");
#endif
      debug2("got %d threads to send out",agent_info_ptr->thread_count);
      /* start all the other threads (up to AGENT_THREAD_COUNT active) */
      for (i = 0; i < agent_info_ptr->thread_count; i++) {

            /* wait until "room" for another thread */
            slurm_mutex_lock(&agent_info_ptr->thread_mutex);
            while (agent_info_ptr->threads_active >=
                   AGENT_THREAD_COUNT) {
                  pthread_cond_wait(&agent_info_ptr->thread_cond,
                                &agent_info_ptr->thread_mutex);
            }

            /* create thread specific data, NOTE: freed from 
             *      _thread_per_group_rpc() */
            task_specific_ptr = _make_task_data(agent_info_ptr, i);

            slurm_attr_init(&thread_ptr[i].attr);
            if (pthread_attr_setdetachstate(&thread_ptr[i].attr,
                                    PTHREAD_CREATE_DETACHED))
                  error("pthread_attr_setdetachstate error %m");
            while ((rc = pthread_create(&thread_ptr[i].thread,
                                  &thread_ptr[i].attr,
                                  _thread_per_group_rpc,
                                  (void *) task_specific_ptr))) {
                  error("pthread_create error %m");
                  if (agent_info_ptr->threads_active)
                        pthread_cond_wait(&agent_info_ptr->
                                      thread_cond,
                                      &agent_info_ptr->
                                      thread_mutex);
                  else {
                        slurm_mutex_unlock(&agent_info_ptr->
                                         thread_mutex);
                        sleep(1);
                        slurm_mutex_lock(&agent_info_ptr->
                                       thread_mutex);
                  }
            }
            slurm_attr_destroy(&thread_ptr[i].attr);
            agent_info_ptr->threads_active++;
            slurm_mutex_unlock(&agent_info_ptr->thread_mutex);
      }
            
      /* wait for termination of remaining threads */
      pthread_join(thread_wdog, NULL);
      delay = (int) difftime(time(NULL), begin_time);
      if (delay > (slurm_get_msg_timeout() * 2)) {
            info("agent msg_type=%u ran for %d seconds",
                  agent_arg_ptr->msg_type,  delay);
      }
      slurm_mutex_lock(&agent_info_ptr->thread_mutex);
      while (agent_info_ptr->threads_active != 0) {
            pthread_cond_wait(&agent_info_ptr->thread_cond,
                        &agent_info_ptr->thread_mutex);
      }
      slurm_mutex_unlock(&agent_info_ptr->thread_mutex);

      cleanup:
#if AGENT_IS_THREAD
      _purge_agent_args(agent_arg_ptr);
#endif

      if (agent_info_ptr) {
            xfree(agent_info_ptr->thread_struct);
            xfree(agent_info_ptr);
      }
      slurm_mutex_lock(&agent_cnt_mutex);
      if (agent_cnt > 0)
            agent_cnt--;
      else
            error("agent_cnt underflow");
      if (agent_cnt < MAX_AGENT_CNT)
            agent_retry(RPC_RETRY_INTERVAL);
      slurm_mutex_unlock(&agent_cnt_mutex);
      pthread_cond_broadcast(&agent_cnt_cond);

      return NULL;
}

/* Basic validity test of agent argument */
static int _valid_agent_arg(agent_arg_t *agent_arg_ptr)
{
      xassert(agent_arg_ptr);
      xassert(agent_arg_ptr->hostlist);

      if (agent_arg_ptr->node_count == 0)
            return SLURM_FAILURE;   /* no messages to be sent */
      if (agent_arg_ptr->node_count 
          != hostlist_count(agent_arg_ptr->hostlist)) {
            error("you said you were going to send to %d "
                 "hosts but I only have %d",
                 agent_arg_ptr->node_count,
                 hostlist_count(agent_arg_ptr->hostlist));
            return SLURM_FAILURE;   /* no messages to be sent */
      }
      return SLURM_SUCCESS;
}

static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr)
{
      int i = 0, j=0;
      agent_info_t *agent_info_ptr = NULL;
      thd_t *thread_ptr = NULL;
      int *span = NULL;
      int thr_count = 0;
            //forward_t forward;
      hostlist_t hl = NULL;
      char buf[8192];
      char *name = NULL;
      
      agent_info_ptr = xmalloc(sizeof(agent_info_t));
      slurm_mutex_init(&agent_info_ptr->thread_mutex);
      if (pthread_cond_init(&agent_info_ptr->thread_cond, NULL))
            fatal("pthread_cond_init error %m");
      agent_info_ptr->thread_count   = agent_arg_ptr->node_count;
      agent_info_ptr->retry          = agent_arg_ptr->retry;
      agent_info_ptr->threads_active = 0;
      thread_ptr = xmalloc(agent_info_ptr->thread_count * sizeof(thd_t));
      memset(thread_ptr, 0, (agent_info_ptr->thread_count * sizeof(thd_t)));
      agent_info_ptr->thread_struct  = thread_ptr;
      agent_info_ptr->msg_type       = agent_arg_ptr->msg_type;
      agent_info_ptr->msg_args_pptr  = &agent_arg_ptr->msg_args;
      
      if ((agent_arg_ptr->msg_type != REQUEST_SHUTDOWN)
      &&  (agent_arg_ptr->msg_type != REQUEST_RECONFIGURE)
      &&  (agent_arg_ptr->msg_type != SRUN_EXEC)
      &&  (agent_arg_ptr->msg_type != SRUN_TIMEOUT)
      &&  (agent_arg_ptr->msg_type != SRUN_NODE_FAIL)
      &&  (agent_arg_ptr->msg_type != SRUN_USER_MSG)
      &&  (agent_arg_ptr->msg_type != SRUN_JOB_COMPLETE)) {
            agent_info_ptr->get_reply = true;
            span = set_span(agent_arg_ptr->node_count, 0);
      } else {
            span = set_span(agent_arg_ptr->node_count, 
                        agent_arg_ptr->node_count);
      }
      i = 0;
      while(i < agent_info_ptr->thread_count) {
            thread_ptr[thr_count].state      = DSH_NEW;
            thread_ptr[thr_count].addr = agent_arg_ptr->addr;
            name = hostlist_shift(agent_arg_ptr->hostlist);
            if(!name) {
                  debug3("no more nodes to send to");
                  break;
            }
            hl = hostlist_create(name);
            if(thread_ptr[thr_count].addr && span[thr_count]) {
                  debug("warning: you will only be sending this to %s",
                        name);
                  span[thr_count] = 0;
            }
            free(name);
            i++;
            for(j = 0; j < span[thr_count]; j++) {
                  name = hostlist_shift(agent_arg_ptr->hostlist);
                  if(!name)
                        break;
                  /* info("adding %s", name); */
                  hostlist_push(hl, name);
                  free(name);
                  i++;
            }
            hostlist_uniq(hl);
            hostlist_ranged_string(hl, sizeof(buf), buf);
            hostlist_destroy(hl);
            thread_ptr[thr_count].nodelist = xstrdup(buf);
            
            /* info("sending to nodes %s",  */
/*               thread_ptr[thr_count].nodelist); */
            thr_count++;                   
      }
      xfree(span);
      agent_info_ptr->thread_count = thr_count;
      return agent_info_ptr;
}

static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx)
{
      task_info_t *task_info_ptr;
      task_info_ptr = xmalloc(sizeof(task_info_t));

      task_info_ptr->thread_mutex_ptr  = &agent_info_ptr->thread_mutex;
      task_info_ptr->thread_cond_ptr   = &agent_info_ptr->thread_cond;
      task_info_ptr->threads_active_ptr= &agent_info_ptr->threads_active;
      task_info_ptr->thread_struct_ptr = &agent_info_ptr->thread_struct[inx];
      task_info_ptr->get_reply         = agent_info_ptr->get_reply;
      task_info_ptr->msg_type          = agent_info_ptr->msg_type;
      task_info_ptr->msg_args_ptr      = *agent_info_ptr->msg_args_pptr;

      return task_info_ptr;
}

static void _update_wdog_state(thd_t *thread_ptr, 
                         state_t *state, 
                         thd_complete_t *thd_comp)
{
      switch(*state) {
      case DSH_ACTIVE:
            thd_comp->work_done = false;
            if (thread_ptr->end_time <= thd_comp->now) {
                  debug3("agent thread %lu timed out\n", 
                         (unsigned long) 
                         thread_ptr->thread);
                  if (pthread_kill(thread_ptr->thread, SIGUSR1) == ESRCH)
                        *state = DSH_NO_RESP;
            }
            break;
      case DSH_NEW:
            thd_comp->work_done = false;
            break;
      case DSH_DONE:
            if (thd_comp->max_delay < (int)thread_ptr->end_time)
                  thd_comp->max_delay = (int)thread_ptr->end_time;
            break;
      case DSH_NO_RESP:
            thd_comp->no_resp_cnt++;
            thd_comp->retry_cnt++;
            break;
      case DSH_FAILED:
            thd_comp->fail_cnt++;
            break;
      }
}

/* 
 * _wdog - Watchdog thread. Send SIGUSR1 to threads which have been active 
 *    for too long. 
 * IN args - pointer to agent_info_t with info on threads to watch
 * Sleep between polls with exponential times (from 0.125 to 1.0 second) 
 */
static void *_wdog(void *args)
{
      bool srun_agent = false;
      int i;
      agent_info_t *agent_ptr = (agent_info_t *) args;
      thd_t *thread_ptr = agent_ptr->thread_struct;
      unsigned long usec = 125000;
      ListIterator itr;
      thd_complete_t thd_comp;
      ret_data_info_t *ret_data_info = NULL;

      if ( (agent_ptr->msg_type == SRUN_JOB_COMPLETE)
      ||   (agent_ptr->msg_type == SRUN_EXEC)
      ||   (agent_ptr->msg_type == SRUN_PING)
      ||   (agent_ptr->msg_type == SRUN_TIMEOUT)
      ||   (agent_ptr->msg_type == SRUN_USER_MSG)
      ||   (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION)
      ||   (agent_ptr->msg_type == SRUN_NODE_FAIL) )
            srun_agent = true;

      thd_comp.max_delay = 0;
            
      while (1) {
            thd_comp.work_done   = true;/* assume all threads complete */
            thd_comp.fail_cnt    = 0;   /* assume no threads failures */
            thd_comp.no_resp_cnt = 0;   /* assume all threads respond */
            thd_comp.retry_cnt   = 0;   /* assume no required retries */
            thd_comp.now         = time(NULL);
            
            usleep(usec);
            usec = MIN((usec * 2), 1000000);          

            slurm_mutex_lock(&agent_ptr->thread_mutex);
            for (i = 0; i < agent_ptr->thread_count; i++) {
                  //info("thread name %s",thread_ptr[i].node_name);
                  if(!thread_ptr[i].ret_list) {
                        _update_wdog_state(&thread_ptr[i],
                                       &thread_ptr[i].state,
                                       &thd_comp);
                  } else {
                        itr = list_iterator_create(
                              thread_ptr[i].ret_list);
                        while((ret_data_info = list_next(itr))) {
                              _update_wdog_state(&thread_ptr[i],
                                             &ret_data_info->err,
                                             &thd_comp);
                        }
                        list_iterator_destroy(itr);
                  }
            }
            if (thd_comp.work_done)
                  break;
            
            slurm_mutex_unlock(&agent_ptr->thread_mutex);
      }

      if (srun_agent) {
              _notify_slurmctld_jobs(agent_ptr);
      } else {
            _notify_slurmctld_nodes(agent_ptr, 
                              thd_comp.no_resp_cnt, 
                              thd_comp.retry_cnt);
      }

      for (i = 0; i < agent_ptr->thread_count; i++) {
            if (thread_ptr[i].ret_list)
                  list_destroy(thread_ptr[i].ret_list);
            xfree(thread_ptr[i].nodelist);
      }
      
      if (thd_comp.max_delay)
            debug2("agent maximum delay %d seconds", thd_comp.max_delay);
      
      slurm_mutex_unlock(&agent_ptr->thread_mutex);
      return (void *) NULL;
}

static void _notify_slurmctld_jobs(agent_info_t *agent_ptr)
{
#if AGENT_IS_THREAD
      /* Locks: Write job */
      slurmctld_lock_t job_write_lock =
          { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
      uint32_t job_id = 0, step_id = 0;
      thd_t *thread_ptr = agent_ptr->thread_struct;

      if        (agent_ptr->msg_type == SRUN_PING) {
            srun_ping_msg_t *msg = *agent_ptr->msg_args_pptr;
            job_id  = msg->job_id;
            step_id = msg->step_id;
      } else if (agent_ptr->msg_type == SRUN_TIMEOUT) {
            srun_timeout_msg_t *msg = *agent_ptr->msg_args_pptr;
            job_id  = msg->job_id;
            step_id = msg->step_id;
      } else if (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) {
            resource_allocation_response_msg_t *msg =
                  *agent_ptr->msg_args_pptr;
            job_id  = msg->job_id;
            step_id = NO_VAL;
      } else if ((agent_ptr->msg_type == SRUN_JOB_COMPLETE)
      ||         (agent_ptr->msg_type == SRUN_EXEC)
      ||         (agent_ptr->msg_type == SRUN_USER_MSG)) {
            return;           /* no need to note srun response */
      } else if (agent_ptr->msg_type == SRUN_NODE_FAIL) {
            return;           /* no need to note srun response */
      } else {
            error("_notify_slurmctld_jobs invalid msg_type %u",
                  agent_ptr->msg_type);
            return;
      }
      lock_slurmctld(job_write_lock);
      if  (thread_ptr[0].state == DSH_DONE) {
            srun_response(job_id, step_id);
      }

      unlock_slurmctld(job_write_lock);
#else
      fatal("Code development needed here if agent is not thread");
#endif
}

static void _notify_slurmctld_nodes(agent_info_t *agent_ptr, 
                            int no_resp_cnt, int retry_cnt)
{
      ListIterator itr = NULL;
      ret_data_info_t *ret_data_info = NULL;
      state_t state;
      int is_ret_list = 1;

#if AGENT_IS_THREAD
      /* Locks: Read config, write job, write node */
      slurmctld_lock_t node_write_lock =
          { READ_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
#endif
      thd_t *thread_ptr = agent_ptr->thread_struct;
      int i;

      /* Notify slurmctld of non-responding nodes */
      if (no_resp_cnt) {
#if AGENT_IS_THREAD
            /* Update node table data for non-responding nodes */
            lock_slurmctld(node_write_lock);
            if (agent_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
                  /* Requeue the request */
                  batch_job_launch_msg_t *launch_msg_ptr = 
                              *agent_ptr->msg_args_pptr;
                  uint32_t job_id = launch_msg_ptr->job_id;
                  job_complete(job_id, 0, true, 0);
            }
            unlock_slurmctld(node_write_lock);
#else
            fatal("Code development needed here if agent is not thread");
#endif
      }
      if (retry_cnt && agent_ptr->retry)
            _queue_agent_retry(agent_ptr, retry_cnt);

      /* Update last_response on responding nodes */
#if AGENT_IS_THREAD
      lock_slurmctld(node_write_lock);
      for (i = 0; i < agent_ptr->thread_count; i++) {
            if(!thread_ptr[i].ret_list) {
                  state = thread_ptr[i].state;
                  is_ret_list = 0;
                  goto switch_on_state;
            } 
            is_ret_list = 1;
            
            itr = list_iterator_create(thread_ptr[i].ret_list);
            while((ret_data_info = list_next(itr))) {
                  state = ret_data_info->err;
            switch_on_state:
                  switch(state) {
                  case DSH_NO_RESP:
                        if(!is_ret_list) {
                              node_not_resp(thread_ptr[i].nodelist,
                                          thread_ptr[i].
                                          start_time);
                              break;
                        }

                        node_not_resp(ret_data_info->node_name,
                                    thread_ptr[i].start_time);
                        break;
                  case DSH_FAILED:
                        if(!is_ret_list) {
                              set_node_down(thread_ptr[i].nodelist, 
                                          "Prolog/epilog failure");
                              break;
                        }
                        set_node_down(ret_data_info->node_name,
                                    "Prolog/epilog failure");
                        break;
                  case DSH_DONE:
                        if(!is_ret_list) {
                              node_did_resp(thread_ptr[i].nodelist);
                              break;
                        }
                        node_did_resp(ret_data_info->node_name);
                        break;
                  default:
                        if(!is_ret_list) {
                              error("unknown state returned for %s",
                                    thread_ptr[i].nodelist);
                              break;
                        }
                        error("unknown state returned for %s",
                              ret_data_info->node_name);
                        break;
                  }
                  if(!is_ret_list)
                        goto finished;
            }
            list_iterator_destroy(itr);
finished:   ;
      }
      unlock_slurmctld(node_write_lock);
      if (run_scheduler) {
            run_scheduler = false;
            /* below functions all have their own locking */
            if (schedule())   {
                  schedule_job_save();
                  schedule_node_save();
            }
      }
      if ((agent_ptr->msg_type == REQUEST_PING) ||
          (agent_ptr->msg_type == REQUEST_NODE_REGISTRATION_STATUS))
            ping_end();
#else
      fatal("Code development needed here if agent is not thread");
#endif
}

/* Report a communications error for specified node */
static inline int _comm_err(char *node_name)
{
      int rc = 1;
#if AGENT_IS_THREAD
      if ((rc = is_node_resp (node_name)))
#endif
            error("agent/send_recv_msg: %s: %m", node_name);
      return rc;
}

/* return a value for wihc WEXITSTATUS returns 1 */
static int _wif_status(void)
{
      static int rc = 0;
      int i;

      if (rc)
            return rc;

      rc = 1;
      for (i=0; i<64; i++) {
            if (WEXITSTATUS(rc))
                  return rc;
            rc = rc << 1;
      }
      error("Could not identify WEXITSTATUS");
      rc = 1;
      return rc;
}

/*
 * _thread_per_group_rpc - thread to issue an RPC for a group of nodes
 *                         sending message out to one and forwarding it to
 *                         others if necessary.
 * IN/OUT args - pointer to task_info_t, xfree'd on completion
 */
static void *_thread_per_group_rpc(void *args)
{
      int rc = SLURM_SUCCESS;
      slurm_msg_t msg;
      task_info_t *task_ptr = (task_info_t *) args;
      /* we cache some pointers from task_info_t because we need 
       * to xfree args before being finished with their use. xfree 
       * is required for timely termination of this pthread because 
       * xfree could lock it at the end, preventing a timely
       * thread_exit */
      pthread_mutex_t *thread_mutex_ptr   = task_ptr->thread_mutex_ptr;
      pthread_cond_t  *thread_cond_ptr    = task_ptr->thread_cond_ptr;
      uint32_t        *threads_active_ptr = task_ptr->threads_active_ptr;
      thd_t           *thread_ptr         = task_ptr->thread_struct_ptr;
      state_t thread_state = DSH_NO_RESP;
      slurm_msg_type_t msg_type = task_ptr->msg_type;
      bool is_kill_msg, srun_agent;
      List ret_list = NULL;
      ListIterator itr;
      ret_data_info_t *ret_data_info = NULL;
      int found = 0;
      int sig_array[2] = {SIGUSR1, 0};

#if AGENT_IS_THREAD
      /* Locks: Write job, write node */
      slurmctld_lock_t job_write_lock = { 
            NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
#endif
      xassert(args != NULL);
      xsignal(SIGUSR1, _sig_handler);
      xsignal_unblock(sig_array);
      is_kill_msg = (   (msg_type == REQUEST_KILL_TIMELIMIT) ||
                  (msg_type == REQUEST_TERMINATE_JOB) );
      srun_agent = (    (msg_type == SRUN_PING)    ||
                  (msg_type == SRUN_EXEC)    ||
                  (msg_type == SRUN_JOB_COMPLETE) ||
                  (msg_type == SRUN_TIMEOUT) ||
                  (msg_type == SRUN_USER_MSG) ||
                  (msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
                  (msg_type == SRUN_NODE_FAIL) );

      thread_ptr->start_time = time(NULL);

      slurm_mutex_lock(thread_mutex_ptr);
      thread_ptr->state = DSH_ACTIVE;
      thread_ptr->end_time = thread_ptr->start_time + COMMAND_TIMEOUT;
      slurm_mutex_unlock(thread_mutex_ptr);

      /* send request message */
      slurm_msg_t_init(&msg);
      msg.msg_type = msg_type;
      msg.data     = task_ptr->msg_args_ptr;
/*    info("sending message type %u to %s", msg_type,
      thread_ptr->nodelist); */
      if (task_ptr->get_reply) {
            if(thread_ptr->addr) {
                  msg.address = *thread_ptr->addr;
                  if(!(ret_list = slurm_send_addr_recv_msgs(
                             &msg, thread_ptr->nodelist, 0))) {
                        error("_thread_per_group_rpc: "
                              "no ret_list given");
                        goto cleanup;
                  }
            } else {
                  if(!(ret_list = slurm_send_recv_msgs(
                             thread_ptr->nodelist,
                             &msg, 
                             0))) {
                        error("_thread_per_group_rpc: "
                              "no ret_list given");
                        goto cleanup;
                  }
            }
      } else {
            if(thread_ptr->addr) {
                  msg.address = *thread_ptr->addr;
            } else {
                  if(slurm_conf_get_addr(thread_ptr->nodelist,
                                     &msg.address) == SLURM_ERROR) {
                        error("_thread_per_group_rpc: "
                              "can't get address for "
                              "host %s", thread_ptr->nodelist);
                        goto cleanup;
                  }
            }
            if (slurm_send_only_node_msg(&msg) == SLURM_SUCCESS) {
                  thread_state = DSH_DONE;
            } else {
                  if (!srun_agent)
                        _comm_err(thread_ptr->nodelist);
            }
            goto cleanup;
      }
            
      //info("got %d messages back", list_count(ret_list));
      found = 0;
      itr = list_iterator_create(ret_list);           
      while((ret_data_info = list_next(itr)) != NULL) {
            rc = slurm_get_return_code(ret_data_info->type, 
                                 ret_data_info->data);
#if AGENT_IS_THREAD
            /* SPECIAL CASE: Mark node as IDLE if job already 
               complete */
            if (is_kill_msg && 
                (rc == ESLURMD_KILL_JOB_ALREADY_COMPLETE)) {
                  kill_job_msg_t *kill_job;
                  kill_job = (kill_job_msg_t *) 
                        task_ptr->msg_args_ptr;
                  rc = SLURM_SUCCESS;
                  lock_slurmctld(job_write_lock);
                  if (job_epilog_complete(kill_job->job_id, 
                                    ret_data_info->
                                    node_name, 
                                    rc))
                        run_scheduler = true;
                  unlock_slurmctld(job_write_lock);
            }
            /* SPECIAL CASE: Kill non-startable batch job, 
             * Requeue the job on ESLURMD_PROLOG_FAILED */
            if ((msg_type == REQUEST_BATCH_JOB_LAUNCH) && 
                (rc != SLURM_SUCCESS) && (rc != ESLURMD_PROLOG_FAILED) &&
                (ret_data_info->type != RESPONSE_FORWARD_FAILED)) {
                  batch_job_launch_msg_t *launch_msg_ptr = 
                        task_ptr->msg_args_ptr;
                  uint32_t job_id = launch_msg_ptr->job_id;
                  info("Killing non-startable batch job %u: %s", 
                       job_id, slurm_strerror(rc));
                  thread_state = DSH_DONE;
                  ret_data_info->err = thread_state;
                  lock_slurmctld(job_write_lock);
                  job_complete(job_id, 0, false, _wif_status());
                  unlock_slurmctld(job_write_lock);
                  continue;
            }
#endif
            
            
            if (((msg_type == REQUEST_SIGNAL_TASKS) || 
                 (msg_type == REQUEST_TERMINATE_TASKS)) && 
                 (rc == ESRCH)) {
                  /* process is already dead, not a real error */
                  rc = SLURM_SUCCESS;
            }
            
            switch (rc) {
            case SLURM_SUCCESS:
                  /*debug3("agent processed RPC to node %s", 
                    ret_data_info->node_name); */
                  thread_state = DSH_DONE;
                  break;
            case ESLURMD_EPILOG_FAILED:
                  error("Epilog failure on host %s, "
                        "setting DOWN", 
                        ret_data_info->node_name);
                        
                  thread_state = DSH_FAILED;
                  break;
            case ESLURMD_PROLOG_FAILED:
                  error("Prolog failure on host %s, "
                        "setting DOWN", 
                        ret_data_info->node_name);
                  thread_state = DSH_FAILED;
                  break;
            case ESLURM_INVALID_JOB_ID:  
                  /* Not indicative of a real error */
            case ESLURMD_JOB_NOTRUNNING: 
                  /* Not indicative of a real error */
                  debug2("agent processed RPC to node %s: %s",
                         ret_data_info->node_name,
                         slurm_strerror(rc));
                  
                  thread_state = DSH_DONE;
                  break;
            default:
                  if (!srun_agent) {
                        errno = ret_data_info->err;
                        rc = _comm_err(ret_data_info->node_name);
                  }
                  if(srun_agent)
                        thread_state = DSH_FAILED;
                  else if(ret_data_info->type == RESPONSE_FORWARD_FAILED)
                        /* check if a forward failed */
                        thread_state = DSH_NO_RESP;
                  else {      /* some will fail that don't mean anything went
                         * bad like a job term request on a job that is
                         * already finished, we will just exit on those
                         * cases */
                        thread_state = DSH_DONE;
                  }
            }     
            ret_data_info->err = thread_state;
      }
      list_iterator_destroy(itr);

cleanup:
      xfree(args);
      
      /* handled at end of thread just in case resend is needed */
      destroy_forward(&msg.forward);
      slurm_mutex_lock(thread_mutex_ptr);
      thread_ptr->ret_list = ret_list;
      thread_ptr->state = thread_state;
      thread_ptr->end_time = (time_t) difftime(time(NULL), 
                                     thread_ptr->start_time);
      /* Signal completion so another thread can replace us */
      (*threads_active_ptr)--;
      slurm_mutex_unlock(thread_mutex_ptr);
      pthread_cond_signal(thread_cond_ptr);
      return (void *) NULL;
}

/*
 * Signal handler.  We are really interested in interrupting hung communictions
 * and causing them to return EINTR. Multiple interupts might be required.
 */
static void _sig_handler(int dummy)
{
}

static int _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr, 
                    int count, int *spot)
{
      ret_data_info_t *ret_data_info = NULL;
      ListIterator itr = list_iterator_create(thread_ptr->ret_list);
      while((ret_data_info = list_next(itr))) {
            debug2("got err of %d", ret_data_info->err);
            if (ret_data_info->err != DSH_NO_RESP)
                  continue;

            debug("got the name %s to resend out of %d", 
                  ret_data_info->node_name, count);
                        
            if(agent_arg_ptr) {
                  hostlist_push(agent_arg_ptr->hostlist, 
                              ret_data_info->node_name);
                  
                  if ((++(*spot)) == count) {
                        list_iterator_destroy(itr);
                        return 1;
                  }
            }
      }
      list_iterator_destroy(itr);
      return 0;
}

/*
 * _queue_agent_retry - Queue any failed RPCs for later replay
 * IN agent_info_ptr - pointer to info on completed agent requests
 * IN count - number of agent requests which failed, count to requeue
 */
static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count)
{
      agent_arg_t *agent_arg_ptr;
      queued_request_t *queued_req_ptr = NULL;
      thd_t *thread_ptr = agent_info_ptr->thread_struct;
      int i, j;
      
      if (count == 0)
            return;

      /* build agent argument with just the RPCs to retry */
      agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
      agent_arg_ptr->node_count = count;
      agent_arg_ptr->retry = 1;
      agent_arg_ptr->hostlist = hostlist_create("");
      agent_arg_ptr->msg_type = agent_info_ptr->msg_type;
      agent_arg_ptr->msg_args = *(agent_info_ptr->msg_args_pptr);
      *(agent_info_ptr->msg_args_pptr) = NULL;

      j = 0;
      for (i = 0; i < agent_info_ptr->thread_count; i++) {
            if(!thread_ptr[i].ret_list) {
                  if (thread_ptr[i].state != DSH_NO_RESP)
                        continue;

                  debug("got the name %s to resend", 
                        thread_ptr[i].nodelist);
                  hostlist_push(agent_arg_ptr->hostlist, 
                              thread_ptr[i].nodelist);

                  if ((++j) == count)
                        break;
            } else {
                  if(_setup_requeue(agent_arg_ptr, &thread_ptr[i], 
                                count, &j))
                        break;
            }
      }
      if (count != j) {
            error("agent: Retry count (%d) != actual count (%d)", 
                  count, j);
            agent_arg_ptr->node_count = j;
      }
      debug2("Queue RPC msg_type=%u, nodes=%d for retry", 
             agent_arg_ptr->msg_type, j);

      /* add the requeust to a list */
      queued_req_ptr = xmalloc(sizeof(queued_request_t));
      queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
      queued_req_ptr->last_attempt  = time(NULL);
      slurm_mutex_lock(&retry_mutex);
      if (retry_list == NULL) {
            retry_list = list_create(_list_delete_retry);
            if (retry_list == NULL)
                  fatal("list_create failed");
      }
      if (list_append(retry_list, (void *) queued_req_ptr) == 0)
            fatal("list_append failed");
      slurm_mutex_unlock(&retry_mutex);
}

/*
 * _list_delete_retry - delete an entry from the retry list, 
 *    see common/list.h for documentation
 */
static void _list_delete_retry(void *retry_entry)
{
      queued_request_t *queued_req_ptr;

      if (! retry_entry)
            return;

      queued_req_ptr = (queued_request_t *) retry_entry;
      _purge_agent_args(queued_req_ptr->agent_arg_ptr);
      xfree(queued_req_ptr);
}


/*
 * agent_retry - Agent for retrying pending RPCs. One pending request is 
 *    issued if it has been pending for at least min_wait seconds
 * IN min_wait - Minimum wait time between re-issue of a pending RPC
 * RET count of queued requests remaining
 */
extern int agent_retry (int min_wait)
{
      int list_size = 0;
      time_t now = time(NULL);
      queued_request_t *queued_req_ptr = NULL;
      agent_arg_t *agent_arg_ptr = NULL;
      ListIterator retry_iter;

      slurm_mutex_lock(&retry_mutex);
      if (retry_list) {
            static time_t last_msg_time = (time_t) 0;
            uint32_t msg_type[5], i = 0;
            list_size = list_count(retry_list);
            if ((list_size > MAX_AGENT_CNT) 
            &&  (difftime(now, last_msg_time) > 300)) {
                  /* Note sizable backlog of work */
                  info("WARNING: agent retry_list size is %d", 
                        list_size);
                  retry_iter = list_iterator_create(retry_list);
                  while ((queued_req_ptr = (queued_request_t *) 
                              list_next(retry_iter))) {
                        agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
                        msg_type[i++] = agent_arg_ptr->msg_type;
                        if (i == 5)
                              break;
                  }
                  list_iterator_destroy(retry_iter);
                  info("   retry_list msg_type=%u,%u,%u,%u,%u", 
                        msg_type[0], msg_type[1], msg_type[2],
                        msg_type[3], msg_type[4]);
                  last_msg_time = now;
            }
      }
      if (agent_cnt >= MAX_AGENT_CNT) {   /* too much work already */
            slurm_mutex_unlock(&retry_mutex);
            return list_size;
      }

      if (retry_list) {
            /* first try to find a new (never tried) record */

            retry_iter = list_iterator_create(retry_list);
            while ((queued_req_ptr = (queued_request_t *)
                        list_next(retry_iter))) {
                  if (queued_req_ptr->last_attempt == 0) {
                        list_remove(retry_iter);
                        list_size--;
                        break;
                  }
            }
            list_iterator_destroy(retry_iter);
      }

      if (retry_list && (queued_req_ptr == NULL)) {
            /* now try to find a requeue request that is 
             * relatively old */
            double age = 0;

            retry_iter = list_iterator_create(retry_list);
            /* next try to find an older record to retry */
            while ((queued_req_ptr = (queued_request_t *) 
                        list_next(retry_iter))) {
                  age = difftime(now, queued_req_ptr->last_attempt);
                  if (age > min_wait) {
                        list_remove(retry_iter);
                        list_size--;
                        break;
                  }
            }
            list_iterator_destroy(retry_iter);
      }
      slurm_mutex_unlock(&retry_mutex);

      if (queued_req_ptr) {
            agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
            xfree(queued_req_ptr);
            if (agent_arg_ptr)
                  _spawn_retry_agent(agent_arg_ptr);
            else
                  error("agent_retry found record with no agent_args");
      } else {
            mail_info_t *mi = NULL;
            slurm_mutex_lock(&mail_mutex);
            if (mail_list)
                  mi = (mail_info_t *) list_dequeue(mail_list);
            slurm_mutex_unlock(&mail_mutex);
            if (mi)
                  _mail_proc(mi);
      }

      return list_size;
}

/*
 * agent_queue_request - put a new request on the queue for execution or
 *    execute now if not too busy
 * IN agent_arg_ptr - the request to enqueue
 */
void agent_queue_request(agent_arg_t *agent_arg_ptr)
{
      queued_request_t *queued_req_ptr = NULL;

      if (agent_arg_ptr->msg_type == REQUEST_SHUTDOWN) { /* execute now */
            pthread_attr_t attr_agent;
            pthread_t thread_agent;
            int rc;
            slurm_attr_init(&attr_agent);
            if (pthread_attr_setdetachstate
                        (&attr_agent, PTHREAD_CREATE_DETACHED))
                  error("pthread_attr_setdetachstate error %m");
            rc = pthread_create(&thread_agent, &attr_agent,
                            agent, (void *) agent_arg_ptr);
            slurm_attr_destroy(&attr_agent);
            if (rc == 0) {
                  sleep(1);
                  if (!pthread_kill(thread_agent, 0))
                        info("Shutdown agent still running");
                  return;
            }
      }

      queued_req_ptr = xmalloc(sizeof(queued_request_t));
      queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
/*    queued_req_ptr->last_attempt  = 0; Implicit */

      slurm_mutex_lock(&retry_mutex);
      if (retry_list == NULL) {
            retry_list = list_create(_list_delete_retry);
            if (retry_list == NULL)
                  fatal("list_create failed");
      }
      list_append(retry_list, (void *)queued_req_ptr);
      slurm_mutex_unlock(&retry_mutex);
}

/* _spawn_retry_agent - pthread_create an agent for the given task */
static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr)
{
      int retries = 0;
      pthread_attr_t attr_agent;
      pthread_t thread_agent;

      if (agent_arg_ptr == NULL)
            return;

      debug2("Spawning RPC agent for msg_type %u", 
             agent_arg_ptr->msg_type);
      slurm_attr_init(&attr_agent);
      if (pthread_attr_setdetachstate(&attr_agent,
                              PTHREAD_CREATE_DETACHED))
            error("pthread_attr_setdetachstate error %m");
      while (pthread_create(&thread_agent, &attr_agent,
                  agent, (void *) agent_arg_ptr)) {
            error("pthread_create error %m");
            if (++retries > MAX_RETRIES)
                  fatal("Can't create pthread");
            sleep(1);   /* sleep and try again */
      }
      slurm_attr_destroy(&attr_agent);
}

/* _slurmctld_free_batch_job_launch_msg is a variant of 
 *    slurm_free_job_launch_msg because all environment variables currently
 *    loaded in one xmalloc buffer (see get_job_env()), which is different
 *    from how slurmd assembles the data from a message
 */
static void _slurmctld_free_batch_job_launch_msg(batch_job_launch_msg_t * msg)
{
      if (msg) {
            if (msg->environment) {
                  xfree(msg->environment[0]);
                  xfree(msg->environment);
            }
            slurm_free_job_launch_msg(msg);
      }
}

/* agent_purge - purge all pending RPC requests */
void agent_purge(void)
{
      if (retry_list) {
            slurm_mutex_lock(&retry_mutex);
            list_destroy(retry_list);
            retry_list = NULL;
            slurm_mutex_unlock(&retry_mutex);
      }
      if (mail_list) {
            slurm_mutex_lock(&mail_mutex);
            list_destroy(mail_list);
            mail_list = NULL;
            slurm_mutex_unlock(&mail_mutex);
      }
}
extern int get_agent_count(void)
{
      return agent_cnt;
}

static void _purge_agent_args(agent_arg_t *agent_arg_ptr)
{
      if (agent_arg_ptr == NULL)
            return;

      hostlist_destroy(agent_arg_ptr->hostlist);
      xfree(agent_arg_ptr->addr);
      if (agent_arg_ptr->msg_args) {
            if (agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH)
                  _slurmctld_free_batch_job_launch_msg(
                        agent_arg_ptr->msg_args);
            else if (agent_arg_ptr->msg_type == 
                        RESPONSE_RESOURCE_ALLOCATION)
                  slurm_free_resource_allocation_response_msg(
                              agent_arg_ptr->msg_args);
            else if ((agent_arg_ptr->msg_type == REQUEST_TERMINATE_JOB)
            ||       (agent_arg_ptr->msg_type == REQUEST_KILL_TIMELIMIT))
                  slurm_free_kill_job_msg(agent_arg_ptr->msg_args);
            else if (agent_arg_ptr->msg_type == SRUN_USER_MSG)
                  slurm_free_srun_user_msg(agent_arg_ptr->msg_args);
            else if (agent_arg_ptr->msg_type == SRUN_EXEC)
                  slurm_free_srun_exec_msg(agent_arg_ptr->msg_args);
            else
                  xfree(agent_arg_ptr->msg_args);
      }
      xfree(agent_arg_ptr);
}

static mail_info_t *_mail_alloc(void)
{
      return xmalloc(sizeof(mail_info_t));
}

static void _mail_free(void *arg)
{
      mail_info_t *mi = (mail_info_t *) arg;

      if (mi) {
            xfree(mi->user_name);
            xfree(mi->message);
            xfree(mi);
      }
}

/* process an email request and free the record */
static void _mail_proc(mail_info_t *mi)
{
      pid_t pid;

      pid = fork();
      if (pid < 0) {          /* error */
            error("fork(): %m");
      } else if (pid == 0) {  /* child */
            int fd;
            (void) close(0);
            (void) close(1);
            (void) close(2);
            fd = open("/dev/null", O_RDWR);
            dup(fd);
            dup(fd);
            execle(slurmctld_conf.mail_prog, "mail", 
                  "-s", mi->message, mi->user_name,
                  NULL, NULL);
            error("Failed to exec %s: %m", 
                  slurmctld_conf.mail_prog);
            exit(1);
      } else {          /* parent */
            waitpid(pid, NULL, 0);
      }
      _mail_free(mi);
      return;
}

static char *_mail_type_str(uint16_t mail_type)
{
      if (mail_type == MAIL_JOB_BEGIN)
            return "Began";
      if (mail_type == MAIL_JOB_END)
            return "Ended";
      if (mail_type == MAIL_JOB_FAIL)
            return "Failed";
      return "unknown";
}

/*
 * mail_job_info - Send e-mail notice of job state change
 * IN job_ptr - job identification
 * IN state_type - job transition type, see MAIL_JOB in slurm.h
 */
extern void mail_job_info (struct job_record *job_ptr, uint16_t mail_type)
{
      mail_info_t *mi = _mail_alloc();

      if (!job_ptr->mail_user)
            mi->user_name = xstrdup(uid_to_string((uid_t)job_ptr->user_id));
      else
            mi->user_name = xstrdup(job_ptr->mail_user);

      mi->message = xmalloc(sizeof(char)*128);
      sprintf(mi->message, "SLURM Job_id=%u Name=%.24s %s",
            job_ptr->job_id, job_ptr->name, 
            _mail_type_str(mail_type));

      info ("msg to %s: %s", mi->user_name, mi->message);

      slurm_mutex_lock(&mail_mutex);
      if (!mail_list) {
            mail_list = list_create(_mail_free);
            if (!mail_list)
                  fatal("list_create failed");
      }
      if (!list_enqueue(mail_list, (void *) mi))
            fatal("list_enqueue failed");
      slurm_mutex_unlock(&mail_mutex);
      return;
}


Generated by  Doxygen 1.6.0   Back to index