Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make compression of peer forwarding configurable, rather than hard coded #208

Merged
merged 1 commit into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,15 @@ func main() {

peerClient, err := libhoney.NewClient(libhoney.ClientConfig{
Transmission: &transmission.Honeycomb{
MaxBatchSize: 500,
BatchTimeout: libhoney.DefaultBatchTimeout,
MaxConcurrentBatches: libhoney.DefaultMaxConcurrentBatches,
PendingWorkCapacity: uint(c.GetPeerBufferSize()),
UserAgentAddition: userAgentAddition,
Transport: peerTransport,
// gzip compression is expensive, and peers are most likely close to each other
// so we can turn off gzip when forwarding to peers
DisableGzipCompression: true,
EnableMsgpackEncoding: true,
Metrics: sdPeer,
MaxBatchSize: 500,
BatchTimeout: libhoney.DefaultBatchTimeout,
MaxConcurrentBatches: libhoney.DefaultMaxConcurrentBatches,
PendingWorkCapacity: uint(c.GetPeerBufferSize()),
UserAgentAddition: userAgentAddition,
Transport: peerTransport,
DisableCompression: !c.GetCompressPeerCommunication(),
EnableMsgpackEncoding: true,
Metrics: sdPeer,
},
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type Config interface {
// peer traffic
GetPeerListenAddr() (string, error)

// GetCompressPeerCommunication will be true if refinery should compress
// data before forwarding it to a peer.
GetCompressPeerCommunication() bool

// GetGRPCListenAddr returns the address and port on which to listen for
// incoming events over gRPC
GetGRPCListenAddr() (string, error)
Expand Down
49 changes: 29 additions & 20 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,27 @@ func (r *RulesBasedSamplerConfig) String() string {
}

type configContents struct {
ListenAddr string `validate:"required"`
PeerListenAddr string `validate:"required"`
GRPCListenAddr string
APIKeys []string `validate:"required"`
HoneycombAPI string `validate:"required,url"`
Logger string `validate:"required,oneof= logrus honeycomb"`
LoggingLevel string `validate:"required"`
Collector string `validate:"required,oneof= InMemCollector"`
Sampler string `validate:"required,oneof= DeterministicSampler DynamicSampler EMADynamicSampler RulesBasedSampler TotalThroughputSampler"`
Metrics string `validate:"required,oneof= prometheus honeycomb"`
SendDelay time.Duration `validate:"required"`
TraceTimeout time.Duration `validate:"required"`
SendTicker time.Duration `validate:"required"`
UpstreamBufferSize int `validate:"required"`
PeerBufferSize int `validate:"required"`
DebugServiceAddr string
DryRun bool
DryRunFieldName string
PeerManagement PeerManagementConfig `validate:"required"`
InMemCollector InMemoryCollectorCacheCapacity `validate:"required"`
ListenAddr string `validate:"required"`
PeerListenAddr string `validate:"required"`
CompressPeerCommunication bool
GRPCListenAddr string
APIKeys []string `validate:"required"`
HoneycombAPI string `validate:"required,url"`
Logger string `validate:"required,oneof= logrus honeycomb"`
LoggingLevel string `validate:"required"`
Collector string `validate:"required,oneof= InMemCollector"`
Sampler string `validate:"required,oneof= DeterministicSampler DynamicSampler EMADynamicSampler RulesBasedSampler TotalThroughputSampler"`
Metrics string `validate:"required,oneof= prometheus honeycomb"`
SendDelay time.Duration `validate:"required"`
TraceTimeout time.Duration `validate:"required"`
SendTicker time.Duration `validate:"required"`
UpstreamBufferSize int `validate:"required"`
PeerBufferSize int `validate:"required"`
DebugServiceAddr string
DryRun bool
DryRunFieldName string
PeerManagement PeerManagementConfig `validate:"required"`
InMemCollector InMemoryCollectorCacheCapacity `validate:"required"`
}

type InMemoryCollectorCacheCapacity struct {
Expand Down Expand Up @@ -122,6 +123,7 @@ func NewConfig(config, rules string, errorCallback func(error)) (Config, error)
c.BindEnv("PeerManagement.RedisPassword", "REFINERY_REDIS_PASSWORD")
c.SetDefault("ListenAddr", "0.0.0.0:8080")
c.SetDefault("PeerListenAddr", "0.0.0.0:8081")
c.SetDefault("CompressPeerCommunication", true)
c.SetDefault("APIKeys", []string{"*"})
c.SetDefault("PeerManagement.Peers", []string{"http://127.0.0.1:8081"})
c.SetDefault("PeerManagement.Type", "file")
Expand Down Expand Up @@ -380,6 +382,13 @@ func (f *fileConfig) GetPeerListenAddr() (string, error) {
return f.conf.PeerListenAddr, nil
}

func (f *fileConfig) GetCompressPeerCommunication() bool {
f.mux.RLock()
defer f.mux.RUnlock()

return f.conf.CompressPeerCommunication
}

func (f *fileConfig) GetGRPCListenAddr() (string, error) {
f.mux.RLock()
defer f.mux.RUnlock()
Expand Down
7 changes: 7 additions & 0 deletions config/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type MockConfig struct {
GetListenAddrVal string
GetPeerListenAddrErr error
GetPeerListenAddrVal string
GetCompressPeerCommunicationsVal bool
GetGRPCListenAddrErr error
GetGRPCListenAddrVal string
GetLoggerTypeErr error
Expand Down Expand Up @@ -116,6 +117,12 @@ func (m *MockConfig) GetPeerListenAddr() (string, error) {

return m.GetPeerListenAddrVal, m.GetPeerListenAddrErr
}
func (m *MockConfig) GetCompressPeerCommunication() bool {
m.Mux.RLock()
defer m.Mux.RUnlock()

return m.GetCompressPeerCommunicationsVal
}
func (m *MockConfig) GetGRPCListenAddr() (string, error) {
m.Mux.RLock()
defer m.Mux.RUnlock()
Expand Down
8 changes: 8 additions & 0 deletions config_complete.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ GRPCListenAddr = "0.0.0.0:9090"
# Not eligible for live reload.
PeerListenAddr = "0.0.0.0:8081"

# CompressPeerCommunication determines whether refinery will compress span data
# it forwards to peers. If it costs money to transmit data between refinery
# instances (e.g. they're spread across AWS availability zones), then you
# almost certainly want compression enabled to reduce your bill. The option to
# disable it is provided as an escape hatch for deployments that value lower CPU
# utilization over data transfer costs.
CompressPeerCommunication = true

# APIKeys is a list of Honeycomb API keys that the proxy will accept. This list
# only applies to events - other Honeycomb API actions will fall through to the
# upstream API directly.
Expand Down