Skip to content

Commit

Permalink
Make accessLogEntry a sarama.Encoder.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 18, 2015
1 parent ffc518b commit ba21d54
Showing 1 changed file with 30 additions and 15 deletions.
45 changes: 30 additions & 15 deletions examples/http_server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,40 +103,55 @@ func (s *Server) collectQueryStringData() http.Handler {
})
}

func (s *Server) withAccessLog(next http.Handler) http.Handler {
type accessLogEntry struct {
Method string `json:"method"`
Host string `json:"host"`
Path string `json:"path"`
IP string `json:"ip"`
ResponseTime float64 `json:"response_time"`
type accessLogEntry struct {
Method string `json:"method"`
Host string `json:"host"`
Path string `json:"path"`
IP string `json:"ip"`
ResponseTime float64 `json:"response_time"`

encoded []byte
err error
}

func (ale *accessLogEntry) ensureEncoded() {
if ale.encoded == nil && ale.err == nil {
ale.encoded, ale.err = json.Marshal(ale)
}
}

func (ale *accessLogEntry) Length() int {
ale.ensureEncoded()
return len(ale.encoded)
}

func (ale *accessLogEntry) Encode() ([]byte, error) {
ale.ensureEncoded()
return ale.encoded, ale.err
}

func (s *Server) withAccessLog(next http.Handler) http.Handler {

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
started := time.Now()

next.ServeHTTP(w, r)

entry := accessLogEntry{
entry := &accessLogEntry{
Method: r.Method,
Host: r.Host,
Path: r.RequestURI,
IP: r.RemoteAddr,
ResponseTime: float64(time.Since(started)) / float64(time.Second),
}

jsonEntry, err := json.Marshal(entry)
if err != nil {
log.Println("Failed to marshal JSON access log entry:", err)
}

// We will use the client's IP address as key. This will cause
// all the access log entries of the same IP address to end up
// on the same partition.
s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
Topic: "access_log",
Key: sarama.StringEncoder(r.RemoteAddr),
Value: sarama.ByteEncoder(jsonEntry),
Value: entry,
}
})
}
Expand Down Expand Up @@ -177,7 +192,7 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
log.Fatalln("Failed to start Sarama producer:", err)
}

// We wil just log to STDOUT if we're not able to produce messages.
// We will just log to STDOUT if we're not able to produce messages.
// Note: messages will only be returned here after all retry attempts are exhausted.
go func() {
for err := range producer.Errors() {
Expand Down

0 comments on commit ba21d54

Please sign in to comment.