Skip to content

Commit

Permalink
Fix Kafka Partition Count (#641)
Browse files Browse the repository at this point in the history
- addresses `partitions` being returned by API Get/List operations instead of `partition_count` which is used for Create/Update operations

Co-authored-by: Andrew Starr-Bochicchio <andrewsomething@users.noreply.github.com>
  • Loading branch information
dweinshenker and andrewsomething authored Oct 16, 2023
1 parent dab707b commit 96b0f34
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 32 deletions.
29 changes: 23 additions & 6 deletions databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,26 @@ type DatabaseDB struct {

// DatabaseTopic represents a Kafka topic
type DatabaseTopic struct {
Name string `json:"name"`
PartitionCount *uint32 `json:"partition_count,omitempty"`
ReplicationFactor *uint32 `json:"replication_factor,omitempty"`
State string `json:"state,omitempty"`
Config *TopicConfig `json:"config,omitempty"`
Name string `json:"name"`
Partitions []*TopicPartition `json:"partitions,omitempty"`
ReplicationFactor *uint32 `json:"replication_factor,omitempty"`
State string `json:"state,omitempty"`
Config *TopicConfig `json:"config,omitempty"`
}

// TopicPartition represents the state of a Kafka topic partition
type TopicPartition struct {
EarliestOffset uint64 `json:"earliest_offset,omitempty"`
InSyncReplicas uint32 `json:"in_sync_replicas,omitempty"`
Id uint32 `json:"id,omitempty"`
Size uint64 `json:"size,omitempty"`
ConsumerGroups []*TopicConsumerGroup `json:"consumer_groups,omitempty"`
}

// TopicConsumerGroup represents a consumer group for a particular Kafka topic
type TopicConsumerGroup struct {
Name string `json:"name,omitempty"`
Offset uint64 `json:"offset,omitempty"`
}

// TopicConfig represents all configurable options for a Kafka topic
Expand Down Expand Up @@ -342,7 +357,9 @@ type DatabaseCreateTopicRequest struct {

// DatabaseUpdateTopicRequest ...
type DatabaseUpdateTopicRequest struct {
Topic *DatabaseTopic `json:"topic"` // note: `name` field in Topic unused on update
PartitionCount *uint32 `json:"partition_count,omitempty"`
ReplicationFactor *uint32 `json:"replication_factor,omitempty"`
Config *TopicConfig `json:"config,omitempty"`
}

// DatabaseReplica represents a read-only replica of a particular database
Expand Down
179 changes: 153 additions & 26 deletions databases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2323,7 +2323,6 @@ func TestDatabases_CreateTopic(t *testing.T) {

want := &DatabaseTopic{
Name: "events",
PartitionCount: &numPartitions,
ReplicationFactor: &replicationFactor,
Config: &TopicConfig{
RetentionMS: &retentionMS,
Expand Down Expand Up @@ -2392,12 +2391,10 @@ func TestDatabases_UpdateTopic(t *testing.T) {
})

_, err := client.Databases.UpdateTopic(ctx, dbID, topicName, &DatabaseUpdateTopicRequest{
Topic: &DatabaseTopic{
PartitionCount: &numPartitions,
ReplicationFactor: &replicationFactor,
Config: &TopicConfig{
RetentionMS: &retentionMS,
},
PartitionCount: &numPartitions,
ReplicationFactor: &replicationFactor,
Config: &TopicConfig{
RetentionMS: &retentionMS,
},
})

Expand Down Expand Up @@ -2430,30 +2427,73 @@ func TestDatabases_GetTopic(t *testing.T) {
var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
topicName = "events"
numPartitions = uint32(3)
replicationFactor = uint32(2)
retentionMS = int64(1000 * 60)
)

want := &DatabaseTopic{
Name: "events",
PartitionCount: &numPartitions,
Name: "events",
Partitions: []*TopicPartition{
{
Size: 0,
Id: 0,
InSyncReplicas: 2,
EarliestOffset: 0,
ConsumerGroups: nil,
},
{
Size: 0,
Id: 1,
InSyncReplicas: 2,
EarliestOffset: 0,
ConsumerGroups: nil,
},
{
Size: 0,
Id: 2,
InSyncReplicas: 2,
EarliestOffset: 0,
ConsumerGroups: nil,
},
},
ReplicationFactor: &replicationFactor,
Config: &TopicConfig{
RetentionMS: &retentionMS,
},
}

body := `{
"topic": {
"name": "events",
"partition_count": 3,
"replication_factor": 2,
"config": {
"retention_ms": 60000
}
}
}`
"topic":{
"name":"events",
"replication_factor":2,
"config":{
"retention_ms":60000
},
"partitions":[
{
"size":0,
"id":0,
"in_sync_replicas":2,
"earliest_offset":0,
"consumer_groups":null
},
{
"size":0,
"id":1,
"in_sync_replicas":2,
"earliest_offset":0,
"consumer_groups":null
},
{
"size":0,
"id":2,
"in_sync_replicas":2,
"earliest_offset":0,
"consumer_groups":null
}
]
}
}`

path := fmt.Sprintf("/v2/databases/%s/topics/%s", dbID, topicName)

Expand All @@ -2473,23 +2513,66 @@ func TestDatabases_ListTopics(t *testing.T) {

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
numPartitions = uint32(3)
replicationFactor = uint32(2)
retentionMS = int64(1000 * 60)
)

want := []DatabaseTopic{
{
Name: "events",
PartitionCount: &numPartitions,
Name: "events",
Partitions: []*TopicPartition{
{
Size: 0,
Id: 0,
InSyncReplicas: 2,
EarliestOffset: 0,
ConsumerGroups: nil,
},
{
Size: 0,
Id: 1,
InSyncReplicas: 2,
EarliestOffset: 0,
ConsumerGroups: nil,
},
{
Size: 0,
Id: 2,
InSyncReplicas: 2,
EarliestOffset: 0,
ConsumerGroups: nil,
},
},
ReplicationFactor: &replicationFactor,
Config: &TopicConfig{
RetentionMS: &retentionMS,
},
},
{
Name: "events_ii",
PartitionCount: &numPartitions,
Name: "events_ii",
Partitions: []*TopicPartition{
{
Size: 0,
Id: 0,
InSyncReplicas: 2,
EarliestOffset: 0,
ConsumerGroups: nil,
},
{
Size: 0,
Id: 1,
InSyncReplicas: 2,
EarliestOffset: 0,
ConsumerGroups: nil,
},
{
Size: 0,
Id: 2,
InSyncReplicas: 2,
EarliestOffset: 0,
ConsumerGroups: nil,
},
},
ReplicationFactor: &replicationFactor,
Config: &TopicConfig{
RetentionMS: &retentionMS,
Expand All @@ -2501,15 +2584,59 @@ func TestDatabases_ListTopics(t *testing.T) {
"topics": [
{
"name": "events",
"partition_count": 3,
"partitions":[
{
"size":0,
"id":0,
"in_sync_replicas":2,
"earliest_offset":0,
"consumer_groups":null
},
{
"size":0,
"id":1,
"in_sync_replicas":2,
"earliest_offset":0,
"consumer_groups":null
},
{
"size":0,
"id":2,
"in_sync_replicas":2,
"earliest_offset":0,
"consumer_groups":null
}
],
"replication_factor": 2,
"config": {
"retention_ms": 60000
}
},
{
"name": "events_ii",
"partition_count": 3,
"partitions":[
{
"size":0,
"id":0,
"in_sync_replicas":2,
"earliest_offset":0,
"consumer_groups":null
},
{
"size":0,
"id":1,
"in_sync_replicas":2,
"earliest_offset":0,
"consumer_groups":null
},
{
"size":0,
"id":2,
"in_sync_replicas":2,
"earliest_offset":0,
"consumer_groups":null
}
],
"replication_factor": 2,
"config": {
"retention_ms": 60000
Expand Down

0 comments on commit 96b0f34

Please sign in to comment.