Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade Argo Dataflow to v0.0.104 #6749

Merged
merged 4 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
461 changes: 165 additions & 296 deletions api/jsonschema/schema.json

Large diffs are not rendered by default.

467 changes: 168 additions & 299 deletions api/openapi-spec/swagger.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/TwinProduction/go-color v0.0.3
github.com/aliyun/aliyun-oss-go-sdk v2.1.8+incompatible
github.com/antonmedv/expr v1.8.9
github.com/argoproj-labs/argo-dataflow v0.0.98
github.com/argoproj-labs/argo-dataflow v0.0.107
github.com/argoproj/argo-events v1.4.0
github.com/argoproj/pkg v0.11.0
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
Expand All @@ -35,6 +35,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/klauspost/compress v1.13.1 // indirect
github.com/klauspost/pgzip v1.2.5
github.com/minio/minio-go/v7 v7.0.2
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
Expand Down
51 changes: 37 additions & 14 deletions go.sum

Large diffs are not rendered by default.

52 changes: 17 additions & 35 deletions ui/src/app/pipelines/components/pipeline-details/pipeline-graph.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import {Pipeline} from '../../../../models/pipeline';
import {Metrics, Step} from '../../../../models/step';
import {Step} from '../../../../models/step';
import {Graph} from '../../../shared/components/graph/types';
import {Icon} from '../../../shared/components/icon';
import {totalRate} from '../total-rate';

type Type = '' | 'cat' | 'code' | 'container' | 'dedupe' | 'expand' | 'filter' | 'flatten' | 'git' | 'group' | 'map' | 'split';

Expand Down Expand Up @@ -34,19 +33,6 @@ const stepIcon = (type: Type): Icon => {
}
};

const pendingSymbol = '◷';

function formatRates(metrics: Metrics, replicas: number): string {
const rates = Object.entries(metrics || {})
// the rate will remain after scale-down, so we must filter out, as it'll be wrong
.filter(([replica]) => parseInt(replica, 10) < replicas);
return rates.length > 0 ? 'Δ' + totalRate(metrics, replicas) : '';
}

function formatPending(pending: number) {
return pending ? ' ' + pendingSymbol + pending.toLocaleString() + ' ' : '';
}

