Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Azure blob storage support #560

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,35 @@ OPTIONS:
value. This flag will be removed. (default: 2)
[$BAZEL_REMOTE_S3_KEY_VERSION]

--azblob.tenant_id value The AzBlob tenant id to use when using azblob
proxy backend. [$BAZEL_REMOTE_AZBLOB_TENANT_ID, $AZURE_TENANT_ID]

--azblob.storage_account value The AzBlob storage account to use when
using azblob proxy backend. [$BAZEL_REMOTE_AZBLOB_STORAGE_ACCOUNT]

--azblob.container_name value The AzBlob container name to use when using
azblob proxy backend. [$BAZEL_REMOTE_AZBLOB_CONTAINER_NAME]

--azblob.prefix value The AZBLOB object prefix to use when using az blob
proxy backend. [$BAZEL_REMOTE_AZBLOB_PREFIX]

--azblob.auth_method value The AzBlob authentication method. This argument
is required when an AzBlob proxy backend is used. Allowed values:
client_certificate, client_secret, environment_credential, device_code,
default. [$BAZEL_REMOTE_AZBLOB_AUTH_METHOD]

--azblob.client_id value The AzBlob client id to use when using AzBlob
proxy backend. Applies to AzBlob auth method(s): client_secret.
[$BAZEL_REMOTE_AZBLOB_CLIENT_ID, $AZURE_CLIENT_ID]

--azblob.client_secret value The AzBlob cliensecret key to use when using
S3 proxy backend. Applies to AzBlob auth method(s): client_secret.
[$BAZEL_REMOTE_AZBLOB_SECRET_CLIENT_SECRET, $AZURE_CLIENT_SECRET]

--azblob.cert_path value Path to the Certificates filefile. Applies to
AzBlob auth method(s): client_certificate.
[$BAZEL_REMOTE_AZBLOB_CERT_PATH, $AZURE_CLIENT_CERTIFICATE_PATH]

--disable_http_ac_validation Whether to disable ActionResult validation
for HTTP requests. (default: false, ie enable validation)
[$BAZEL_REMOTE_DISABLE_HTTP_AC_VALIDATION]
Expand Down Expand Up @@ -425,7 +454,15 @@ http_address: 0.0.0.0:8080
#
#http_proxy:
# url: https://remote-cache.com:8080/cache

#
#azblob_proxy:
# tenant_id: TENANT_ID
# storage_account: STORAGE_ACCOUNT
# container_name: CONTAINER_NAME
# auth_method: client_secret
bakjos marked this conversation as resolved.
Show resolved Hide resolved
# client_id: APP_ID
# client_secret: APP_SECRET

# If set to a valid port number, then serve /debug/pprof/* URLs here:
#profile_port: 7070
# IP address to use, if profiling is enabled:
Expand Down
19 changes: 19 additions & 0 deletions cache/azblobproxy/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = [
"auth_methods.go",
"azblobproxy.go",
],
importpath = "github.com/buchgr/bazel-remote/cache/azblobproxy",
visibility = ["//visibility:public"],
deps = [
"//cache:go_default_library",
"//cache/disk/casblob:go_default_library",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:go_default_library",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
],
)
28 changes: 28 additions & 0 deletions cache/azblobproxy/auth_methods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package azblobproxy

const (
AuthMethodClientCertificate = "client_certificate"
AuthMethodClientSecret = "client_secret"
AuthMethodEnvironmentCredential = "environment_credential"
AuthMethodDeviceCode = "device_code"
AuthMethodDefault = "default"
)

func GetAuthMethods() []string {
return []string{
AuthMethodClientCertificate,
AuthMethodClientSecret,
AuthMethodEnvironmentCredential,
AuthMethodDeviceCode,
AuthMethodDefault,
}
}

func IsValidAuthMethod(authMethod string) bool {
for _, b := range GetAuthMethods() {
if authMethod == b {
return true
}
}
return false
}
249 changes: 249 additions & 0 deletions cache/azblobproxy/azblobproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package azblobproxy

import (
"context"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/buchgr/bazel-remote/cache"
"github.com/buchgr/bazel-remote/cache/disk/casblob"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"io"
"log"
"path"
bakjos marked this conversation as resolved.
Show resolved Hide resolved
)

var (
cacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "bazel_remote_azblob_cache_hits",
Help: "The total number of azblob backend cache hits",
})
cacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "bazel_remote_azblob_cache_misses",
Help: "The total number of azblob backend cache misses",
})
)

type uploadReq struct {
hash string
size int64
kind cache.EntryKind
rc io.ReadCloser
cxt context.Context
}

type azBlobCache struct {
containerClient *azblob.ContainerClient
storageAccount string
container string
prefix string
v2mode bool
uploadQueue chan<- uploadReq
accessLogger cache.Logger
errorLogger cache.Logger
objectKey func(hash string, kind cache.EntryKind) string
}

func (c *azBlobCache) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
if c.uploadQueue == nil {
rc.Close()
return
}

select {
case c.uploadQueue <- uploadReq{
hash: hash,
size: size,
kind: kind,
rc: rc,
cxt: context.Background(),
bakjos marked this conversation as resolved.
Show resolved Hide resolved
}:
default:
c.errorLogger.Printf("too many uploads queued\n")
rc.Close()
}
}

