Skip to content

Commit

Permalink
Merge pull request #369 from gwos/GROUNDWORK-3850
Browse files Browse the repository at this point in the history
GROUNDWORK-3850 reduce memory allocations
  • Loading branch information
ymkins authored Dec 24, 2024
2 parents 807eb99 + b9c0a7e commit 626d6d5
Show file tree
Hide file tree
Showing 26 changed files with 484 additions and 500 deletions.
2 changes: 1 addition & 1 deletion batcher/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (bld *EventsBatchBuilder) Build(buf *[][]byte, maxBytes int) {
c, bq = 0, transit.GroundworkEventsRequest{}
}
}
*buf = (*buf)[:0]
*buf = make([][]byte, 0)

if len(bq.Events) > 0 {
qq = append(qq, bq)
Expand Down
4 changes: 2 additions & 2 deletions batcher/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (bld *MetricsBatchBuilder) Build(buf *[][]byte, maxBytes int) {
c, bq = 0, transit.ResourcesWithServicesRequest{}
}
}
*buf = (*buf)[:0]
*buf = make([][]byte, 0)

if len(bq.Resources) > 0 {
qq = append(qq, bq)
Expand Down Expand Up @@ -162,7 +162,7 @@ func packGroups(groups *[]transit.ResourceGroup) {
}
m[gk] = rg
}
*groups = (*groups)[:0]
*groups = make([]transit.ResourceGroup, 0)

