Skip to content

Commit

Permalink
Fix improperly merged 14.10 job array changes to job (especially
Browse files Browse the repository at this point in the history
update_status). Fixes #15.
  • Loading branch information
natefoo committed Nov 13, 2018
1 parent e9e511a commit a2319d7
Showing 1 changed file with 225 additions and 48 deletions.
273 changes: 225 additions & 48 deletions slurm_drmaa/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
#include <slurm/slurmdb.h>
#include <stdint.h>


static int
slurmdrmaa_id_in_array_expr( const char *array_expr, uint32_t id );


static void
slurmdrmaa_job_control( fsd_job_t *self, int action )
{
Expand All @@ -55,8 +60,12 @@ slurmdrmaa_job_control( fsd_job_t *self, int action )
switch( action )
{
case DRMAA_CONTROL_SUSPEND:
#if SLURM_VERSION_NUMBER > SLURM_VERSION_NUM(14,10,0)
if(slurm_suspend2(self->job_id, NULL) == -1) {
#else
if(slurm_suspend(fsd_atoi(self->job_id)) == -1) {
fsd_exc_raise_fmt( FSD_ERRNO_INTERNAL_ERROR,"slurm_suspend error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
#endif
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "slurm_suspend error: %s,job_id: %s", slurm_strerror(slurm_get_errno()), self->job_id);
}
slurm_self->user_suspended = true;
break;
Expand All @@ -68,12 +77,16 @@ slurmdrmaa_job_control( fsd_job_t *self, int action )
job_desc.priority = 0;
job_desc.alloc_sid = 0;
if(slurm_update_job(&job_desc) == -1) {
fsd_exc_raise_fmt( FSD_ERRNO_INTERNAL_ERROR,"slurm_update_job error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "slurm_update_job error: %s,job_id: %s", slurm_strerror(slurm_get_errno()), self->job_id);
}
break;
case DRMAA_CONTROL_RESUME:
#if SLURM_VERSION_NUMBER > SLURM_VERSION_NUM(14,10,0)
if(slurm_resume2(self->job_id, NULL) == -1) {
#else
if(slurm_resume(fsd_atoi(self->job_id)) == -1) {
fsd_exc_raise_fmt( FSD_ERRNO_INTERNAL_ERROR,"slurm_resume error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
#endif
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "slurm_resume error: %s,job_id: %s", slurm_strerror(slurm_get_errno()), self->job_id);
}
slurm_self->user_suspended = false;
break;
Expand All @@ -83,12 +96,16 @@ slurmdrmaa_job_control( fsd_job_t *self, int action )
job_desc.priority = INFINITE;
job_desc.job_id = atoi(self->job_id);
if(slurm_update_job(&job_desc) == -1) {
fsd_exc_raise_fmt( FSD_ERRNO_INTERNAL_ERROR,"slurm_update_job error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "slurm_update_job error: %s,job_id: %s", slurm_strerror(slurm_get_errno()), self->job_id);
}
break;
case DRMAA_CONTROL_TERMINATE:
if(slurm_kill_job(fsd_atoi(self->job_id),SIGKILL,0) == -1) {
fsd_exc_raise_fmt( FSD_ERRNO_INTERNAL_ERROR,"slurm_terminate_job error: %s,job_id: %s",slurm_strerror(slurm_get_errno()),self->job_id);
#if SLURM_VERSION_NUMBER > SLURM_VERSION_NUM(14,10,0)
if(slurm_kill_job2(self->job_id, SIGKILL, 0) == -1) {
#else
if(slurm_kill_job(fsd_atoi(self->job_id), SIGKILL, 0) == -1) {
#endif
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "slurm_terminate_job error: %s,job_id: %s", slurm_strerror(slurm_get_errno()), self->job_id);
}
break;
default:
Expand All @@ -110,42 +127,205 @@ slurmdrmaa_job_control( fsd_job_t *self, int action )
}


static void
slurmdrmaa_job_update_status( fsd_job_t *self )
{
job_info_msg_t *job_info = NULL;
slurmdrmaa_job_t * slurm_self = (slurmdrmaa_job_t *) self;
job_id_spec_t job_id_spec;
static slurm_job_info_t*
slurmdrmaa_find_job_info( fsd_job_t *self, job_info_msg_t **job_info ) {
const char* str_i;

fsd_assert( job_info );

fsd_log_enter(( "({job_id=%s})", self->job_id ));

fsd_mutex_lock( &self->session->drm_connection_mutex );
TRY
{
job_id_spec.original = self->job_id;
self->job_id = slurmdrmaa_set_job_id(&job_id_spec);
if (! (str_i = strchr( self->job_id, '_' ))) {
/* single job */
if ( slurm_load_job( job_info, fsd_atoi( self->job_id ), SHOW_ALL) ) {
int _slurm_errno = slurm_get_errno();

if ( slurm_load_job( &job_info, fsd_atoi(self->job_id), SHOW_ALL) ) {
if (_slurm_errno == ESLURM_INVALID_JOB_ID) {
self->on_missing(self);
} else if (_slurm_errno == SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT ||
_slurm_errno == SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR) {
fsd_exc_raise_fmt(FSD_ERRNO_DRM_COMMUNICATION_FAILURE, "slurm_load_jobs error: %s,job_id: %s", slurm_strerror(_slurm_errno), self->job_id);
} else {
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "slurm_load_jobs error: %s,job_id: %s", slurm_strerror(slurm_get_errno()), self->job_id);
}
}

if ( *job_info ) {
return &(*job_info)->job_array[0];
}

return NULL;
} else {
/* subtask of array job */

char parent_job[128];
uint32_t sub_len;
uint32_t task_id;
uint32_t r_i;

sub_len = str_i - self->job_id;
task_id = atol( str_i + 1 );

fsd_assert( sub_len + 1 < sizeof( parent_job ) );

if ( sub_len >= sizeof( parent_job ))
sub_len = sizeof( parent_job ) - 1;

memset( parent_job, 0, sizeof(parent_job) );
strncpy( parent_job, self->job_id, sub_len );

fsd_log_debug(( "looking for task (%u) of job (%s)", task_id, parent_job ));

if ( slurm_load_job( job_info, fsd_atoi( parent_job ), SHOW_ALL) ) {
int _slurm_errno = slurm_get_errno();

if (_slurm_errno == ESLURM_INVALID_JOB_ID) {
self->on_missing(self);
} else if (_slurm_errno == SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT ||
_slurm_errno == SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR) {
fsd_exc_raise_fmt(FSD_ERRNO_DRM_COMMUNICATION_FAILURE,"slurm_load_jobs error: %s,job_id: %s", slurm_strerror(_slurm_errno), self->job_id);
fsd_exc_raise_fmt(FSD_ERRNO_DRM_COMMUNICATION_FAILURE, "slurm_load_jobs error: %s,job_id: %s", slurm_strerror(_slurm_errno), self->job_id);
} else {
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR,"slurm_load_jobs error: %s,job_id: %s", slurm_strerror(slurm_get_errno()), self->job_id);
fsd_exc_raise_fmt(FSD_ERRNO_INTERNAL_ERROR, "slurm_load_jobs error: %s,job_id: %s", slurm_strerror(slurm_get_errno()), self->job_id);
}
}
if (job_info) {
fsd_log_debug(("state = %d, state_reason = %d", job_info->job_array[0].job_state, job_info->job_array[0].state_reason));

fsd_log_debug(( "got (%u) subtasks of job (%s)", (*job_info)->record_count, parent_job) );

for ( r_i = 0; r_i < (*job_info)->record_count; ++r_i ) {
slurm_job_info_t *subtask = &((*job_info)->job_array[r_i]);

fsd_log_debug(( "checking array_task_str(%s), array_task_id(%d)", subtask->array_task_str, subtask->array_task_id ));

if ( subtask->array_task_str ) {
if ( slurmdrmaa_id_in_array_expr( subtask->array_task_str, task_id ) )
return subtask;
} else if ( subtask->array_task_id > 0 ) {
if ( subtask->array_task_id == task_id )
return subtask;
}
}

fsd_log_debug(( "subtask (%d) doesn't exist in slurm job (%s)", task_id, parent_job ));
}

return NULL;
}


static int
slurmdrmaa_id_in_array_expr( const char *array_expr, uint32_t id ) {
char *str_i;

if ( ! array_expr )
return 0;

/* array_expr may contain:
- values: {v1},{v2},...,{vn}
- loops: {start}-{end}[:{step}]
*/
if ( ( str_i = strchr( array_expr, ',' ) ) ) {
/* values */
char *saveptr, *token;
char *array_expr_cpy, *expr;
int result = 0;

fsd_log_debug(( "checking values expr "));

array_expr_cpy = fsd_strdup( array_expr );

TRY {
for ( expr = array_expr_cpy; ; expr = NULL ) {
token = strtok_r( expr, ",", &saveptr );

if ( ! token )
break;

fsd_log_debug(( "checking value (%s:%d) against %d ", token, fsd_atoi( token ), id ));

if ( (uint32_t)fsd_atoi( token ) == id ) {
result = 1;
break;
}
}
} EXCEPT_DEFAULT {
fsd_log_debug(( "unknown format of array expression: (%s)", array_expr ));
} FINALLY {
fsd_free( array_expr_cpy );
} END_TRY

return result;
} else {
/* loop */
char *start_end_s, *start_s, *end_s, *step_s;
char *expr = NULL, *expr2 = NULL, *saveptr;
uint32_t start_i, end_i, step_i;
int result = 0;

TRY {
expr = fsd_strdup( array_expr );
start_end_s = strtok_r( expr, ":", &saveptr );

if ( start_end_s )
step_s = strtok_r( NULL, ":", &saveptr );

if ( ! start_end_s )
fsd_exc_raise_code( FSD_ERRNO_INVALID_VALUE_FORMAT );

if ( ! step_s )
step_s = "1";

switch(job_info->job_array[0].job_state & JOB_STATE_BASE)
{
expr2 = fsd_strdup( start_end_s );
start_s = strtok_r( expr2, "-", &saveptr );

if ( start_s )
end_s = strtok_r( NULL, "-", &saveptr );

if ( ! start_s || ! end_s )
fsd_exc_raise_code( FSD_ERRNO_INVALID_VALUE_FORMAT );

start_i = fsd_atoi( start_s );
end_i = fsd_atoi( end_s );
step_i = fsd_atoi( step_s );

fsd_log_debug(( "checking loop (%d-%d:%d) against %d ", start_i, end_i, step_i, id ));

if ( id >= start_i && id <= end_i && ( ( id - start_i ) % step_i ) == 0 )
result = 1;
} EXCEPT_DEFAULT {
fsd_log_debug(( "unknown format of array expression: (%s)", array_expr ));
} FINALLY {
fsd_free( expr );
fsd_free( expr2 );
} END_TRY

fsd_log_debug(( "%s found ", result ? "YES" : "NOT" ));

return result;
}
}


static void
slurmdrmaa_job_update_status( fsd_job_t *self )
{
job_info_msg_t *job_info = NULL;
slurm_job_info_t *subtask = NULL;
slurmdrmaa_job_t * slurm_self = (slurmdrmaa_job_t *) self;
job_id_spec_t job_id_spec;
fsd_log_enter(( "({job_id=%s})", self->job_id ));

fsd_mutex_lock( &self->session->drm_connection_mutex );
TRY {
job_id_spec.original = self->job_id;
self->job_id = slurmdrmaa_set_job_id(&job_id_spec);
subtask = slurmdrmaa_find_job_info( self, &job_info );

if ( subtask ) {
fsd_log_debug(("state = %d, state_reason = %d", subtask->job_state, subtask->state_reason));

switch ( subtask->job_state & JOB_STATE_BASE ) {
case JOB_PENDING:
switch(job_info->job_array[0].state_reason)
{
switch ( subtask->state_reason ) {
case WAIT_HELD_USER: /* job is held by user */
fsd_log_debug(("interpreting as DRMAA_PS_USER_ON_HOLD"));
self->state = DRMAA_PS_USER_ON_HOLD;
Expand All @@ -164,7 +344,7 @@ slurmdrmaa_job_update_status( fsd_job_t *self )
self->state = DRMAA_PS_RUNNING;
break;
case JOB_SUSPENDED:
if(slurm_self->user_suspended == true) {
if (slurm_self->user_suspended == true) {
fsd_log_debug(("interpreting as DRMAA_PS_USER_SUSPENDED"));
self->state = DRMAA_PS_USER_SUSPENDED;
} else {
Expand All @@ -175,15 +355,13 @@ slurmdrmaa_job_update_status( fsd_job_t *self )
case JOB_COMPLETE:
fsd_log_debug(("interpreting as DRMAA_PS_DONE"));
self->state = DRMAA_PS_DONE;
self->exit_status = job_info->job_array[0].exit_code;
fsd_log_debug(("exit_status = %d -> %d",self->exit_status, WEXITSTATUS(self->exit_status)));
self->exit_status = subtask->exit_code;
fsd_log_debug(("exit_status = %d -> %d", self->exit_status, WEXITSTATUS(self->exit_status)));
break;
case JOB_CANCELLED:
fsd_log_debug(("interpreting as DRMAA_PS_FAILED (aborted)"));
self->state = DRMAA_PS_FAILED;
self->exit_status = -1;
fsd_log_debug(("exit_status (set internally) = %d -> %d",self->exit_status, WEXITSTATUS(self->exit_status)));
break;
case JOB_FAILED:
case JOB_TIMEOUT:
case JOB_NODE_FAIL:
Expand All @@ -199,44 +377,43 @@ slurmdrmaa_job_update_status( fsd_job_t *self )
#endif
fsd_log_debug(("interpreting as DRMAA_PS_FAILED"));
self->state = DRMAA_PS_FAILED;
self->exit_status = job_info->job_array[0].exit_code;
fsd_log_debug(("exit_status = %d -> %d",self->exit_status, WEXITSTATUS(self->exit_status)));
self->exit_status = subtask->exit_code;
fsd_log_debug(("exit_status = %d -> %d", self->exit_status, WEXITSTATUS(self->exit_status)));
break;
default: /*unknown state */
fsd_log_error(("Unknown job state: %d. Please send bug report: http://apps.man.poznan.pl/trac/slurm-drmaa", job_info->job_array[0].job_state));
fsd_log_error(("Unknown job state: %d. Please send bug report: http://apps.man.poznan.pl/trac/slurm-drmaa", subtask->job_state));
}

if (job_info->job_array[0].job_state & JOB_STATE_FLAGS & JOB_COMPLETING) {
if (subtask->job_state & JOB_STATE_FLAGS & JOB_COMPLETING) {
fsd_log_debug(("Epilog completing"));
}

if (job_info->job_array[0].job_state & JOB_STATE_FLAGS & JOB_CONFIGURING) {
if (subtask->job_state & JOB_STATE_FLAGS & JOB_CONFIGURING) {
fsd_log_debug(("Nodes booting"));
}

if (self->exit_status == -1) /* input,output,error path failure etc*/
if (self->exit_status == -1 ) /* input,output,error path failure etc*/
self->state = DRMAA_PS_FAILED;

self->last_update_time = time(NULL);
self->last_update_time = time( NULL );

if( self->state >= DRMAA_PS_DONE ) {
if (self->state >= DRMAA_PS_DONE) {
fsd_log_debug(("exit_status = %d, WEXITSTATUS(exit_status) = %d", self->exit_status, WEXITSTATUS(self->exit_status)));
fsd_cond_broadcast( &self->status_cond );
fsd_cond_broadcast(&self->status_cond);
}
}
}
FINALLY
{
if(job_info != NULL)
slurm_free_job_info_msg (job_info);
} FINALLY {
if (job_info != NULL)
slurm_free_job_info_msg(job_info);
self->job_id = slurmdrmaa_unset_job_id(&job_id_spec);

fsd_mutex_unlock( &self->session->drm_connection_mutex );
}
END_TRY

} END_TRY

fsd_log_return(( "" ));
}


static void
slurmdrmaa_job_on_missing( fsd_job_t *self )
{
Expand Down

0 comments on commit a2319d7

Please sign in to comment.