Skip to content

Commit

Permalink
Merge pull request #363 from Shopify/http_server_example
Browse files Browse the repository at this point in the history
Add http_server example
  • Loading branch information
wvanbergen committed Mar 18, 2015
2 parents 086643f + ba21d54 commit 8e3dfac
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 0 deletions.
7 changes: 7 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Sarama examples

This folder contains example applications to demonstrate the use of Sarama. For code snippet examples on how to use the different types in Sarama, see [Sarams's API documentation on godoc.org](https://godoc.org/github.com/Shopify/sarama)

#### HTTP server

[http_server](./http_server) is a simple HTTP server uses both the sync producer to produce data as part of the request handling cycle, as well as the async producer to maintain an access log. It also uses the [mocks subpackage](https://godoc.org/github.com/Shopify/sarama/mocks) to test both.
2 changes: 2 additions & 0 deletions examples/http_server/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
http_server
http_server.test
7 changes: 7 additions & 0 deletions examples/http_server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# HTTP server example

This HTTP server example shows you how to use the AsyncProducer and SyncProducer, and how to test them using mocks. The server simply sends the data of the HTTP request's query string to Kafka, and send a 200 result if that succeeds. For every request, it will send an access log entry to Kafka as well in the background.

If you need to know whether a message was successfully sent to the Kafka cluster before you can send your HTTP response, using the `SyncProducer` is probably the simplest way to achieve this. If you don't care, e.g. for the access log, using the `AsyncProducer` will let you fire and forget. You can send the HTTP response, while the message is being produced in the background.

One important thing to note is that both the `SyncProducer` and `AsyncProducer` are **thread-safe**. Go's `http.Server` handles requests concurrently in different goroutines, but you can use a single producer safely. This will actually achieve efficiency gains as the producer will be able to batch messages from concurrent requests together.
204 changes: 204 additions & 0 deletions examples/http_server/http_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package main

import (
"github.com/Shopify/sarama"

"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"strings"
"time"
)

var (
addr = flag.String("addr", ":8080", "The address to bind to")
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
)

func main() {
flag.Parse()

if *verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}

if *brokers == "" {
flag.PrintDefaults()
os.Exit(1)
}

brokerList := strings.Split(*brokers, ",")
log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))

server := &Server{
DataCollector: newDataCollector(brokerList),
AccessLogProducer: newAccessLogProducer(brokerList),
}
defer func() {
if err := server.Close(); err != nil {
log.Println("Failed to close server", err)
}
}()

log.Fatal(server.Run(*addr))
}

type Server struct {
DataCollector sarama.SyncProducer
AccessLogProducer sarama.AsyncProducer
}

func (s *Server) Close() error {
if err := s.DataCollector.Close(); err != nil {
log.Println("Failed to shut down data collector cleanly", err)
}

if err := s.AccessLogProducer.Close(); err != nil {
log.Println("Failed to shut down access log producer cleanly", err)
}

return nil
}

func (s *Server) Handler() http.Handler {
return s.withAccessLog(s.collectQueryStringData())
}

func (s *Server) Run(addr string) error {
httpServer := &http.Server{
Addr: addr,
Handler: s.Handler(),
}

log.Printf("Listening for requests on %s...\n", addr)
return httpServer.ListenAndServe()
}

func (s *Server) collectQueryStringData() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}

// We are not setting a message key, which means that all messages will
// be distributed randomly over the different partitions.
partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
Topic: "important",
Value: sarama.StringEncoder(r.URL.RawQuery),
})

if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Failed to store your data:, %s", err)
} else {
// The tuple (topic, partition, offset) can be used as a unique identifier
// for a message in a Kafka cluster.
fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
}
})
}

type accessLogEntry struct {
Method string `json:"method"`
Host string `json:"host"`
Path string `json:"path"`
IP string `json:"ip"`
ResponseTime float64 `json:"response_time"`

encoded []byte
err error
}

func (ale *accessLogEntry) ensureEncoded() {
if ale.encoded == nil && ale.err == nil {
ale.encoded, ale.err = json.Marshal(ale)
}
}

func (ale *accessLogEntry) Length() int {
ale.ensureEncoded()
return len(ale.encoded)
}

func (ale *accessLogEntry) Encode() ([]byte, error) {
ale.ensureEncoded()
return ale.encoded, ale.err
}

