Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Post to webhook on successful cron job completion #829

Merged
merged 2 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions cron/bq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@
package main

import (
"bytes"
"context"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"time"

"google.golang.org/protobuf/encoding/protojson"

"github.com/ossf/scorecard/v2/cron/config"
"github.com/ossf/scorecard/v2/cron/data"
)

type shardSummary struct {
shardMetadata []byte
shardsExpected int
shardsCreated int
isTransferred bool
Expand Down Expand Up @@ -60,6 +66,7 @@ func getBucketSummary(ctx context.Context, bucketURL string) (*bucketSummary, er
return nil, fmt.Errorf("error parsing Blob key: %w", err)
}
switch {
// TODO(azeems): Remove this case once all instances stop producing .shard_num file.
case filename == config.ShardNumFilename:
keyData, err := data.GetBlobContent(ctx, bucketURL, key)
if err != nil {
Expand All @@ -74,8 +81,16 @@ func getBucketSummary(ctx context.Context, bucketURL string) (*bucketSummary, er
case filename == config.TransferStatusFilename:
summary.getOrCreate(creationTime).isTransferred = true
case filename == config.ShardMetadataFilename:
// TODO(azeems): Handle shard_metadata file.
continue
keyData, err := data.GetBlobContent(ctx, bucketURL, key)
if err != nil {
return nil, fmt.Errorf("error during GetBlobContent: %w", err)
}
var metadata data.ShardMetadata
if err := protojson.Unmarshal(keyData, &metadata); err != nil {
return nil, fmt.Errorf("error parsing data as ShardMetadata: %w", err)
}
summary.getOrCreate(creationTime).shardsExpected = int(metadata.GetNumShard())
summary.getOrCreate(creationTime).shardMetadata = keyData
default:
// nolint: goerr113
return nil, fmt.Errorf("found unrecognized file: %s", key)
Expand All @@ -85,7 +100,7 @@ func getBucketSummary(ctx context.Context, bucketURL string) (*bucketSummary, er
}

func transferDataToBq(ctx context.Context,
bucketURL, projectID, datasetName, tableName string,
bucketURL, projectID, datasetName, tableName, webhookURL string,
summary *bucketSummary) error {
for creationTime, shards := range summary.shards {
if shards.isTransferred || shards.shardsExpected != shards.shardsCreated {
Expand All @@ -103,6 +118,16 @@ func transferDataToBq(ctx context.Context,
if err := data.WriteToBlobStore(ctx, bucketURL, transferStatusFilename, nil); err != nil {
return fmt.Errorf("error during WriteToBlobStore: %w", err)
}
if webhookURL == "" {
continue
}
// nolint: noctx, gosec // variable URL is ok here.
resp, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(shards.shardMetadata))
if err != nil {
return fmt.Errorf("error during http.Post to %s: %w", webhookURL, err)
}
defer resp.Body.Close()
log.Println(resp.Status)
}
return nil
}
Expand All @@ -129,6 +154,10 @@ func main() {
if err != nil {
panic(err)
}
webhookURL, err := config.GetWebhookURL()
if err != nil {
panic(err)
}
projectID, datasetName, tableName, err := getBQConfig()
if err != nil {
panic(err)
Expand All @@ -140,7 +169,7 @@ func main() {
}

if err := transferDataToBq(ctx,
bucketURL, projectID, datasetName, tableName,
bucketURL, projectID, datasetName, tableName, webhookURL,
summary); err != nil {
panic(err)
}
Expand Down
11 changes: 11 additions & 0 deletions cron/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
bigqueryDataset string = "SCORECARD_BIGQUERY_DATASET"
bigqueryTable string = "SCORECARD_BIGQUERY_TABLE"
shardSize string = "SCORECARD_SHARD_SIZE"
webhookURL string = "SCORECARD_WEBHOOK_URL"
metricExporter string = "SCORECARD_METRIC_EXPORTER"
)

Expand All @@ -61,6 +62,7 @@ type config struct {
RequestSubscriptionURL string `yaml:"request-subscription-url"`
BigQueryDataset string `yaml:"bigquery-dataset"`
BigQueryTable string `yaml:"bigquery-table"`
WebhookURL string `yaml:"webhook-url"`
MetricExporter string `yaml:"metric-exporter"`
ShardSize int `yaml:"shard-size"`
}
Expand Down Expand Up @@ -150,6 +152,15 @@ func GetShardSize() (int, error) {
return getIntConfigValue(shardSize, configYAML, "ShardSize", "shard-size")
}

// GetWebhookURL returns the webhook URL to ping on a successful cron job completion.
func GetWebhookURL() (string, error) {
url, err := getStringConfigValue(webhookURL, configYAML, "WebhookURL", "webhook-url")
if err != nil && !errors.As(err, &ErrorEmptyConfigValue) {
return url, err
}
return url, nil
}

// GetMetricExporter returns the opencensus exporter type.
func GetMetricExporter() (string, error) {
return getStringConfigValue(metricExporter, configYAML, "MetricExporter", "metric-exporter")
Expand Down
1 change: 1 addition & 0 deletions cron/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ request-subscription-url: gcppubsub://projects/openssf/subscriptions/scorecard-b
bigquery-dataset: scorecardcron
bigquery-table: scorecard
shard-size: 10
webhook-url:
metric-exporter: stackdriver
2 changes: 2 additions & 0 deletions cron/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
prodSubscription = "gcppubsub://projects/openssf/subscriptions/scorecard-batch-worker"
prodBigQueryDataset = "scorecardcron"
prodBigQueryTable = "scorecard"
prodWebhookURL = ""
prodShardSize int = 10
prodMetricExporter string = "stackdriver"
)
Expand Down Expand Up @@ -58,6 +59,7 @@ func TestYAMLParsing(t *testing.T) {
RequestSubscriptionURL: prodSubscription,
BigQueryDataset: prodBigQueryDataset,
BigQueryTable: prodBigQueryTable,
WebhookURL: prodWebhookURL,
ShardSize: prodShardSize,
MetricExporter: prodMetricExporter,
},
Expand Down
8 changes: 0 additions & 8 deletions cron/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"fmt"
"os"
"strconv"
"time"

"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -111,13 +110,6 @@ func main() {
if err != nil {
panic(err)
}
// TODO(azeems): Stop populating `.shard_num` file.
err = data.WriteToBlobStore(ctx, bucket,
data.GetShardNumFilename(t),
[]byte(strconv.Itoa(int(shardNum+1))))
if err != nil {
panic(err)
}
// Populate `.shard_metadata` file.
metadata := data.ShardMetadata{
NumShard: new(int32),
Expand Down