func (c *azBlobCache) Get(ctx context.Context, kind cache.EntryKind, hash string) (rc io.ReadCloser, size int64, err error) {
key := c.objectKey(hash, kind)
if c.prefix != "" {
key = c.prefix + "/" + key
}
client, err := c.containerClient.NewBlockBlobClient(key)

if err != nil {
cacheMisses.Inc()
logResponse(c.accessLogger, "DOWNLOAD", c.storageAccount, c.container, key, err)
return nil, -1, err
}

resp, err := client.Download(context.Background(), nil)

if err != nil {

bakjos marked this conversation as resolved.
Show resolved Hide resolved
cacheMisses.Inc()
logResponse(c.accessLogger, "DOWNLOAD", c.storageAccount, c.container, key, err)
return nil, -1, err
}
cacheHits.Inc()

logResponse(c.accessLogger, "DOWNLOAD", c.storageAccount, c.container, key, err)

if kind == cache.CAS && c.v2mode {
return casblob.ExtractLogicalSize(rc)
}

if resp.ContentLength != nil {
size = *resp.ContentLength
}

rc = resp.Body(&azblob.RetryReaderOptions{MaxRetryRequests: 2})

return rc, size, nil
}

var errNotFound = errors.New("NOT FOUND")

func (c *azBlobCache) Contains(ctx context.Context, kind cache.EntryKind, hash string) (bool, int64) {
key := c.objectKey(hash, kind)
if c.prefix != "" {
key = c.prefix + "/" + key
}

size := int64(-1)
exists := false

client, err := c.containerClient.NewBlobClient(key)

exists = (err == nil)
if err != nil {
err = errNotFound
} else if kind != cache.CAS || !c.v2mode {
props, e := client.GetProperties(context.Background(), nil)
err = e
if props.ContentLength != nil {
size = *props.ContentLength
}
}

logResponse(c.accessLogger, "CONTAINS", c.storageAccount, c.container, key, err)

return exists, size

bakjos marked this conversation as resolved.
Show resolved Hide resolved
}

func New(
storageAccount string,
containerName string,
prefix string,
creds azcore.TokenCredential,
storageMode string, accessLogger cache.Logger,
errorLogger cache.Logger, numUploaders, maxQueuedUploads int,
) cache.Proxy {

url := fmt.Sprintf("https://%s.blob.core.windows.net/", storageAccount)
serviceClient, err := azblob.NewServiceClient(url, creds, nil)
if err != nil {
log.Fatalln(err)
}

containerClient, err := serviceClient.NewContainerClient(containerName)
if err != nil {
log.Fatalln(err)
}

if storageMode != "zstd" && storageMode != "uncompressed" {
log.Fatalf("Unsupported storage mode for the s3proxy backend: %q, must be one of \"zstd\" or \"uncompressed\"",
mostynb marked this conversation as resolved.
Show resolved Hide resolved
storageMode)
}

c := &azBlobCache{
containerClient: containerClient,
prefix: prefix,
storageAccount: storageAccount,
container: containerName,
accessLogger: accessLogger,
errorLogger: errorLogger,
v2mode: storageMode == "zstd",
}

if c.v2mode {
c.objectKey = func(hash string, kind cache.EntryKind) string {
return objectKeyV2(c.prefix, hash, kind)
}
} else {
c.objectKey = func(hash string, kind cache.EntryKind) string {
return objectKeyV1(c.prefix, hash, kind)
}
}

if maxQueuedUploads > 0 && numUploaders > 0 {
uploadQueue := make(chan uploadReq, maxQueuedUploads)
for uploader := 0; uploader < numUploaders; uploader++ {
go func() {
for item := range uploadQueue {
c.uploadFile(item)
}
}()
}

c.uploadQueue = uploadQueue
}

return c
}

func (c *azBlobCache) uploadFile(item uploadReq) {
key := c.objectKey(item.hash, item.kind)
if c.prefix != "" {
key = c.prefix + "/" + key
}
client, err := c.containerClient.NewBlockBlobClient(key)

if err != nil {
logResponse(c.accessLogger, "UPLOAD", c.storageAccount, c.container, key, err)
return
}

_, err = client.Upload(item.cxt, item.rc.(io.ReadSeekCloser), nil)

logResponse(c.accessLogger, "UPLOAD", c.storageAccount, c.container, key, err)

item.rc.Close()
bakjos marked this conversation as resolved.
Show resolved Hide resolved
}

func objectKeyV2(prefix string, hash string, kind cache.EntryKind) string {
var baseKey string
if kind == cache.CAS {
// Use "cas.v2" to distinguish new from old format blobs.
baseKey = path.Join("cas.v2", hash[:2], hash)
} else {
baseKey = path.Join(kind.String(), hash[:2], hash)
}

if prefix == "" {
return baseKey
}

return path.Join(prefix, baseKey)
}

func objectKeyV1(prefix string, hash string, kind cache.EntryKind) string {
if prefix == "" {
return path.Join(kind.String(), hash[:2], hash)
}

return path.Join(prefix, kind.String(), hash[:2], hash)
}

// Helper function for logging responses
func logResponse(log cache.Logger, method, storageAccount, container, key string, err error) {
status := "OK"
if err != nil {
status = err.Error()
}

log.Printf("AZBLOB %s %s %s %s", method, storageAccount, container, key, status)
}
4 changes: 4 additions & 0 deletions config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"azblob.go",
"config.go",
"logger.go",
"proxy.go",
Expand All @@ -13,9 +14,12 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//cache:go_default_library",
"//cache/azblobproxy:go_default_library",
"//cache/gcsproxy:go_default_library",
"//cache/httpproxy:go_default_library",
"//cache/s3proxy:go_default_library",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:go_default_library",
"@com_github_azure_azure_sdk_for_go_sdk_azidentity//:go_default_library",
"@com_github_minio_minio_go_v7//pkg/credentials:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
"@in_gopkg_yaml_v3//:go_default_library",
Expand Down
Loading