func (s *Server) withAccessLog(next http.Handler) http.Handler {

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
started := time.Now()

next.ServeHTTP(w, r)

entry := &accessLogEntry{
Method: r.Method,
Host: r.Host,
Path: r.RequestURI,
IP: r.RemoteAddr,
ResponseTime: float64(time.Since(started)) / float64(time.Second),
}

// We will use the client's IP address as key. This will cause
// all the access log entries of the same IP address to end up
// on the same partition.
s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
Topic: "access_log",
Key: sarama.StringEncoder(r.RemoteAddr),
Value: entry,
}
})
}

func newDataCollector(brokerList []string) sarama.SyncProducer {

// For the data collector, we are looking for strong consistency semantics.
// Because we don't change the flush settings, sarama will try to produce messages
// as fast as possible to keep latency low.
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message

// On the broker side, you may want to change the following settings to get
// stronger consistency guarantees:
// - For your broker, set `unclean.leader.election.enable` to false
// - For the topic, you could increase `min.insync.replicas`.

producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}

return producer
}

func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {

// For the access log, we are looking for AP semantics, with high throughput.
// By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms

producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}

// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()

return producer
}
109 changes: 109 additions & 0 deletions examples/http_server/http_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package main

import (
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
)

// In normal operation, we expect one access log entry,
// and one data collector entry. Let's assume both will succeed.
// We should return a HTTP 200 status.
func TestCollectSuccessfully(t *testing.T) {
dataCollectorMock := mocks.NewSyncProducer(t, nil)
dataCollectorMock.ExpectSendMessageAndSucceed()

accessLogProducerMock := mocks.NewAsyncProducer(t, nil)
accessLogProducerMock.ExpectInputAndSucceed()

// Now, use dependency injection to use the mocks.
s := &Server{
DataCollector: dataCollectorMock,
AccessLogProducer: accessLogProducerMock,
}

// The Server's Close call is important; it will call Close on
// the two mock producers, which will then validate whether all
// expectations are resolved.
defer safeClose(t, s)

req, err := http.NewRequest("GET", "http://example.com/?data", nil)
if err != nil {
t.Fatal(err)
}
res := httptest.NewRecorder()
s.Handler().ServeHTTP(res, req)

if res.Code != 200 {
t.Errorf("Expected HTTP status 200, found %d", res.Code)
}

if string(res.Body.Bytes()) != "Your data is stored with unique identifier important/0/1" {
t.Error("Unexpected response body", res.Body)
}
}

// Now, let's see if we handle the case of not being able to produce
// to the data collector properly. In this case we should return a 500 status.
func TestCollectionFailure(t *testing.T) {
dataCollectorMock := mocks.NewSyncProducer(t, nil)
dataCollectorMock.ExpectSendMessageAndFail(sarama.ErrRequestTimedOut)

accessLogProducerMock := mocks.NewAsyncProducer(t, nil)
accessLogProducerMock.ExpectInputAndSucceed()

s := &Server{
DataCollector: dataCollectorMock,
AccessLogProducer: accessLogProducerMock,
}
defer safeClose(t, s)

req, err := http.NewRequest("GET", "http://example.com/?data", nil)
if err != nil {
t.Fatal(err)
}
res := httptest.NewRecorder()
s.Handler().ServeHTTP(res, req)

if res.Code != 500 {
t.Errorf("Expected HTTP status 500, found %d", res.Code)
}
}

// We don't expect any data collector calls because the path is wrong,
// so we are not setting any expectations on the dataCollectorMock. It
// will still generate an access log entry though.
func TestWrongPath(t *testing.T) {
dataCollectorMock := mocks.NewSyncProducer(t, nil)

accessLogProducerMock := mocks.NewAsyncProducer(t, nil)
accessLogProducerMock.ExpectInputAndSucceed()

s := &Server{
DataCollector: dataCollectorMock,
AccessLogProducer: accessLogProducerMock,
}
defer safeClose(t, s)

req, err := http.NewRequest("GET", "http://example.com/wrong?data", nil)
if err != nil {
t.Fatal(err)
}
res := httptest.NewRecorder()

s.Handler().ServeHTTP(res, req)

if res.Code != 404 {
t.Errorf("Expected HTTP status 404, found %d", res.Code)
}
}

func safeClose(t *testing.T, o io.Closer) {
if err := o.Close(); err != nil {
t.Error(err)
}
}

0 comments on commit 8e3dfac

Please sign in to comment.