-
Notifications
You must be signed in to change notification settings - Fork 8.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FAB-1360] Introduce ChainPartition for Kafka
https://jira.hyperledger.org/browse/FAB-1360 This changeset introduces ChainPartition, a construct that will be used to identify the Kafka topic/partition that the ordering shims should interact with when dealing with a particular chain. Note that it encodes the given chain ID in base-16 as a means of escaping characters that are not allowed in Kafka topic names. All the restrictions for topic names are linked to in line 38. Filtering for complying chain IDs should probably happen during transaction filtering at ingress. This is marked as a TODO, and a JIRA issue has been created for it. This changeset merely introduces the ChainPartition construct and does not integrate it with the rest of the code yet. This will happen in a follow-up changeset. Change-Id: I33d4e80758d7dfd7e7d827caa385d0507a39a40a Signed-off-by: Kostas Christidis <kostas@christidis.io>
- Loading branch information
1 parent
95094cd
commit b9db02d
Showing
2 changed files
with
98 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
Copyright IBM Corp. 2016 All Rights Reserved. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package kafka | ||
|
||
import "fmt" | ||
|
||
const rawPartition = 0 | ||
|
||
// ChainPartition identifies the Kafka partition the orderer interacts with. | ||
type ChainPartition interface { | ||
Topic() string | ||
Partition() int32 | ||
fmt.Stringer | ||
} | ||
|
||
type chainPartitionImpl struct { | ||
tpc string | ||
prt int32 | ||
} | ||
|
||
// Returns a new chain partition for a given chain ID and partition. | ||
func newChainPartition(chainID string, partition int32) ChainPartition { | ||
return &chainPartitionImpl{ | ||
// TODO https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/common/Topic.scala#L29 | ||
tpc: fmt.Sprintf("%x", chainID), | ||
prt: partition, | ||
} | ||
} | ||
|
||
// Topic returns the Kafka topic of this chain partition. | ||
func (cp *chainPartitionImpl) Topic() string { | ||
return cp.tpc | ||
} | ||
|
||
// Partition returns the Kafka partition of this chain partition. | ||
func (cp *chainPartitionImpl) Partition() int32 { | ||
return cp.prt | ||
} | ||
|
||
// String returns a string identifying the chain partition. | ||
func (cp *chainPartitionImpl) String() string { | ||
return fmt.Sprintf("%s/%d", cp.tpc, cp.prt) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
Copyright IBM Corp. 2016 All Rights Reserved. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package kafka | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/hyperledger/fabric/orderer/common/bootstrap/static" | ||
) | ||
|
||
func TestChainPartition(t *testing.T) { | ||
cp := newChainPartition(static.TestChainID, rawPartition) | ||
|
||
expectedTopic := fmt.Sprintf("%x", static.TestChainID) | ||
actualTopic := cp.Topic() | ||
if strings.Compare(expectedTopic, actualTopic) != 0 { | ||
t.Fatalf("Got the wrong topic, expected %s, got %s instead", expectedTopic, actualTopic) | ||
} | ||
|
||
expectedPartition := int32(rawPartition) | ||
actualPartition := cp.Partition() | ||
if actualPartition != expectedPartition { | ||
t.Fatalf("Got the wrong partition, expected %d, got %d instead", expectedPartition, actualPartition) | ||
} | ||
} |