Skip to content

Commit

Permalink
AccessLogFilter support
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash committed Sep 22, 2019
1 parent dcbd967 commit c5082fd
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 2 deletions.
2 changes: 2 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
GENERIC_KEY = "generic"
CLASSIFIER_KEY = "classifier"
TOKEN_KEY = "token"
LOCAL_ADDR = "local-addr"
REMOTE_ADDR = "remote-addr"
)

const (
Expand Down
208 changes: 208 additions & 0 deletions filter/impl/access_log_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package impl

import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
"os"
"reflect"
"strings"
"time"
)

const (
//usd in URL.
AccessLogKey = "accesslog"
FileDateFormat = "2006-01-02"
MessageDateLayout = "2006-01-02 15:04:05"
LogMaxBuffer = 5000
LogFileMode = 0600

// those fields are the data collected by this filter
Types = "types"
Arguments = "arguments"
)

func init() {
extension.SetFilter(AccessLogKey, GetAccessLogFilter)
}

type AccessLogFilter struct {
logChan chan AccessLogData
}

func (ef *AccessLogFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
accessLog := invoker.GetUrl().GetParam(AccessLogKey, "")
logger.Warnf(invoker.GetUrl().String())
if len(accessLog) > 0 {
accessLogData := AccessLogData{data: ef.buildAccessLogData(invoker, invocation), accessLog: accessLog}
ef.logIntoChannel(accessLogData)
}
return invoker.Invoke(invocation)
}

// it won't block the invocation
func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
select {
case ef.logChan <- accessLogData:
return
default:
logger.Warn("The channel is full and the access logIntoChannel data will be dropped")
return
}
}

func (ef *AccessLogFilter) buildAccessLogData(invoker protocol.Invoker, invocation protocol.Invocation) map[string]string {
dataMap := make(map[string]string)
attachments := invocation.Attachments()
dataMap[constant.INTERFACE_KEY] = attachments[constant.INTERFACE_KEY]
dataMap[constant.METHOD_KEY] = invocation.MethodName()
dataMap[constant.VERSION_KEY] = attachments[constant.VERSION_KEY]
dataMap[constant.GROUP_KEY] = attachments[constant.GROUP_KEY]
dataMap[constant.TIMESTAMP_KEY] = time.Now().Format(MessageDateLayout)
dataMap[constant.LOCAL_ADDR], _ = attachments[constant.LOCAL_ADDR]
dataMap[constant.REMOTE_ADDR], _ = attachments[constant.REMOTE_ADDR]

if len(invocation.Arguments()) > 0 {
builder := strings.Builder{}
// todo(after the paramTypes were set to the invocation. we should change this implementation)
typeBuilder := strings.Builder{}
first := true
for _, arg := range invocation.Arguments() {
if first {
first = false
} else {
builder.WriteString(",")
typeBuilder.WriteString(",")
}
builder.WriteString(reflect.ValueOf(arg).String())
typeBuilder.WriteString(reflect.TypeOf(arg).Name())
}
dataMap[Arguments] = builder.String()
dataMap[Types] = typeBuilder.String()
}

return dataMap
}

func (ef *AccessLogFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}

func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) {
accessLog := data.accessLog
if isDefault(accessLog) {
logger.Info(data.toLogMessage())
} else {
logFile, err := ef.openLogFile(accessLog)
if err != nil {
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
return
}
logger.Debugf("Append log to %s", accessLog)
message := data.toLogMessage()
message = message + "\n"
_, err = logFile.WriteString(message)
if err != nil {
logger.Warnf("Can not write the log into access log file: %s, %v", accessLog, err)
}
}
}

func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) {
logFile, err := os.OpenFile(accessLog, os.O_CREATE|os.O_APPEND|os.O_RDWR, LogFileMode)
if err != nil {
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
return nil, err
}
now := time.Now().Format(FileDateFormat)
fileInfo, err := logFile.Stat()
if err != nil {
logger.Warnf("Can not get the info of access log file: %s, %v", accessLog, err)
return nil, err
}
last := fileInfo.ModTime().Format(FileDateFormat)
if now != last {
err = os.Rename(fileInfo.Name(), fileInfo.Name()+"."+now)
if err != nil {
logger.Warnf("Can not rename access log file: %s, %v", fileInfo.Name(), err)
return nil, err
} else {
logFile, err = os.OpenFile(accessLog, os.O_CREATE|os.O_APPEND|os.O_RDWR, LogFileMode)
}
}
return logFile, err
}

