Skip to content

Commit

Permalink
fix(plugins/plugin-kubectl): kubectl get --all-namespaces --watch doe…
Browse files Browse the repository at this point in the history
…s not update

Fixes #4581
  • Loading branch information
starpit committed May 14, 2020
1 parent 23711f6 commit 663a972
Showing 1 changed file with 81 additions and 17 deletions.
98 changes: 81 additions & 17 deletions plugins/plugin-kubectl/src/controller/kubectl/watch/get-watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@
*/

import Debug from 'debug'
import { Table, Arguments, CodedError, Streamable, Abortable, Watchable, Watcher, WatchPusher } from '@kui-shell/core'
import {
Table,
Arguments,
CodedError,
Streamable,
Abortable,
Watchable,
Watcher,
WatchPusher,
flatten
} from '@kui-shell/core'

import { kindPart } from '../fqn'
import { formatOf, KubeOptions, KubeExecOptions } from '../options'
Expand Down Expand Up @@ -66,6 +76,10 @@ function preprocessTable(raw: string, nCols): { rows: Pair[][]; leftover: string
}
}

function notEmpty<TValue>(value: TValue | void | null | undefined): value is TValue {
return value !== null && value !== undefined
}

class KubectlWatcher implements Abortable, Watcher {
/**
* We expect k columns; see the custom-columns below.
Expand Down Expand Up @@ -109,6 +123,71 @@ class KubectlWatcher implements Abortable, Watcher {
}
}

private groupByNamespace(rows: Pair[][]) {
return rows.reduce((groups, row) => {
const namespace = row[3].value || 'default'

const group = groups[namespace] || []
if (!groups[namespace]) {
groups[namespace] = group
}

group.push(row)
return groups
}, {} as Record<string, Pair[][]>)
}

/**
* Fetch the user-formatted rows for the so-named resources in the
* given namespace.
*
*/
private getRowsForNamespace(
apiVersion: string,
kind: string,
namespace: string,
rowNames: string[]
): Promise<Table | void> {
const getCommand = `${getCommandFromArgs(this.args)} get ${kindPart(apiVersion, kind)} ${rowNames.join(
' '
)} -n ${namespace} ${this.output ? `-o ${this.output}` : ''}`

return this.args.REPL.qexec<Table>(getCommand).catch((err: CodedError) => {
if (err.code !== 404) {
console.error(err)
}
// mark as all offline, if we got a 404 for the bulk get
if (typeof rowNames === 'string') {
this.pusher.offline(rowNames)
} else {
rowNames.forEach(name => this.pusher.offline(name))
}
})
}

/** Get rows as specified by user's -o */
private async getRowsForUser(rows: Pair[][]): Promise<void | Table> {
const kind = rows[0][1].value
const apiVersion = rows[0][2].value

const groups = this.groupByNamespace(rows)

const tables = (
await Promise.all(
Object.keys(groups).map(namespace => {
const rowNames = groups[namespace].map(group => group[0].value)
return this.getRowsForNamespace(apiVersion, kind, namespace, rowNames)
})
)
).filter(notEmpty)

return {
title: tables[0].title,
header: tables[0].header,
body: flatten(tables.map(_ => _.body))
}
}

/**
* Our impl of the `onInit` streaming PTY API: the PTY calls us with
* the PTY job (so that we can abort it, if we want). In return, we
Expand Down Expand Up @@ -146,22 +225,7 @@ class KubectlWatcher implements Abortable, Watcher {
const { rows } = preprocessed

// now process the full rows into table view updates
const kind = rows[0][1].value
const apiVersion = rows[0][2].value
const namespace = rows[0][3].value || 'default'
const rowNames = rows.map(_ => _[0].value)

const getCommand = `${getCommandFromArgs(this.args)} get ${kindPart(apiVersion, kind)} ${rowNames.join(
' '
)} -n ${namespace} ${this.output ? `-o ${this.output}` : ''}`

const table = await this.args.REPL.qexec<Table>(getCommand).catch((err: CodedError) => {
if (err.code !== 404) {
console.error(err)
}
// mark as all offline, if we got a 404 for the bulk get
rowNames.forEach(name => this.pusher.offline(name))
})
const table = await this.getRowsForUser(rows)

if (table) {
// in case the initial get was empty, we add the header to the
Expand Down

0 comments on commit 663a972

Please sign in to comment.