Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make periodic jobs service requests disabled by default #1529

Merged
merged 2 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions demos/jobs/jobs_demo_mosquitto/demo_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,10 @@
#define MQTT_WAIT_TIME ( 10U * 1000U )

/**
* @brief How often in seconds to ask for a new job.
* @brief Maximum interval in seconds for pollinv and updateinv command line arguments.
* (arbitrarily chosen to be a week; must be less than LONG_MAX)
*/
#define PROMPT_INTERVAL ( 120U )

/**
* @brief How often in seconds to send updates for a running job.
*/
#define UPDATE_INTERVAL ( 10U )
#define INTERVAL_MAX ( 60U * 60U * 24U * 7U )

/**
* @brief Parent directory to contain download directories and files.
Expand Down
127 changes: 74 additions & 53 deletions demos/jobs/jobs_demo_mosquitto/jobs_demo_mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,27 @@ static void usage( const char * programName )
);
fprintf( stderr,
"\nusage: %s "
"[-o] -n name -h host [-p port] {--cafile file | --capath dir} --certfile file --keyfile file\n"
"[-o] -n name -h host [-p port] {--cafile file | --capath dir} --certfile file --keyfile file [--pollinv seconds] [--updateinv seconds]\n"
"\n"
"-o : run once, exit after the first job is finished.\n"
"-n : thing name\n"
"-h : mqtt host to connect to.\n"
"-p : network port to connect to. Defaults to %d.\n",
programName, DEFAULT_MQTT_PORT );
fprintf( stderr,
"--cafile : path to a file containing trusted CA certificates to enable encrypted\n"
" certificate based communication.\n"
"--capath : path to a directory containing trusted CA certificates to enable encrypted\n"
" communication. Defaults to %s.\n"
"--certfile : client certificate for authentication in PEM format.\n"
"--keyfile : client private key for authentication in PEM format.\n\n",
"--cafile : path to a file containing trusted CA certificates to enable encrypted\n"
" certificate based communication.\n"
"--capath : path to a directory containing trusted CA certificates to enable encrypted\n"
" communication. Defaults to %s.\n"
"--certfile : client certificate for authentication in PEM format.\n"
"--keyfile : client private key for authentication in PEM format.\n",
DEFAULT_CA_DIRECTORY );
fprintf( stderr,
"--pollinv : after this many idle seconds, request a job.\n"
" Without this option and a positive value, no polling is done.\n"
"--updateinv : after this many seconds running a job, resend the current status to the jobs service.\n"
" Without this option and a positive value, status is not resent.\n\n"
);
}

/*-----------------------------------------------------------*/
Expand Down Expand Up @@ -170,6 +176,8 @@ typedef struct
char * capath;
char * certfile;
char * keyfile;
uint32_t pollinv; /* 0 (default) disables polling for new jobs */
uint32_t updateinv; /* 0 (default) disables periodic resending of status */
/* flags */
bool runOnce;
/* callback-populated values */
Expand All @@ -184,8 +192,11 @@ typedef struct
size_t urlLength;
/* internal state tracking */
runStatus_t runStatus;
char * report;
time_t lastPrompt;
time_t lastUpdate;
bool forcePrompt;
bool forceUpdate;
pid_t child;
} handle_t;

