Skip to content

Commit

Permalink
Fix avro union null string initialization error (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
Peiling-Ding authored May 17, 2023
1 parent 3cbf9a9 commit 11cd3ab
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
15 changes: 11 additions & 4 deletions exchange/msp_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ func addContextToMoa(moa *model.Msp_openrtb2_auction, mspContexts []*MspContext)
if mspContexts != nil && len(mspContexts) > 0 {
data := mspContexts[0].Data
if data.SessionId != nil && len(data.SessionId) > 0 {
moa.Session_id = &model.UnionNullString{String: data.SessionId[0]}
moa.Session_id = newUnionNullString(data.SessionId[0])
}
if data.DocId != nil && len(data.DocId) > 0 {
moa.Doc_id = &model.UnionNullString{String: data.DocId[0]}
moa.Doc_id = newUnionNullString(data.DocId[0])
}
if data.PubDomainSource != nil && len(data.PubDomainSource) > 0 {
moa.Pub_domain_source = &model.UnionNullString{String: data.PubDomainSource[0]}
moa.Pub_domain_source = newUnionNullString(data.PubDomainSource[0])
}
moa.Art_first_cat = &model.UnionNullString{String: data.ArtFirstCat}
moa.Art_first_cat = newUnionNullString(data.ArtFirstCat)
}
}

Expand Down Expand Up @@ -170,3 +170,10 @@ func addBiddingDetailsToMoa(
moa.Bidder_context_list = append(moa.Bidder_context_list, bidderCtx)
}
}

func newUnionNullString(str string) *model.UnionNullString {
s := model.NewUnionNullString()
s.String = str
s.UnionType = model.UnionNullStringTypeEnumString
return s
}
23 changes: 15 additions & 8 deletions msp/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"log"

"github.com/golang/glog"
"github.com/prebid/prebid-server/config"
model "github.com/prebid/prebid-server/msp/etl/model"
kafka "github.com/segmentio/kafka-go"
Expand All @@ -29,18 +28,22 @@ func (k *KafkaDataProducer) ProduceOpenrtb2Auction(moa model.Msp_openrtb2_auctio
// print(moa)

var buf bytes.Buffer
moa.Serialize(&buf)
if err := moa.Serialize(&buf); err != nil {
glog.Errorln("Failed to serialize MSP Open Auction data.")
return
}

msg, _ := addSchema(k.auctionAvroSchema, buf.Bytes())

if err := k.kafkaWriter.WriteMessages(context.Background(), kafka.Message{Value: msg}); err != nil {
// Only validation errors would be reported in this case.
log.Fatal("failed to write messages:", err)
glog.Errorln("Failed to write messages to Kafka:", err)
}
}

func NewKafkaDataProducer(cfg config.ETL) MspEtlDataProducer {
log.Println("ETL Kafka Host: ", cfg.KafkaHost)
log.Println("ETL Kafka Topic: ", cfg.KafkaTopic)
glog.Infoln("ETL Kafka Host: ", cfg.KafkaHost)
glog.Infoln("ETL Kafka Topic: ", cfg.KafkaTopic)
return &KafkaDataProducer{
auctionAvroSchema: cfg.AvroSchemaAuction,
kafkaWriter: &kafka.Writer{
Expand Down Expand Up @@ -72,6 +75,10 @@ func addSchema(id int, msgBytes []byte) ([]byte, error) {
}

func print(moa model.Msp_openrtb2_auction) {
out, _ := json.Marshal(moa)
fmt.Println("** MSP SERVER LOG **", string(out))
out, err := json.Marshal(moa)
if err != nil {
glog.Errorln("Failed to print MOA: ", err)
} else {
glog.Infoln("** MSP SERVER LOG **", string(out))
}
}

0 comments on commit 11cd3ab

Please sign in to comment.