From cfe993d028bef9acb730e2543cf381bb214a4e43 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 2 Mar 2022 14:35:13 -0500 Subject: [PATCH 1/2] [BEAM-13903] Improve coverage of metricsx package --- .../core/runtime/metricsx/metricsx_test.go | 178 ++++++++++++++++++ 1 file changed, 178 insertions(+) diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go index 20492ab2e699c..6a3d53c1ec106 100644 --- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go +++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go @@ -68,6 +68,184 @@ func TestFromMonitoringInfos_Counters(t *testing.T) { } } +func TestFromMonitoringInfos_Msec(t *testing.T) { + want := metrics.MsecResult{ + Attempted: metrics.MsecValue{ + Start: 15 * time.Millisecond, + Process: 20 * time.Millisecond, + Finish: 40 * time.Millisecond, + Total: 25 * time.Millisecond, + }, + Committed: metrics.MsecValue{ + Start: 0 * time.Millisecond, + Process: 0 * time.Millisecond, + Finish: 0 * time.Millisecond, + Total: 0 * time.Millisecond, + }, + Key: metrics.StepKey{ + Step: "main.customDoFn", + Name: "customCounter", + Namespace: "customDoFn", + }, + } + + labels := map[string]string{ + "PTRANSFORM": "main.customDoFn", + "NAMESPACE": "customDoFn", + "NAME": "customCounter", + } + + startValue, err := Int64Counter(int64(15)) + if err != nil { + t.Fatalf("Failed to encode Int64Counter: %v", err) + } + processValue, err := Int64Counter(int64(20)) + if err != nil { + t.Fatalf("Failed to encode Int64Counter: %v", err) + } + finishValue, err := Int64Counter(int64(40)) + if err != nil { + t.Fatalf("Failed to encode Int64Counter: %v", err) + } + totalValue, err := Int64Counter(int64(25)) + if err != nil { + t.Fatalf("Failed to encode Int64Counter: %v", err) + } + mStartBundleInfo := &pipepb.MonitoringInfo{ + Urn: UrnToString(ExecutionMsecUrn(0)), + Type: UrnToType(ExecutionMsecUrn(0)), + Labels: labels, + Payload: startValue, + } + mProcessBundleInfo := &pipepb.MonitoringInfo{ + Urn: UrnToString(ExecutionMsecUrn(1)), + Type: UrnToType(ExecutionMsecUrn(1)), + Labels: labels, + Payload: processValue, + } + mFinishBundleInfo := &pipepb.MonitoringInfo{ + Urn: UrnToString(ExecutionMsecUrn(2)), + Type: UrnToType(ExecutionMsecUrn(2)), + Labels: labels, + Payload: finishValue, + } + mTotalTimeInfo := &pipepb.MonitoringInfo{ + Urn: UrnToString(ExecutionMsecUrn(3)), + Type: UrnToType(ExecutionMsecUrn(3)), + Labels: labels, + Payload: totalValue, + } + + attempted := []*pipepb.MonitoringInfo{mStartBundleInfo, mProcessBundleInfo, mFinishBundleInfo, mTotalTimeInfo} + committed := []*pipepb.MonitoringInfo{} + p := &pipepb.Pipeline{} + + got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Msecs() + size := len(got) + if size != 1 { + t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1) + } + if d := cmp.Diff(want, got[0]); d != "" { + t.Fatalf("Invalid MsecResult: got: %v, want: %v, diff(-want,+got):\n %v", + got[0], want, d) + } +} + +func TestFromMonitoringInfos_PColCounters(t *testing.T) { + var value int64 = 15 + want := metrics.PColResult{ + Attempted: metrics.PColValue{ + ElementCount: 15, + }, + Key: metrics.StepKey{ + Step: "main.customDoFn", + Name: "customCounter", + Namespace: "customDoFn", + }} + + payload, err := Int64Counter(value) + if err != nil { + t.Fatalf("Failed to encode Int64Counter: %v", err) + } + + labels := map[string]string{ + "PTRANSFORM": "main.customDoFn", + "NAMESPACE": "customDoFn", + "NAME": "customCounter", + } + + mInfo := &pipepb.MonitoringInfo{ + Urn: UrnToString(UrnElementCount), + Type: UrnToType(UrnElementCount), + Labels: labels, + Payload: payload, + } + + attempted := []*pipepb.MonitoringInfo{mInfo} + committed := []*pipepb.MonitoringInfo{} + p := &pipepb.Pipeline{} + + got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols() + size := len(got) + if size < 1 { + t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1) + } + if d := cmp.Diff(want, got[0]); d != "" { + t.Fatalf("Invalid counter: got: %v, want: %v, diff(-want,+got):\n %v", + got[0], want, d) + } +} + +func TestFromMonitoringInfos_SampledByteSize(t *testing.T) { + want := metrics.PColResult{ + Attempted: metrics.PColValue{ + SampledByteSize: metrics.DistributionValue{ + Count: 100, + Sum: 5, + Min: -12, + Max: 30, + }, + }, + Key: metrics.StepKey{ + Step: "main.customDoFn", + Name: "customCounter", + Namespace: "customDoFn", + }} + + var count, sum, min, max int64 = 100, 5, -12, 30 + payload, err := Int64Distribution(count, sum, min, max) + if err != nil { + t.Fatalf("Failed to encode Int64Distribution: %v", err) + } + + labels := map[string]string{ + "PTRANSFORM": "main.customDoFn", + "NAMESPACE": "customDoFn", + "NAME": "customCounter", + } + + mInfo := &pipepb.MonitoringInfo{ + Urn: UrnToString(UrnSampledByteSize), + Type: UrnToType(UrnSampledByteSize), + Labels: labels, + Payload: payload, + } + + attempted := []*pipepb.MonitoringInfo{mInfo} + committed := []*pipepb.MonitoringInfo{} + p := &pipepb.Pipeline{} + + got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols() + size := len(got) + if size < 1 { + t.Fatalf("Invalid array's size: got: %v, want: %v", size, FromMonitoringInfos(p, attempted, committed).AllMetrics()) + } + if d := cmp.Diff(want, got[0]); d != "" { + t.Fatalf("Invalid counter: got: %v, want: %v, diff(-want,+got):\n %v", + got[0], want, d) + } +} + func TestFromMonitoringInfos_Distributions(t *testing.T) { var count, sum, min, max int64 = 100, 5, -12, 30 From 8517ff3e41ff372d0504cb45fada6ab0c9751bd9 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 2 Mar 2022 14:37:30 -0500 Subject: [PATCH 2/2] Exact size check --- .../go/pkg/beam/core/runtime/metricsx/metricsx_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go index 6a3d53c1ec106..95b92f929c08c 100644 --- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go +++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go @@ -59,7 +59,7 @@ func TestFromMonitoringInfos_Counters(t *testing.T) { got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Counters() size := len(got) - if size < 1 { + if size != 1 { t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1) } if d := cmp.Diff(want, got[0]); d != "" { @@ -187,7 +187,7 @@ func TestFromMonitoringInfos_PColCounters(t *testing.T) { got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols() size := len(got) - if size < 1 { + if size != 1 { t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1) } if d := cmp.Diff(want, got[0]); d != "" { @@ -237,7 +237,7 @@ func TestFromMonitoringInfos_SampledByteSize(t *testing.T) { got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols() size := len(got) - if size < 1 { + if size != 1 { t.Fatalf("Invalid array's size: got: %v, want: %v", size, FromMonitoringInfos(p, attempted, committed).AllMetrics()) } if d := cmp.Diff(want, got[0]); d != "" { @@ -287,7 +287,7 @@ func TestFromMonitoringInfos_Distributions(t *testing.T) { got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Distributions() size := len(got) - if size < 1 { + if size != 1 { t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1) } if d := cmp.Diff(want, got[0]); d != "" { @@ -337,7 +337,7 @@ func TestFromMonitoringInfos_Gauges(t *testing.T) { got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Gauges() size := len(got) - if size < 1 { + if size != 1 { t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1) } if d := cmp.Diff(want, got[0]); d != "" {