From 003add76bf10b38ecdb7de3f6dc8b37856b81a6a Mon Sep 17 00:00:00 2001 From: Azeem Shaikh Date: Tue, 10 Aug 2021 18:03:45 -0700 Subject: [PATCH] Post to webhook on successful cron job completion --- cron/bq/main.go | 37 +++++++++++++++++++++++++++++++++---- cron/config/config.go | 11 +++++++++++ cron/config/config.yaml | 1 + cron/config/config_test.go | 2 ++ cron/controller/main.go | 8 -------- 5 files changed, 47 insertions(+), 12 deletions(-) diff --git a/cron/bq/main.go b/cron/bq/main.go index 66cd7f0e1dc..5aa541873b5 100644 --- a/cron/bq/main.go +++ b/cron/bq/main.go @@ -16,17 +16,23 @@ package main import ( + "bytes" "context" "fmt" + "log" + "net/http" "strconv" "strings" "time" + "google.golang.org/protobuf/encoding/protojson" + "github.com/ossf/scorecard/v2/cron/config" "github.com/ossf/scorecard/v2/cron/data" ) type shardSummary struct { + shardMetadata []byte shardsExpected int shardsCreated int isTransferred bool @@ -60,6 +66,7 @@ func getBucketSummary(ctx context.Context, bucketURL string) (*bucketSummary, er return nil, fmt.Errorf("error parsing Blob key: %w", err) } switch { + // TODO(azeems): Remove this case once all instances stop producing .shard_num file. case filename == config.ShardNumFilename: keyData, err := data.GetBlobContent(ctx, bucketURL, key) if err != nil { @@ -74,8 +81,16 @@ func getBucketSummary(ctx context.Context, bucketURL string) (*bucketSummary, er case filename == config.TransferStatusFilename: summary.getOrCreate(creationTime).isTransferred = true case filename == config.ShardMetadataFilename: - // TODO(azeems): Handle shard_metadata file. - continue + keyData, err := data.GetBlobContent(ctx, bucketURL, key) + if err != nil { + return nil, fmt.Errorf("error during GetBlobContent: %w", err) + } + var metadata data.ShardMetadata + if err := protojson.Unmarshal(keyData, &metadata); err != nil { + return nil, fmt.Errorf("error parsing data as ShardMetadata: %w", err) + } + summary.getOrCreate(creationTime).shardsExpected = int(metadata.GetNumShard()) + summary.getOrCreate(creationTime).shardMetadata = keyData default: // nolint: goerr113 return nil, fmt.Errorf("found unrecognized file: %s", key) @@ -85,7 +100,7 @@ func getBucketSummary(ctx context.Context, bucketURL string) (*bucketSummary, er } func transferDataToBq(ctx context.Context, - bucketURL, projectID, datasetName, tableName string, + bucketURL, projectID, datasetName, tableName, webhookURL string, summary *bucketSummary) error { for creationTime, shards := range summary.shards { if shards.isTransferred || shards.shardsExpected != shards.shardsCreated { @@ -103,6 +118,16 @@ func transferDataToBq(ctx context.Context, if err := data.WriteToBlobStore(ctx, bucketURL, transferStatusFilename, nil); err != nil { return fmt.Errorf("error during WriteToBlobStore: %w", err) } + if webhookURL == "" { + continue + } + // nolint: noctx, gosec // variable URL is ok here. + resp, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(shards.shardMetadata)) + if err != nil { + return fmt.Errorf("error during http.Post to %s: %w", webhookURL, err) + } + defer resp.Body.Close() + log.Println(resp.Status) } return nil } @@ -129,6 +154,10 @@ func main() { if err != nil { panic(err) } + webhookURL, err := config.GetWebhookURL() + if err != nil { + panic(err) + } projectID, datasetName, tableName, err := getBQConfig() if err != nil { panic(err) @@ -140,7 +169,7 @@ func main() { } if err := transferDataToBq(ctx, - bucketURL, projectID, datasetName, tableName, + bucketURL, projectID, datasetName, tableName, webhookURL, summary); err != nil { panic(err) } diff --git a/cron/config/config.go b/cron/config/config.go index 1281a5ec0d7..40c69a815d9 100644 --- a/cron/config/config.go +++ b/cron/config/config.go @@ -42,6 +42,7 @@ const ( bigqueryDataset string = "SCORECARD_BIGQUERY_DATASET" bigqueryTable string = "SCORECARD_BIGQUERY_TABLE" shardSize string = "SCORECARD_SHARD_SIZE" + webhookURL string = "SCORECARD_WEBHOOK_URL" metricExporter string = "SCORECARD_METRIC_EXPORTER" ) @@ -61,6 +62,7 @@ type config struct { RequestSubscriptionURL string `yaml:"request-subscription-url"` BigQueryDataset string `yaml:"bigquery-dataset"` BigQueryTable string `yaml:"bigquery-table"` + WebhookURL string `yaml:"webhook-url"` MetricExporter string `yaml:"metric-exporter"` ShardSize int `yaml:"shard-size"` } @@ -150,6 +152,15 @@ func GetShardSize() (int, error) { return getIntConfigValue(shardSize, configYAML, "ShardSize", "shard-size") } +// GetWebhookURL returns the webhook URL to ping on a successful cron job completion. +func GetWebhookURL() (string, error) { + url, err := getStringConfigValue(webhookURL, configYAML, "WebhookURL", "webhook-url") + if err != nil && !errors.As(err, &ErrorEmptyConfigValue) { + return url, err + } + return url, nil +} + // GetMetricExporter returns the opencensus exporter type. func GetMetricExporter() (string, error) { return getStringConfigValue(metricExporter, configYAML, "MetricExporter", "metric-exporter") diff --git a/cron/config/config.yaml b/cron/config/config.yaml index 3caa05e445e..1da9eb36e5f 100644 --- a/cron/config/config.yaml +++ b/cron/config/config.yaml @@ -19,4 +19,5 @@ request-subscription-url: gcppubsub://projects/openssf/subscriptions/scorecard-b bigquery-dataset: scorecardcron bigquery-table: scorecard shard-size: 10 +webhook-url: metric-exporter: stackdriver diff --git a/cron/config/config_test.go b/cron/config/config_test.go index e60a51b8cab..edda9af9c60 100644 --- a/cron/config/config_test.go +++ b/cron/config/config_test.go @@ -29,6 +29,7 @@ const ( prodSubscription = "gcppubsub://projects/openssf/subscriptions/scorecard-batch-worker" prodBigQueryDataset = "scorecardcron" prodBigQueryTable = "scorecard" + prodWebhookURL = "" prodShardSize int = 10 prodMetricExporter string = "stackdriver" ) @@ -58,6 +59,7 @@ func TestYAMLParsing(t *testing.T) { RequestSubscriptionURL: prodSubscription, BigQueryDataset: prodBigQueryDataset, BigQueryTable: prodBigQueryTable, + WebhookURL: prodWebhookURL, ShardSize: prodShardSize, MetricExporter: prodMetricExporter, }, diff --git a/cron/controller/main.go b/cron/controller/main.go index 6b23bbe4c2e..03a865d6f7a 100644 --- a/cron/controller/main.go +++ b/cron/controller/main.go @@ -19,7 +19,6 @@ import ( "context" "fmt" "os" - "strconv" "time" "google.golang.org/protobuf/encoding/protojson" @@ -111,13 +110,6 @@ func main() { if err != nil { panic(err) } - // TODO(azeems): Stop populating `.shard_num` file. - err = data.WriteToBlobStore(ctx, bucket, - data.GetShardNumFilename(t), - []byte(strconv.Itoa(int(shardNum+1)))) - if err != nil { - panic(err) - } // Populate `.shard_metadata` file. metadata := data.ShardMetadata{ NumShard: new(int32),