for _, rg := range m {
g := rg.ResourceGroup
Expand Down
144 changes: 72 additions & 72 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,36 +53,35 @@ func (l LogLevel) String() string {
type Nats struct {
// NatsAckWait is the time the NATS server will wait before resending a message
// Should be greater then the GWClient request duration
NatsAckWait time.Duration `yaml:"-"`
NatsAckWait time.Duration `env:"NATSACKWAIT" yaml:"-"`
// designates the maximum number of outstanding acknowledgements
// (messages that have been delivered but not acknowledged)
// that NATS Streaming will allow for a given subscription.
// When this limit is reached, NATS Streaming will suspend delivery of messages
// to this subscription until the number of unacknowledged messages falls below the specified limit
NatsMaxInflight int `yaml:"-"`
NatsMaxInflight int `env:"NATSMAXINFLIGHT" yaml:"-"`
// NatsMaxPubAcksInflight accepts number of unacknowledged messages
// that a publisher may have in-flight at any given time.
// When this maximum is reached, further async publish calls will block
// until the number of unacknowledged messages falls below the specified limit
NatsMaxPubAcksInflight int `yaml:"-"`
// NatsMaxPayload int32 `yaml:"-"`
NatsMaxPayload int32 `yaml:"natsMaxPayload"` // enabling for testing oversized payloads
NatsMaxPubAcksInflight int `env:"NATSMAXPUBACKSINFLIGHT" yaml:"-"`
NatsMaxPayload int32 `env:"NATSMAXPAYLOAD" yaml:"-"`
// NatsMonitorPort enables monitoring on http port useful for debug
// curl 'localhost:8222/streaming/channelsz?limit=0&offset=0&subs=1'
// More info: https://docs.nats.io/nats-streaming-concepts/monitoring
NatsMonitorPort int `yaml:"-"`
NatsStoreDir string `yaml:"natsFilestoreDir"`
NatsMonitorPort int `env:"NATSMONITORPORT" yaml:"-"`
NatsStoreDir string `env:"NATSSTOREDIR" yaml:"natsFilestoreDir"`
// NatsStoreType accepts "FILE"|"MEMORY"
NatsStoreType string `yaml:"natsStoreType"`
NatsStoreType string `env:"NATSSTORETYPE" yaml:"natsStoreType"`
// How long messages are kept
NatsStoreMaxAge time.Duration `yaml:"natsStoreMaxAge"`
NatsStoreMaxAge time.Duration `env:"NATSSTOREMAXAGE" yaml:"natsStoreMaxAge"`
// How many bytes are allowed per-channel
NatsStoreMaxBytes int64 `yaml:"natsStoreMaxBytes"`
NatsStoreMaxBytes int64 `env:"NATSSTOREMAXBYTES" yaml:"natsStoreMaxBytes"`
// How many messages are allowed per-channel
NatsStoreMaxMsgs int64 `yaml:"natsStoreMaxMsgs"`
NatsStoreMaxMsgs int64 `env:"NATSSTOREMAXMSGS" yaml:"natsStoreMaxMsgs"`
// NatsServerConfigFile is used to override yaml values for
// NATS server configuration (debug only).
NatsServerConfigFile string `yaml:"natsServerConfigFile"`
NatsServerConfigFile string `env:"NATSSERVERCONFIGFILE" yaml:"natsServerConfigFile"`
}

// Hashsum calculates FNV non-cryptographic hash suitable for checking the equality
Expand All @@ -95,49 +94,49 @@ func (c Nats) Hashsum() ([]byte, error) {
type Connector struct {
transit.AgentIdentity `yaml:",inline"`

BatchEvents time.Duration `yaml:"batchEvents"`
BatchMetrics time.Duration `yaml:"batchMetrics"`
BatchMaxBytes int `yaml:"batchMaxBytes"`
BatchEvents time.Duration `env:"BATCHEVENTS" yaml:"batchEvents"`
BatchMetrics time.Duration `env:"BATCHMETRICS" yaml:"batchMetrics"`
BatchMaxBytes int `env:"BATCHMAXBYTES" yaml:"batchMaxBytes"`

// ControllerAddr accepts value for combined "host:port"
// used as `http.Server{Addr}`
ControllerAddr string `yaml:"controllerAddr"`
ControllerCertFile string `yaml:"controllerCertFile"`
ControllerKeyFile string `yaml:"controllerKeyFile"`
ControllerAddr string `env:"CONTROLLERADDR" yaml:"controllerAddr"`
ControllerCertFile string `env:"CONTROLLERCERTFILE" yaml:"controllerCertFile"`
ControllerKeyFile string `env:"CONTROLLERKEYFILE" yaml:"controllerKeyFile"`
// ControllerPin accepts value from environment
// provides local access for debug
ControllerPin string `yaml:"-"`
ControllerPin string `env:"CONTROLLERPIN" yaml:"-"`
// Custom HTTP configuration
ControllerReadTimeout time.Duration `yaml:"-"`
ControllerWriteTimeout time.Duration `yaml:"-"`
ControllerStartTimeout time.Duration `yaml:"-"`
ControllerStopTimeout time.Duration `yaml:"-"`

Enabled bool `yaml:"enabled"`
InstallationMode string `yaml:"installationMode,omitempty"`
IsDynamicInventory bool `yaml:"-"`
ControllerReadTimeout time.Duration `env:"CONTROLLERREADTIMEOUT" yaml:"-"`
ControllerWriteTimeout time.Duration `env:"CONTROLLERWRITETIMEOUT" yaml:"-"`
ControllerStartTimeout time.Duration `env:"CONTROLLERSTARTTIMEOUT" yaml:"-"`
ControllerStopTimeout time.Duration `env:"CONTROLLERSTOPTIMEOUT" yaml:"-"`

Enabled bool `env:"ENABLED" yaml:"enabled"`
InstallationMode string `env:"INSTALLATIONMODE" yaml:"installationMode,omitempty"`
IsDynamicInventory bool `env:"ISDYNAMICINVENTORY" yaml:"-"`
// GWEncode defines using HTTPEncode in Groundwork client: child|force|off
// enabled for child by default
GWEncode string `yaml:"-"`
GWEncode string `env:"GWENCODE" yaml:"-"`

// LogCondense accepts time duration for condensing similar records
// if 0 turn off condensing
LogCondense time.Duration `yaml:"logCondense"`
LogCondense time.Duration `env:"LOGCONDENSE" yaml:"logCondense"`
// LogFile accepts file path to log in addition to stdout
LogFile string `yaml:"logFile"`
LogFileMaxSize int64 `yaml:"logFileMaxSize"`
LogFile string `env:"LOGFILE" yaml:"logFile"`
LogFileMaxSize int64 `env:"LOGFILEMAXSIZE" yaml:"logFileMaxSize"`
// Log files are rotated count times before being removed.
// If count is 0, old versions are removed rather than rotated.
LogFileRotate int `yaml:"logFileRotate"`
LogLevel LogLevel `yaml:"logLevel"`
LogColors bool `yaml:"logColors"`
LogTimeFormat string `yaml:"logTimeFormat"`
LogFileRotate int `env:"LOGFILEROTATE" yaml:"logFileRotate"`
LogLevel LogLevel `env:"LOGLEVEL" yaml:"logLevel"`
LogColors bool `env:"LOGCOLORS" yaml:"logColors"`
LogTimeFormat string `env:"LOGTIMEFORMAT" yaml:"logTimeFormat"`

Nats `yaml:",inline"`

TransportStartRndDelay int `yaml:"-"`
TransportStartRndDelay int `env:"TRANSPORTSTARTRNDDELAY" yaml:"-"`

ExportProm bool `yaml:"exportProm"`
ExportProm bool `env:"EXPORTPROM" yaml:"exportProm"`
}

// ConnectorDTO defines TCG Connector configuration
Expand All @@ -158,49 +157,59 @@ type ConnectorDTO struct {
// DSConnection defines DalekServices Connection configuration
type DSConnection clients.DSConnection

// AsClient returns as clients type
func (c *DSConnection) AsClient() clients.DSConnection {
return (clients.DSConnection)(*c)
}

// GWConnection defines Groundwork Connection configuration
type GWConnection clients.GWConnection

// AsClient returns as clients type
func (c *GWConnection) AsClient() clients.GWConnection {
return (clients.GWConnection)(*c)
}

// MarshalYAML implements yaml.Marshaler interface
// overrides the password field
func (con GWConnection) MarshalYAML() (interface{}, error) {
func (c GWConnection) MarshalYAML() (interface{}, error) {
type plain GWConnection
c := plain(con)
p := plain(c)
if s := os.Getenv(SecKeyEnv); s != "" {
encrypted, err := Encrypt([]byte(c.Password), []byte(s))
encrypted, err := Encrypt([]byte(p.Password), []byte(s))
if err != nil {
return nil, err
}
c.Password = fmt.Sprintf("%s%x", SecVerPrefix, encrypted)
p.Password = fmt.Sprintf("%s%x", SecVerPrefix, encrypted)
}
return c, nil
return p, nil
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
// overrides the password field
func (con *GWConnection) UnmarshalYAML(unmarshal func(interface{}) error) error {
func (c *GWConnection) UnmarshalYAML(unmarshal func(interface{}) error) error {
type plain GWConnection
if err := unmarshal((*plain)(con)); err != nil {
if err := unmarshal((*plain)(c)); err != nil {
return err
}
if strings.HasPrefix(con.Password, SecVerPrefix) {
if strings.HasPrefix(c.Password, SecVerPrefix) {
s := os.Getenv(SecKeyEnv)
if s == "" {
return fmt.Errorf("unmarshaler error: %s SecKeyEnv is empty", SecVerPrefix)
}
var encrypted []byte
fmt.Sscanf(con.Password, SecVerPrefix+"%x", &encrypted)
fmt.Sscanf(c.Password, SecVerPrefix+"%x", &encrypted)
decrypted, err := Decrypt(encrypted, []byte(s))
if err != nil {
return err
}
con.Password = string(decrypted)
c.Password = string(decrypted)
}
return nil
}

// GWConnections defines a set of configurations
type GWConnections []*GWConnection
type GWConnections []GWConnection

// UnmarshalYAML implements the yaml.Unmarshaler interface.
// Applies decode to items in collection for setting only fields present in yaml.
Expand All @@ -212,7 +221,7 @@ func (cc *GWConnections) UnmarshalYAML(value *yaml.Node) error {
if len(*cc) < i+1 {
*cc = append(*cc, GWConnections{{}}...)
}
if err := node.Decode((*cc)[i]); err != nil {
if err := node.Decode(&(*cc)[i]); err != nil {
return err
}
}
Expand All @@ -221,9 +230,9 @@ func (cc *GWConnections) UnmarshalYAML(value *yaml.Node) error {

// Config defines TCG Agent configuration
type Config struct {
Connector Connector `yaml:"connector"`
DSConnection DSConnection `yaml:"dsConnection"`
GWConnections GWConnections `yaml:"gwConnections"`
Connector Connector `envPrefix:"CONNECTOR_" yaml:"connector"`
DSConnection DSConnection `envPrefix:"DSCONNECTION_" yaml:"dsConnection"`
GWConnections GWConnections `envPrefix:"GWCONNECTIONS_" yaml:"gwConnections"`
}

func defaults() Config {
Expand Down Expand Up @@ -293,14 +302,7 @@ func GetConfig() *Config {
Msg("could not parse config")
}
}
if data, err := yaml.Marshal(cfg); err == nil {
data = applyEnv(data)
if err := yaml.Unmarshal(data, cfg); err != nil {
log.Err(err).
Str("configData", string(data)).
Msg("could not apply env vars")
}
} else {
if err := applyEnv(cfg); err != nil {
log.Warn().Err(err).
Msg("could not apply env vars")
}
Expand Down Expand Up @@ -355,20 +357,20 @@ func (cfg *Config) loadAdvancedPrefixes(data []byte) error {
log.Err(err).Msg("could not parse advanced")
return err
}
for _, c := range cfg.GWConnections {
c.PrefixResourceNames = false
c.ResourceNamePrefix = ""
for i := range cfg.GWConnections {
cfg.GWConnections[i].PrefixResourceNames = false
cfg.GWConnections[i].ResourceNamePrefix = ""
for _, p := range s.Advanced.Prefixes {
if c.ID == p.GWConnectionID && p.Prefix != "" {
c.PrefixResourceNames = true
c.ResourceNamePrefix = p.Prefix
if cfg.GWConnections[i].ID == p.GWConnectionID && p.Prefix != "" {
cfg.GWConnections[i].PrefixResourceNames = true
cfg.GWConnections[i].ResourceNamePrefix = p.Prefix
}
}
}
return nil
}

func (cfg *Config) loadDynamicInventoryFlag(data []byte) error {
func (cfg *Config) loadDynamicInventoryFlag(_ []byte) error {
/* TODO: Support dynamic flag in UI
var s struct {
Connection struct {
Expand Down Expand Up @@ -437,10 +439,8 @@ func (cfg *Config) LoadConnectorDTO(data []byte) (*ConnectorDTO, error) {
Msg("could not write config")
}
/* load environment */
output = applyEnv(output)
if err := yaml.Unmarshal(output, newCfg); err != nil {
log.Err(err).
Str("configData", string(output)).
if err := applyEnv(newCfg); err != nil {
log.Warn().Err(err).
Msg("could not apply env vars")
}
}
Expand Down Expand Up @@ -514,5 +514,5 @@ func (cfg Config) initLogger() {
With().Timestamp().Caller().
Logger()
/* adapt SDK logger */
sdklog.Logger = slog.New((&logzer.SLogHandler{CallerSkipFrame: 3}))
sdklog.Logger = slog.New(&logzer.SLogHandler{CallerSkipFrame: 3})
}
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dsConnection: # empty set
assert.Equal(t, false, cfg.Connector.Enabled)
assert.Equal(t, 111, cfg.Connector.BatchMaxBytes)
assert.Equal(t, LogLevel(2), cfg.Connector.LogLevel)
// assert.Equal(t, "localhost:3001", cfg.DSConnection.HostName)
assert.Equal(t, "localhost:3001", cfg.DSConnection.HostName)
assert.Equal(t, "SEC RET", cfg.GWConnections[0].Password)
assert.Equal(t, "SEC_RET", cfg.GWConnections[1].Password)
}
Expand Down
Loading

0 comments on commit 626d6d5

Please sign in to comment.