Index: trunk/torque_gss/src/server/req_runjob.c =================================================================== --- branches/VENDOR/torque_gss/torque_gss/src/server/req_runjob.c (revision 528) +++ trunk/torque_gss/src/server/req_runjob.c (revision 684) @@ -91,6 +91,7 @@ #include #include #include +#include "dis.h" #include "libpbs.h" #include "server_limits.h" #include "list_link.h" @@ -105,6 +106,7 @@ #include "pbs_error.h" #include "log.h" #include "acct.h" +#include "dis.h" #include "svrfunc.h" #include "net_connect.h" #include "pbs_proto.h" @@ -122,7 +125,7 @@ void stream_eof(int, u_long, int); extern int job_set_wait(attribute *, void *, int); extern void stat_mom_job(job *); - +extern struct connection svr_conn[]; extern int LOGLEVEL; /* Public Functions in this file */ @@ -673,17 +676,26 @@ { int f; int rc; -#ifdef BOEING +#if defined(BOEING) || defined(GSSAPI) int sock, nodenum; struct hostent *hp; char *nodestr, *cp, *hostlist; int size; - + enum conn_type cntype = ToServerDIS; struct sockaddr_in saddr; badplace *bp; - char *id = "svr_startjob"; + char * lasthost=NULL; + char * hostname, *ccname=NULL; + extern char *path_creds; + int retries,i; + int con; + int port= pbs_mom_port; + pbs_net_t hostaddr; + int broke; + char *id = "svr_startjob"; + #endif if (FailHost != NULL) @@ -747,7 +759,7 @@ return(rc); } -#ifdef BOEING +#if defined(BOEING) || defined(GSSAPI) /* Verify that all the nodes are alive via a TCP connect. */ /* NOTE: Copy the nodes into a temp string because strtok() is destructive. */ @@ -769,174 +781,289 @@ hostlist[size] = '\0'; nodestr = strtok(hostlist, "+"); } - + lasthost = NULL; while (nodestr != NULL) - { - /* truncate from trailing slash on (if one exists). */ - - if ((cp = strchr(nodestr, '/')) != NULL) - { - cp[0] = '\0'; - } - - /* Lookup IP address of host. */ - - if ((hp = gethostbyname(nodestr)) == NULL) - { - sprintf(log_buffer, "could not contact %s (gethostbyname failed, errno: %d (%s))", - nodestr, - errno, - pbs_strerror(errno)); - - if (FailHost != NULL) - strncpy(FailHost, nodestr, 1024); - - if (EMsg != NULL) - strncpy(EMsg, log_buffer, 1024); - - log_record( - PBSEVENT_JOB, - PBS_EVENTCLASS_JOB, - pjob->ji_qs.ji_jobid, - log_buffer); - - /* Add this host to the reject destination list for the job */ - - bp = (badplace *)malloc(sizeof(badplace)); - - if (bp == NULL) - { - log_err(errno, id, msg_err_malloc); - - return; - } - - CLEAR_LINK(bp->bp_link); - - strcpy(bp->bp_dest, nodestr); - - append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); - - /* FAILURE - cannot lookup master compute host */ - - return(PBSE_RESCUNAV); - } - - /* open a socket. */ - - /* NOTE: should change to PF_* */ - - if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) - { - sprintf(log_buffer, "could not contact %s (cannot create socket, errno: %d (%s))", - nodestr, - errno, - pbs_strerror(errno)); - - if (FailHost != NULL) - strncpy(FailHost, nodestr, 1024); - - if (EMsg != NULL) - strncpy(EMsg, log_buffer, 1024); - - log_record( - PBSEVENT_JOB, - PBS_EVENTCLASS_JOB, - pjob->ji_qs.ji_jobid, - log_buffer); - - /* Add this host to the reject destination list for the job */ - - bp = (badplace *)malloc(sizeof(badplace)); - - if (bp == NULL) - { - /* FAILURE - cannot allocate memory */ - - log_err(errno, id, msg_err_malloc); - - return(PBSE_RESCUNAV); - } - - CLEAR_LINK(bp->bp_link); - - strcpy(bp->bp_dest, nodestr); - - append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); - - /* FAILURE - cannot create socket for master compute host */ - - return(PBSE_RESCUNAV); - } - - /* Set the host information. */ - - memset(&saddr, '\0', sizeof(saddr)); - - saddr.sin_family = AF_INET; - - memcpy(&saddr.sin_addr, hp->h_addr, hp->h_length); - - saddr.sin_port = htons(pbs_rm_port); - - /* Connect to the host. */ - - if (connect(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) - { - sprintf(log_buffer, "could not contact %s (connect failed, errno: %d (%s))", - nodestr, - errno, - pbs_strerror(errno)); - - if (FailHost != NULL) - strncpy(FailHost, nodestr, 1024); - - if (EMsg != NULL) - strncpy(EMsg, log_buffer, 1024); - - log_record( - PBSEVENT_JOB, - PBS_EVENTCLASS_JOB, - pjob->ji_qs.ji_jobid, - log_buffer); - - /* Add this host to the reject list for the job */ - - bp = (badplace *)malloc(sizeof(badplace)); - - if (bp == NULL) - { - /* FAILURE - cannot allocate memory */ - - log_err(errno, id, msg_err_malloc); - - return(PBSE_RESCUNAV); - } - - CLEAR_LINK(bp->bp_link); - - strcpy(bp->bp_dest, nodestr); - - append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); - - /* FAILURE - cannot connect to master compute host */ - - return(PBSE_RESCUNAV); - } - - /* clean up and get next host. */ - - close(sock); - - nodestr = strtok(NULL, "+"); - } /* END while (nodestr != NULL) */ + { + /* truncate from trailing slash on (if one exists). */ + + if ((cp = strchr(nodestr, '/')) != NULL) + { + cp[0] = '\0'; + } +/* No need to do duplicate node checks only helps if nodestr is + * in host name order, noticed to be generally the case */ + if(lasthost == NULL){ + lasthost=nodestr; + }else + { + if((strlen(lasthost)== strlen(nodestr))&&(strcmp(lasthost,nodestr)==0)) + { + nodestr = strtok(NULL, "+"); + continue; + }else{ + lasthost=nodestr; + } + } + + /* Lookup IP address of host. */ + + if ((hp = gethostbyname(nodestr)) == NULL) + { + sprintf(log_buffer, "could not contact %s (gethostbyname failed, errno: %d (%s))", + nodestr, + errno, + pbs_strerror(errno)); + + if (FailHost != NULL) + strncpy(FailHost, nodestr, 1024); + + if (EMsg != NULL) + strncpy(EMsg, log_buffer, 1024); + + log_record( + PBSEVENT_JOB, + PBS_EVENTCLASS_JOB, + pjob->ji_qs.ji_jobid, + log_buffer); + + /* Add this host to the reject destination list for the job */ + + bp = (badplace *)malloc(sizeof(badplace)); + + if (bp == NULL) + { + log_err(errno, id, msg_err_malloc); + + return; + } + + CLEAR_LINK(bp->bp_link); + + strcpy(bp->bp_dest, nodestr); + + append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); + + /* FAILURE - cannot lookup master compute host */ + + return(PBSE_RESCUNAV); + } + + /* open a socket. */ +#ifdef BOEING + /* NOTE: should change to PF_* */ + + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) + { + sprintf(log_buffer, "could not contact %s (cannot create socket, errno: %d (%s))", + nodestr, + errno, + pbs_strerror(errno)); + + if (FailHost != NULL) + strncpy(FailHost, nodestr, 1024); + + if (EMsg != NULL) + strncpy(EMsg, log_buffer, 1024); + + log_record( + PBSEVENT_JOB, + PBS_EVENTCLASS_JOB, + pjob->ji_qs.ji_jobid, + log_buffer); + + /* Add this host to the reject destination list for the job */ + + bp = (badplace *)malloc(sizeof(badplace)); + + if (bp == NULL) + { + /* FAILURE - cannot allocate memory */ + + log_err(errno, id, msg_err_malloc); + + return(PBSE_RESCUNAV); + } + + CLEAR_LINK(bp->bp_link); + + strcpy(bp->bp_dest, nodestr); + + append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); + + /* FAILURE - cannot create socket for master compute host */ + + return(PBSE_RESCUNAV); + } + + /* Set the host information. */ + + memset(&saddr, '\0', sizeof(saddr)); + + saddr.sin_family = AF_INET; + + memcpy(&saddr.sin_addr, hp->h_addr, hp->h_length); + + saddr.sin_port = htons(pbs_rm_port); + + /* Connect to the host. */ + + if (connect(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) + { + sprintf(log_buffer, "could not contact %s (connect failed, errno: %d (%s))", + nodestr, + errno, + pbs_strerror(errno)); + + if (FailHost != NULL) + strncpy(FailHost, nodestr, 1024); + + if (EMsg != NULL) + strncpy(EMsg, log_buffer, 1024); + + log_record( + PBSEVENT_JOB, + PBS_EVENTCLASS_JOB, + pjob->ji_qs.ji_jobid, + log_buffer); + + /* Add this host to the reject list for the job */ + + bp = (badplace *)malloc(sizeof(badplace)); + + if (bp == NULL) + { + /* FAILURE - cannot allocate memory */ + + log_err(errno, id, msg_err_malloc); + + return(PBSE_RESCUNAV); + } + + CLEAR_LINK(bp->bp_link); + + strcpy(bp->bp_dest, nodestr); + + append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); + + /* FAILURE - cannot connect to master compute host */ + + return(PBSE_RESCUNAV); + } + /* clean up and get next host. */ + close(sock); + +#else /* GSSAPI */ + cntype = ToServerDIS; + hostaddr=get_hostaddr(nodestr); + broke=0; + if ((con = svr_connect(hostaddr, port, 0, cntype)) == PBS_NET_RC_FATAL) + { + sprintf(log_buffer, "send_job failed to %lx port %d", + hostaddr, + port); + + /* log_err(pbs_errno, id, log_buffer); */ + /* exit(1); */ + /*push bad node info*/ + broke=1; + + } + + if (con == PBS_NET_RC_RETRY) + { + pbs_errno = 0; /* should retry */ + + continue; + } +#ifdef GSIAPI + if ( globus_module_activate(GLOBUS_GSI_GSSAPI_MODULE) !=GLOBUS_SUCCESS || + globus_module_activate(GLOBUS_GSI_GSS_ASSIST_MODULE) !=GLOBUS_SUCCESS ){ + /* return -1; */ + return(PBSE_RESCUNAV); + } +#endif + DIS_tcp_setup(connection[con].ch_socket); + if (encode_DIS_ReqHdr(connection[con].ch_socket, + PBS_BATCH_ForwardCreds, + pbs_current_user) || + encode_DIS_JobId(connection[con].ch_socket,pjob->ji_qs.ji_jobid) || + encode_DIS_ReqExtend(connection[con].ch_socket,0)) { + /* exit(1); */ + return(PBSE_RESCUNAV); + } + DIS_tcp_wflush(connection[con].ch_socket); + + /* do client gss auth */ + hostname = get_hostnamefromaddr(hostaddr); + if (hostname == NULL) { + sprintf(log_buffer,"send job failed: couldn't get hostname for %lx\n",hostaddr); + log_err(0,"svr_movejob get_hostname",log_buffer); + /* exit(1); */ + broke=1; + } + + /* Forward job's credentials to the server. + in a child, so ok to block */ + ccname = ccname_for_job(pjob->ji_qs.ji_jobid,path_creds); + if (setenv("X509_USER_PROXY",ccname,1)) { + perror("Couldn't put X509_USER_PROXY into environment"); + /* exit(1); */ + return(PBSE_RESCUNAV); + } + if(getenv("GLOBUS_LOCATION") == NULL){ + if (setenv("GLOBUS_LOCATION","/opt/vdt/globus",1)) { + perror("svr_movejob_gsi:Couldn't put default GLOBUS_LOCATION into environment"); + return(PBSE_RESCUNAV); + } + } + sprintf(log_buffer,"svr_movejob::hostname for %lx ccname %s\n",hostaddr,ccname); + log_err(0,"svr_movejob ccname",log_buffer); + retries = 0; + if(!broke){ + while ((i = pbsgss_client_authenticate(hostname, connection[con].ch_socket,1,0)) != 0) { + fprintf(stderr,"send job failed: Couldn't authenticate as user to %s:%d : %d\n",hostname,con,i); + if(i== -1){ + broke=1; + break; + } + if (retries++ > 2) { + /* exit(1); */ + broke=1; + break; + } + } + } + free(ccname); + free(hostname); + /* clean up and get next host. */ + close(connection[con].ch_socket); + DIS_tcp_release(connection[con].ch_socket); + svr_disconnect(con); + if(broke){ + bp = (badplace *)malloc(sizeof(badplace)); + if (bp == NULL) + { + /* FAILURE - cannot allocate memory */ + log_err(errno, id, msg_err_malloc); + return(PBSE_RESCUNAV); + } + CLEAR_LINK(bp->bp_link); + strcpy(bp->bp_dest, nodestr); + append_link(&pjob->ji_rejectdest, &bp->bp_link, bp); + /* FAILURE - cannot create socket for master compute host */ + } + broke=0; +#endif /* end GSSAPI */ + + nodestr = strtok(NULL, "+"); + } /* END while (nodestr != NULL) */ if (hostlist != NULL) free(hostlist); /* END MOM verification check via TCP. */ -#endif /* END BOEING */ +#endif /* END BOEING or GSSAPI */ /* Next, are there files to be staged-in? */ @@ -1005,7 +1132,7 @@ pattr = &pjob->ji_wattr[(int)JOB_ATR_start_time]; - if ((pattr->at_flags & ATR_VFLAG_SET) == 0) + if ((pjob->ji_wattr[(int)JOB_ATR_restart_name].at_flags & ATR_VFLAG_SET) == 0) { pattr->at_val.at_long = time(NULL); pattr->at_flags |= ATR_VFLAG_SET; @@ -1019,6 +1146,14 @@ /* send the job to MOM */ svr_setjobstate(pjob,JOB_STATE_RUNNING,JOB_SUBSTATE_PRERUN); + + /* if job start timeout attribute is set use its value */ + + if (((server.sv_attr[(int)SRV_ATR_JobStartTimeout].at_flags & ATR_VFLAG_SET) != 0) + && (server.sv_attr[(int)SRV_ATR_JobStartTimeout].at_val.at_long > 0)) + { + DIS_tcp_settimeout(server.sv_attr[(int)SRV_ATR_JobStartTimeout].at_val.at_long); + } if (send_job( pjob, @@ -1030,8 +1165,12 @@ { /* SUCCESS */ + DIS_tcp_settimeout(server.sv_attr[(int)SRV_ATR_tcp_timeout].at_val.at_long); + return(0); } + + DIS_tcp_settimeout(server.sv_attr[(int)SRV_ATR_tcp_timeout].at_val.at_long); sprintf(tmpLine, "unable to run job, send to MOM '%s' failed",