diff --git a/outputs/awssecuritylake.go b/outputs/awssecuritylake.go index f6760330f..cfb181439 100644 --- a/outputs/awssecuritylake.go +++ b/outputs/awssecuritylake.go @@ -201,10 +201,10 @@ func (c *Client) EnqueueSecurityLake(falcopayload types.FalcoPayload) { go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"}) c.Stats.AWSSecurityLake.Add(Error, 1) c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc() - log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, err) + log.Printf("[ERROR] : %v SecurityLake - %v\n", c.OutputType, err) return } - log.Printf("[INFO] : %v SecurityLake. - Event queued (%v)\n", c.OutputType, falcopayload.UUID) + log.Printf("[INFO] : %v SecurityLake - Event queued (%v)\n", c.OutputType, falcopayload.UUID) *c.Config.AWS.SecurityLake.WriteOffset = offset } @@ -231,7 +231,7 @@ func (c *Client) processNextBatch() error { go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"}) c.Stats.AWSSecurityLake.Add(Error, 1) c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc() - log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, err) + log.Printf("[ERROR] : %v SecurityLake - %v\n", c.OutputType, err) // ctx currently not handled in main // https://github.com/falcosecurity/falcosidekick/pull/390#discussion_r1081690326 return err @@ -250,7 +250,7 @@ func (c *Client) processNextBatch() error { earliest, err, ) - log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, msg) + log.Printf("[ERROR] : %v SecurityLake - %v\n", c.OutputType, msg) awslake.ReadOffset = &earliest return err } @@ -260,7 +260,7 @@ func (c *Client) processNextBatch() error { go c.CountMetric(Outputs, 1, []string{"output:awssecuritylake.", "status:error"}) c.Stats.AWSSecurityLake.Add(Error, 1) c.PromStats.Outputs.With(map[string]string{"destination": "awssecuritylake.", "status": Error}).Inc() - log.Printf("[ERROR] : %v SecurityLake. - %v\n", c.OutputType, err) + log.Printf("[ERROR] : %v SecurityLake - %v\n", c.OutputType, err) return err } } @@ -290,7 +290,8 @@ func (c *Client) processNextBatch() error { func (c *Client) writeParquet(uid string, records []memlog.Record) error { fw, err := mem.NewMemFileWriter(uid+".parquet", func(name string, r io.Reader) error { t := time.Now() - key := fmt.Sprintf("/%s/region=%s/accountId=%s/eventHour=%s/%s.parquet", c.Config.AWS.SecurityLake.Prefix, c.Config.AWS.SecurityLake.Region, c.Config.AWS.SecurityLake.AccountID, t.Format("2006010215"), uid) + key := fmt.Sprintf("/%s/region=%s/accountId=%s/eventDay=%s/%s.parquet", c.Config.AWS.SecurityLake.Prefix, c.Config.AWS.SecurityLake.Region, c.Config.AWS.SecurityLake.AccountID, t.Format("20060102"), uid) + fmt.Println(key) ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFn() resp, err := s3.New(c.AWSSession).PutObjectWithContext(ctx, &s3.PutObjectInput{ @@ -301,42 +302,42 @@ func (c *Client) writeParquet(uid string, records []memlog.Record) error { ACL: aws.String(s3.ObjectCannedACLBucketOwnerFullControl), }) if err != nil { - log.Printf("[ERROR] : %v SecurityLake. - Upload parquet file %s.parquet Failed: %v\n", c.OutputType, uid, err) + log.Printf("[ERROR] : %v SecurityLake - Upload parquet file %s.parquet Failed: %v\n", c.OutputType, uid, err) return err } if resp.SSECustomerAlgorithm != nil { - log.Printf("[INFO] : %v SecurityLake. - Upload parquet file %s.parquet OK (%v) (%v events) \n", c.OutputType, uid, *resp.SSECustomerKeyMD5, len(records)) + log.Printf("[INFO] : %v SecurityLake - Upload parquet file %s.parquet OK (%v) (%v events) \n", c.OutputType, uid, *resp.SSECustomerKeyMD5, len(records)) } else { - log.Printf("[INFO] : %v SecurityLake. - Upload parquet file %s.parquet OK (%v events)\n", c.OutputType, uid, len(records)) + log.Printf("[INFO] : %v SecurityLake - Upload parquet file %s.parquet OK (%v events)\n", c.OutputType, uid, len(records)) } return nil }) if err != nil { - log.Printf("[ERROR] : %v SecurityLake. - Can't create the parquet file %s.parquet: %v\n", c.OutputType, uid, err) + log.Printf("[ERROR] : %v SecurityLake - Can't create the parquet file %s.parquet: %v\n", c.OutputType, uid, err) return err } pw, err := writer.NewParquetWriter(fw, new(OCSFSecurityFinding), 10) if err != nil { - log.Printf("[ERROR] : %v SecurityLake. - Can't create the parquet writer: %v\n", c.OutputType, err) + log.Printf("[ERROR] : %v SecurityLake - Can't create the parquet writer: %v\n", c.OutputType, err) return err } for _, i := range records { var f types.FalcoPayload if err := json.Unmarshal(i.Data, &f); err != nil { - log.Printf("[ERROR] : %v SecurityLake. - Unmarshalling error: %v\n", c.OutputType, err) + log.Printf("[ERROR] : %v SecurityLake - Unmarshalling error: %v\n", c.OutputType, err) continue } o := NewOCSFSecurityFinding(f) if err = pw.Write(o); err != nil { - log.Printf("[ERROR] : %v SecurityLake. - Parquet writer error: %v\n", c.OutputType, err) + log.Printf("[ERROR] : %v SecurityLake - Parquet writer error: %v\n", c.OutputType, err) continue } } if err = pw.WriteStop(); err != nil { - log.Printf("[ERROR] : %v SecurityLake. - Can't stop the parquet writer: %v\n", c.OutputType, err) + log.Printf("[ERROR] : %v SecurityLake - Can't stop the parquet writer: %v\n", c.OutputType, err) } if err = fw.Close(); err != nil { - log.Printf("[ERROR] : %v SecurityLake. - Can't close the parquet file %s.parquet: %v\n", c.OutputType, uid, err) + log.Printf("[ERROR] : %v SecurityLake - Can't close the parquet file %s.parquet: %v\n", c.OutputType, uid, err) return err } return nil