Skip to content

Commit

Permalink
Merge pull request #832 from sasjs/parent-session-state-check
Browse files Browse the repository at this point in the history
feat(job-state): added session state check to doPoll func
  • Loading branch information
medjedovicm committed Sep 11, 2023
2 parents 0359fcb + 3a186bc commit 5dfee30
Show file tree
Hide file tree
Showing 9 changed files with 423 additions and 53 deletions.
8 changes: 4 additions & 4 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ module.exports = {
// An object that configures minimum threshold enforcement for coverage results
coverageThreshold: {
global: {
statements: 63.61,
branches: 44.72,
functions: 53.94,
lines: 64.07
statements: 64.01,
branches: 45.11,
functions: 54.1,
lines: 64.51
}
},

Expand Down
43 changes: 27 additions & 16 deletions src/SessionManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Session, Context, SessionVariable } from './types'
import { Session, Context, SessionVariable, SessionState } from './types'
import { NoSessionStateError } from './types/errors'
import { asyncForEach, isUrl } from './utils'
import { prefixMessage } from '@sasjs/utils/error'
Expand All @@ -12,6 +12,7 @@ interface ApiErrorResponse {

export class SessionManager {
private loggedErrors: NoSessionStateError[] = []
private sessionStateLinkError = 'Error while getting session state link. '

constructor(
private serverUrl: string,
Expand All @@ -28,7 +29,7 @@ export class SessionManager {
private _debug: boolean = false
private printedSessionState = {
printed: false,
state: ''
state: SessionState.NoState
}

public get debug() {
Expand Down Expand Up @@ -265,6 +266,18 @@ export class SessionManager {
)
})

// Add response etag to Session object.
createdSession.etag = etag

// Get session state link.
const stateLink = createdSession.links.find((link) => link.rel === 'state')

// Throw error if session state link is not present.
if (!stateLink) throw this.sessionStateLinkError

// Add session state link to Session object.
createdSession.stateUrl = stateLink.href

await this.waitForSession(createdSession, etag, accessToken)

this.sessions.push(createdSession)
Expand Down Expand Up @@ -327,32 +340,30 @@ export class SessionManager {
etag: string | null,
accessToken?: string
): Promise<string> {
let { state: sessionState } = session
const { stateUrl } = session
const logger = process.logger || console

let sessionState = session.state

const stateLink = session.links.find((l: any) => l.rel === 'state')

if (
sessionState === 'pending' ||
sessionState === 'running' ||
sessionState === ''
sessionState === SessionState.Pending ||
sessionState === SessionState.Running ||
sessionState === SessionState.NoState
) {
if (stateLink) {
if (stateUrl) {
if (this.debug && !this.printedSessionState.printed) {
logger.info(`Polling: ${this.serverUrl + stateLink.href}`)
logger.info(`Polling: ${this.serverUrl + stateUrl}`)

this.printedSessionState.printed = true
}

const url = `${this.serverUrl}${stateLink.href}?wait=30`
const url = `${this.serverUrl}${stateUrl}?wait=30`

const { result: state, responseStatus: responseStatus } =
await this.getSessionState(url, etag!, accessToken).catch((err) => {
throw prefixMessage(err, 'Error while waiting for session. ')
})

sessionState = state.trim()
sessionState = state.trim() as SessionState

if (this.debug && this.printedSessionState.state !== sessionState) {
logger.info(`Current session state is '${sessionState}'`)
Expand All @@ -364,7 +375,7 @@ export class SessionManager {
if (!sessionState) {
const stateError = new NoSessionStateError(
responseStatus,
this.serverUrl + stateLink.href,
this.serverUrl + stateUrl,
session.links.find((l: any) => l.rel === 'log')?.href as string
)

Expand All @@ -386,7 +397,7 @@ export class SessionManager {

return sessionState
} else {
throw 'Error while getting session state link. '
throw this.sessionStateLinkError
}
} else {
this.loggedErrors = []
Expand All @@ -413,7 +424,7 @@ export class SessionManager {
return await this.requestClient
.get(url, accessToken, 'text/plain', { 'If-None-Match': etag })
.then((res) => ({
result: res.result as string,
result: res.result as SessionState,
responseStatus: res.status
}))
.catch((err) => {
Expand Down
19 changes: 12 additions & 7 deletions src/api/viya/executeOnComputeApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,29 @@ export async function executeOnComputeApi(
postedJob,
debug,
authConfig,
pollOptions
pollOptions,
{
session,
sessionManager
}
).catch(async (err) => {
const error = err?.response?.data
const result = /err=[0-9]*,/.exec(error)

const errorCode = '5113'

if (result?.[0]?.slice(4, -1) === errorCode) {
const logCount = 1000000
const sessionLogUrl =
postedJob.links.find((l: any) => l.rel === 'up')!.href + '/log'
const logCount = 1000000

err.log = await fetchLogByChunks(
requestClient,
access_token!,
sessionLogUrl,
logCount
)
}

throw prefixMessage(err, 'Error while polling job status. ')
})

Expand All @@ -205,12 +211,12 @@ export async function executeOnComputeApi(

let jobResult
let log = ''

const logLink = currentJob.links.find((l) => l.rel === 'log')

if (debug && logLink) {
const logUrl = `${logLink.href}/content`
const logCount = currentJob.logStatistics?.lineCount ?? 1000000

log = await fetchLogByChunks(
requestClient,
access_token!,
Expand All @@ -223,9 +229,7 @@ export async function executeOnComputeApi(
throw new ComputeJobExecutionError(currentJob, log)
}

if (!expectWebout) {
return { job: currentJob, log }
}
if (!expectWebout) return { job: currentJob, log }

const resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`

Expand All @@ -236,6 +240,7 @@ export async function executeOnComputeApi(
if (logLink) {
const logUrl = `${logLink.href}/content`
const logCount = currentJob.logStatistics?.lineCount ?? 1000000

log = await fetchLogByChunks(
requestClient,
access_token!,
Expand Down
60 changes: 54 additions & 6 deletions src/api/viya/pollJobState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Job, PollOptions, PollStrategy } from '../..'
import { getTokens } from '../../auth/getTokens'
import { RequestClient } from '../../request/RequestClient'
import { JobStatePollError } from '../../types/errors'
import { Link, WriteStream } from '../../types'
import { Link, WriteStream, SessionState, JobSessionManager } from '../../types'
import { delay, isNode } from '../../utils'

export enum JobState {
Expand Down Expand Up @@ -37,14 +37,16 @@ export enum JobState {
* { maxPollCount: 500, pollInterval: 30000 }, // approximately ~50.5 mins (including time to get response (~300ms))
* { maxPollCount: 3400, pollInterval: 60000 } // approximately ~3015 mins (~125 hours) (including time to get response (~300ms))
* ]
* @param jobSessionManager - job session object containing session object and an instance of Session Manager. Job session object is used to periodically (every 10th job state poll) check parent session state.
* @returns - a promise which resolves with a job state
*/
export async function pollJobState(
requestClient: RequestClient,
postedJob: Job,
debug: boolean,
authConfig?: AuthConfig,
pollOptions?: PollOptions
pollOptions?: PollOptions,
jobSessionManager?: JobSessionManager
): Promise<JobState> {
const logger = process.logger || console

Expand Down Expand Up @@ -127,7 +129,8 @@ export async function pollJobState(
pollOptions,
authConfig,
streamLog,
logFileStream
logFileStream,
jobSessionManager
)

currentState = result.state
Expand Down Expand Up @@ -158,7 +161,8 @@ export async function pollJobState(
defaultPollOptions,
authConfig,
streamLog,
logFileStream
logFileStream,
jobSessionManager
)

currentState = result.state
Expand Down Expand Up @@ -208,7 +212,21 @@ const needsRetry = (state: string) =>
state === JobState.Pending ||
state === JobState.Unavailable

const doPoll = async (
/**
* Polls job state.
* @param requestClient - the pre-configured HTTP request client.
* @param postedJob - the relative or absolute path to the job.
* @param currentState - current job state.
* @param debug - sets the _debug flag in the job arguments.
* @param pollCount - current poll count.
* @param pollOptions - an object containing maxPollCount, pollInterval, streamLog and logFolderPath.
* @param authConfig - an access token, refresh token, client and secret for an authorized user.
* @param streamLog - indicates if job log should be streamed.
* @param logStream - job log stream.
* @param jobSessionManager - job session object containing session object and an instance of Session Manager. Job session object is used to periodically (every 10th job state poll) check parent session state.
* @returns - a promise which resolves with a job state
*/
export const doPoll = async (
requestClient: RequestClient,
postedJob: Job,
currentState: JobState,
Expand All @@ -217,7 +235,8 @@ const doPoll = async (
pollOptions: PollOptions,
authConfig?: AuthConfig,
streamLog?: boolean,
logStream?: WriteStream
logStream?: WriteStream,
jobSessionManager?: JobSessionManager
): Promise<{ state: JobState; pollCount: number }> => {
const { maxPollCount, pollInterval } = pollOptions
const logger = process.logger || console
Expand All @@ -229,6 +248,35 @@ const doPoll = async (
let startLogLine = 0

while (needsRetry(state) && pollCount <= maxPollCount) {
// Check parent session state on every 10th job state poll.
if (jobSessionManager && pollCount && pollCount % 10 === 0 && authConfig) {
const { session, sessionManager } = jobSessionManager
const { stateUrl, etag, id: sessionId } = session
const { access_token } = authConfig
const { id: jobId } = postedJob

// Get session state.
const { result: sessionState, responseStatus } = await sessionManager[
'getSessionState'
](stateUrl, etag, access_token).catch((err) => {
// Handle error while getting session state.
throw new JobStatePollError(jobId, err)
})

// Clear parent session and throw an error if session state is not
// 'running' or response status is not 200.
if (sessionState !== SessionState.Running || responseStatus !== 200) {
sessionManager.clearSession(sessionId, access_token)

const sessionError =
sessionState !== SessionState.Running
? `Session state of the job is not 'running'. Session state is '${sessionState}'`
: `Session response status is not 200. Session response status is ${responseStatus}.`

throw new JobStatePollError(jobId, new Error(sessionError))
}
}

state = await getJobState(
requestClient,
postedJob,
Expand Down
10 changes: 8 additions & 2 deletions src/api/viya/spec/executeScript.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import * as uploadTablesModule from '../uploadTables'
import * as getTokensModule from '../../../auth/getTokens'
import * as formatDataModule from '../../../utils/formatDataForRequest'
import * as fetchLogsModule from '../../../utils/fetchLogByChunks'
import { PollOptions } from '../../../types'
import { PollOptions, JobSessionManager } from '../../../types'
import { ComputeJobExecutionError, NotFoundError } from '../../../types/errors'
import { Logger, LogLevel } from '@sasjs/utils/logger'

Expand Down Expand Up @@ -308,6 +308,11 @@ describe('executeScript', () => {
})

it('should poll for job completion when waitForResult is true', async () => {
const jobSessionManager: JobSessionManager = {
session: mockSession,
sessionManager: sessionManager
}

await executeOnComputeApi(
requestClient,
sessionManager,
Expand All @@ -329,7 +334,8 @@ describe('executeScript', () => {
mockJob,
false,
mockAuthConfig,
defaultPollOptions
defaultPollOptions,
jobSessionManager
)
})

Expand Down
8 changes: 5 additions & 3 deletions src/api/viya/spec/mockResponses.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { AuthConfig } from '@sasjs/utils/types'
import { Job, Session } from '../../../types'
import { Job, Session, SessionState } from '../../../types'

export const mockSession: Session = {
id: 's35510n',
state: 'idle',
state: SessionState.Idle,
stateUrl: '',
links: [],
attributes: {
sessionInactiveTimeout: 1
},
creationTimeStamp: new Date().valueOf().toString()
creationTimeStamp: new Date().valueOf().toString(),
etag: 'etag-string'
}

export const mockJob: Job = {
Expand Down
Loading

0 comments on commit 5dfee30

Please sign in to comment.