Skip to content

Commit

Permalink
GCS Proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
fkorotkov committed Apr 17, 2018
1 parent c5c2311 commit 9f343a9
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 1 deletion.
9 changes: 9 additions & 0 deletions .cirrus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
container:
image: golang:1.10

test_task:
env:
CIRRUS_WORKING_DIR: /go/src/github.com/$CIRRUS_REPO_FULL_NAME
get_script: go get -t -v ./...
vet_script: go vet -v ./...
test_script: go test -v ./...
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@

# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/

# IntelliJ
.idea/
*.iml
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# google-storage-proxy
HTTP proxy with REST API to interact with Google Cloud Storage Buckets

Simply allows to use `HEAD`, `GET` or `PUT` requests to check blob's availability, as well as downloading or uploading
blobs to a specified GCS bucket.
36 changes: 36 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"cloud.google.com/go/storage"
"context"
"flag"
"github.com/cirruslabs/google-storage-proxy/proxy"
"log"
)

func main() {
var port int64
flag.Int64Var(&port, "port", 8080, "Port to serve")
var bucketName string
flag.StringVar(&bucketName, "bucket", "", "Google Storage Bucket Name")
var defaultPrefix string
flag.StringVar(&defaultPrefix, "prefix", "", "Default prefix for all object names. For example, use --prefix=foo/.")
flag.Parse()

if bucketName == "" {
log.Fatal("Please specify Google Cloud Storage Bucket")
}

client, err := storage.NewClient(context.Background())
if err != nil {
log.Fatalf("Failed to create a storage client: %s", err)
}

bucketHandler := client.Bucket(bucketName)
storageProxy := http_cache.NewStorageProxy(bucketHandler, defaultPrefix)

err = storageProxy.Serve(port)
if err != nil {
log.Fatalf("Failed to start proxy: %s", err)
}
}
109 changes: 109 additions & 0 deletions proxy/http_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package http_cache

import (
"bufio"
"context"
"fmt"
"cloud.google.com/go/storage"
"log"
"net"
"net/http"
)

type StorageProxy struct {
bucketHandler *storage.BucketHandle
defaultPrefix string
}

func NewStorageProxy(bucketHandler *storage.BucketHandle, defaultPrefix string) *StorageProxy {
return &StorageProxy{
bucketHandler: bucketHandler,
defaultPrefix: defaultPrefix,
}
}

func (proxy StorageProxy) objectName(name string) string {
return proxy.defaultPrefix + name
}

func (proxy StorageProxy) Serve(port int64) error {
http.HandleFunc("/", proxy.handler)

listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))

if err == nil {
address := listener.Addr().String()
listener.Close()
log.Printf("Starting http cache server %s\n", address)
return http.ListenAndServe(address, nil)
}
return err
}

func (proxy StorageProxy) handler(w http.ResponseWriter, r *http.Request) {
key := r.URL.Path
if key[0] == '/' {
key = key[1:]
}
if r.Method == "GET" {
proxy.downloadBlob(w, key)
} else if r.Method == "HEAD" {
proxy.checkBlobExists(w, key)
} else if r.Method == "POST" {
proxy.uploadBlob(w, r, key)
} else if r.Method == "PUT" {
proxy.uploadBlob(w, r, key)
}
}

func (proxy StorageProxy) downloadBlob(w http.ResponseWriter, name string) {
object := proxy.bucketHandler.Object(proxy.objectName(name))
if object == nil {
w.WriteHeader(http.StatusNotFound)
return
}
reader, err := object.NewReader(context.Background())
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
defer reader.Close()
bufferedReader := bufio.NewReader(reader)
_, err = bufferedReader.WriteTo(w)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
}

func (proxy StorageProxy) checkBlobExists(w http.ResponseWriter, name string) {
object := proxy.bucketHandler.Object(proxy.objectName(name))
if object == nil {
w.WriteHeader(http.StatusNotFound)
return
}
// lookup attributes to see if the object exists
attrs, err := object.Attrs(context.Background())
if err != nil || attrs == nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
}

func (proxy StorageProxy) uploadBlob(w http.ResponseWriter, r *http.Request, name string) {
object := proxy.bucketHandler.Object(proxy.objectName(name))

writer := object.NewWriter(context.Background())
defer writer.Close()

_, err := bufio.NewWriter(writer).ReadFrom(bufio.NewReader(r.Body))
if err != nil {
errorMsg := fmt.Sprintf("Failed read cache body! %s", err)
w.Write([]byte(errorMsg))
w.WriteHeader(http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusCreated)
}
101 changes: 101 additions & 0 deletions proxy/http_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package http_cache

import (
"testing"
"github.com/fsouza/fake-gcs-server/fakestorage"
"net/http/httptest"
"net/http"
"strings"
)

const TestBucketName = "some-bucket"

func Test_Blob_Exists(t *testing.T) {
server := fakestorage.NewServer([]fakestorage.Object{
{
BucketName: TestBucketName,
Name: "some/object/file",
},
})
defer server.Stop()
client := server.Client()
storageProxy := NewStorageProxy(client.Bucket(TestBucketName), "")

response := httptest.NewRecorder()
storageProxy.checkBlobExists(response, "some/object/file")

if response.Code == http.StatusOK {
t.Log("Passed")
} else {
t.Errorf("Wrong status: '%d'", response.Code)
}
}

func Test_Default_Prefix(t *testing.T) {
server := fakestorage.NewServer([]fakestorage.Object{
{
BucketName: TestBucketName,
Name: "some/object/file",
},
})
defer server.Stop()
client := server.Client()
storageProxy := NewStorageProxy(client.Bucket(TestBucketName), "some/object/")

response := httptest.NewRecorder()
storageProxy.checkBlobExists(response, "file")

if response.Code == http.StatusOK {
t.Log("Passed")
} else {
t.Errorf("Wrong status: '%d'", response.Code)
}
}

func Test_Blob_Download(t *testing.T) {
expectedBlobContent := "my content"
server := fakestorage.NewServer([]fakestorage.Object{
{
BucketName: TestBucketName,
Name: "some/file",
Content: []byte(expectedBlobContent),
},
})
defer server.Stop()
client := server.Client()
storageProxy := NewStorageProxy(client.Bucket(TestBucketName), "")

response := httptest.NewRecorder()
storageProxy.downloadBlob(response, "some/file")

if response.Code == http.StatusOK {
t.Log("Passed")
} else {
t.Errorf("Wrong status: '%d'", response.Code)
}

downloadedBlobContent := response.Body.String()
if downloadedBlobContent == expectedBlobContent {
t.Log("Passed")
} else {
t.Errorf("Wrong content: '%s'", downloadedBlobContent)
}
}

func Test_Blob_Upload(t *testing.T) {
expectedBlobContent := "my content"
server := fakestorage.NewServer([]fakestorage.Object{})
defer server.Stop()
client := server.Client()
storageProxy := NewStorageProxy(client.Bucket(TestBucketName), "")

response := httptest.NewRecorder()
request := httptest.NewRequest("POST", "/test-file", strings.NewReader(expectedBlobContent))
storageProxy.uploadBlob(response, request,"test-file")

if response.Code == http.StatusCreated {
t.Log("Passed")
} else {
t.Errorf("Wrong status: '%d'", response.Code)
}
}

0 comments on commit 9f343a9

Please sign in to comment.