Skip to content

Commit

Permalink
adds StartOffset to the right struct
Browse files Browse the repository at this point in the history
By mistake I added the field to the root level instead of adding to the
blocks
  • Loading branch information
d1egoaz committed Jan 17, 2020
1 parent 7728893 commit 7e314f9
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions produce_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,27 @@ import (
"time"
)

// Protocol, http://kafka.apache.org/protocol.html
// v1
// v2 = v3 = v4
// v5 = v6 = v7
// Produce Response (Version: 7) => [responses] throttle_time_ms
// responses => topic [partition_responses]
// topic => STRING
// partition_responses => partition error_code base_offset log_append_time log_start_offset
// partition => INT32
// error_code => INT16
// base_offset => INT64
// log_append_time => INT64
// log_start_offset => INT64
// throttle_time_ms => INT32

// partition_responses in protocol
type ProduceResponseBlock struct {
Err KError
Offset int64
// only provided if Version >= 2 and the broker is configured with `LogAppendTime`
Timestamp time.Time
Err KError // v0, error_code
Offset int64 // v0, base_offset
Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
StartOffset int64 // v5, log_start_offset
}

func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand All @@ -32,6 +48,13 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro
}
}

if version >= 5 {
b.StartOffset, err = pd.getInt64()
if err != nil {
return err
}
}

return nil
}

Expand All @@ -49,14 +72,17 @@ func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err erro
pe.putInt64(timestamp)
}

if version >= 5 {
pe.putInt64(b.StartOffset)
}

return nil
}

type ProduceResponse struct {
Blocks map[string]map[int32]*ProduceResponseBlock
Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
Version int16
ThrottleTime time.Duration // only provided if Version >= 1
StartOffset int64 // only provided if Version >= 5
ThrottleTime time.Duration // v1, throttle_time_ms
}

func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -96,13 +122,6 @@ func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
}
}

if version >= 5 {
r.StartOffset, err = pd.getInt64()
if err != nil {
return err
}
}

if r.Version >= 1 {
millis, err := pd.getInt32()
if err != nil {
Expand Down Expand Up @@ -137,9 +156,6 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
}
}
}
if r.Version >= 5 {
pe.putInt64(r.StartOffset)
}

if r.Version >= 1 {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
Expand Down

0 comments on commit 7e314f9

Please sign in to comment.