func isDefault(accessLog string) bool {
return strings.EqualFold("true", accessLog) || strings.EqualFold("default", accessLog)
}

func GetAccessLogFilter() filter.Filter {
accessLogFilter := &AccessLogFilter{logChan: make(chan AccessLogData, LogMaxBuffer)}
go func() {
for accessLogData := range accessLogFilter.logChan {
accessLogFilter.writeLogToFile(accessLogData)
}
}()
return accessLogFilter
}

type AccessLogData struct {
accessLog string
data map[string]string
}

func (ef *AccessLogData) toLogMessage() string {
builder := strings.Builder{}
builder.WriteString("[")
builder.WriteString(ef.data[constant.TIMESTAMP_KEY])
builder.WriteString("] ")
builder.WriteString(ef.data[constant.REMOTE_ADDR])
builder.WriteString(" -> ")
builder.WriteString(ef.data[constant.LOCAL_ADDR])
builder.WriteString(" - ")
if len(ef.data[constant.GROUP_KEY]) > 0 {
builder.WriteString(ef.data[constant.GROUP_KEY])
builder.WriteString("/")
}

builder.WriteString(ef.data[constant.INTERFACE_KEY])

if len(ef.data[constant.VERSION_KEY]) > 0 {
builder.WriteString(":")
builder.WriteString(ef.data[constant.VERSION_KEY])
}

builder.WriteString(" ")
builder.WriteString(ef.data[constant.METHOD_KEY])
builder.WriteString("(")
if len(ef.data[Types]) > 0 {
builder.WriteString(ef.data[Types])
}
builder.WriteString(") ")

if len(ef.data[Arguments]) > 0 {
builder.WriteString(ef.data[Arguments])
}
return builder.String()
}
66 changes: 66 additions & 0 deletions filter/impl/access_log_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package impl

import (
"context"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"testing"
)

func TestAccessLogFilter_Invoke_Not_Config(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
url, _ := common.NewURL(context.Background(),
"dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider"+
"&cluster=failover&environment=dev&group=&interface=com.ikurento.user.UserProvider&loadbalance=random&methods.GetUser."+
"loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name="+
"BDTService&organization=ikurento.com&owner=ZX&registry.role=3&retries=&"+
"service.filter=echo%2Ctoken%2Caccesslog&timestamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker := protocol.NewBaseInvoker(url)

attach := make(map[string]string, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attach)

accessLogFilter := GetAccessLogFilter()
result := accessLogFilter.Invoke(invoker, inv)
assert.Nil(t, result.Error())
}

func TestAccessLogFilter_Invoke_Default_Config(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
url, _ := common.NewURL(context.Background(),
"dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider"+
"&cluster=failover&accesslog=true&environment=dev&group=&interface=com.ikurento.user.UserProvider&loadbalance=random&methods.GetUser."+
"loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name="+
"BDTService&organization=ikurento.com&owner=ZX&registry.role=3&retries=&"+
"service.filter=echo%2Ctoken%2Caccesslog&timestamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker := protocol.NewBaseInvoker(url)

attach := make(map[string]string, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attach)

accessLogFilter := GetAccessLogFilter()
result := accessLogFilter.Invoke(invoker, inv)
assert.Nil(t, result.Error())
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/api v0.0.0-20180829000535-087779f1d2c9 h1:z1TeLUmxf9ws9KLICfmX+KGXTs+rjm+aGWzfsv7MZ9w=
google.golang.org/api v0.0.0-20180829000535-087779f1d2c9/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
Expand Down
9 changes: 7 additions & 2 deletions protocol/dubbo/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,13 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker != nil {
result := invoker.Invoke(invocation.NewRPCInvocation(p.Service.Method, p.Body.(map[string]interface{})["args"].([]interface{}),
p.Body.(map[string]interface{})["attachments"].(map[string]string)))
attachments := p.Body.(map[string]interface{})["attachments"].(map[string]string)
attachments[constant.LOCAL_ADDR] = session.LocalAddr()
attachments[constant.REMOTE_ADDR] = session.RemoteAddr()

args := p.Body.(map[string]interface{})["args"].([]interface{})
inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments)
result := invoker.Invoke(inv)
if err := result.Error(); err != nil {
p.Header.ResponseStatus = hessian.Response_OK
p.Body = hessian.NewResponse(nil, err, result.Attachments())
Expand Down

0 comments on commit c5082fd

Please sign in to comment.