export const graph = (pipeline: Pipeline, steps: Step[]) => {
const g = new Graph();

Expand Down Expand Up @@ -82,85 +68,81 @@ export const graph = (pipeline: Pipeline, steps: Step[]) => {

const classNames = status.phase === 'Running' ? 'flow' : '';
(spec.sources || []).forEach(x => {
const ss = (status.sourceStatuses || {})[x.name || ''] || {};
const label = formatPending(ss.pending) + formatRates(ss.metrics, step.status.replicas);
if (x.cron) {
const cronId = 'cron/' + stepId + '/sources/' + x.cron.schedule;
g.nodes.set(cronId, {genre: 'cron', icon: 'clock', label: x.cron.schedule});
g.edges.set({v: cronId, w: stepId}, {classNames, label});
g.edges.set({v: cronId, w: stepId}, {classNames});
} else if (x.db) {
const id = 'db/' + +stepId + '/sources/' + x.name;
g.nodes.set(id, {genre: 'db', icon: 'database', label: x.name});
g.edges.set({v: id, w: stepId}, {classNames, label});
g.edges.set({v: id, w: stepId}, {classNames});
} else if (x.kafka) {
const kafkaId = x.kafka.name || x.kafka.url || 'default';
const topicId = 'kafka/' + kafkaId + '/' + x.kafka.topic;
g.nodes.set(topicId, {genre: 'kafka', icon: 'stream', label: x.kafka.topic});
g.edges.set({v: topicId, w: stepId}, {classNames, label});
g.edges.set({v: topicId, w: stepId}, {classNames});
} else if (x.stan) {
const stanId = x.stan.name || x.stan.url || 'default';
const subjectId = 'stan/' + stanId + '/' + x.stan.subject;
g.nodes.set(subjectId, {genre: 'stan', icon: 'stream', label: x.stan.subject});
g.edges.set({v: subjectId, w: stepId}, {classNames, label});
g.edges.set({v: subjectId, w: stepId}, {classNames});
} else if (x.http) {
const y = new URL('http://' + (x.http.serviceName || pipeline.metadata.name + '-' + step.spec.name) + '/sources/' + x.name);
const subjectId = 'http/' + y;
g.nodes.set(subjectId, {genre: 'http', icon: 'cloud', label: y.hostname});
g.edges.set({v: subjectId, w: stepId}, {classNames, label});
g.edges.set({v: subjectId, w: stepId}, {classNames});
} else if (x.s3) {
const bucket = x.s3.bucket;
const id = 's3/' + bucket;
g.nodes.set(id, {genre: 's3', icon: 'hdd', label: bucket});
g.edges.set({v: id, w: stepId}, {classNames, label});
g.edges.set({v: id, w: stepId}, {classNames});
} else if (x.volume) {
const id = 'volume/' + stepId + '/sources/' + x.name;
g.nodes.set(id, {genre: 'volume', icon: 'hdd', label: x.name});
g.edges.set({v: id, w: stepId}, {classNames, label});
g.edges.set({v: id, w: stepId}, {classNames});
} else {
const id = 'unknown/' + stepId + '/sources/' + x.name;
g.nodes.set(id, {genre: 'unknown', icon: 'square', label: x.name});
g.edges.set({v: id, w: stepId}, {classNames, label});
g.edges.set({v: id, w: stepId}, {classNames});
}
});
(spec.sinks || []).forEach(x => {
const ss = (status.sinkStatuses || {})[x.name || ''] || {};
const label = formatRates(ss.metrics, step.status.replicas);
if (x.db) {
const id = 'db/' + stepId + '/sinks/' + x.name;
g.nodes.set(id, {genre: 'db', icon: 'database', label: x.name});
g.edges.set({v: stepId, w: id}, {classNames, label});
g.edges.set({v: stepId, w: id}, {classNames});
} else if (x.kafka) {
const kafkaId = x.kafka.name || x.kafka.url || 'default';
const topicId = 'kafka/' + kafkaId + '/' + x.kafka.topic;
g.nodes.set(topicId, {genre: 'kafka', icon: 'stream', label: x.kafka.topic});
g.edges.set({v: stepId, w: topicId}, {classNames, label});
g.edges.set({v: stepId, w: topicId}, {classNames});
} else if (x.log) {
const logId = 'log/' + stepId + '/sinks/' + x.name;
g.nodes.set(logId, {genre: 'log', icon: 'file-alt', label: 'log'});
g.edges.set({v: stepId, w: logId}, {classNames, label});
g.edges.set({v: stepId, w: logId}, {classNames});
} else if (x.stan) {
const stanId = x.stan.name || x.stan.url || 'default';
const subjectId = 'stan/' + stanId + '/' + x.stan.subject;
g.nodes.set(subjectId, {genre: 'stan', icon: 'stream', label: x.stan.subject});
g.edges.set({v: stepId, w: subjectId}, {classNames, label});
g.edges.set({v: stepId, w: subjectId}, {classNames});
} else if (x.http) {
const y = new URL(x.http.url);
const subjectId = 'http/' + y;
g.nodes.set(subjectId, {genre: 'http', icon: 'cloud', label: y.hostname});
g.edges.set({v: stepId, w: subjectId}, {classNames, label});
g.edges.set({v: stepId, w: subjectId}, {classNames});
} else if (x.s3) {
const bucket = x.s3.bucket;
const id = 's3/' + bucket;
g.nodes.set(id, {genre: 's3', icon: 'hdd', label: bucket});
g.edges.set({v: stepId, w: id}, {classNames, label});
g.edges.set({v: stepId, w: id}, {classNames});
} else if (x.volume) {
const id = 'volume/' + stepId + '/sinks/' + x.name;
g.nodes.set(id, {genre: 'volume', icon: 'hdd', label: x.name});
g.edges.set({v: stepId, w: id}, {classNames, label});
g.edges.set({v: stepId, w: id}, {classNames});
} else {
const id = 'unknown/' + stepId + '/sinks/' + x.name;
g.nodes.set(id, {genre: 'unknown', icon: 'square', label: x.name});
g.edges.set({v: stepId, w: id}, {classNames, label});
g.edges.set({v: stepId, w: id}, {classNames});
}
});
});
Expand Down
117 changes: 0 additions & 117 deletions ui/src/app/pipelines/components/step-side-panel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@ import * as React from 'react';
import {Step, StepStatus} from '../../../models/step';
import {ObjectEditor} from '../../shared/components/object-editor/object-editor';
import {Phase} from '../../shared/components/phase';
import {TickMeter} from '../../shared/components/tick-meter';
import {Timestamp} from '../../shared/components/timestamp';
import {parseResourceQuantity} from '../../shared/resource-quantity';
import {EventsPanel} from '../../workflows/components/events-panel';
import {PipelineLogsViewer} from './pipeline-logs-viewer';
import {totalRate} from './total-rate';

const prettyNumber = (x: number): number => (x < 1 ? x : Math.round(x));

