Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rft: Filter #1299

Merged
merged 11 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions common/extension/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

var (
authenticators = make(map[string]func() filter.Authenticator)
accesskeyStorages = make(map[string]func() filter.AccessKeyStorage)
accessKeyStorages = make(map[string]func() filter.AccessKeyStorage)
)

// SetAuthenticator puts the @fcn into map with name
Expand All @@ -40,16 +40,16 @@ func GetAuthenticator(name string) filter.Authenticator {
return authenticators[name]()
}

// SetAccesskeyStorages will set the @fcn into map with this name
func SetAccesskeyStorages(name string, fcn func() filter.AccessKeyStorage) {
accesskeyStorages[name] = fcn
// SetAccessKeyStorages will set the @fcn into map with this name
func SetAccessKeyStorages(name string, fcn func() filter.AccessKeyStorage) {
accessKeyStorages[name] = fcn
}

// GetAccesskeyStorages finds the storage with the @name.
// GetAccessKeyStorages finds the storage with the @name.
// Panic if not found
func GetAccesskeyStorages(name string) filter.AccessKeyStorage {
if accesskeyStorages[name] == nil {
panic("accesskeyStorages for " + name + " is not existing, make sure you have import the package.")
func GetAccessKeyStorages(name string) filter.AccessKeyStorage {
if accessKeyStorages[name] == nil {
panic("accessKeyStorages for " + name + " is not existing, make sure you have import the package.")
}
return accesskeyStorages[name]()
return accessKeyStorages[name]()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package filter_impl
package accesslog

import (
"context"
Expand All @@ -35,7 +35,6 @@ import (

const (
// used in URL.

// nolint
FileDateFormat = "2006-01-02"
// nolint
Expand All @@ -54,11 +53,13 @@ const (
)

func init() {
extension.SetFilter(constant.ACCESS_LOG_KEY, GetAccessLogFilter)
extension.SetFilter(constant.ACCESS_LOG_KEY, func() filter.Filter {
return NewFilter()
})
}

/*
* AccessLogFilter
// Filter for Access Log
/**
* Although the access log filter is a default filter,
* you should config "accesslog" in service's config to tell the filter where store the access log.
* for example:
Expand All @@ -73,27 +74,27 @@ func init() {
* If the value is one of them, the access log will be record in log file which defined in log.yml
* AccessLogFilter is designed to be singleton
*/
type AccessLogFilter struct {
logChan chan AccessLogData
type Filter struct {
logChan chan Data
}

// Invoke will check whether user wants to use this filter.
// If we find the value of key constant.ACCESS_LOG_KEY, we will log the invocation info
func (ef *AccessLogFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
accessLog := invoker.GetURL().GetParam(constant.ACCESS_LOG_KEY, "")

// the user do not
if len(accessLog) > 0 {
accessLogData := AccessLogData{data: ef.buildAccessLogData(invoker, invocation), accessLog: accessLog}
ef.logIntoChannel(accessLogData)
accessLogData := Data{data: f.buildAccessLogData(invoker, invocation), accessLog: accessLog}
f.logIntoChannel(accessLogData)
}
return invoker.Invoke(ctx, invocation)
}

// logIntoChannel won't block the invocation
func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
func (f *Filter) logIntoChannel(accessLogData Data) {
select {
case ef.logChan <- accessLogData:
case f.logChan <- accessLogData:
return
default:
logger.Warn("The channel is full and the access logIntoChannel data will be dropped")
Expand All @@ -102,7 +103,7 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
}

// buildAccessLogData builds the access log data
func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string {
func (f *Filter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string {
dataMap := make(map[string]string, 16)
attachments := invocation.Attachments()
itf := attachments[constant.INTERFACE_KEY]
Expand Down Expand Up @@ -154,19 +155,19 @@ func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation pro
}

// OnResponse do nothing
func (ef *AccessLogFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
func (f *Filter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
return result
}

// writeLogToFile actually write the logs into file
func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) {
func (f *Filter) writeLogToFile(data Data) {
accessLog := data.accessLog
if isDefault(accessLog) {
logger.Info(data.toLogMessage())
return
}

logFile, err := ef.openLogFile(accessLog)
logFile, err := f.openLogFile(accessLog)
if err != nil {
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
return
Expand All @@ -186,7 +187,7 @@ func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) {
// You may find out that, once we want to write access log into log file,
// we open the file again and again.
// It needs to be optimized.
func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) {
func (f *Filter) openLogFile(accessLog string) (*os.File, error) {
logFile, err := os.OpenFile(accessLog, os.O_CREATE|os.O_APPEND|os.O_RDWR, LogFileMode)
if err != nil {
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
Expand Down Expand Up @@ -221,9 +222,8 @@ func isDefault(accessLog string) bool {
return strings.EqualFold("true", accessLog) || strings.EqualFold("default", accessLog)
}

// GetAccessLogFilter return the instance of AccessLogFilter
func GetAccessLogFilter() filter.Filter {
accessLogFilter := &AccessLogFilter{logChan: make(chan AccessLogData, LogMaxBuffer)}
func NewFilter() *Filter {
accessLogFilter := &Filter{logChan: make(chan Data, LogMaxBuffer)}
go func() {
for accessLogData := range accessLogFilter.logChan {
accessLogFilter.writeLogToFile(accessLogData)
Expand All @@ -232,44 +232,44 @@ func GetAccessLogFilter() filter.Filter {
return accessLogFilter
}

// AccessLogData defines the data that will be log into file
type AccessLogData struct {
// Data defines the data that will be log into file
type Data struct {
accessLog string
data map[string]string
}

// toLogMessage convert the AccessLogData to String
func (ef *AccessLogData) toLogMessage() string {
// toLogMessage convert the Data to String
func (d *Data) toLogMessage() string {
builder := strings.Builder{}
builder.WriteString("[")
builder.WriteString(ef.data[constant.TIMESTAMP_KEY])
builder.WriteString(d.data[constant.TIMESTAMP_KEY])
builder.WriteString("] ")
builder.WriteString(ef.data[constant.REMOTE_ADDR])
builder.WriteString(d.data[constant.REMOTE_ADDR])
builder.WriteString(" -> ")
builder.WriteString(ef.data[constant.LOCAL_ADDR])
builder.WriteString(d.data[constant.LOCAL_ADDR])
builder.WriteString(" - ")
if len(ef.data[constant.GROUP_KEY]) > 0 {
builder.WriteString(ef.data[constant.GROUP_KEY])
if len(d.data[constant.GROUP_KEY]) > 0 {
builder.WriteString(d.data[constant.GROUP_KEY])
builder.WriteString("/")
}

builder.WriteString(ef.data[constant.INTERFACE_KEY])
builder.WriteString(d.data[constant.INTERFACE_KEY])

if len(ef.data[constant.VERSION_KEY]) > 0 {
if len(d.data[constant.VERSION_KEY]) > 0 {
builder.WriteString(":")
builder.WriteString(ef.data[constant.VERSION_KEY])
builder.WriteString(d.data[constant.VERSION_KEY])
}

builder.WriteString(" ")
builder.WriteString(ef.data[constant.METHOD_KEY])
builder.WriteString(d.data[constant.METHOD_KEY])
builder.WriteString("(")
if len(ef.data[Types]) > 0 {
builder.WriteString(ef.data[Types])
if len(d.data[Types]) > 0 {
builder.WriteString(d.data[Types])
}
builder.WriteString(") ")

if len(ef.data[Arguments]) > 0 {
builder.WriteString(ef.data[Arguments])
if len(d.data[Arguments]) > 0 {
builder.WriteString(d.data[Arguments])
}
return builder.String()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package filter_impl
package accesslog

import (
"context"
Expand All @@ -34,7 +34,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

func TestAccessLogFilter_Invoke_Not_Config(t *testing.T) {
func TestFilter_Invoke_Not_Config(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
url, _ := common.NewURL(
Expand All @@ -48,12 +48,12 @@ func TestAccessLogFilter_Invoke_Not_Config(t *testing.T) {
attach := make(map[string]interface{}, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)

accessLogFilter := GetAccessLogFilter()
accessLogFilter := &Filter{}
result := accessLogFilter.Invoke(context.Background(), invoker, inv)
assert.Nil(t, result.Error())
}

func TestAccessLogFilterInvokeDefaultConfig(t *testing.T) {
func TestFilterInvokeDefaultConfig(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
url, _ := common.NewURL(
Expand All @@ -69,14 +69,14 @@ func TestAccessLogFilterInvokeDefaultConfig(t *testing.T) {
attach[constant.GROUP_KEY] = "MyGroup"
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)

accessLogFilter := GetAccessLogFilter()
accessLogFilter := &Filter{}
result := accessLogFilter.Invoke(context.Background(), invoker, inv)
assert.Nil(t, result.Error())
}

func TestAccessLogFilterOnResponse(t *testing.T) {
func TestFilterOnResponse(t *testing.T) {
result := &protocol.RPCResult{}
accessLogFilter := GetAccessLogFilter()
accessLogFilter := &Filter{}
response := accessLogFilter.OnResponse(context.TODO(), result, nil, nil)
assert.Equal(t, result, response)
}
21 changes: 9 additions & 12 deletions filter/filter_impl/active_filter.go → filter/active/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package filter_impl
package active

import (
"context"
Expand All @@ -31,27 +31,29 @@ import (
)

const (
active = "active"
dubboInvokeStartTime = "dubboInvokeStartTime"
Active = "active"
)

func init() {
extension.SetFilter(active, GetActiveFilter)
extension.SetFilter(Active, func() filter.Filter {
return &Filter{}
})
}

// ActiveFilter tracks the requests status
type ActiveFilter struct{}
// Filter tracks the requests status
type Filter struct{}

// Invoke starts to record the requests status
func (ef *ActiveFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))
invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
protocol.BeginCount(invoker.GetURL(), invocation.MethodName())
return invoker.Invoke(ctx, invocation)
}

// OnResponse update the active count base on the request result.
func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64)
if err != nil {
result.SetError(err)
Expand All @@ -62,8 +64,3 @@ func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result,
protocol.EndCount(invoker.GetURL(), invocation.MethodName(), elapsed, result.Error() == nil)
return result
}

// GetActiveFilter creates ActiveFilter instance
func GetActiveFilter() filter.Filter {
return &ActiveFilter{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package filter_impl
package active

import (
"context"
Expand All @@ -36,10 +36,10 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol/mock"
)

func TestActiveFilterInvoke(t *testing.T) {
func TestFilterInvoke(t *testing.T) {
invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, make(map[string]interface{}))
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
filter := ActiveFilter{}
filter := Filter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
Expand All @@ -49,14 +49,14 @@ func TestActiveFilterInvoke(t *testing.T) {
assert.True(t, invoc.AttachmentsByKey(dubboInvokeStartTime, "") != "")
}

func TestActiveFilterOnResponse(t *testing.T) {
func TestFilterOnResponse(t *testing.T) {
c := protocol.CurrentTimeMillis()
elapsed := 100
invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]interface{}{
dubboInvokeStartTime: strconv.FormatInt(c-int64(elapsed), 10),
})
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
filter := ActiveFilter{}
filter := Filter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func init() {
extension.SetAccessKeyStorages(constant.DEFAULT_ACCESS_KEY_STORAGE, func() filter.AccessKeyStorage {
return &DefaultAccesskeyStorage{}
})
}

// DefaultAccesskeyStorage is the default implementation of AccesskeyStorage
type DefaultAccesskeyStorage struct{}

Expand All @@ -35,12 +41,3 @@ func (storage *DefaultAccesskeyStorage) GetAccessKeyPair(invocation protocol.Inv
SecretKey: url.GetParam(constant.SECRET_ACCESS_KEY_KEY, ""),
}
}

func init() {
extension.SetAccesskeyStorages(constant.DEFAULT_ACCESS_KEY_STORAGE, GetDefaultAccesskeyStorage)
}

// GetDefaultAccesskeyStorage initiates an empty DefaultAccesskeyStorage
func GetDefaultAccesskeyStorage() filter.AccessKeyStorage {
return &DefaultAccesskeyStorage{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestDefaultAccesskeyStorage_GetAccesskeyPair(t *testing.T) {
common.WithParamsValue(constant.SECRET_ACCESS_KEY_KEY, "skey"),
common.WithParamsValue(constant.ACCESS_KEY_ID_KEY, "akey"))
invocation := &invocation2.RPCInvocation{}
storage := GetDefaultAccesskeyStorage()
storage := &DefaultAccesskeyStorage{}
accesskeyPair := storage.GetAccessKeyPair(invocation, url)
assert.Equal(t, "skey", accesskeyPair.SecretKey)
assert.Equal(t, "akey", accesskeyPair.AccessKey)
Expand Down
Loading