Expand Down Expand Up @@ -334,13 +345,11 @@ void on_message( struct mosquitto * m,
static bool sendStartNext( handle_t * h );

/**
* @brief Reports status of the download process.
* @brief Checks status of the download process.
*
* @param[in] h runtime state handle
*
* @return the result of sendUpdate()
*/
static bool update( handle_t * h );
static void checkChild( handle_t * h );

/**
* @brief Launch a download process.
Expand Down Expand Up @@ -521,19 +530,21 @@ static bool parseArgs( handle_t * h,
long x;
static struct option long_options[] =
{
{ "once", no_argument, NULL, 'o' },
{ "name", required_argument, NULL, 'n' },
{ "host", required_argument, NULL, 'h' },
{ "port", required_argument, NULL, 'p' },
{ "cafile", required_argument, NULL, 'f' },
{ "capath", required_argument, NULL, 'd' },
{ "certfile", required_argument, NULL, 'c' },
{ "keyfile", required_argument, NULL, 'k' },
{ "help", no_argument, NULL, '?' },
{ NULL, 0, NULL, 0 }
{ "once", no_argument, NULL, 'o' },
{ "name", required_argument, NULL, 'n' },
{ "host", required_argument, NULL, 'h' },
{ "port", required_argument, NULL, 'p' },
{ "cafile", required_argument, NULL, 'f' },
{ "capath", required_argument, NULL, 'd' },
{ "certfile", required_argument, NULL, 'c' },
{ "keyfile", required_argument, NULL, 'k' },
{ "pollinv", required_argument, NULL, 'P' },
{ "updateinv", required_argument, NULL, 'u' },
{ "help", no_argument, NULL, '?' },
{ NULL, 0, NULL, 0 }
};

c = getopt_long( argc, argv, "on:h:p:f:d:c:k:?",
c = getopt_long( argc, argv, "on:h:p:P:u:f:d:c:k:?",
long_options, &option_index );

if( c == -1 )
Expand All @@ -556,19 +567,29 @@ static bool parseArgs( handle_t * h,
h->host = optarg;
break;

#define optargToInt( element, min, max ) \
x = strtol( optarg, NULL, 0 ); \
\
if( ( x > min ) && ( x <= max ) ) \
{ \
h->element = x; \
} \
else \
{ \
ret = false; \
warnx( "bad %s value: %s", # element, optarg ); \
}

case 'p':
x = strtol( optarg, NULL, 0 );
optargToInt( port, 0, 0xFFFF );
break;

if( ( x > 0 ) && ( x <= 0xFFFF ) )
{
h->port = x;
}
else
{
ret = false;
warnx( "bad port value: %s", optarg );
}
case 'P':
optargToInt( pollinv, 0, INTERVAL_MAX );
break;

case 'u':
optargToInt( updateinv, 0, INTERVAL_MAX );
break;

case 'f':
Expand Down Expand Up @@ -897,8 +918,8 @@ void on_message( struct mosquitto * m,
{
/* a job has been added or the current job was canceled */
case JobsNextJobChanged:
h->lastPrompt = 0;
h->lastUpdate = 0;
h->forceUpdate = ( h->runStatus == Running ) ? true : false;
h->forcePrompt = true;
break;

/* response to a request to start the next job */
Expand Down Expand Up @@ -981,11 +1002,10 @@ static bool sendStartNext( handle_t * h )

/*-----------------------------------------------------------*/

static bool update( handle_t * h )
static void checkChild( handle_t * h )
{
pid_t pid;
int status;
char * report;

assert( h != NULL );
assert( h->child > 0 );
Expand All @@ -998,7 +1018,7 @@ static bool update( handle_t * h )
{
/* still running */
case 0:
report = makeReport_( "IN_PROGRESS" );
h->report = makeReport_( "IN_PROGRESS" );
break;

/* exited */
Expand All @@ -1010,19 +1030,17 @@ static bool update( handle_t * h )
( WEXITSTATUS( status ) == 0 ) )
{
info( "completed job id: %s", h->jobid );
report = makeReport_( "SUCCEEDED" );
h->report = makeReport_( "SUCCEEDED" );
}
else
{
info( "failed job id: %s", h->jobid );
report = makeReport_( "FAILED" );
h->report = makeReport_( "FAILED" );
}

h->child = 0;
h->runStatus = None;
}

return sendUpdate( h, h->jobid, h->jobidLength, report );
}

/*-----------------------------------------------------------*/
Expand Down Expand Up @@ -1181,10 +1199,7 @@ int main( int argc,
char * argv[] )
{
handle_t h_, * h = &h_;
size_t i;
time_t now;
/* subscribe to these topics */
JobsTopic_t topics[] = { JobsNextJobChanged, JobsStartNextSuccess, JobsUpdateFailed };

initHandle( h );

Expand All @@ -1200,12 +1215,9 @@ int main( int argc,
errx( 1, "fatal error" );
}

for( i = 0; i < ( sizeof( topics ) / sizeof( topics[ 0 ] ) ); i++ )
if( subscribe( h, JobsNextJobChanged ) == false )
{
if( subscribe( h, topics[ i ] ) == false )
{
errx( 1, "fatal error" );
}
errx( 1, "fatal error" );
}

if( sendStartNext( h ) == false )

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendStartNext is a POST api which scales separate from Describe Job with $next as the ID which is a GET api. sendStartNext transactions will compete with the Update api which might result in throttling at scale.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The jobs library only uses MQTT.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies - I meant to say that the category of the API is for modification instead of retrieval. SendStartNext and DescribeJob are handled and scaled differently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to understand this better. Any such changes will merit a new PR.

Expand Down Expand Up @@ -1233,11 +1245,13 @@ int main( int argc,
{
case None:

if( now > ( h->lastPrompt + PROMPT_INTERVAL ) )
if( ( h->forcePrompt == true ) ||
( ( h->pollinv != 0 ) && ( now > ( h->lastPrompt + h->pollinv ) ) ) )
{
h->lastPrompt = now;
info( "requesting job" );
ret = sendStartNext( h );
h->forcePrompt = false;
}

break;
Expand All @@ -1250,19 +1264,26 @@ int main( int argc,
{
h->runStatus = Running;
info( "sending first update" );
ret = update( h );
checkChild( h );
ret = sendUpdate( h, h->jobid, h->jobidLength, h->report );
h->lastUpdate = now;
}

break;

case Running:

if( now > ( h->lastUpdate + UPDATE_INTERVAL ) )
checkChild( h );

/* send an update if the job exited, was "force" canceled, or a periodic update is due */
if( ( h->runStatus == None ) ||
( h->forceUpdate == true ) ||
( ( h->updateinv != 0 ) && ( now > ( h->lastUpdate + h->updateinv ) ) ) )
{
info( "updating job id: %s", h->jobid );
ret = update( h );
ret = sendUpdate( h, h->jobid, h->jobidLength, h->report );
h->lastUpdate = now;
h->forceUpdate = false;
}

break;
Expand Down
2 changes: 2 additions & 0 deletions demos/lexicon.txt
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ pmqttcontext
pmsg
pmsg
pnetworkcontext
pollinv
poly
popenportsarray
portsarraylength
Expand Down Expand Up @@ -602,6 +603,7 @@ ulslotcount
unix
unsub
unsuback
updateinv
updatejobexecution
urandom
uri
Expand Down