export const StepSidePanel = ({
isShown,
Expand Down Expand Up @@ -74,10 +69,6 @@ const statusPanel = (step: Step) =>
<div className='row'>
<div className='columns small-12'>{statusHeader(step.status)}</div>
</div>
<div className='row' style={{marginTop: 10}}>
<div className='columns small-6'>{sourcesPanel(step.status)}</div>
<div className='columns small-6'>{sinksPanel(step.status)}</div>
</div>
</>
);

Expand Down Expand Up @@ -105,111 +96,3 @@ const statusHeader = (status: StepStatus) => (
</div>
</div>
);

const sourcesPanel = (status: StepStatus) => (
<>
<h5>Sources</h5>
{status.sourceStatuses ? (
Object.entries(status.sourceStatuses).map(([name, x]) => {
const total = Object.values(x.metrics || {})
.filter(m => m.total)
.reduce((a, b) => a + b.total, 0);
const rate = totalRate(x.metrics, status.replicas);
const errors = Object.values(x.metrics || {})
.filter(m => m.errors)
.reduce((a, b) => a + b.errors, 0);
const retries = Object.values(x.metrics || {})
.filter(m => m.retries)
.reduce((a, b) => a + b.retries, 0);
return (
<div className='white-box' key={name}>
<p>{name}</p>
<div className='white-box__details'>
<div className='row white-box__details-row'>
<div className='columns small-4'>Pending</div>
<div className='columns small-8'>
<TickMeter value={x.pending || 0} />
</div>
</div>
<div className='row white-box__details-row'>
<div className='columns small-4'>Retries</div>
<div className='columns small-8'>
<TickMeter value={retries} />
</div>
</div>
<div className='row white-box__details-row'>
<div className='columns small-4'>Total</div>
<div className='columns small-4'>
<TickMeter value={total} />
</div>
<div className='columns small-4' title='Rate'>
<TickMeter value={rate} /> <small>TPS</small>
</div>
</div>
<div className='row white-box__details-row'>
<div className='columns small-4'>Errors</div>
<div className='columns small-4'>
<TickMeter value={errors} />
</div>
<div className='columns small-4'>
<TickMeter value={Math.floor((10000 * errors) / total) / 100} />%
</div>
</div>
</div>
</div>
);
})
) : (
<div className='white-box'>None</div>
)}
</>
);

const sinksPanel = (status: StepStatus) => (
<>
<h5>Sinks</h5>
{status.sinkStatuses ? (
Object.entries(status.sinkStatuses).map(([name, x]) => {
const total = Object.values(x.metrics || {})
.filter(m => m.total)
.reduce((a, b) => a + b.total, 0);
const rate = Object.entries(x.metrics || {})
// the rate will remain after scale-down, so we must filter out, as it'll be wrong
.filter(([replica]) => parseInt(replica, 10) < status.replicas)
.map(([, m]) => m)
.map(m => parseResourceQuantity(m.rate))
.reduce((a, b) => a + b, 0);
const errors = Object.values(x.metrics || {})
.filter(m => m.errors)
.reduce((a, b) => a + b.errors, 0);
return (
<div className='white-box' key={name}>
<p>{name}</p>
<div className='white-box__details'>
<div className='row white-box__details-row'>
<div className='columns small-4'>Total</div>
<div className='columns small-4'>
<TickMeter value={total} />
</div>
<div className='columns small-4' title='Rate'>
<TickMeter value={prettyNumber(rate)} /> <small>TPS</small>
</div>
</div>
<div className='row white-box__details-row'>
<div className='columns small-4'>Errors</div>
<div className='columns small-4'>
<TickMeter value={errors} />
</div>
<div className='columns small-4'>
<TickMeter value={prettyNumber(Math.floor((10000 * errors) / total) / 100)} />%
</div>
</div>
</div>
</div>
);
})
) : (
<div className='white-box'>None</div>
)}
</>
);
17 changes: 0 additions & 17 deletions ui/src/app/pipelines/components/total-rate.ts

This file was deleted.

22 changes: 0 additions & 22 deletions ui/src/models/step.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
import {ObjectMeta, Time, WatchEvent} from 'argo-ui/src/models/kubernetes';

export interface Metrics {
total?: number;
errors?: number;
retries?: number;
rate?: string;
}

export interface Step {
metadata: ObjectMeta;
spec: {
Expand Down Expand Up @@ -65,21 +58,6 @@ export interface StepStatus {
message?: string;
replicas: number;
lastScaledAt?: Time;
sinkStatuses?: SinkStatuses;
sourceStatuses?: SourceStatuses;
}

interface SourceStatuses {
[name: string]: {
pending?: number;
metrics?: {[name: string]: Metrics};
};
}

interface SinkStatuses {
[name: string]: {
metrics?: {[replica: string]: Metrics};
};
}

export type StepWatchEvent = WatchEvent<Step>;