Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditional Toxics #519

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ToxicLink struct {
stubs []*toxics.ToxicStub
proxy *Proxy
toxics *ToxicCollection
input *stream.ChanWriter
input *stream.MultiChanWriter
output *stream.ChanReader
direction stream.Direction
Logger *zerolog.Logger
Expand All @@ -36,6 +36,7 @@ func NewToxicLink(
collection *ToxicCollection,
direction stream.Direction,
logger zerolog.Logger,
additionalStreamChans []chan<- *stream.StreamChunk,
) *ToxicLink {
link := &ToxicLink{
stubs: make(
Expand All @@ -50,7 +51,7 @@ func NewToxicLink(
}
// Initialize the link with ToxicStubs
last := make(chan *stream.StreamChunk) // The first toxic is always a noop
link.input = stream.NewChanWriter(last)
link.input = stream.NewMultiChanWriter(append([]chan<- *stream.StreamChunk{last}, additionalStreamChans...)...)
for i := 0; i < len(link.stubs); i++ {
var next chan *stream.StreamChunk
if i+1 < len(link.stubs) {
Expand Down
32 changes: 32 additions & 0 deletions matchers/http_request_header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package matchers

import (
"bufio"
"bytes"
"net/http"
"regexp"
)

type HttpRequestHeaderMatcher struct {
HeaderKey string `json:"headerKey"`
HeaderValueRegex string `json:"headerValueRegex"`
}

func (m *HttpRequestHeaderMatcher) TryMatch(data []byte) (bool, error) {
bufioReader := bufio.NewReader(bytes.NewReader(data))

// Try to parse the data as a HTTP request.
req, err := http.ReadRequest(bufioReader)
if err != nil {
return false, err
}

// Try to match the header using the regex.
headerValue := req.Header.Get(m.HeaderKey)
match, err := regexp.MatchString(m.HeaderValueRegex, headerValue)
if err != nil {
return false, err
}

return match, nil
}
42 changes: 42 additions & 0 deletions matchers/matcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package matchers

import (
"reflect"
"sync"
)

// Matcher is the interface for all matcher types.
type Matcher interface {
TryMatch([]byte) (bool, error)
}

var (
MatcherRegistry map[string]Matcher
registryMutex sync.RWMutex
)

func RegisterMatcher(typeName string, matcher Matcher) {
registryMutex.Lock()
defer registryMutex.Unlock()

if MatcherRegistry == nil {
MatcherRegistry = make(map[string]Matcher)
}
MatcherRegistry[typeName] = matcher
}

func New(matcherType string) Matcher {
registryMutex.RLock()
defer registryMutex.RUnlock()

orig, ok := MatcherRegistry[matcherType]
if !ok {
return nil
}
matcher := reflect.New(reflect.TypeOf(orig).Elem()).Interface().(Matcher)
return matcher
}

func init() {
RegisterMatcher("httpRequestHeaderMatcher", new(HttpRequestHeaderMatcher))
}
28 changes: 28 additions & 0 deletions stream/io_chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@ type StreamChunk struct {
Timestamp time.Time
}

// Implements io.WriteCloser interface for a slice of channel []byte.
type MultiChanWriter struct {
outputs []chan<- *StreamChunk
}

func NewMultiChanWriter(outputs ...chan<- *StreamChunk) *MultiChanWriter {
return &MultiChanWriter{outputs}
}

// Write `buf` as a StreamChunk to all channels. The full buffer is always written, and error
// will always be nil. Calling `Write()` after closing the channel will panic.
func (m *MultiChanWriter) Write(buf []byte) (int, error) {
packet := &StreamChunk{make([]byte, len(buf)), time.Now()}
copy(packet.Data, buf) // Make a copy before sending it to the channel
for _, output := range m.outputs {
output <- packet
}
return len(buf), nil
}

// Close all output channels.
func (m *MultiChanWriter) Close() error {
for _, output := range m.outputs {
close(output)
}
return nil
}

// Implements the io.WriteCloser interface for a chan []byte.
type ChanWriter struct {
output chan<- *StreamChunk
Expand Down
106 changes: 94 additions & 12 deletions toxic_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,30 @@ import (
type ToxicCollection struct {
sync.Mutex

noop *toxics.ToxicWrapper
proxy *Proxy
chain [][]*toxics.ToxicWrapper
links map[string]*ToxicLink
noop *toxics.ToxicWrapper
proxy *Proxy
chain [][]*toxics.ToxicWrapper
toxicConditions [][]*toxics.ToxicCondition
links map[string]*ToxicLink
}

func NewToxicCollection(proxy *Proxy) *ToxicCollection {
collection := &ToxicCollection{
noop: &toxics.ToxicWrapper{
Toxic: new(toxics.NoopToxic),
Type: "noop",
Toxic: new(toxics.NoopToxic),
Type: "noop",
Enabled: true,
},
proxy: proxy,
chain: make([][]*toxics.ToxicWrapper, stream.NumDirections),
links: make(map[string]*ToxicLink),
proxy: proxy,
chain: make([][]*toxics.ToxicWrapper, stream.NumDirections),
toxicConditions: make([][]*toxics.ToxicCondition, stream.NumDirections),
links: make(map[string]*ToxicLink),
}
for dir := range collection.chain {
collection.chain[dir] = make([]*toxics.ToxicWrapper, 1, toxics.Count()+1)
collection.chain[dir][0] = collection.noop
collection.toxicConditions[dir] = make([]*toxics.ToxicCondition, 1, toxics.Count()+1)
collection.toxicConditions[dir][0] = nil
}
return collection
}
Expand Down Expand Up @@ -107,10 +112,19 @@ func (c *ToxicCollection) AddToxicJson(data io.Reader) (*toxics.ToxicWrapper, er
wrapper.Name = fmt.Sprintf("%s_%s", wrapper.Type, wrapper.Stream)
}

// Initialize the toxic
if toxics.New(wrapper) == nil {
return nil, ErrInvalidToxicType
}

// Set the wrapper to be enabled if no condition is specified.
if wrapper.Condition == nil {
wrapper.Enabled = true
} else {
wrapper.Condition.ToxicWrapper = wrapper
}

// Check if toxic already exists
found := c.findToxicByName(wrapper.Name)
if found != nil {
return nil, ErrToxicAlreadyExists
Expand Down Expand Up @@ -200,9 +214,74 @@ func (c *ToxicCollection) StartLink(
logger = zerolog.Nop()
}

link := NewToxicLink(c.proxy, c, direction, logger)
link.Start(server, name, input, output)
c.links[name] = link
// If the direction is upstream, we need to run matchers and update
// toxics if matched.
if direction == stream.Upstream {
// Write input to the matcher writer so that we can match the input
// in parallel while piping it through the link.
streamChan := make(chan *stream.StreamChunk)
streamChanWriter := stream.NewChanWriter(streamChan)
forkedInput := io.TeeReader(input, streamChanWriter)

// Fire of a goroutine to match all conditions separately.
go c.matchAllToxicConditions(streamChan, direction)

link := NewToxicLink(c.proxy, c, direction, logger, []chan<- *stream.StreamChunk{streamChan})
link.Start(server, name, forkedInput, output)
c.links[name] = link
} else {
link := NewToxicLink(c.proxy, c, direction, logger, nil)
link.Start(server, name, input, output)
c.links[name] = link
}
}

// matchAllToxicConditions matches all conditions for a given direction, and updates
// the toxics if matched.
func (c *ToxicCollection) matchAllToxicConditions(
streamChan chan *stream.StreamChunk,
direction stream.Direction,
) {
c.Lock()
defer c.Unlock()

var logger zerolog.Logger
if c.proxy.Logger != nil {
logger = *c.proxy.Logger
} else {
logger = zerolog.Nop()
}

for {
streamChunk, ok := <-streamChan
if streamChunk == nil && !ok {
logger.Debug().Msg("Stream chunk is nil and not ok, exiting")
return
}

// Loop through all conditions and try to match them.
// If matched, enable the toxic.
for _, condition := range c.toxicConditions[direction] {
if condition == nil {
continue
}

matched, err := condition.TryMatch(streamChunk.Data)
if err != nil {
logger.Warn().Err(err).Msg("Error matching condition")
continue
}

if matched {
// Get the toxic wrapper from the condition and enable it.
newToxicWrapper := condition.ToxicWrapper
newToxicWrapper.Enabled = true

// TODO: Do I need to call this? Currently fails when uncommented, though.
// c.chainUpdateToxic(newToxicWrapper)
}
}
}
}

func (c *ToxicCollection) RemoveLink(name string) {
Expand All @@ -228,6 +307,7 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) {
dir := toxic.Direction
toxic.Index = len(c.chain[dir])
c.chain[dir] = append(c.chain[dir], toxic)
c.toxicConditions[dir] = append(c.toxicConditions[dir], toxic.Condition)

// Asynchronously add the toxic to each link
wg := sync.WaitGroup{}
Expand All @@ -245,6 +325,7 @@ func (c *ToxicCollection) chainAddToxic(toxic *toxics.ToxicWrapper) {

func (c *ToxicCollection) chainUpdateToxic(toxic *toxics.ToxicWrapper) {
c.chain[toxic.Direction][toxic.Index] = toxic
c.toxicConditions[toxic.Direction][toxic.Index] = toxic.Condition

// Asynchronously update the toxic in each link
group := sync.WaitGroup{}
Expand All @@ -271,6 +352,7 @@ func (c *ToxicCollection) chainRemoveToxic(ctx context.Context, toxic *toxics.To

dir := toxic.Direction
c.chain[dir] = append(c.chain[dir][:toxic.Index], c.chain[dir][toxic.Index+1:]...)
c.toxicConditions[dir] = append(c.toxicConditions[dir][:toxic.Index], c.toxicConditions[dir][toxic.Index+1:]...)
for i := toxic.Index; i < len(c.chain[dir]); i++ {
c.chain[dir][i].Index = i
}
Expand Down
45 changes: 44 additions & 1 deletion toxics/toxic.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package toxics

import (
"bytes"
"encoding/json"
"fmt"
"math/rand"
"reflect"
"sync"
"time"

"github.com/Shopify/toxiproxy/v2/matchers"
"github.com/Shopify/toxiproxy/v2/stream"
)

Expand Down Expand Up @@ -47,6 +50,40 @@ type StatefulToxic interface {
NewState() interface{}
}

type ToxicCondition struct {
ToxicWrapper *ToxicWrapper `json:"-"`
MatcherType string `json:"matcherType"`

// A matcher means this toxic is only enabled when the matcher matches on any data
// passing through the link this toxic is attached to.
matchers.Matcher
}

func (t *ToxicCondition) UnmarshalJSON(data []byte) error {
reader := bytes.NewReader(data)

var tmp struct {
MatcherType string `json:"matcherType"`
}
if err := json.Unmarshal(data, &tmp); err != nil {
return err
}

t.MatcherType = tmp.MatcherType
t.Matcher = matchers.New(tmp.MatcherType)

tmp2 := &struct {
MatcherParameters interface{} `json:"matcherParameters"`
}{
t.Matcher,
}
if err := json.NewDecoder(reader).Decode(&tmp2); err != nil {
return err
}

return nil
}

type ToxicWrapper struct {
Toxic `json:"attributes"`
Name string `json:"name"`
Expand All @@ -56,6 +93,12 @@ type ToxicWrapper struct {
Direction stream.Direction `json:"-"`
Index int `json:"-"`
BufferSize int `json:"-"`

// A non-nil condition means this toxic is only enabled when the condition is met.
Condition *ToxicCondition `json:"condition"`

// Enabled is true if this toxic is enabled, false otherwise
Enabled bool `json:"-"`
}

type ToxicStub struct {
Expand All @@ -82,7 +125,7 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) {
s.running = make(chan struct{})
defer close(s.running)
//#nosec
if rand.Float32() < toxic.Toxicity {
if toxic.Enabled && rand.Float32() < toxic.Toxicity {
toxic.Pipe(s)
} else {
new(NoopToxic).Pipe(s)
Expand Down