Skip to content

Commit

Permalink
to be reverted: more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Sep 22, 2024
1 parent 1307d92 commit 060595a
Showing 1 changed file with 48 additions and 44 deletions.
92 changes: 48 additions & 44 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func newPeer(
msg := proto.Clone(mt)
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
p.Logger.Error("before panic", "msg", msg, "channel", chID, "type", mt, "bytes", hex.EncodeToString(msgBytes))
p.Logger.Error("before panic", "msg", msg, "channel", chID, "type", mt, "bytes", hex.EncodeToString(msgBytes), "raw_bytes", msgBytes)
return
}

Expand Down Expand Up @@ -429,49 +429,6 @@ func (p *peer) getStream(chID byte) (quic.Stream, bool) {
return stream, has
}

// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
// SendEnvelope replaces Send which will be deprecated in a future release.
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
stream, has := p.getStream(chID)
if !has {
newStream, err := p.conn.OpenStreamSync(context.Background())
if err != nil {
p.Logger.Error("error opening quic stream", "err", err.Error())
return false
}
p.addStream(newStream, chID)
stream = newStream
err = binary.Write(stream, binary.BigEndian, chID)
if err != nil {
p.Logger.Error("error sending channel ID", "err", err.Error())
return false
}
}

if err := binary.Write(stream, binary.BigEndian, uint32(len(msgBytes))); err != nil {
p.Logger.Error("Send len failed", "err", err, "stream_id", stream.StreamID(), "msgBytes", log.NewLazySprintf("%X", msgBytes))
return false
}
err := binary.Write(stream, binary.BigEndian, msgBytes)
if err != nil {
p.Logger.Info("Send failed", "channel", "stream_id", stream.StreamID(), "msgBytes", log.NewLazySprintf("%X", msgBytes))
return false
}
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))

return true
}

// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
// TrySendEnvelope replaces TrySend which will be deprecated in a future release.
Expand Down Expand Up @@ -570,6 +527,51 @@ func PeerMetrics(metrics *Metrics) PeerOption {
func (p *peer) metricsReporter() {
}

// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
// SendEnvelope replaces Send which will be deprecated in a future release.
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
stream, has := p.getStream(chID)
if !has {
newStream, err := p.conn.OpenStreamSync(context.Background())
if err != nil {
p.Logger.Error("error opening quic stream", "err", err.Error())
return false
}
p.addStream(newStream, chID)
stream = newStream
err = binary.Write(stream, binary.BigEndian, chID)
if err != nil {
p.Logger.Error("error sending channel ID", "err", err.Error())
return false
}
}

if err := binary.Write(stream, binary.BigEndian, uint32(len(msgBytes))); err != nil {
p.Logger.Error("Send len failed", "err", err, "stream_id", stream.StreamID(), "msgBytes", log.NewLazySprintf("%X", msgBytes))
return false
}
err := binary.Write(stream, binary.BigEndian, msgBytes)
p.Logger.Info("sent data_len", "len", len(msgBytes))
if err != nil {
p.Logger.Info("Send failed", "channel", "stream_id", stream.StreamID(), "msgBytes", log.NewLazySprintf("%X", msgBytes))
return false
}
p.Logger.Info("sent data", "data", hex.EncodeToString(msgBytes), "raw_data", msgBytes)
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))

return true
}

func (p *peer) StartReceiving() error {
for {
stream, err := p.conn.AcceptStream(context.Background())
Expand All @@ -592,12 +594,14 @@ func (p *peer) StartReceiving() error {
p.Logger.Debug("failed to read size from stream", "err", err.Error())
return
}
p.Logger.Info("received data len", "len", dataLen)
data := make([]byte, dataLen)
_, err = io.ReadFull(stream, data)
if err != nil {
p.Logger.Debug("failed to read data from stream", "err", err.Error())
return
}
p.Logger.Info("received data", "bytes", hex.EncodeToString(data), "raw_bytes", data)
p.onReceive(chID, data)
}
}()
Expand Down

0 comments on commit 060595a

Please sign in to comment.