diff --git a/.golangci.yml b/.golangci.yml index c1e71ac4f..25d8c1bea 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -59,7 +59,7 @@ linters: - misspell # - nakedret - nilerr - - paralleltest + # - paralleltest # - scopelint - staticcheck - structcheck diff --git a/Makefile b/Makefile index e5ba4f613..a9b95e332 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ default: fmt get update test lint GO := go GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -GOTEST := $(GO) test -gcflags='-l' -p 1 -parallel 1 -race -timeout 10m -coverprofile=profile.out -covermode=atomic +GOTEST := $(GO) test -race -timeout 10m -coverprofile=profile.out -covermode=atomic FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go') TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go') diff --git a/acl_create_request_test.go b/acl_create_request_test.go index 896b36d6c..1c636d01f 100644 --- a/acl_create_request_test.go +++ b/acl_create_request_test.go @@ -25,7 +25,6 @@ var ( ) func TestCreateAclsRequestv0(t *testing.T) { - t.Parallel() req := &CreateAclsRequest{ Version: 0, AclCreations: []*AclCreation{ @@ -48,7 +47,6 @@ func TestCreateAclsRequestv0(t *testing.T) { } func TestCreateAclsRequestv1(t *testing.T) { - t.Parallel() req := &CreateAclsRequest{ Version: 1, AclCreations: []*AclCreation{ diff --git a/acl_create_response_test.go b/acl_create_response_test.go index 020c8b378..65b934d9a 100644 --- a/acl_create_response_test.go +++ b/acl_create_response_test.go @@ -24,7 +24,6 @@ var ( ) func TestCreateAclsResponse(t *testing.T) { - t.Parallel() errmsg := "error" resp := &CreateAclsResponse{ ThrottleTime: 100 * time.Millisecond, diff --git a/acl_delete_request_test.go b/acl_delete_request_test.go index 6b456f5c8..cb126e30f 100644 --- a/acl_delete_request_test.go +++ b/acl_delete_request_test.go @@ -63,7 +63,6 @@ var ( ) func TestDeleteAclsRequest(t *testing.T) { - t.Parallel() req := &DeleteAclsRequest{ Filters: []*AclFilter{{ ResourceType: AclResourceAny, @@ -92,7 +91,6 @@ func TestDeleteAclsRequest(t *testing.T) { } func TestDeleteAclsRequestV1(t *testing.T) { - t.Parallel() req := &DeleteAclsRequest{ Version: 1, Filters: []*AclFilter{{ diff --git a/acl_delete_response_test.go b/acl_delete_response_test.go index e8ba57753..eb57d68a5 100644 --- a/acl_delete_response_test.go +++ b/acl_delete_response_test.go @@ -22,7 +22,6 @@ var deleteAclsResponse = []byte{ } func TestDeleteAclsResponse(t *testing.T) { - t.Parallel() resp := &DeleteAclsResponse{ ThrottleTime: 100 * time.Millisecond, FilterResponses: []*FilterResponse{{ diff --git a/acl_describe_request_test.go b/acl_describe_request_test.go index 6a090c330..3cb73eb79 100644 --- a/acl_describe_request_test.go +++ b/acl_describe_request_test.go @@ -25,7 +25,6 @@ var ( ) func TestAclDescribeRequestV0(t *testing.T) { - t.Parallel() resourcename := "topic" principal := "principal" host := "host" @@ -45,7 +44,6 @@ func TestAclDescribeRequestV0(t *testing.T) { } func TestAclDescribeRequestV1(t *testing.T) { - t.Parallel() resourcename := "topic" principal := "principal" host := "host" diff --git a/acl_describe_response_test.go b/acl_describe_response_test.go index 3147c4bc8..f0652cfee 100644 --- a/acl_describe_response_test.go +++ b/acl_describe_response_test.go @@ -20,7 +20,6 @@ var aclDescribeResponseError = []byte{ } func TestAclDescribeResponse(t *testing.T) { - t.Parallel() errmsg := "error" resp := &DescribeAclsResponse{ ThrottleTime: 100 * time.Millisecond, diff --git a/acl_types_test.go b/acl_types_test.go index 65dc3a84c..0b5247a14 100644 --- a/acl_types_test.go +++ b/acl_types_test.go @@ -5,7 +5,6 @@ import ( ) func TestAclOperationTextMarshal(t *testing.T) { - t.Parallel() for i := AclOperationUnknown; i <= AclOperationIdempotentWrite; i++ { text, err := i.MarshalText() if err != nil { @@ -23,7 +22,6 @@ func TestAclOperationTextMarshal(t *testing.T) { } func TestAclPermissionTypeTextMarshal(t *testing.T) { - t.Parallel() for i := AclPermissionUnknown; i <= AclPermissionAllow; i++ { text, err := i.MarshalText() if err != nil { @@ -41,7 +39,6 @@ func TestAclPermissionTypeTextMarshal(t *testing.T) { } func TestAclResourceTypeTextMarshal(t *testing.T) { - t.Parallel() for i := AclResourceUnknown; i <= AclResourceTransactionalID; i++ { text, err := i.MarshalText() if err != nil { @@ -59,7 +56,6 @@ func TestAclResourceTypeTextMarshal(t *testing.T) { } func TestAclResourcePatternTypeTextMarshal(t *testing.T) { - t.Parallel() for i := AclPatternUnknown; i <= AclPatternPrefixed; i++ { text, err := i.MarshalText() if err != nil { diff --git a/add_offsets_to_txn_request_test.go b/add_offsets_to_txn_request_test.go index c3bb3cc12..471d085cd 100644 --- a/add_offsets_to_txn_request_test.go +++ b/add_offsets_to_txn_request_test.go @@ -10,7 +10,6 @@ var addOffsetsToTxnRequest = []byte{ } func TestAddOffsetsToTxnRequest(t *testing.T) { - t.Parallel() req := &AddOffsetsToTxnRequest{ TransactionalID: "txn", ProducerID: 8000, diff --git a/add_offsets_to_txn_response_test.go b/add_offsets_to_txn_response_test.go index 1c2d47b37..d1730cee4 100644 --- a/add_offsets_to_txn_response_test.go +++ b/add_offsets_to_txn_response_test.go @@ -11,7 +11,6 @@ var addOffsetsToTxnResponse = []byte{ } func TestAddOffsetsToTxnResponse(t *testing.T) { - t.Parallel() resp := &AddOffsetsToTxnResponse{ ThrottleTime: 100 * time.Millisecond, Err: ErrInvalidProducerEpoch, diff --git a/add_partitions_to_txn_request_test.go b/add_partitions_to_txn_request_test.go index 59ae0a5d5..f60a88695 100644 --- a/add_partitions_to_txn_request_test.go +++ b/add_partitions_to_txn_request_test.go @@ -12,7 +12,6 @@ var addPartitionsToTxnRequest = []byte{ } func TestAddPartitionsToTxnRequest(t *testing.T) { - t.Parallel() req := &AddPartitionsToTxnRequest{ TransactionalID: "txn", ProducerID: 8000, diff --git a/add_partitions_to_txn_response_test.go b/add_partitions_to_txn_response_test.go index ed4902011..b3635e58d 100644 --- a/add_partitions_to_txn_response_test.go +++ b/add_partitions_to_txn_response_test.go @@ -15,7 +15,6 @@ var addPartitionsToTxnResponse = []byte{ } func TestAddPartitionsToTxnResponse(t *testing.T) { - t.Parallel() resp := &AddPartitionsToTxnResponse{ ThrottleTime: 100 * time.Millisecond, Errors: map[string][]*PartitionError{ diff --git a/admin_test.go b/admin_test.go index 9441142d7..57b59c374 100644 --- a/admin_test.go +++ b/admin_test.go @@ -7,7 +7,6 @@ import ( ) func TestClusterAdmin(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -31,7 +30,6 @@ func TestClusterAdmin(t *testing.T) { } func TestClusterAdminInvalidController(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -53,7 +51,6 @@ func TestClusterAdminInvalidController(t *testing.T) { } func TestClusterAdminCreateTopic(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -82,7 +79,6 @@ func TestClusterAdminCreateTopic(t *testing.T) { } func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -111,7 +107,6 @@ func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { } func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -142,7 +137,6 @@ func TestClusterAdminCreateTopicWithoutAuthorization(t *testing.T) { } func TestClusterAdminListTopics(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -194,7 +188,6 @@ func TestClusterAdminListTopics(t *testing.T) { } func TestClusterAdminDeleteTopic(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -224,7 +217,6 @@ func TestClusterAdminDeleteTopic(t *testing.T) { } func TestClusterAdminDeleteEmptyTopic(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -254,7 +246,6 @@ func TestClusterAdminDeleteEmptyTopic(t *testing.T) { } func TestClusterAdminCreatePartitions(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -284,7 +275,6 @@ func TestClusterAdminCreatePartitions(t *testing.T) { } func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -314,7 +304,6 @@ func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { } func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -344,7 +333,6 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) { } func TestClusterAdminAlterPartitionReassignments(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -385,7 +373,6 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) { } func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -425,7 +412,6 @@ func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) { } func TestClusterAdminListPartitionReassignments(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -473,7 +459,6 @@ func TestClusterAdminListPartitionReassignments(t *testing.T) { } func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -513,7 +498,6 @@ func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) { } func TestClusterAdminDeleteRecords(t *testing.T) { - t.Parallel() topicName := "my_topic" seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -559,7 +543,6 @@ func TestClusterAdminDeleteRecords(t *testing.T) { } func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) { - t.Parallel() topicName := "my_topic" seedBroker := NewMockBroker(t, 1) secondBroker := NewMockBroker(t, 2) @@ -611,7 +594,6 @@ func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) { } func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { - t.Parallel() topicName := "my_topic" seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -661,7 +643,6 @@ func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { } func TestClusterAdminDescribeConfig(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -728,7 +709,6 @@ func TestClusterAdminDescribeConfig(t *testing.T) { } func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -764,7 +744,6 @@ func TestClusterAdminDescribeConfigWithErrorCode(t *testing.T) { // TestClusterAdminDescribeBrokerConfig ensures that a describe broker config // is sent to the broker in the resource struct, _not_ the controller func TestClusterAdminDescribeBrokerConfig(t *testing.T) { - t.Parallel() controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2) @@ -815,7 +794,6 @@ func TestClusterAdminDescribeBrokerConfig(t *testing.T) { } func TestClusterAdminAlterConfig(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -849,7 +827,6 @@ func TestClusterAdminAlterConfig(t *testing.T) { } func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -881,7 +858,6 @@ func TestClusterAdminAlterConfigWithErrorCode(t *testing.T) { } func TestClusterAdminAlterBrokerConfig(t *testing.T) { - t.Parallel() controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2) @@ -936,7 +912,6 @@ func TestClusterAdminAlterBrokerConfig(t *testing.T) { } func TestClusterAdminIncrementalAlterConfig(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -978,7 +953,6 @@ func TestClusterAdminIncrementalAlterConfig(t *testing.T) { } func TestClusterAdminIncrementalAlterConfigWithErrorCode(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1018,7 +992,6 @@ func TestClusterAdminIncrementalAlterConfigWithErrorCode(t *testing.T) { } func TestClusterAdminIncrementalAlterBrokerConfig(t *testing.T) { - t.Parallel() controllerBroker := NewMockBroker(t, 1) defer controllerBroker.Close() configBroker := NewMockBroker(t, 2) @@ -1081,7 +1054,6 @@ func TestClusterAdminIncrementalAlterBrokerConfig(t *testing.T) { } func TestClusterAdminCreateAcl(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1114,7 +1086,6 @@ func TestClusterAdminCreateAcl(t *testing.T) { } func TestClusterAdminListAcls(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1162,7 +1133,6 @@ func TestClusterAdminListAcls(t *testing.T) { } func TestClusterAdminDeleteAcl(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1199,7 +1169,6 @@ func TestClusterAdminDeleteAcl(t *testing.T) { } func TestDescribeTopic(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1238,7 +1207,6 @@ func TestDescribeTopic(t *testing.T) { } func TestDescribeTopicWithVersion0_11(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1277,7 +1245,6 @@ func TestDescribeTopicWithVersion0_11(t *testing.T) { } func TestDescribeConsumerGroup(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1321,7 +1288,6 @@ func TestDescribeConsumerGroup(t *testing.T) { } func TestListConsumerGroups(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1367,7 +1333,6 @@ func TestListConsumerGroups(t *testing.T) { } func TestListConsumerGroupsMultiBroker(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1432,7 +1397,6 @@ func TestListConsumerGroupsMultiBroker(t *testing.T) { } func TestListConsumerGroupOffsets(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1480,7 +1444,6 @@ func TestListConsumerGroupOffsets(t *testing.T) { } func TestDeleteConsumerGroup(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1510,7 +1473,6 @@ func TestDeleteConsumerGroup(t *testing.T) { } func TestDeleteOffset(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -1563,7 +1525,6 @@ func TestDeleteOffset(t *testing.T) { // TestRefreshMetaDataWithDifferentController ensures that the cached // controller can be forcibly updated from Metadata by the admin client func TestRefreshMetaDataWithDifferentController(t *testing.T) { - t.Parallel() seedBroker1 := NewMockBroker(t, 1) seedBroker2 := NewMockBroker(t, 2) defer seedBroker1.Close() @@ -1610,7 +1571,6 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) { } func TestDescribeLogDirs(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() diff --git a/alter_client_quotas_request_test.go b/alter_client_quotas_request_test.go index 404f69f92..46f92b9cb 100644 --- a/alter_client_quotas_request_test.go +++ b/alter_client_quotas_request_test.go @@ -63,7 +63,6 @@ var ( ) func TestAlterClientQuotasRequest(t *testing.T) { - t.Parallel() // default user defaultUserComponent := QuotaEntityComponent{ EntityType: QuotaEntityUser, diff --git a/alter_client_quotas_response_test.go b/alter_client_quotas_response_test.go index a767cf984..40efeb547 100644 --- a/alter_client_quotas_response_test.go +++ b/alter_client_quotas_response_test.go @@ -40,7 +40,6 @@ var ( ) func TestAlterClientQuotasResponse(t *testing.T) { - t.Parallel() // default user defaultUserComponent := QuotaEntityComponent{ EntityType: QuotaEntityUser, diff --git a/alter_configs_request_test.go b/alter_configs_request_test.go index 97e2679bb..e612244c6 100644 --- a/alter_configs_request_test.go +++ b/alter_configs_request_test.go @@ -41,7 +41,6 @@ var ( ) func TestAlterConfigsRequest(t *testing.T) { - t.Parallel() var request *AlterConfigsRequest request = &AlterConfigsRequest{ diff --git a/alter_configs_response_test.go b/alter_configs_response_test.go index ca8afbe9e..62cd5991f 100644 --- a/alter_configs_response_test.go +++ b/alter_configs_response_test.go @@ -21,7 +21,6 @@ var ( ) func TestAlterConfigsResponse(t *testing.T) { - t.Parallel() var response *AlterConfigsResponse response = &AlterConfigsResponse{ diff --git a/alter_partition_reassignments_request_test.go b/alter_partition_reassignments_request_test.go index e60d61096..c917f2d79 100644 --- a/alter_partition_reassignments_request_test.go +++ b/alter_partition_reassignments_request_test.go @@ -33,7 +33,6 @@ var ( ) func TestAlterPartitionReassignmentRequest(t *testing.T) { - t.Parallel() var request *AlterPartitionReassignmentsRequest request = &AlterPartitionReassignmentsRequest{ diff --git a/alter_partition_reassignments_response_test.go b/alter_partition_reassignments_response_test.go index 4a513c1f3..8c39ef6ce 100644 --- a/alter_partition_reassignments_response_test.go +++ b/alter_partition_reassignments_response_test.go @@ -26,7 +26,6 @@ var ( ) func TestAlterPartitionReassignmentResponse(t *testing.T) { - t.Parallel() var response *AlterPartitionReassignmentsResponse = &AlterPartitionReassignmentsResponse{ ThrottleTimeMs: int32(10000), Version: int16(0), diff --git a/alter_user_scram_credentials_request_test.go b/alter_user_scram_credentials_request_test.go index 8ba7f9f20..6fe881906 100644 --- a/alter_user_scram_credentials_request_test.go +++ b/alter_user_scram_credentials_request_test.go @@ -31,7 +31,6 @@ var ( ) func TestAlterUserScramCredentialsRequest(t *testing.T) { - t.Parallel() request := &AlterUserScramCredentialsRequest{ Version: 0, Deletions: []AlterUserScramCredentialsDelete{}, diff --git a/alter_user_scram_credentials_response_test.go b/alter_user_scram_credentials_response_test.go index 31607051f..983500639 100644 --- a/alter_user_scram_credentials_response_test.go +++ b/alter_user_scram_credentials_response_test.go @@ -23,7 +23,6 @@ var ( ) func TestAlterUserScramCredentialsResponse(t *testing.T) { - t.Parallel() response := &AlterUserScramCredentialsResponse{ Version: 0, ThrottleTime: time.Second * 3, diff --git a/api_versions_request_test.go b/api_versions_request_test.go index 15b84f346..371ac9602 100644 --- a/api_versions_request_test.go +++ b/api_versions_request_test.go @@ -13,13 +13,11 @@ var ( ) func TestApiVersionsRequest(t *testing.T) { - t.Parallel() request := new(ApiVersionsRequest) testRequest(t, "basic", request, apiVersionRequest) } func TestApiVersionsRequestV3(t *testing.T) { - t.Parallel() request := new(ApiVersionsRequest) request.Version = 3 request.ClientSoftwareName = "sarama" diff --git a/api_versions_response_test.go b/api_versions_response_test.go index 6574c8317..26e0bf2a4 100644 --- a/api_versions_response_test.go +++ b/api_versions_response_test.go @@ -24,7 +24,6 @@ var ( ) func TestApiVersionsResponse(t *testing.T) { - t.Parallel() response := new(ApiVersionsResponse) testVersionDecodable(t, "no error", response, apiVersionResponse, 0) if response.ErrorCode != int16(ErrNoError) { @@ -42,7 +41,6 @@ func TestApiVersionsResponse(t *testing.T) { } func TestApiVersionsResponseV3(t *testing.T) { - t.Parallel() response := new(ApiVersionsResponse) response.Version = 3 testVersionDecodable(t, "no error", response, apiVersionResponseV3, 3) diff --git a/async_producer_test.go b/async_producer_test.go index 60810b30a..f3b848119 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -100,7 +100,6 @@ func (f flakyEncoder) Encode() ([]byte, error) { } func TestAsyncProducer(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -150,7 +149,6 @@ done: } func TestAsyncProducerMultipleFlushes(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -186,7 +184,6 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) { } func TestAsyncProducerMultipleBrokers(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader0 := NewMockBroker(t, 2) leader1 := NewMockBroker(t, 3) @@ -227,7 +224,6 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) { } func TestAsyncProducerCustomPartitioner(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -270,7 +266,6 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) { } func TestAsyncProducerFailureRetry(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) @@ -319,7 +314,6 @@ func TestAsyncProducerFailureRetry(t *testing.T) { } func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { - t.Parallel() tt := func(t *testing.T, kErr KError) { seedBroker := NewMockBroker(t, 0) broker1 := NewMockBroker(t, 1) @@ -397,18 +391,15 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { } t.Run("retriable error", func(t *testing.T) { - t.Parallel() tt(t, ErrNotLeaderForPartition) }) t.Run("non-retriable error", func(t *testing.T) { - t.Parallel() tt(t, ErrNotController) }) } func TestAsyncProducerEncoderFailures(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -447,7 +438,6 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { // If a Kafka broker becomes unavailable and then returns back in service, then // producer reconnects to it and continues sending messages. func TestAsyncProducerBrokerBounce(t *testing.T) { - t.Parallel() // Given seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -489,7 +479,6 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { } func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) @@ -533,7 +522,6 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { } func TestAsyncProducerMultipleRetries(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) @@ -590,7 +578,6 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { } func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) @@ -659,7 +646,6 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { // https://github.com/Shopify/sarama/issues/2129 func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) { - t.Parallel() //Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -708,7 +694,6 @@ func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) { } func TestAsyncProducerOutOfRetries(t *testing.T) { - t.Parallel() t.Skip("Enable once bug #294 is fixed.") seedBroker := NewMockBroker(t, 1) @@ -766,7 +751,6 @@ func TestAsyncProducerOutOfRetries(t *testing.T) { } func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() @@ -824,7 +808,6 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { } func TestAsyncProducerFlusherRetryCondition(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -891,7 +874,6 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { } func TestAsyncProducerRetryShutdown(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -941,7 +923,6 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { } func TestAsyncProducerNoReturns(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -982,7 +963,6 @@ func TestAsyncProducerNoReturns(t *testing.T) { } func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { - t.Parallel() broker := NewMockBroker(t, 1) metadataResponse := &MetadataResponse{ @@ -1031,7 +1011,6 @@ func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { } func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { - t.Parallel() // Logger = log.New(os.Stderr, "", log.LstdFlags) tests := []struct { name string @@ -1179,7 +1158,6 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { } func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { - t.Parallel() broker := NewMockBroker(t, 1) metadataResponse := &MetadataResponse{ @@ -1229,7 +1207,6 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { } func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { - t.Parallel() broker := NewMockBroker(t, 1) defer broker.Close() @@ -1397,7 +1374,6 @@ func testProducerInterceptor( } func TestAsyncProducerInterceptors(t *testing.T) { - t.Parallel() tests := []struct { name string interceptors []ProducerInterceptor @@ -1451,7 +1427,6 @@ func TestAsyncProducerInterceptors(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() testProducerInterceptor(t, tt.interceptors, tt.expectationFn) }) } diff --git a/balance_strategy_test.go b/balance_strategy_test.go index 4da317cfc..44aee3ec0 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -12,7 +12,6 @@ import ( ) func TestBalanceStrategyRange(t *testing.T) { - t.Parallel() tests := []struct { members map[string][]string topics map[string][]int32 @@ -65,7 +64,6 @@ func TestBalanceStrategyRange(t *testing.T) { } func TestBalanceStrategyRangeAssignmentData(t *testing.T) { - t.Parallel() strategy := BalanceStrategyRange members := make(map[string]ConsumerGroupMemberMetadata, 2) @@ -86,7 +84,6 @@ func TestBalanceStrategyRangeAssignmentData(t *testing.T) { } func TestBalanceStrategyRoundRobin(t *testing.T) { - t.Parallel() tests := []struct { members map[string][]string topics map[string][]int32 @@ -169,7 +166,6 @@ func TestBalanceStrategyRoundRobin(t *testing.T) { } func Test_deserializeTopicPartitionAssignment(t *testing.T) { - t.Parallel() type args struct { userDataBytes []byte } @@ -243,7 +239,6 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() got, err := deserializeTopicPartitionAssignment(tt.args.userDataBytes) if (err != nil) != tt.wantErr { t.Errorf("deserializeTopicPartitionAssignment() error = %v, wantErr %v", err, tt.wantErr) @@ -257,7 +252,6 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) { } func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) { - t.Parallel() strategy := BalanceStrategyRoundRobin members := make(map[string]ConsumerGroupMemberMetadata, 2) @@ -278,7 +272,6 @@ func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) { } func Test_prepopulateCurrentAssignments(t *testing.T) { - t.Parallel() type args struct { members map[string]ConsumerGroupMemberMetadata } @@ -418,7 +411,6 @@ func Test_prepopulateCurrentAssignments(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() _, gotPrevAssignments, err := prepopulateCurrentAssignments(tt.args.members) if (err != nil) != tt.wantErr { @@ -433,7 +425,6 @@ func Test_prepopulateCurrentAssignments(t *testing.T) { } func Test_areSubscriptionsIdentical(t *testing.T) { - t.Parallel() type args struct { partition2AllPotentialConsumers map[topicPartitionAssignment][]string consumer2AllPotentialPartitions map[string][]topicPartitionAssignment @@ -551,7 +542,6 @@ func Test_areSubscriptionsIdentical(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() if got := areSubscriptionsIdentical(tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions); got != tt.want { t.Errorf("areSubscriptionsIdentical() = %v, want %v", got, tt.want) } @@ -560,7 +550,6 @@ func Test_areSubscriptionsIdentical(t *testing.T) { } func Test_sortMemberIDsByPartitionAssignments(t *testing.T) { - t.Parallel() type args struct { assignments map[string][]topicPartitionAssignment } @@ -610,7 +599,6 @@ func Test_sortMemberIDsByPartitionAssignments(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() if got := sortMemberIDsByPartitionAssignments(tt.args.assignments); !reflect.DeepEqual(got, tt.want) { t.Errorf("sortMemberIDsByPartitionAssignments() = %v, want %v", got, tt.want) } @@ -619,7 +607,6 @@ func Test_sortMemberIDsByPartitionAssignments(t *testing.T) { } func Test_sortPartitions(t *testing.T) { - t.Parallel() type args struct { currentAssignment map[string][]topicPartitionAssignment partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair @@ -723,7 +710,6 @@ func Test_sortPartitions(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() got := sortPartitions(tt.args.currentAssignment, tt.args.partitionsWithADifferentPreviousAssignment, tt.args.isFreshAssignment, tt.args.partition2AllPotentialConsumers, tt.args.consumer2AllPotentialPartitions) if tt.want != nil && !reflect.DeepEqual(got, tt.want) { t.Errorf("sortPartitions() = %v, want %v", got, tt.want) @@ -733,7 +719,6 @@ func Test_sortPartitions(t *testing.T) { } func Test_filterAssignedPartitions(t *testing.T) { - t.Parallel() type args struct { currentAssignment map[string][]topicPartitionAssignment partition2AllPotentialConsumers map[topicPartitionAssignment][]string @@ -799,7 +784,6 @@ func Test_filterAssignedPartitions(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() if got := filterAssignedPartitions(tt.args.currentAssignment, tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) { t.Errorf("filterAssignedPartitions() = %v, want %v", got, tt.want) } @@ -808,7 +792,6 @@ func Test_filterAssignedPartitions(t *testing.T) { } func Test_canConsumerParticipateInReassignment(t *testing.T) { - t.Parallel() type args struct { memberID string currentAssignment map[string][]topicPartitionAssignment @@ -903,7 +886,6 @@ func Test_canConsumerParticipateInReassignment(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() if got := canConsumerParticipateInReassignment(tt.args.memberID, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.partition2AllPotentialConsumers); got != tt.want { t.Errorf("canConsumerParticipateInReassignment() = %v, want %v", got, tt.want) } @@ -912,7 +894,6 @@ func Test_canConsumerParticipateInReassignment(t *testing.T) { } func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) { - t.Parallel() type args struct { assignments []topicPartitionAssignment topic topicPartitionAssignment @@ -979,7 +960,6 @@ func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() if got := removeTopicPartitionFromMemberAssignments(tt.args.assignments, tt.args.topic); !reflect.DeepEqual(got, tt.want) { t.Errorf("removeTopicPartitionFromMemberAssignments() = %v, want %v", got, tt.want) } @@ -988,7 +968,6 @@ func Test_removeTopicPartitionFromMemberAssignments(t *testing.T) { } func Test_assignPartition(t *testing.T) { - t.Parallel() type args struct { partition topicPartitionAssignment sortedCurrentSubscriptions []string @@ -1100,7 +1079,6 @@ func Test_assignPartition(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() if got := assignPartition(tt.args.partition, tt.args.sortedCurrentSubscriptions, tt.args.currentAssignment, tt.args.consumer2AllPotentialPartitions, tt.args.currentPartitionConsumer); !reflect.DeepEqual(got, tt.want) { t.Errorf("assignPartition() = %v, want %v", got, tt.want) } @@ -1115,7 +1093,6 @@ func Test_assignPartition(t *testing.T) { } func Test_stickyBalanceStrategy_Plan(t *testing.T) { - t.Parallel() type args struct { members map[string]ConsumerGroupMemberMetadata topics map[string][]int32 @@ -1337,7 +1314,6 @@ func Test_stickyBalanceStrategy_Plan(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} plan, err := s.Plan(tt.args.members, tt.args.topics) verifyPlanIsBalancedAndSticky(t, s, tt.args.members, plan, err) @@ -1347,7 +1323,6 @@ func Test_stickyBalanceStrategy_Plan(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_KIP54_ExampleOne(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1388,7 +1363,6 @@ func Test_stickyBalanceStrategy_Plan_KIP54_ExampleOne(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_KIP54_ExampleTwo(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1433,7 +1407,6 @@ func Test_stickyBalanceStrategy_Plan_KIP54_ExampleTwo(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_KIP54_ExampleThree(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topicNames := []string{"topic1", "topic2"} @@ -1471,7 +1444,6 @@ func Test_stickyBalanceStrategy_Plan_KIP54_ExampleThree(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_AddRemoveConsumerOneTopic(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1508,7 +1480,6 @@ func Test_stickyBalanceStrategy_Plan_AddRemoveConsumerOneTopic(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_PoorRoundRobinAssignmentScenario(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1540,7 +1511,6 @@ func Test_stickyBalanceStrategy_Plan_PoorRoundRobinAssignmentScenario(t *testing } func Test_stickyBalanceStrategy_Plan_AddRemoveTopicTwoConsumers(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1591,7 +1561,6 @@ func Test_stickyBalanceStrategy_Plan_AddRemoveTopicTwoConsumers(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerLeaves(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1632,7 +1601,6 @@ func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerLeaves(t *testi } func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerAdded(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1657,7 +1625,6 @@ func Test_stickyBalanceStrategy_Plan_ReassignmentAfterOneConsumerAdded(t *testin } func Test_stickyBalanceStrategy_Plan_SameSubscriptions(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} // PLAN 1 @@ -1695,7 +1662,6 @@ func Test_stickyBalanceStrategy_Plan_SameSubscriptions(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_LargeAssignmentWithMultipleConsumersLeaving(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -1736,7 +1702,6 @@ func Test_stickyBalanceStrategy_Plan_LargeAssignmentWithMultipleConsumersLeaving } func Test_stickyBalanceStrategy_Plan_NewSubscription(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} members := make(map[string]ConsumerGroupMemberMetadata, 20) @@ -1766,7 +1731,6 @@ func Test_stickyBalanceStrategy_Plan_NewSubscription(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_ReassignmentWithRandomSubscriptionsAndChanges(t *testing.T) { - t.Parallel() r := rand.New(rand.NewSource(time.Now().UnixNano())) minNumConsumers := 20 @@ -1816,7 +1780,6 @@ func Test_stickyBalanceStrategy_Plan_ReassignmentWithRandomSubscriptionsAndChang } func Test_stickyBalanceStrategy_Plan_MoveExistingAssignments(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topics := make(map[string][]int32, 6) @@ -1842,7 +1805,6 @@ func Test_stickyBalanceStrategy_Plan_MoveExistingAssignments(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_Stickiness(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2}} @@ -1871,7 +1833,6 @@ func Test_stickyBalanceStrategy_Plan_Stickiness(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_AssignmentUpdatedForDeletedTopic(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topics := make(map[string][]int32, 2) @@ -1894,7 +1855,6 @@ func Test_stickyBalanceStrategy_Plan_AssignmentUpdatedForDeletedTopic(t *testing } func Test_stickyBalanceStrategy_Plan_NoExceptionRaisedWhenOnlySubscribedTopicDeleted(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2}} @@ -1923,7 +1883,6 @@ func Test_stickyBalanceStrategy_Plan_NoExceptionRaisedWhenOnlySubscribedTopicDel } func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations1(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}} @@ -1974,7 +1933,6 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations1(t *testi } func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}} @@ -2021,7 +1979,6 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testi } func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2, 3, 4, 5}} @@ -2045,7 +2002,6 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGeneration } func Test_stickyBalanceStrategy_Plan_SchemaBackwardCompatibility(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1, 2}} @@ -2066,7 +2022,6 @@ func Test_stickyBalanceStrategy_Plan_SchemaBackwardCompatibility(t *testing.T) { } func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} topics := map[string][]int32{"topic1": {0, 1}} @@ -2086,7 +2041,6 @@ func Test_stickyBalanceStrategy_Plan_ConflictingPreviousAssignments(t *testing.T } func Test_stickyBalanceStrategy_Plan_AssignmentData(t *testing.T) { - t.Parallel() s := &stickyBalanceStrategy{} members := make(map[string]ConsumerGroupMemberMetadata, 2) @@ -2376,7 +2330,6 @@ func getRandomSublist(r *rand.Rand, s []string) []string { } func Test_sortPartitionsByPotentialConsumerAssignments(t *testing.T) { - t.Parallel() type args struct { partition2AllPotentialConsumers map[topicPartitionAssignment][]string } @@ -2456,7 +2409,6 @@ func Test_sortPartitionsByPotentialConsumerAssignments(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() if got := sortPartitionsByPotentialConsumerAssignments(tt.args.partition2AllPotentialConsumers); !reflect.DeepEqual(got, tt.want) { t.Errorf("sortPartitionsByPotentialConsumerAssignments() = %v, want %v", got, tt.want) } diff --git a/broker_test.go b/broker_test.go index 909995bf9..0573e4627 100644 --- a/broker_test.go +++ b/broker_test.go @@ -52,7 +52,6 @@ type brokerMetrics struct { } func TestBrokerAccessors(t *testing.T) { - t.Parallel() broker := NewBroker("abc:123") if broker.ID() != -1 { @@ -112,11 +111,9 @@ func (p produceResponsePromise) Get() (*ProduceResponse, error) { } func TestSimpleBrokerCommunication(t *testing.T) { - t.Parallel() for _, tt := range brokerTestTable { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() Logger.Printf("Testing broker communication for %s", tt.name) mb := NewMockBroker(t, 0) mb.Returns(&mockEncoder{tt.response}) @@ -155,11 +152,9 @@ func TestSimpleBrokerCommunication(t *testing.T) { } func TestBrokerFailedRequest(t *testing.T) { - t.Parallel() for _, tt := range brokerFailedReqTestTable { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() t.Logf("Testing broker communication for %s", tt.name) mb := NewMockBroker(t, 0) if !tt.stopBroker { @@ -217,7 +212,6 @@ func newTokenProvider(token *AccessToken, err error) *TokenProvider { } func TestSASLOAuthBearer(t *testing.T) { - t.Parallel() testTable := []struct { name string authidentity string @@ -290,7 +284,6 @@ func TestSASLOAuthBearer(t *testing.T) { for i, test := range testTable { test := test t.Run(test.name, func(t *testing.T) { - t.Parallel() // mockBroker mocks underlying network logic and broker responses mockBroker := NewMockBroker(t, 0) @@ -373,7 +366,6 @@ func (m *MockSCRAMClient) Done() bool { var _ SCRAMClient = &MockSCRAMClient{} func TestSASLSCRAMSHAXXX(t *testing.T) { - t.Parallel() testTable := []struct { name string mockHandshakeErr KError @@ -415,7 +407,6 @@ func TestSASLSCRAMSHAXXX(t *testing.T) { for i, test := range testTable { test := test t.Run(test.name, func(t *testing.T) { - t.Parallel() // mockBroker mocks underlying network logic and broker responses mockBroker := NewMockBroker(t, 0) broker := NewBroker(mockBroker.Addr()) @@ -486,7 +477,6 @@ func TestSASLSCRAMSHAXXX(t *testing.T) { } func TestSASLPlainAuth(t *testing.T) { - t.Parallel() testTable := []struct { name string authidentity string @@ -520,7 +510,6 @@ func TestSASLPlainAuth(t *testing.T) { for i, test := range testTable { test := test t.Run(test.name, func(t *testing.T) { - t.Parallel() // mockBroker mocks underlying network logic and broker responses mockBroker := NewMockBroker(t, 0) @@ -617,7 +606,6 @@ func TestSASLPlainAuth(t *testing.T) { // TestSASLReadTimeout ensures that the broker connection won't block forever // if the remote end never responds after the handshake func TestSASLReadTimeout(t *testing.T) { - t.Parallel() mockBroker := NewMockBroker(t, 0) defer mockBroker.Close() @@ -666,7 +654,6 @@ func TestSASLReadTimeout(t *testing.T) { } func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { - t.Parallel() testTable := []struct { name string error error @@ -717,7 +704,6 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { for i, test := range testTable { test := test t.Run(test.name, func(t *testing.T) { - t.Parallel() mockBroker := NewMockBroker(t, 0) // broker executes SASL requests against mockBroker @@ -789,7 +775,6 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { } func TestBuildClientFirstMessage(t *testing.T) { - t.Parallel() testTable := []struct { name string token *AccessToken @@ -828,7 +813,6 @@ func TestBuildClientFirstMessage(t *testing.T) { for i, test := range testTable { test := test t.Run(test.name, func(t *testing.T) { - t.Parallel() actual, err := buildClientFirstMessage(test.token) if !reflect.DeepEqual(test.expected, actual) { diff --git a/client_test.go b/client_test.go index f5253a564..412a1429f 100644 --- a/client_test.go +++ b/client_test.go @@ -16,7 +16,6 @@ func safeClose(t testing.TB, c io.Closer) { } func TestSimpleClient(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) seedBroker.Returns(new(MetadataResponse)) @@ -31,7 +30,6 @@ func TestSimpleClient(t *testing.T) { } func TestCachedPartitions(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) replicas := []int32{3, 1, 5} @@ -71,7 +69,6 @@ func TestCachedPartitions(t *testing.T) { } func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) replicas := []int32{seedBroker.BrokerID()} @@ -126,7 +123,6 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) { } func TestClientSeedBrokers(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) metadataResponse := new(MetadataResponse) @@ -143,7 +139,6 @@ func TestClientSeedBrokers(t *testing.T) { } func TestClientMetadata(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -219,7 +214,6 @@ func TestClientMetadata(t *testing.T) { } func TestClientMetadataWithOfflineReplicas(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -308,7 +302,6 @@ func TestClientMetadataWithOfflineReplicas(t *testing.T) { } func TestClientGetOffset(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) leaderAddr := leader.Addr() @@ -357,7 +350,6 @@ func TestClientGetOffset(t *testing.T) { } func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) @@ -395,7 +387,6 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) { } func TestClientReceivingUnknownTopic(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) metadataResponse1 := new(MetadataResponse) @@ -432,7 +423,6 @@ func TestClientReceivingUnknownTopic(t *testing.T) { } func TestClientReceivingPartialMetadata(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -488,7 +478,6 @@ func TestClientReceivingPartialMetadata(t *testing.T) { } func TestClientRefreshBehaviour(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -526,7 +515,6 @@ func TestClientRefreshBehaviour(t *testing.T) { } func TestClientRefreshBrokers(t *testing.T) { - t.Parallel() initialSeed := NewMockBroker(t, 0) leader := NewMockBroker(t, 5) @@ -558,7 +546,6 @@ func TestClientRefreshBrokers(t *testing.T) { } func TestClientRefreshMetadataBrokerOffline(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -589,7 +576,6 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { } func TestClientGetBroker(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 5) @@ -626,7 +612,6 @@ func TestClientGetBroker(t *testing.T) { } func TestClientResurrectDeadSeeds(t *testing.T) { - t.Parallel() initialSeed := NewMockBroker(t, 0) emptyMetadata := new(MetadataResponse) initialSeed.Returns(emptyMetadata) @@ -733,7 +718,6 @@ func TestClientController(t *testing.T) { } func TestClientMetadataTimeout(t *testing.T) { - t.Parallel() tests := []struct { name string timeout time.Duration @@ -759,7 +743,6 @@ func TestClientMetadataTimeout(t *testing.T) { for _, tc := range tests { tc := tc t.Run(tc.name, func(t *testing.T) { - t.Parallel() // Use a responsive broker to create a working client initialSeed := NewMockBroker(t, 0) emptyMetadata := new(MetadataResponse) @@ -820,7 +803,6 @@ func TestClientMetadataTimeout(t *testing.T) { } func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) staleCoordinator := NewMockBroker(t, 2) freshCoordinator := NewMockBroker(t, 3) @@ -900,7 +882,6 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { } func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) coordinator := NewMockBroker(t, 2) @@ -954,7 +935,6 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) { } func TestClientAutorefreshShutdownRace(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) metadataResponse := new(MetadataResponse) diff --git a/client_tls_test.go b/client_tls_test.go index d6d8e85be..8d7a51a33 100644 --- a/client_tls_test.go +++ b/client_tls_test.go @@ -13,7 +13,6 @@ import ( ) func TestTLS(t *testing.T) { - t.Parallel() cakey, err := rsa.GenerateKey(rand.Reader, 2048) if err != nil { t.Fatal(err) @@ -176,7 +175,6 @@ func TestTLS(t *testing.T) { } { tc := tc t.Run(tc.name, func(t *testing.T) { - t.Parallel() doListenerTLSTest(t, tc.Succeed, tc.Server, tc.Client) }) } @@ -221,7 +219,6 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon } func TestSetServerName(t *testing.T) { - t.Parallel() if validServerNameTLS("kafka-server.domain.com:9093", nil).ServerName != "kafka-server.domain.com" { t.Fatal("Expected kafka-server.domain.com as tls.ServerName when tls config is nil") } diff --git a/config_test.go b/config_test.go index 391b884be..fdde01ff3 100644 --- a/config_test.go +++ b/config_test.go @@ -8,7 +8,6 @@ import ( ) func TestDefaultConfigValidates(t *testing.T) { - t.Parallel() config := NewTestConfig() if err := config.Validate(); err != nil { t.Error(err) @@ -19,7 +18,6 @@ func TestDefaultConfigValidates(t *testing.T) { } func TestInvalidClientIDConfigValidates(t *testing.T) { - t.Parallel() config := NewTestConfig() config.ClientID = "foo:bar" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { @@ -28,7 +26,6 @@ func TestInvalidClientIDConfigValidates(t *testing.T) { } func TestEmptyClientIDConfigValidates(t *testing.T) { - t.Parallel() config := NewTestConfig() config.ClientID = "" if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" { @@ -43,7 +40,6 @@ func (t *DummyTokenProvider) Token() (*AccessToken, error) { } func TestNetConfigValidates(t *testing.T) { - t.Parallel() tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs @@ -239,7 +235,6 @@ func TestNetConfigValidates(t *testing.T) { } func TestMetadataConfigValidates(t *testing.T) { - t.Parallel() tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs @@ -278,7 +273,6 @@ func TestMetadataConfigValidates(t *testing.T) { } func TestAdminConfigValidates(t *testing.T) { - t.Parallel() tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs @@ -303,7 +297,6 @@ func TestAdminConfigValidates(t *testing.T) { } func TestProducerConfigValidates(t *testing.T) { - t.Parallel() tests := []struct { name string cfg func(*Config) // resorting to using a function as a param because of internal composite structs @@ -433,7 +426,6 @@ func TestProducerConfigValidates(t *testing.T) { } func TestConsumerConfigValidates(t *testing.T) { - t.Parallel() tests := []struct { name string cfg func(*Config) @@ -467,7 +459,6 @@ func TestConsumerConfigValidates(t *testing.T) { } func TestLZ4ConfigValidation(t *testing.T) { - t.Parallel() config := NewTestConfig() config.Producer.Compression = CompressionLZ4 if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" { @@ -480,7 +471,6 @@ func TestLZ4ConfigValidation(t *testing.T) { } func TestZstdConfigValidation(t *testing.T) { - t.Parallel() config := NewTestConfig() config.Producer.Compression = CompressionZSTD if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" { diff --git a/consumer_group_members_test.go b/consumer_group_members_test.go index a7ce8c350..a99de61c6 100644 --- a/consumer_group_members_test.go +++ b/consumer_group_members_test.go @@ -45,7 +45,6 @@ var ( ) func TestConsumerGroupMemberMetadata(t *testing.T) { - t.Parallel() meta := &ConsumerGroupMemberMetadata{ Version: 0, Topics: []string{"one", "two"}, @@ -69,7 +68,6 @@ func TestConsumerGroupMemberMetadata(t *testing.T) { } func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { - t.Parallel() meta := new(ConsumerGroupMemberMetadata) if err := decode(groupMemberMetadataV1, meta); err != nil { t.Error("Failed to decode V1 data", err) @@ -80,7 +78,6 @@ func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) { } func TestConsumerGroupMemberAssignment(t *testing.T) { - t.Parallel() amt := &ConsumerGroupMemberAssignment{ Version: 0, Topics: map[string][]int32{ diff --git a/consumer_metadata_request_test.go b/consumer_metadata_request_test.go index 100c9354a..3f6bcdb57 100644 --- a/consumer_metadata_request_test.go +++ b/consumer_metadata_request_test.go @@ -15,7 +15,6 @@ var ( ) func TestConsumerMetadataRequest(t *testing.T) { - t.Parallel() request := new(ConsumerMetadataRequest) testEncodable(t, "empty string", request, consumerMetadataRequestEmpty) testVersionDecodable(t, "empty string", request, consumerMetadataRequestEmpty, 0) diff --git a/consumer_metadata_response_test.go b/consumer_metadata_response_test.go index 6b0f782d2..a7d74b55b 100644 --- a/consumer_metadata_response_test.go +++ b/consumer_metadata_response_test.go @@ -19,7 +19,6 @@ var ( ) func TestConsumerMetadataResponseError(t *testing.T) { - t.Parallel() response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress} testEncodable(t, "", response, consumerMetadataResponseError) @@ -34,7 +33,6 @@ func TestConsumerMetadataResponseError(t *testing.T) { } func TestConsumerMetadataResponseSuccess(t *testing.T) { - t.Parallel() broker := NewBroker("foo:52445") broker.id = 0xAB response := ConsumerMetadataResponse{ diff --git a/consumer_test.go b/consumer_test.go index 038bbd542..48c8090e9 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -16,7 +16,6 @@ var testMsg = StringEncoder("Foo") // If a particular offset is provided then messages are consumed starting from // that offset. func TestConsumerOffsetManual(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 0) @@ -79,7 +78,6 @@ func TestConsumerOffsetManual(t *testing.T) { } func TestPauseResumeConsumption(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 0) @@ -164,7 +162,6 @@ func TestPauseResumeConsumption(t *testing.T) { // message indeed corresponds to the offset that broker claims to be the // newest in its metadata response. func TestConsumerOffsetNewest(t *testing.T) { - t.Parallel() // Given offsetNewest := int64(10) offsetNewestAfterFetchRequest := int64(50) @@ -211,7 +208,6 @@ func TestConsumerOffsetNewest(t *testing.T) { // If `OffsetOldest` is passed as the initial offset then the first consumed // message is indeed the first available in the partition. func TestConsumerOffsetOldest(t *testing.T) { - t.Parallel() // Given offsetNewest := int64(10) broker0 := NewMockBroker(t, 0) @@ -259,7 +255,6 @@ func TestConsumerOffsetOldest(t *testing.T) { // It is possible to close a partition consumer and create the same anew. func TestConsumerRecreate(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -301,7 +296,6 @@ func TestConsumerRecreate(t *testing.T) { // An attempt to consume the same partition twice should fail. func TestConsumerDuplicate(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -413,7 +407,6 @@ func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) { // If consumer fails to refresh metadata it keeps retrying with frequency // specified by `Config.Consumer.Retry.Backoff`. func TestConsumerLeaderRefreshError(t *testing.T) { - t.Parallel() config := NewTestConfig() config.Net.ReadTimeout = 100 * time.Millisecond config.Consumer.Retry.Backoff = 200 * time.Millisecond @@ -424,7 +417,6 @@ func TestConsumerLeaderRefreshError(t *testing.T) { } func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) { - t.Parallel() var calls int32 = 0 config := NewTestConfig() @@ -445,7 +437,6 @@ func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) { } func TestConsumerInvalidTopic(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 100) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -473,7 +464,6 @@ func TestConsumerInvalidTopic(t *testing.T) { // Nothing bad happens if a partition consumer that has no leader assigned at // the moment is closed. func TestConsumerClosePartitionWithoutLeader(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 100) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -528,7 +518,6 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) { // actual offset range for the partition, then the partition consumer stops // immediately closing its output channels. func TestConsumerShutsDownOutOfRange(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 0) fetchResponse := new(FetchResponse) @@ -567,7 +556,6 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) { // If a fetch response contains messages with offsets that are smaller then // requested, then such messages are ignored. func TestConsumerExtraOffsets(t *testing.T) { - t.Parallel() // Given legacyFetchResponse := &FetchResponse{} legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -642,7 +630,6 @@ func TestConsumerExtraOffsets(t *testing.T) { // messages older then requested, even though there would be // more messages if higher offset was requested. func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { - t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 4} fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1) @@ -691,7 +678,6 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { } func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { - t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 4} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -735,7 +721,6 @@ func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { } func TestConsumeMessageWithSessionIDs(t *testing.T) { - t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 7} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -785,7 +770,6 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) { } func TestConsumeMessagesFromReadReplica(t *testing.T) { - t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 11} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -864,7 +848,6 @@ func TestConsumeMessagesFromReadReplica(t *testing.T) { } func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) { - t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 11} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) @@ -917,7 +900,6 @@ func TestConsumeMessagesFromReadReplicaLeaderFallback(t *testing.T) { } func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) { - t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 11} block1 := fetchResponse1.getOrCreateBlock("my_topic", 0) @@ -990,7 +972,6 @@ func TestConsumeMessagesFromReadReplicaErrorReplicaNotAvailable(t *testing.T) { } func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { - t.Parallel() // Given fetchResponse1 := &FetchResponse{Version: 11} block1 := fetchResponse1.getOrCreateBlock("my_topic", 0) @@ -1069,7 +1050,6 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) { // // See https://github.com/Shopify/sarama/issues/1927 func TestConsumeMessagesTrackLeader(t *testing.T) { - t.Parallel() cfg := NewConfig() cfg.ClientID = t.Name() cfg.Metadata.RefreshFrequency = time.Millisecond * 50 @@ -1181,7 +1161,6 @@ func TestConsumeMessagesTrackLeader(t *testing.T) { // It is fine if offsets of fetched messages are not sequential (although // strictly increasing!). func TestConsumerNonSequentialOffsets(t *testing.T) { - t.Parallel() // Given legacyFetchResponse := &FetchResponse{} legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5) @@ -1241,7 +1220,6 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { // If leadership for a partition is changing then consumer resolves the new // leader and switches to it. func TestConsumerRebalancingMultiplePartitions(t *testing.T) { - t.Parallel() // initial setup seedBroker := NewMockBroker(t, 10) leader0 := NewMockBroker(t, 0) @@ -1440,7 +1418,6 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { // consumer channel buffer is full then that does not affect the ability to // read messages by the other consumer. func TestConsumerInterleavedClose(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -1489,7 +1466,6 @@ func TestConsumerInterleavedClose(t *testing.T) { } func TestConsumerBounceWithReferenceOpen(t *testing.T) { - t.Parallel() broker0 := NewMockBroker(t, 0) broker0Addr := broker0.Addr() broker1 := NewMockBroker(t, 1) @@ -1588,7 +1564,6 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { } func TestConsumerOffsetOutOfRange(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 2) broker0.SetHandlerByMap(map[string]MockResponse{ @@ -1621,7 +1596,6 @@ func TestConsumerOffsetOutOfRange(t *testing.T) { } func TestConsumerExpiryTicker(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 0) fetchResponse1 := &FetchResponse{} @@ -1664,7 +1638,6 @@ func TestConsumerExpiryTicker(t *testing.T) { } func TestConsumerTimestamps(t *testing.T) { - t.Parallel() now := time.Now().Truncate(time.Millisecond) type testMessage struct { key Encoder @@ -1782,7 +1755,6 @@ func TestConsumerTimestamps(t *testing.T) { // When set to ReadCommitted, no uncommitted message should be available in messages channel func TestExcludeUncommitted(t *testing.T) { - t.Parallel() // Given broker0 := NewMockBroker(t, 0) @@ -1896,7 +1868,6 @@ ConsumerLoop: } func Test_partitionConsumer_parseResponse(t *testing.T) { - t.Parallel() type args struct { response *FetchResponse } @@ -1925,7 +1896,6 @@ func Test_partitionConsumer_parseResponse(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() child := &partitionConsumer{ broker: &brokerConsumer{ broker: &Broker{}, @@ -1945,7 +1915,6 @@ func Test_partitionConsumer_parseResponse(t *testing.T) { } func Test_partitionConsumer_parseResponseEmptyBatch(t *testing.T) { - t.Parallel() lrbOffset := int64(5) block := &FetchResponseBlock{ HighWaterMarkOffset: 10, @@ -2028,7 +1997,6 @@ func testConsumerInterceptor( } func TestConsumerInterceptors(t *testing.T) { - t.Parallel() tests := []struct { name string interceptors []ConsumerInterceptor @@ -2086,7 +2054,6 @@ func TestConsumerInterceptors(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) }) } diff --git a/control_record_test.go b/control_record_test.go index 871edbab6..8a1d74863 100644 --- a/control_record_test.go +++ b/control_record_test.go @@ -46,7 +46,6 @@ func assertRecordType(t *testing.T, r *ControlRecord, expected ControlRecordType } func TestDecodingControlRecords(t *testing.T) { - t.Parallel() abortTx := testDecode(t, "abort transaction", abortTxCtrlRecKey, abortTxCtrlRecValue) assertRecordType(t, &abortTx, ControlRecordAbort) diff --git a/create_partitions_request_test.go b/create_partitions_request_test.go index 0c5923cda..8230b3866 100644 --- a/create_partitions_request_test.go +++ b/create_partitions_request_test.go @@ -30,7 +30,6 @@ var ( ) func TestCreatePartitionsRequest(t *testing.T) { - t.Parallel() req := &CreatePartitionsRequest{ TopicPartitions: map[string]*TopicPartition{ "topic": { diff --git a/create_partitions_response_test.go b/create_partitions_response_test.go index 9f9ee64ee..2f5967494 100644 --- a/create_partitions_response_test.go +++ b/create_partitions_response_test.go @@ -25,7 +25,6 @@ var ( ) func TestCreatePartitionsResponse(t *testing.T) { - t.Parallel() resp := &CreatePartitionsResponse{ ThrottleTime: 100 * time.Millisecond, TopicPartitionErrors: map[string]*TopicPartitionError{ @@ -53,7 +52,6 @@ func TestCreatePartitionsResponse(t *testing.T) { } func TestTopicPartitionError(t *testing.T) { - t.Parallel() // Assert that TopicPartitionError satisfies error interface var err error = &TopicPartitionError{ Err: ErrTopicAuthorizationFailed, diff --git a/create_topics_request_test.go b/create_topics_request_test.go index 3298e06a2..daf223c53 100644 --- a/create_topics_request_test.go +++ b/create_topics_request_test.go @@ -23,7 +23,6 @@ var ( ) func TestCreateTopicsRequest(t *testing.T) { - t.Parallel() retention := "-1" req := &CreateTopicsRequest{ diff --git a/create_topics_response_test.go b/create_topics_response_test.go index 73d803784..2dfb8d159 100644 --- a/create_topics_response_test.go +++ b/create_topics_response_test.go @@ -29,7 +29,6 @@ var ( ) func TestCreateTopicsResponse(t *testing.T) { - t.Parallel() resp := &CreateTopicsResponse{ TopicErrors: map[string]*TopicError{ "topic": { @@ -53,7 +52,6 @@ func TestCreateTopicsResponse(t *testing.T) { } func TestTopicError(t *testing.T) { - t.Parallel() // Assert that TopicError satisfies error interface var err error = &TopicError{ Err: ErrTopicAuthorizationFailed, diff --git a/delete_groups_request_test.go b/delete_groups_request_test.go index cf1144493..908172498 100644 --- a/delete_groups_request_test.go +++ b/delete_groups_request_test.go @@ -18,7 +18,6 @@ var ( ) func TestDeleteGroupsRequest(t *testing.T) { - t.Parallel() var request *DeleteGroupsRequest request = new(DeleteGroupsRequest) diff --git a/delete_groups_response_test.go b/delete_groups_response_test.go index e524d288f..6f622b5f0 100644 --- a/delete_groups_response_test.go +++ b/delete_groups_response_test.go @@ -26,7 +26,6 @@ var ( ) func TestDeleteGroupsResponse(t *testing.T) { - t.Parallel() var response *DeleteGroupsResponse response = new(DeleteGroupsResponse) diff --git a/delete_offsets_request_test.go b/delete_offsets_request_test.go index 3e9769c8f..0eea3fc9f 100644 --- a/delete_offsets_request_test.go +++ b/delete_offsets_request_test.go @@ -10,7 +10,6 @@ var ( ) func TestDeleteOffsetsRequest(t *testing.T) { - t.Parallel() var request *DeleteOffsetsRequest request = new(DeleteOffsetsRequest) diff --git a/delete_offsets_response_test.go b/delete_offsets_response_test.go index aec939bbb..8d069dbcb 100644 --- a/delete_offsets_response_test.go +++ b/delete_offsets_response_test.go @@ -33,7 +33,6 @@ var ( ) func TestDeleteOffsetsResponse(t *testing.T) { - t.Parallel() var response *DeleteOffsetsResponse response = &DeleteOffsetsResponse{ diff --git a/delete_records_request_test.go b/delete_records_request_test.go index f42d2ffe7..c72960cfb 100644 --- a/delete_records_request_test.go +++ b/delete_records_request_test.go @@ -19,7 +19,6 @@ var deleteRecordsRequest = []byte{ } func TestDeleteRecordsRequest(t *testing.T) { - t.Parallel() req := &DeleteRecordsRequest{ Topics: map[string]*DeleteRecordsRequestTopic{ "topic": { diff --git a/delete_records_response_test.go b/delete_records_response_test.go index 32e535b20..3653cdc41 100644 --- a/delete_records_response_test.go +++ b/delete_records_response_test.go @@ -21,7 +21,6 @@ var deleteRecordsResponse = []byte{ } func TestDeleteRecordsResponse(t *testing.T) { - t.Parallel() resp := &DeleteRecordsResponse{ Version: 0, ThrottleTime: 100 * time.Millisecond, diff --git a/delete_topics_request_test.go b/delete_topics_request_test.go index 6f4ea1ba3..c313a2f3b 100644 --- a/delete_topics_request_test.go +++ b/delete_topics_request_test.go @@ -13,7 +13,6 @@ var deleteTopicsRequest = []byte{ } func TestDeleteTopicsRequestV0(t *testing.T) { - t.Parallel() req := &DeleteTopicsRequest{ Version: 0, Topics: []string{"topic", "other"}, @@ -24,7 +23,6 @@ func TestDeleteTopicsRequestV0(t *testing.T) { } func TestDeleteTopicsRequestV1(t *testing.T) { - t.Parallel() req := &DeleteTopicsRequest{ Version: 1, Topics: []string{"topic", "other"}, diff --git a/delete_topics_response_test.go b/delete_topics_response_test.go index 534acd4b8..516f1a3bd 100644 --- a/delete_topics_response_test.go +++ b/delete_topics_response_test.go @@ -21,7 +21,6 @@ var ( ) func TestDeleteTopicsResponse(t *testing.T) { - t.Parallel() resp := &DeleteTopicsResponse{ TopicErrorCodes: map[string]KError{ "topic": ErrNoError, diff --git a/describe_client_quotas_request_test.go b/describe_client_quotas_request_test.go index 790443e4a..6b8c0c9c5 100644 --- a/describe_client_quotas_request_test.go +++ b/describe_client_quotas_request_test.go @@ -37,7 +37,6 @@ var ( ) func TestDescribeClientQuotasRequest(t *testing.T) { - t.Parallel() // Match All req := &DescribeClientQuotasRequest{ Components: []QuotaFilterComponent{}, diff --git a/describe_client_quotas_response_test.go b/describe_client_quotas_response_test.go index 1248838ec..6c681d11c 100644 --- a/describe_client_quotas_response_test.go +++ b/describe_client_quotas_response_test.go @@ -44,7 +44,6 @@ var ( ) func TestDescribeClientQuotasResponse(t *testing.T) { - t.Parallel() // Response With Error errMsg := "Custom entity type 'faulty' not supported" res := &DescribeClientQuotasResponse{ diff --git a/describe_configs_request_test.go b/describe_configs_request_test.go index 79160ad2f..a1148f401 100644 --- a/describe_configs_request_test.go +++ b/describe_configs_request_test.go @@ -49,7 +49,6 @@ var ( ) func TestDescribeConfigsRequestv0(t *testing.T) { - t.Parallel() var request *DescribeConfigsRequest request = &DescribeConfigsRequest{ @@ -103,7 +102,6 @@ func TestDescribeConfigsRequestv0(t *testing.T) { } func TestDescribeConfigsRequestv1(t *testing.T) { - t.Parallel() request := &DescribeConfigsRequest{ Version: 1, Resources: []*ConfigResource{ diff --git a/describe_configs_response_test.go b/describe_configs_response_test.go index 0aa602d6a..ea8f28e57 100644 --- a/describe_configs_response_test.go +++ b/describe_configs_response_test.go @@ -93,7 +93,6 @@ var ( ) func TestDescribeConfigsResponsev0(t *testing.T) { - t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ @@ -128,7 +127,6 @@ func TestDescribeConfigsResponsev0(t *testing.T) { } func TestDescribeConfigsResponseWithDefaultv0(t *testing.T) { - t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ @@ -163,7 +161,6 @@ func TestDescribeConfigsResponseWithDefaultv0(t *testing.T) { } func TestDescribeConfigsResponsev1(t *testing.T) { - t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ @@ -200,7 +197,6 @@ func TestDescribeConfigsResponsev1(t *testing.T) { } func TestDescribeConfigsResponseWithSynonym(t *testing.T) { - t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ @@ -243,7 +239,6 @@ func TestDescribeConfigsResponseWithSynonym(t *testing.T) { } func TestDescribeConfigsResponseWithDefaultv1(t *testing.T) { - t.Parallel() var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ diff --git a/describe_groups_request_test.go b/describe_groups_request_test.go index 3a538263d..7d45f3fee 100644 --- a/describe_groups_request_test.go +++ b/describe_groups_request_test.go @@ -18,7 +18,6 @@ var ( ) func TestDescribeGroupsRequest(t *testing.T) { - t.Parallel() var request *DescribeGroupsRequest request = new(DescribeGroupsRequest) diff --git a/describe_groups_response_test.go b/describe_groups_response_test.go index b63b25ec3..dd3973191 100644 --- a/describe_groups_response_test.go +++ b/describe_groups_response_test.go @@ -35,7 +35,6 @@ var ( ) func TestDescribeGroupsResponse(t *testing.T) { - t.Parallel() var response *DescribeGroupsResponse response = new(DescribeGroupsResponse) diff --git a/describe_log_dirs_request_test.go b/describe_log_dirs_request_test.go index f2b140ad1..65b88c685 100644 --- a/describe_log_dirs_request_test.go +++ b/describe_log_dirs_request_test.go @@ -15,7 +15,6 @@ var ( ) func TestDescribeLogDirsRequest(t *testing.T) { - t.Parallel() request := &DescribeLogDirsRequest{ Version: 0, DescribeTopics: []DescribeLogDirsRequestTopic{}, diff --git a/describe_log_dirs_response_test.go b/describe_log_dirs_response_test.go index f533e8e0c..5a1eb5b5e 100644 --- a/describe_log_dirs_response_test.go +++ b/describe_log_dirs_response_test.go @@ -32,7 +32,6 @@ var ( ) func TestDescribeLogDirsResponse(t *testing.T) { - t.Parallel() // Test empty response response := &DescribeLogDirsResponse{ LogDirs: []DescribeLogDirsResponseDirMetadata{}, diff --git a/describe_user_scram_credentials_request_test.go b/describe_user_scram_credentials_request_test.go index a522a075b..87e52bab6 100644 --- a/describe_user_scram_credentials_request_test.go +++ b/describe_user_scram_credentials_request_test.go @@ -15,7 +15,6 @@ var ( ) func TestDescribeUserScramCredentialsRequest(t *testing.T) { - t.Parallel() request := &DescribeUserScramCredentialsRequest{ Version: 0, DescribeUsers: []DescribeUserScramCredentialsRequestUser{}, diff --git a/describe_user_scram_credentials_response_test.go b/describe_user_scram_credentials_response_test.go index 66b7a2884..a251eaf7a 100644 --- a/describe_user_scram_credentials_response_test.go +++ b/describe_user_scram_credentials_response_test.go @@ -30,7 +30,6 @@ var ( ) func TestDescribeUserScramCredentialsResponse(t *testing.T) { - t.Parallel() response := &DescribeUserScramCredentialsResponse{ Version: 0, ThrottleTime: time.Second * 3, diff --git a/end_txn_request_test.go b/end_txn_request_test.go index cc8fc7ee2..6f5d4480e 100644 --- a/end_txn_request_test.go +++ b/end_txn_request_test.go @@ -10,7 +10,6 @@ var endTxnRequest = []byte{ } func TestEndTxnRequest(t *testing.T) { - t.Parallel() req := &EndTxnRequest{ TransactionalID: "txn", ProducerID: 8000, diff --git a/end_txn_response_test.go b/end_txn_response_test.go index 291cfb653..d7ae1c988 100644 --- a/end_txn_response_test.go +++ b/end_txn_response_test.go @@ -11,7 +11,6 @@ var endTxnResponse = []byte{ } func TestEndTxnResponse(t *testing.T) { - t.Parallel() resp := &EndTxnResponse{ ThrottleTime: 100 * time.Millisecond, Err: ErrInvalidProducerIDMapping, diff --git a/examples/http_server/http_server_test.go b/examples/http_server/http_server_test.go index f171c85b2..ac3ba4d07 100644 --- a/examples/http_server/http_server_test.go +++ b/examples/http_server/http_server_test.go @@ -14,7 +14,6 @@ import ( // and one data collector entry. Let's assume both will succeed. // We should return a HTTP 200 status. func TestCollectSuccessfully(t *testing.T) { - t.Parallel() dataCollectorMock := mocks.NewSyncProducer(t, nil) dataCollectorMock.ExpectSendMessageAndSucceed() @@ -51,7 +50,6 @@ func TestCollectSuccessfully(t *testing.T) { // Now, let's see if we handle the case of not being able to produce // to the data collector properly. In this case we should return a 500 status. func TestCollectionFailure(t *testing.T) { - t.Parallel() dataCollectorMock := mocks.NewSyncProducer(t, nil) dataCollectorMock.ExpectSendMessageAndFail(sarama.ErrRequestTimedOut) @@ -80,7 +78,6 @@ func TestCollectionFailure(t *testing.T) { // so we are not setting any expectations on the dataCollectorMock. It // will still generate an access log entry though. func TestWrongPath(t *testing.T) { - t.Parallel() dataCollectorMock := mocks.NewSyncProducer(t, nil) accessLogProducerMock := mocks.NewAsyncProducer(t, nil) diff --git a/fetch_request_test.go b/fetch_request_test.go index 9fd59b40c..0b807b9f8 100644 --- a/fetch_request_test.go +++ b/fetch_request_test.go @@ -51,15 +51,12 @@ var ( ) func TestFetchRequest(t *testing.T) { - t.Parallel() t.Run("no blocks", func(t *testing.T) { - t.Parallel() request := new(FetchRequest) testRequest(t, "no blocks", request, fetchRequestNoBlocks) }) t.Run("with properties", func(t *testing.T) { - t.Parallel() request := new(FetchRequest) request.MaxWaitTime = 0x20 request.MinBytes = 0xEF @@ -67,7 +64,6 @@ func TestFetchRequest(t *testing.T) { }) t.Run("one block", func(t *testing.T) { - t.Parallel() request := new(FetchRequest) request.MaxWaitTime = 0 request.MinBytes = 0 @@ -76,7 +72,6 @@ func TestFetchRequest(t *testing.T) { }) t.Run("one block v4", func(t *testing.T) { - t.Parallel() request := new(FetchRequest) request.Version = 4 request.MaxBytes = 0xFF @@ -86,7 +81,6 @@ func TestFetchRequest(t *testing.T) { }) t.Run("one block v11 rackid", func(t *testing.T) { - t.Parallel() request := new(FetchRequest) request.Version = 11 request.MaxBytes = 0xFF diff --git a/fetch_response_test.go b/fetch_response_test.go index badbac821..3ba3eb5d1 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -202,7 +202,6 @@ var ( ) func TestEmptyFetchResponse(t *testing.T) { - t.Parallel() response := FetchResponse{} testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0) @@ -212,7 +211,6 @@ func TestEmptyFetchResponse(t *testing.T) { } func TestOneMessageFetchResponse(t *testing.T) { - t.Parallel() response := FetchResponse{} testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0) @@ -269,7 +267,6 @@ func TestOneMessageFetchResponse(t *testing.T) { } func TestOverflowMessageFetchResponse(t *testing.T) { - t.Parallel() response := FetchResponse{} testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0) @@ -330,7 +327,6 @@ func TestOverflowMessageFetchResponse(t *testing.T) { } func TestOneRecordFetchResponse(t *testing.T) { - t.Parallel() response := FetchResponse{} testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4) @@ -380,7 +376,6 @@ func TestOneRecordFetchResponse(t *testing.T) { } func TestPartailFetchResponse(t *testing.T) { - t.Parallel() response := FetchResponse{} testVersionDecodable(t, "partial record", &response, partialFetchResponse, 4) @@ -423,7 +418,6 @@ func TestPartailFetchResponse(t *testing.T) { } func TestEmptyRecordsFetchResponse(t *testing.T) { - t.Parallel() response := FetchResponse{} testVersionDecodable(t, "empty record", &response, emptyRecordsFetchResponsev11, 11) @@ -469,7 +463,6 @@ func TestEmptyRecordsFetchResponse(t *testing.T) { } func TestOneMessageFetchResponseV4(t *testing.T) { - t.Parallel() response := FetchResponse{} testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4) @@ -526,7 +519,6 @@ func TestOneMessageFetchResponseV4(t *testing.T) { } func TestPreferredReplicaFetchResponseV11(t *testing.T) { - t.Parallel() response := FetchResponse{} testVersionDecodable( t, "preferred replica fetch response v11", &response, diff --git a/find_coordinator_request_test.go b/find_coordinator_request_test.go index c5c56ceb5..7e889b074 100644 --- a/find_coordinator_request_test.go +++ b/find_coordinator_request_test.go @@ -15,7 +15,6 @@ var ( ) func TestFindCoordinatorRequest(t *testing.T) { - t.Parallel() req := &FindCoordinatorRequest{ Version: 1, CoordinatorKey: "group", diff --git a/find_coordinator_response_test.go b/find_coordinator_response_test.go index f790e01d0..417a76c6e 100644 --- a/find_coordinator_response_test.go +++ b/find_coordinator_response_test.go @@ -6,7 +6,6 @@ import ( ) func TestFindCoordinatorResponse(t *testing.T) { - t.Parallel() errMsg := "kaboom" for _, tc := range []struct { diff --git a/heartbeat_request_test.go b/heartbeat_request_test.go index c2588386c..2653f82c7 100644 --- a/heartbeat_request_test.go +++ b/heartbeat_request_test.go @@ -9,7 +9,6 @@ var basicHeartbeatRequest = []byte{ } func TestHeartbeatRequest(t *testing.T) { - t.Parallel() request := new(HeartbeatRequest) request.GroupId = "foo" request.GenerationId = 66051 diff --git a/heartbeat_response_test.go b/heartbeat_response_test.go index 6ca6eab61..e60146bdc 100644 --- a/heartbeat_response_test.go +++ b/heartbeat_response_test.go @@ -7,7 +7,6 @@ var heartbeatResponseNoError = []byte{ } func TestHeartbeatResponse(t *testing.T) { - t.Parallel() response := new(HeartbeatResponse) testVersionDecodable(t, "no error", response, heartbeatResponseNoError, 0) if response.Err != ErrNoError { diff --git a/incremental_alter_configs_request_test.go b/incremental_alter_configs_request_test.go index 977869011..d239bf5f5 100644 --- a/incremental_alter_configs_request_test.go +++ b/incremental_alter_configs_request_test.go @@ -44,7 +44,6 @@ var ( ) func TestIncrementalAlterConfigsRequest(t *testing.T) { - t.Parallel() var request *IncrementalAlterConfigsRequest request = &IncrementalAlterConfigsRequest{ diff --git a/incremental_alter_configs_response_test.go b/incremental_alter_configs_response_test.go index 0e9f92adc..697156709 100644 --- a/incremental_alter_configs_response_test.go +++ b/incremental_alter_configs_response_test.go @@ -21,7 +21,6 @@ var ( ) func TestIncrementalAlterConfigsResponse(t *testing.T) { - t.Parallel() var response *IncrementalAlterConfigsResponse response = &IncrementalAlterConfigsResponse{ diff --git a/init_producer_id_request_test.go b/init_producer_id_request_test.go index 78261884c..5c83d8514 100644 --- a/init_producer_id_request_test.go +++ b/init_producer_id_request_test.go @@ -18,7 +18,6 @@ var ( ) func TestInitProducerIDRequest(t *testing.T) { - t.Parallel() req := &InitProducerIDRequest{ TransactionTimeout: 100 * time.Millisecond, } diff --git a/init_producer_id_response_test.go b/init_producer_id_response_test.go index 15f07f330..b0649386a 100644 --- a/init_producer_id_response_test.go +++ b/init_producer_id_response_test.go @@ -22,7 +22,6 @@ var ( ) func TestInitProducerIDResponse(t *testing.T) { - t.Parallel() resp := &InitProducerIDResponse{ ThrottleTime: 100 * time.Millisecond, ProducerID: 8000, diff --git a/join_group_request_test.go b/join_group_request_test.go index bcdd85575..a2e17f980 100644 --- a/join_group_request_test.go +++ b/join_group_request_test.go @@ -34,7 +34,6 @@ var ( ) func TestJoinGroupRequest(t *testing.T) { - t.Parallel() request := new(JoinGroupRequest) request.GroupId = "TestGroup" request.SessionTimeout = 100 @@ -43,7 +42,6 @@ func TestJoinGroupRequest(t *testing.T) { } func TestJoinGroupRequestV0_OneProtocol(t *testing.T) { - t.Parallel() request := new(JoinGroupRequest) request.GroupId = "TestGroup" request.SessionTimeout = 100 @@ -57,7 +55,6 @@ func TestJoinGroupRequestV0_OneProtocol(t *testing.T) { } func TestJoinGroupRequestDeprecatedEncode(t *testing.T) { - t.Parallel() request := new(JoinGroupRequest) request.GroupId = "TestGroup" request.SessionTimeout = 100 @@ -71,7 +68,6 @@ func TestJoinGroupRequestDeprecatedEncode(t *testing.T) { } func TestJoinGroupRequestV1(t *testing.T) { - t.Parallel() request := new(JoinGroupRequest) request.Version = 1 request.GroupId = "TestGroup" diff --git a/join_group_response_test.go b/join_group_response_test.go index 6ec1b643e..a43b37a95 100644 --- a/join_group_response_test.go +++ b/join_group_response_test.go @@ -56,7 +56,6 @@ var ( ) func TestJoinGroupResponseV0(t *testing.T) { - t.Parallel() var response *JoinGroupResponse response = new(JoinGroupResponse) @@ -118,7 +117,6 @@ func TestJoinGroupResponseV0(t *testing.T) { } func TestJoinGroupResponseV1(t *testing.T) { - t.Parallel() response := new(JoinGroupResponse) testVersionDecodable(t, "no error", response, joinGroupResponseV1, 1) if response.Err != ErrNoError { @@ -145,7 +143,6 @@ func TestJoinGroupResponseV1(t *testing.T) { } func TestJoinGroupResponseV2(t *testing.T) { - t.Parallel() response := new(JoinGroupResponse) testVersionDecodable(t, "no error", response, joinGroupResponseV2, 2) if response.ThrottleTime != 100 { diff --git a/kerberos_client_test.go b/kerberos_client_test.go index b1a6d48fd..7c2ed31bc 100644 --- a/kerberos_client_test.go +++ b/kerberos_client_test.go @@ -58,7 +58,6 @@ const ( ) func TestFaildToCreateKerberosConfig(t *testing.T) { - t.Parallel() expectedErr := errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory") clientConfig := NewTestConfig() clientConfig.Net.SASL.Mechanism = SASLTypeGSSAPI @@ -77,7 +76,6 @@ func TestFaildToCreateKerberosConfig(t *testing.T) { } func TestCreateWithPassword(t *testing.T) { - t.Parallel() kerberosConfig, err := krbcfg.NewFromString(krb5cfg) if err != nil { t.Fatal(err) @@ -108,7 +106,6 @@ func TestCreateWithPassword(t *testing.T) { } func TestCreateWithKeyTab(t *testing.T) { - t.Parallel() kerberosConfig, err := krbcfg.NewFromString(krb5cfg) if err != nil { t.Fatal(err) @@ -131,7 +128,6 @@ func TestCreateWithKeyTab(t *testing.T) { } func TestCreateWithDisablePAFXFAST(t *testing.T) { - t.Parallel() kerberosConfig, err := krbcfg.NewFromString(krb5cfg) if err != nil { t.Fatal(err) diff --git a/leave_group_request_test.go b/leave_group_request_test.go index 978c44962..b674e48b2 100644 --- a/leave_group_request_test.go +++ b/leave_group_request_test.go @@ -8,7 +8,6 @@ var basicLeaveGroupRequest = []byte{ } func TestLeaveGroupRequest(t *testing.T) { - t.Parallel() request := new(LeaveGroupRequest) request.GroupId = "foo" request.MemberId = "bar" diff --git a/leave_group_response_test.go b/leave_group_response_test.go index 4a24334cc..9207c6668 100644 --- a/leave_group_response_test.go +++ b/leave_group_response_test.go @@ -8,7 +8,6 @@ var ( ) func TestLeaveGroupResponse(t *testing.T) { - t.Parallel() var response *LeaveGroupResponse response = new(LeaveGroupResponse) diff --git a/list_groups_request_test.go b/list_groups_request_test.go index f5d06da16..2e977d9a5 100644 --- a/list_groups_request_test.go +++ b/list_groups_request_test.go @@ -3,6 +3,5 @@ package sarama import "testing" func TestListGroupsRequest(t *testing.T) { - t.Parallel() testRequest(t, "ListGroupsRequest", &ListGroupsRequest{}, []byte{}) } diff --git a/list_groups_response_test.go b/list_groups_response_test.go index 1c21279d2..41ab822f9 100644 --- a/list_groups_response_test.go +++ b/list_groups_response_test.go @@ -24,7 +24,6 @@ var ( ) func TestListGroupsResponse(t *testing.T) { - t.Parallel() var response *ListGroupsResponse response = new(ListGroupsResponse) diff --git a/list_partition_reassignments_request_test.go b/list_partition_reassignments_request_test.go index ec5215b6e..e50b289b9 100644 --- a/list_partition_reassignments_request_test.go +++ b/list_partition_reassignments_request_test.go @@ -12,7 +12,6 @@ var listPartitionReassignmentsRequestOneBlock = []byte{ } func TestListPartitionReassignmentRequest(t *testing.T) { - t.Parallel() var request *ListPartitionReassignmentsRequest = &ListPartitionReassignmentsRequest{ TimeoutMs: int32(10000), Version: int16(0), diff --git a/list_partition_reassignments_response_test.go b/list_partition_reassignments_response_test.go index 85338ab40..71532423d 100644 --- a/list_partition_reassignments_response_test.go +++ b/list_partition_reassignments_response_test.go @@ -17,7 +17,6 @@ var listPartitionReassignmentsResponse = []byte{ } func TestListPartitionReassignmentResponse(t *testing.T) { - t.Parallel() var response *ListPartitionReassignmentsResponse = &ListPartitionReassignmentsResponse{ ThrottleTimeMs: int32(10000), Version: int16(0), diff --git a/message_test.go b/message_test.go index 134130298..a6c7cff2a 100644 --- a/message_test.go +++ b/message_test.go @@ -117,7 +117,6 @@ var ( ) func TestMessageEncoding(t *testing.T) { - t.Parallel() message := Message{} testEncodable(t, "empty", &message, emptyMessage) @@ -139,7 +138,6 @@ func TestMessageEncoding(t *testing.T) { } func TestMessageDecoding(t *testing.T) { - t.Parallel() message := Message{} testDecodable(t, "empty", &message, emptyMessage) if message.Codec != CompressionNone { @@ -168,7 +166,6 @@ func TestMessageDecoding(t *testing.T) { } func TestMessageDecodingBulkSnappy(t *testing.T) { - t.Parallel() message := Message{} testDecodable(t, "bulk snappy", &message, emptyBulkSnappyMessage) if message.Codec != CompressionSnappy { @@ -185,7 +182,6 @@ func TestMessageDecodingBulkSnappy(t *testing.T) { } func TestMessageDecodingBulkGzip(t *testing.T) { - t.Parallel() message := Message{} testDecodable(t, "bulk gzip", &message, emptyBulkGzipMessage) if message.Codec != CompressionGZIP { @@ -202,7 +198,6 @@ func TestMessageDecodingBulkGzip(t *testing.T) { } func TestMessageDecodingBulkLZ4(t *testing.T) { - t.Parallel() message := Message{} testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message) if message.Codec != CompressionLZ4 { @@ -219,7 +214,6 @@ func TestMessageDecodingBulkLZ4(t *testing.T) { } func TestMessageDecodingBulkZSTD(t *testing.T) { - t.Parallel() message := Message{} testDecodable(t, "bulk zstd", &message, emptyBulkZSTDMessage) if message.Codec != CompressionZSTD { @@ -236,13 +230,11 @@ func TestMessageDecodingBulkZSTD(t *testing.T) { } func TestMessageDecodingVersion1(t *testing.T) { - t.Parallel() message := Message{Version: 1} testDecodable(t, "decoding empty v1 message", &message, emptyV1Message) } func TestMessageDecodingUnknownVersions(t *testing.T) { - t.Parallel() message := Message{Version: 2} err := decode(emptyV2Message, &message) if err == nil { diff --git a/metadata_request_test.go b/metadata_request_test.go index c71dc7c82..16f6b5942 100644 --- a/metadata_request_test.go +++ b/metadata_request_test.go @@ -63,7 +63,6 @@ var ( ) func TestMetadataRequestV0(t *testing.T) { - t.Parallel() request := new(MetadataRequest) testRequest(t, "no topics", request, metadataRequestNoTopicsV0) @@ -75,7 +74,6 @@ func TestMetadataRequestV0(t *testing.T) { } func TestMetadataRequestV1(t *testing.T) { - t.Parallel() request := new(MetadataRequest) request.Version = 1 testRequest(t, "no topics", request, metadataRequestNoTopicsV1) @@ -88,7 +86,6 @@ func TestMetadataRequestV1(t *testing.T) { } func TestMetadataRequestV2(t *testing.T) { - t.Parallel() request := new(MetadataRequest) request.Version = 2 testRequest(t, "no topics", request, metadataRequestNoTopicsV2) @@ -101,7 +98,6 @@ func TestMetadataRequestV2(t *testing.T) { } func TestMetadataRequestV3(t *testing.T) { - t.Parallel() request := new(MetadataRequest) request.Version = 3 testRequest(t, "no topics", request, metadataRequestNoTopicsV3) @@ -114,7 +110,6 @@ func TestMetadataRequestV3(t *testing.T) { } func TestMetadataRequestV4(t *testing.T) { - t.Parallel() request := new(MetadataRequest) request.Version = 4 testRequest(t, "no topics", request, metadataRequestNoTopicsV4) @@ -129,7 +124,6 @@ func TestMetadataRequestV4(t *testing.T) { } func TestMetadataRequestV5(t *testing.T) { - t.Parallel() request := new(MetadataRequest) request.Version = 5 testRequest(t, "no topics", request, metadataRequestNoTopicsV5) diff --git a/metadata_response_test.go b/metadata_response_test.go index 248d9b05c..3970a2f31 100644 --- a/metadata_response_test.go +++ b/metadata_response_test.go @@ -109,7 +109,6 @@ var ( ) func TestEmptyMetadataResponseV0(t *testing.T) { - t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "empty, V0", &response, emptyMetadataResponseV0, 0) @@ -122,7 +121,6 @@ func TestEmptyMetadataResponseV0(t *testing.T) { } func TestMetadataResponseWithBrokersV0(t *testing.T) { - t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "brokers, no topics, V0", &response, brokersNoTopicsMetadataResponseV0, 0) @@ -149,7 +147,6 @@ func TestMetadataResponseWithBrokersV0(t *testing.T) { } func TestMetadataResponseWithTopicsV0(t *testing.T) { - t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "topics, no brokers, V0", &response, topicsNoBrokersMetadataResponseV0, 0) @@ -212,7 +209,6 @@ func TestMetadataResponseWithTopicsV0(t *testing.T) { } func TestMetadataResponseWithBrokersV1(t *testing.T) { - t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "topics, V1", &response, brokersNoTopicsMetadataResponseV1, 1) @@ -234,7 +230,6 @@ func TestMetadataResponseWithBrokersV1(t *testing.T) { } func TestMetadataResponseWithTopicsV1(t *testing.T) { - t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "topics, V1", &response, topicsNoBrokersMetadataResponseV1, 1) @@ -256,7 +251,6 @@ func TestMetadataResponseWithTopicsV1(t *testing.T) { } func TestMetadataResponseWithThrottleTime(t *testing.T) { - t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "no topics, no brokers, throttle time and cluster Id V3", &response, noBrokersNoTopicsWithThrottleTimeAndClusterIDV3, 3) @@ -278,7 +272,6 @@ func TestMetadataResponseWithThrottleTime(t *testing.T) { } func TestMetadataResponseWithOfflineReplicasV5(t *testing.T) { - t.Parallel() response := MetadataResponse{} testVersionDecodable(t, "no brokers, 1 topic with offline replica V5", &response, noBrokersOneTopicWithOfflineReplicasV5, 5) diff --git a/metrics_test.go b/metrics_test.go index 7491196d7..7572f5b90 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -7,7 +7,6 @@ import ( ) func TestGetOrRegisterHistogram(t *testing.T) { - t.Parallel() metricRegistry := metrics.NewRegistry() histogram := getOrRegisterHistogram("name", metricRegistry) @@ -31,7 +30,6 @@ func TestGetOrRegisterHistogram(t *testing.T) { } func TestGetMetricNameForBroker(t *testing.T) { - t.Parallel() metricName := getMetricNameForBroker("name", &Broker{id: 1}) if metricName != "name-for-broker-1" { diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index 666c93b05..29ed9238f 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -36,7 +36,6 @@ func (trm *testReporterMock) Errorf(format string, args ...interface{}) { } func TestMockAsyncProducerImplementsAsyncProducerInterface(t *testing.T) { - t.Parallel() var mp interface{} = &AsyncProducer{} if _, ok := mp.(sarama.AsyncProducer); !ok { t.Error("The mock producer should implement the sarama.Producer interface.") @@ -44,7 +43,6 @@ func TestMockAsyncProducerImplementsAsyncProducerInterface(t *testing.T) { } func TestProducerReturnsExpectationsToChannels(t *testing.T) { - t.Parallel() config := NewTestConfig() config.Producer.Return.Successes = true mp := NewAsyncProducer(t, config). @@ -78,7 +76,6 @@ func TestProducerReturnsExpectationsToChannels(t *testing.T) { } func TestProducerWithTooFewExpectations(t *testing.T) { - t.Parallel() trm := newTestReporterMock() mp := NewAsyncProducer(trm, nil) mp.ExpectInputAndSucceed() @@ -96,7 +93,6 @@ func TestProducerWithTooFewExpectations(t *testing.T) { } func TestProducerWithTooManyExpectations(t *testing.T) { - t.Parallel() trm := newTestReporterMock() mp := NewAsyncProducer(trm, nil). ExpectInputAndSucceed(). @@ -113,7 +109,6 @@ func TestProducerWithTooManyExpectations(t *testing.T) { } func TestProducerWithCheckerFunction(t *testing.T) { - t.Parallel() trm := newTestReporterMock() mp := NewAsyncProducer(trm, nil). ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")). @@ -136,7 +131,6 @@ func TestProducerWithCheckerFunction(t *testing.T) { } func TestProducerWithBrokenPartitioner(t *testing.T) { - t.Parallel() trm := newTestReporterMock() config := sarama.NewConfig() config.Producer.Partitioner = func(string) sarama.Partitioner { diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 2efe183d1..2a408af97 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -8,7 +8,6 @@ import ( ) func TestMockConsumerImplementsConsumerInterface(t *testing.T) { - t.Parallel() var c interface{} = &Consumer{} if _, ok := c.(sarama.Consumer); !ok { t.Error("The mock consumer should implement the sarama.Consumer interface.") @@ -21,7 +20,6 @@ func TestMockConsumerImplementsConsumerInterface(t *testing.T) { } func TestConsumerHandlesExpectations(t *testing.T) { - t.Parallel() consumer := NewConsumer(t, NewTestConfig()) defer func() { if err := consumer.Close(); err != nil { @@ -67,7 +65,6 @@ func TestConsumerHandlesExpectations(t *testing.T) { } func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) { - t.Parallel() consumer := NewConsumer(t, NewTestConfig()) defer func() { if err := consumer.Close(); err != nil { @@ -147,7 +144,6 @@ func TestConsumerHandlesExpectationsPausingResuming(t *testing.T) { } func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) { - t.Parallel() consumer := NewConsumer(t, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers) @@ -173,7 +169,6 @@ func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) { } func TestConsumerWithoutExpectationsOnPartition(t *testing.T) { - t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) @@ -192,7 +187,6 @@ func TestConsumerWithoutExpectationsOnPartition(t *testing.T) { } func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) { - t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")}) @@ -207,7 +201,6 @@ func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) { } func TestConsumerWithWrongOffsetExpectation(t *testing.T) { - t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) @@ -227,7 +220,6 @@ func TestConsumerWithWrongOffsetExpectation(t *testing.T) { } func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) { - t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest). @@ -253,7 +245,6 @@ func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) { } func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { - t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) @@ -281,7 +272,6 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) { } func TestConsumerTopicMetadata(t *testing.T) { - t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) @@ -325,7 +315,6 @@ func TestConsumerTopicMetadata(t *testing.T) { } func TestConsumerUnexpectedTopicMetadata(t *testing.T) { - t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) @@ -339,7 +328,6 @@ func TestConsumerUnexpectedTopicMetadata(t *testing.T) { } func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) { - t.Parallel() trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) @@ -372,7 +360,6 @@ func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) { } func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) { - t.Parallel() startingOffset := int64(123) trm := newTestReporterMock() consumer := NewConsumer(trm, NewTestConfig()) diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index 00b876394..ca1f65f5d 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -9,7 +9,6 @@ import ( ) func TestMockSyncProducerImplementsSyncProducerInterface(t *testing.T) { - t.Parallel() var mp interface{} = &SyncProducer{} if _, ok := mp.(sarama.SyncProducer); !ok { t.Error("The mock async producer should implement the sarama.SyncProducer interface.") @@ -17,7 +16,6 @@ func TestMockSyncProducerImplementsSyncProducerInterface(t *testing.T) { } func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) { - t.Parallel() sp := NewSyncProducer(t, nil) defer func() { if err := sp.Close(); err != nil { @@ -58,7 +56,6 @@ func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) { } func TestSyncProducerWithTooManyExpectations(t *testing.T) { - t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -80,7 +77,6 @@ func TestSyncProducerWithTooManyExpectations(t *testing.T) { } func TestSyncProducerWithTooFewExpectations(t *testing.T) { - t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil).ExpectSendMessageAndSucceed() @@ -103,7 +99,6 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) { } func TestSyncProducerWithCheckerFunction(t *testing.T) { - t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -129,7 +124,6 @@ func TestSyncProducerWithCheckerFunction(t *testing.T) { } func TestSyncProducerWithCheckerFunctionForSendMessagesWithError(t *testing.T) { - t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -154,7 +148,6 @@ func TestSyncProducerWithCheckerFunctionForSendMessagesWithError(t *testing.T) { } func TestSyncProducerWithCheckerFunctionForSendMessagesWithoutError(t *testing.T) { - t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -184,7 +177,6 @@ func TestSyncProducerWithCheckerFunctionForSendMessagesWithoutError(t *testing.T } func TestSyncProducerSendMessagesExpectationsMismatchTooFew(t *testing.T) { - t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -209,7 +201,6 @@ func TestSyncProducerSendMessagesExpectationsMismatchTooFew(t *testing.T) { } func TestSyncProducerSendMessagesExpectationsMismatchTooMany(t *testing.T) { - t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). @@ -233,7 +224,6 @@ func TestSyncProducerSendMessagesExpectationsMismatchTooMany(t *testing.T) { } func TestSyncProducerSendMessagesFaultyEncoder(t *testing.T) { - t.Parallel() trm := newTestReporterMock() sp := NewSyncProducer(trm, nil). diff --git a/offset_commit_request_test.go b/offset_commit_request_test.go index 8d6ba003b..06bbd4036 100644 --- a/offset_commit_request_test.go +++ b/offset_commit_request_test.go @@ -64,7 +64,6 @@ var ( ) func TestOffsetCommitRequestV0(t *testing.T) { - t.Parallel() request := new(OffsetCommitRequest) request.Version = 0 request.ConsumerGroup = "foobar" @@ -75,7 +74,6 @@ func TestOffsetCommitRequestV0(t *testing.T) { } func TestOffsetCommitRequestV1(t *testing.T) { - t.Parallel() request := new(OffsetCommitRequest) request.ConsumerGroup = "foobar" request.ConsumerID = "cons" @@ -88,7 +86,6 @@ func TestOffsetCommitRequestV1(t *testing.T) { } func TestOffsetCommitRequestV2ToV4(t *testing.T) { - t.Parallel() for version := 2; version <= 4; version++ { request := new(OffsetCommitRequest) request.ConsumerGroup = "foobar" diff --git a/offset_commit_response_test.go b/offset_commit_response_test.go index 7e0bb295c..f35ca54ec 100644 --- a/offset_commit_response_test.go +++ b/offset_commit_response_test.go @@ -10,13 +10,11 @@ var emptyOffsetCommitResponse = []byte{ } func TestEmptyOffsetCommitResponse(t *testing.T) { - t.Parallel() response := OffsetCommitResponse{} testResponse(t, "empty", &response, emptyOffsetCommitResponse) } func TestNormalOffsetCommitResponse(t *testing.T) { - t.Parallel() response := OffsetCommitResponse{} response.AddError("t", 0, ErrNotLeaderForPartition) response.Errors["m"] = make(map[int32]KError) @@ -26,7 +24,6 @@ func TestNormalOffsetCommitResponse(t *testing.T) { } func TestOffsetCommitResponseWithThrottleTime(t *testing.T) { - t.Parallel() for version := 3; version <= 4; version++ { response := OffsetCommitResponse{ Version: int16(version), diff --git a/offset_fetch_request_test.go b/offset_fetch_request_test.go index 6657b05e5..d4497a8b8 100644 --- a/offset_fetch_request_test.go +++ b/offset_fetch_request_test.go @@ -55,7 +55,6 @@ var ( ) func TestOffsetFetchRequestNoPartitions(t *testing.T) { - t.Parallel() for version := 0; version <= 5; version++ { request := new(OffsetFetchRequest) request.Version = int16(version) @@ -89,7 +88,6 @@ func TestOffsetFetchRequestNoPartitions(t *testing.T) { } func TestOffsetFetchRequest(t *testing.T) { - t.Parallel() for version := 0; version <= 5; version++ { request := new(OffsetFetchRequest) request.Version = int16(version) @@ -118,7 +116,6 @@ func TestOffsetFetchRequest(t *testing.T) { } func TestOffsetFetchRequestAllPartitions(t *testing.T) { - t.Parallel() for version := 2; version <= 5; version++ { request := &OffsetFetchRequest{Version: int16(version), ConsumerGroup: "blah"} testRequest(t, fmt.Sprintf("all partitions %d", version), request, offsetFetchRequestAllPartitions) diff --git a/offset_fetch_response_test.go b/offset_fetch_response_test.go index 2801a46d9..d70894ab2 100644 --- a/offset_fetch_response_test.go +++ b/offset_fetch_response_test.go @@ -23,7 +23,6 @@ var ( ) func TestEmptyOffsetFetchResponse(t *testing.T) { - t.Parallel() for version := 0; version <= 1; version++ { response := OffsetFetchResponse{Version: int16(version)} testResponse(t, fmt.Sprintf("empty v%d", version), &response, emptyOffsetFetchResponse) @@ -42,7 +41,6 @@ func TestNormalOffsetFetchResponse(t *testing.T) { // The response encoded form cannot be checked for it varies due to // unpredictable map traversal order. // Hence the 'nil' as byte[] parameter in the 'testResponse(..)' calls - t.Parallel() for version := 0; version <= 1; version++ { response := OffsetFetchResponse{Version: int16(version)} diff --git a/offset_manager_test.go b/offset_manager_test.go index 36ea0a0d2..3aff080f6 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -72,7 +72,6 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager, } func TestNewOffsetManager(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) seedBroker.Returns(new(MetadataResponse)) defer seedBroker.Close() @@ -118,12 +117,10 @@ var offsetsautocommitTestTable = []struct { } func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { - t.Parallel() // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable` for _, tt := range offsetsautocommitTestTable { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() config := NewTestConfig() if tt.set { config.Consumer.Offsets.AutoCommit.Enable = tt.enable @@ -174,7 +171,6 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { } func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { - t.Parallel() // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false config := NewTestConfig() config.Consumer.Offsets.AutoCommit.Enable = false @@ -235,7 +231,6 @@ func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { // Test recovery from ErrNotCoordinatorForConsumer // on first fetchInitialOffset call func TestOffsetManagerFetchInitialFail(t *testing.T) { - t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) // Error on first fetchInitialOffset call @@ -279,7 +274,6 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) { // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { - t.Parallel() retryCount := int32(0) backoff := func(retries, maxRetries int) time.Duration { atomic.AddInt32(&retryCount, 1) @@ -322,7 +316,6 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { } func TestPartitionOffsetManagerInitialOffset(t *testing.T) { - t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) testClient.Config().Consumer.Offsets.Initial = OffsetOldest @@ -345,7 +338,6 @@ func TestPartitionOffsetManagerInitialOffset(t *testing.T) { } func TestPartitionOffsetManagerNextOffset(t *testing.T) { - t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta") @@ -365,7 +357,6 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) { } func TestPartitionOffsetManagerResetOffset(t *testing.T) { - t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") @@ -392,7 +383,6 @@ func TestPartitionOffsetManagerResetOffset(t *testing.T) { } func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { - t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, time.Hour) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") @@ -429,7 +419,6 @@ func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { } func TestPartitionOffsetManagerMarkOffset(t *testing.T) { - t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") @@ -455,7 +444,6 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) { } func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { - t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, time.Hour) pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") @@ -491,7 +479,6 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { } func TestPartitionOffsetManagerCommitErr(t *testing.T) { - t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta") @@ -556,7 +543,6 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { // Test of recovery from abort func TestAbortPartitionOffsetManager(t *testing.T) { - t.Parallel() om, testClient, broker, coordinator := initOffsetManager(t, 0) pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta") diff --git a/offset_request_test.go b/offset_request_test.go index 57dd5f7c7..8ea533a6b 100644 --- a/offset_request_test.go +++ b/offset_request_test.go @@ -49,7 +49,6 @@ var ( ) func TestOffsetRequest(t *testing.T) { - t.Parallel() request := new(OffsetRequest) testRequest(t, "no blocks", request, offsetRequestNoBlocksV1) @@ -58,7 +57,6 @@ func TestOffsetRequest(t *testing.T) { } func TestOffsetRequestV1(t *testing.T) { - t.Parallel() request := new(OffsetRequest) request.Version = 1 testRequest(t, "no blocks", request, offsetRequestNoBlocksV1) @@ -68,7 +66,6 @@ func TestOffsetRequestV1(t *testing.T) { } func TestOffsetRequestV2(t *testing.T) { - t.Parallel() request := new(OffsetRequest) request.Version = 2 testRequest(t, "no blocks", request, offsetRequestNoBlocksV2) @@ -79,7 +76,6 @@ func TestOffsetRequestV2(t *testing.T) { } func TestOffsetRequestReplicaID(t *testing.T) { - t.Parallel() request := new(OffsetRequest) replicaID := int32(42) request.SetReplicaID(replicaID) diff --git a/offset_response_test.go b/offset_response_test.go index 8dcb3bfdd..683c11e43 100644 --- a/offset_response_test.go +++ b/offset_response_test.go @@ -38,7 +38,6 @@ var ( ) func TestEmptyOffsetResponse(t *testing.T) { - t.Parallel() response := OffsetResponse{} testVersionDecodable(t, "empty", &response, emptyOffsetResponse, 0) @@ -55,7 +54,6 @@ func TestEmptyOffsetResponse(t *testing.T) { } func TestNormalOffsetResponse(t *testing.T) { - t.Parallel() response := OffsetResponse{} testVersionDecodable(t, "normal", &response, normalOffsetResponse, 0) @@ -86,7 +84,6 @@ func TestNormalOffsetResponse(t *testing.T) { } func TestNormalOffsetResponseV1(t *testing.T) { - t.Parallel() response := OffsetResponse{} testVersionDecodable(t, "normal", &response, normalOffsetResponseV1, 1) diff --git a/partitioner_test.go b/partitioner_test.go index 230f92c26..06a4ad725 100644 --- a/partitioner_test.go +++ b/partitioner_test.go @@ -27,7 +27,6 @@ func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message } func TestRandomPartitioner(t *testing.T) { - t.Parallel() partitioner := NewRandomPartitioner("mytopic") choice, err := partitioner.Partition(nil, 1) @@ -50,7 +49,6 @@ func TestRandomPartitioner(t *testing.T) { } func TestRoundRobinPartitioner(t *testing.T) { - t.Parallel() partitioner := NewRoundRobinPartitioner("mytopic") choice, err := partitioner.Partition(nil, 1) @@ -74,7 +72,6 @@ func TestRoundRobinPartitioner(t *testing.T) { } func TestNewHashPartitionerWithHasher(t *testing.T) { - t.Parallel() // use the current default hasher fnv.New32a() partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") @@ -106,7 +103,6 @@ func TestNewHashPartitionerWithHasher(t *testing.T) { } func TestHashPartitionerWithHasherMinInt32(t *testing.T) { - t.Parallel() // use the current default hasher fnv.New32a() partitioner := NewCustomHashPartitioner(fnv.New32a)("mytopic") @@ -125,7 +121,6 @@ func TestHashPartitionerWithHasherMinInt32(t *testing.T) { } func TestHashPartitioner(t *testing.T) { - t.Parallel() partitioner := NewHashPartitioner("mytopic") choice, err := partitioner.Partition(&ProducerMessage{}, 1) @@ -156,7 +151,6 @@ func TestHashPartitioner(t *testing.T) { } func TestHashPartitionerConsistency(t *testing.T) { - t.Parallel() partitioner := NewHashPartitioner("mytopic") ep, ok := partitioner.(DynamicConsistencyPartitioner) @@ -175,7 +169,6 @@ func TestHashPartitionerConsistency(t *testing.T) { } func TestHashPartitionerMinInt32(t *testing.T) { - t.Parallel() partitioner := NewHashPartitioner("mytopic") msg := ProducerMessage{} @@ -193,7 +186,6 @@ func TestHashPartitionerMinInt32(t *testing.T) { } func TestManualPartitioner(t *testing.T) { - t.Parallel() partitioner := NewManualPartitioner("mytopic") choice, err := partitioner.Partition(&ProducerMessage{}, 1) @@ -216,7 +208,6 @@ func TestManualPartitioner(t *testing.T) { } func TestWithCustomFallbackPartitioner(t *testing.T) { - t.Parallel() topic := "mytopic" partitioner := NewCustomPartitioner( diff --git a/produce_request_test.go b/produce_request_test.go index 628b7f905..e8684ffd2 100644 --- a/produce_request_test.go +++ b/produce_request_test.go @@ -74,7 +74,6 @@ var ( ) func TestProduceRequest(t *testing.T) { - t.Parallel() request := new(ProduceRequest) testRequest(t, "empty", request, produceRequestEmpty) diff --git a/produce_response_test.go b/produce_response_test.go index b989e5a08..0509bb569 100644 --- a/produce_response_test.go +++ b/produce_response_test.go @@ -66,7 +66,6 @@ var ( ) func TestProduceResponseDecode(t *testing.T) { - t.Parallel() response := ProduceResponse{} testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0) @@ -113,7 +112,6 @@ func TestProduceResponseDecode(t *testing.T) { } func TestProduceResponseEncode(t *testing.T) { - t.Parallel() response := ProduceResponse{} response.Blocks = make(map[string]map[int32]*ProduceResponseBlock) testEncodable(t, "empty", &response, produceResponseNoBlocksV0) @@ -133,7 +131,6 @@ func TestProduceResponseEncode(t *testing.T) { } func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) { - t.Parallel() response := ProduceResponse{} response.Version = 2 response.Blocks = make(map[string]map[int32]*ProduceResponseBlock) diff --git a/produce_set_test.go b/produce_set_test.go index 2dede467f..652d6d7d9 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -23,7 +23,6 @@ func safeAddMessage(t *testing.T, ps *produceSet, msg *ProducerMessage) { } func TestProduceSetInitial(t *testing.T) { - t.Parallel() _, ps := makeProduceSet() if !ps.empty() { @@ -36,7 +35,6 @@ func TestProduceSetInitial(t *testing.T) { } func TestProduceSetAddingMessages(t *testing.T) { - t.Parallel() _, ps := makeProduceSet() msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)} @@ -52,7 +50,6 @@ func TestProduceSetAddingMessages(t *testing.T) { } func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) { - t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.Flush.MaxMessages = 1000 @@ -71,7 +68,6 @@ func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) { } func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) { - t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.MaxMessageBytes = 1000 @@ -90,7 +86,6 @@ func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) { } func TestProduceSetPartitionTracking(t *testing.T) { - t.Parallel() _, ps := makeProduceSet() m1 := &ProducerMessage{Topic: "t1", Partition: 0} @@ -138,7 +133,6 @@ func TestProduceSetPartitionTracking(t *testing.T) { } func TestProduceSetRequestBuilding(t *testing.T) { - t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.RequiredAcks = WaitForAll parent.conf.Producer.Timeout = 10 * time.Second @@ -177,7 +171,6 @@ func TestProduceSetRequestBuilding(t *testing.T) { } func TestProduceSetCompressedRequestBuilding(t *testing.T) { - t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.RequiredAcks = WaitForAll parent.conf.Producer.Timeout = 10 * time.Second @@ -223,7 +216,6 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) { } func TestProduceSetV3RequestBuilding(t *testing.T) { - t.Parallel() parent, ps := makeProduceSet() parent.conf.Producer.RequiredAcks = WaitForAll parent.conf.Producer.Timeout = 10 * time.Second @@ -290,7 +282,6 @@ func TestProduceSetV3RequestBuilding(t *testing.T) { } func TestProduceSetIdempotentRequestBuilding(t *testing.T) { - t.Parallel() const pID = 1000 const pEpoch = 1234 @@ -379,7 +370,6 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) { } func TestProduceSetConsistentTimestamps(t *testing.T) { - t.Parallel() parent, ps1 := makeProduceSet() ps2 := newProduceSet(parent) parent.conf.Producer.RequiredAcks = WaitForAll diff --git a/record_test.go b/record_test.go index b92b2e503..ba87feb4d 100644 --- a/record_test.go +++ b/record_test.go @@ -270,7 +270,6 @@ func isOldGo(t *testing.T) bool { } func TestRecordBatchEncoding(t *testing.T) { - t.Parallel() for _, tc := range recordBatchTestCases() { if tc.oldGoEncoded != nil && isOldGo(t) { testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded) @@ -281,7 +280,6 @@ func TestRecordBatchEncoding(t *testing.T) { } func TestRecordBatchDecoding(t *testing.T) { - t.Parallel() for _, tc := range recordBatchTestCases() { batch := RecordBatch{} testDecodable(t, tc.name, &batch, tc.encoded) diff --git a/records_test.go b/records_test.go index fd985f56c..34f1b4a6e 100644 --- a/records_test.go +++ b/records_test.go @@ -7,7 +7,6 @@ import ( ) func TestLegacyRecords(t *testing.T) { - t.Parallel() set := &MessageSet{ Messages: []*MessageBlock{ { @@ -83,7 +82,6 @@ func TestLegacyRecords(t *testing.T) { } func TestDefaultRecords(t *testing.T) { - t.Parallel() batch := &RecordBatch{ IsTransactional: true, Version: 2, diff --git a/response_header_test.go b/response_header_test.go index 8a5daae1a..c7c68eae7 100644 --- a/response_header_test.go +++ b/response_header_test.go @@ -15,7 +15,6 @@ var ( ) func TestResponseHeaderV0(t *testing.T) { - t.Parallel() header := responseHeader{} testVersionDecodable(t, "response header", &header, responseHeaderBytesV0, 0) @@ -28,7 +27,6 @@ func TestResponseHeaderV0(t *testing.T) { } func TestResponseHeaderV1(t *testing.T) { - t.Parallel() header := responseHeader{} testVersionDecodable(t, "response header", &header, responseHeaderBytesV1, 1) diff --git a/sasl_authenticate_request_test.go b/sasl_authenticate_request_test.go index ba3fe47e0..bf75004d2 100644 --- a/sasl_authenticate_request_test.go +++ b/sasl_authenticate_request_test.go @@ -7,7 +7,6 @@ var saslAuthenticateRequest = []byte{ } func TestSaslAuthenticateRequest(t *testing.T) { - t.Parallel() request := new(SaslAuthenticateRequest) request.SaslAuthBytes = []byte(`foo`) testRequest(t, "basic", request, saslAuthenticateRequest) diff --git a/sasl_authenticate_response_test.go b/sasl_authenticate_response_test.go index 018bdd35a..048dade19 100644 --- a/sasl_authenticate_response_test.go +++ b/sasl_authenticate_response_test.go @@ -9,7 +9,6 @@ var saslAuthenticatResponseErr = []byte{ } func TestSaslAuthenticateResponse(t *testing.T) { - t.Parallel() response := new(SaslAuthenticateResponse) response.Err = ErrSASLAuthenticationFailed msg := "err" diff --git a/sasl_handshake_request_test.go b/sasl_handshake_request_test.go index 5940c2471..e100ad5b9 100644 --- a/sasl_handshake_request_test.go +++ b/sasl_handshake_request_test.go @@ -7,7 +7,6 @@ var baseSaslRequest = []byte{ } func TestSaslHandshakeRequest(t *testing.T) { - t.Parallel() request := new(SaslHandshakeRequest) request.Mechanism = "foo" testRequest(t, "basic", request, baseSaslRequest) diff --git a/sasl_handshake_response_test.go b/sasl_handshake_response_test.go index 780bcf577..40441fd85 100644 --- a/sasl_handshake_response_test.go +++ b/sasl_handshake_response_test.go @@ -9,7 +9,6 @@ var saslHandshakeResponse = []byte{ } func TestSaslHandshakeResponse(t *testing.T) { - t.Parallel() response := new(SaslHandshakeResponse) testVersionDecodable(t, "no error", response, saslHandshakeResponse, 0) if response.Err != ErrNoError { diff --git a/scram_formatter_test.go b/scram_formatter_test.go index 7c9f43bb3..b673a6a7d 100644 --- a/scram_formatter_test.go +++ b/scram_formatter_test.go @@ -36,7 +36,6 @@ public class App { */ func TestScramSaltedPasswordSha512(t *testing.T) { - t.Parallel() password := []byte("hello") salt := []byte("world") @@ -61,7 +60,6 @@ func TestScramSaltedPasswordSha512(t *testing.T) { } func TestScramSaltedPasswordSha256(t *testing.T) { - t.Parallel() password := []byte("hello") salt := []byte("world") diff --git a/sticky_assignor_user_data_test.go b/sticky_assignor_user_data_test.go index bca39be78..9eb09a615 100644 --- a/sticky_assignor_user_data_test.go +++ b/sticky_assignor_user_data_test.go @@ -6,7 +6,6 @@ import ( ) func TestStickyAssignorUserDataV0(t *testing.T) { - t.Parallel() // Single topic with deterministic ordering across encode-decode req := &StickyAssignorUserDataV0{} data := decodeUserDataBytes(t, "AAAAAQADdDAzAAAAAQAAAAU=") @@ -25,7 +24,6 @@ func TestStickyAssignorUserDataV0(t *testing.T) { } func TestStickyAssignorUserDataV1(t *testing.T) { - t.Parallel() // Single topic with deterministic ordering across encode-decode req := &StickyAssignorUserDataV1{} data := decodeUserDataBytes(t, "AAAAAQADdDA2AAAAAgAAAAAAAAAE/////w==") diff --git a/sync_group_request_test.go b/sync_group_request_test.go index f03ae5f83..3f537ef9f 100644 --- a/sync_group_request_test.go +++ b/sync_group_request_test.go @@ -21,7 +21,6 @@ var ( ) func TestSyncGroupRequest(t *testing.T) { - t.Parallel() var request *SyncGroupRequest request = new(SyncGroupRequest) diff --git a/sync_group_response_test.go b/sync_group_response_test.go index 85faec4ad..6fb708858 100644 --- a/sync_group_response_test.go +++ b/sync_group_response_test.go @@ -18,7 +18,6 @@ var ( ) func TestSyncGroupResponse(t *testing.T) { - t.Parallel() var response *SyncGroupResponse response = new(SyncGroupResponse) diff --git a/sync_producer_test.go b/sync_producer_test.go index b59d872c8..5f14f7be0 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -7,7 +7,6 @@ import ( ) func TestSyncProducer(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -58,7 +57,6 @@ func TestSyncProducer(t *testing.T) { } func TestSyncProducerBatch(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -107,7 +105,6 @@ func TestSyncProducerBatch(t *testing.T) { } func TestConcurrentSyncProducer(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) @@ -152,7 +149,6 @@ func TestConcurrentSyncProducer(t *testing.T) { } func TestSyncProducerToNonExistingTopic(t *testing.T) { - t.Parallel() broker := NewMockBroker(t, 1) metadataResponse := new(MetadataResponse) @@ -184,7 +180,6 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { } func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) { - t.Parallel() seedBroker := NewMockBroker(t, 1) leader1 := NewMockBroker(t, 2) leader2 := NewMockBroker(t, 3) diff --git a/txn_offset_commit_request_test.go b/txn_offset_commit_request_test.go index 76f208ca6..714ec47eb 100644 --- a/txn_offset_commit_request_test.go +++ b/txn_offset_commit_request_test.go @@ -16,7 +16,6 @@ var txnOffsetCommitRequest = []byte{ } func TestTxnOffsetCommitRequest(t *testing.T) { - t.Parallel() req := &TxnOffsetCommitRequest{ TransactionalID: "txn", GroupID: "groupid", diff --git a/txn_offset_commit_response_test.go b/txn_offset_commit_response_test.go index 4f152fb77..b4caa69b9 100644 --- a/txn_offset_commit_response_test.go +++ b/txn_offset_commit_response_test.go @@ -15,7 +15,6 @@ var txnOffsetCommitResponse = []byte{ } func TestTxnOffsetCommitResponse(t *testing.T) { - t.Parallel() resp := &TxnOffsetCommitResponse{ ThrottleTime: 100 * time.Millisecond, Topics: map[string][]*PartitionError{ diff --git a/utils_test.go b/utils_test.go index a7448d07c..3d2c3ac89 100644 --- a/utils_test.go +++ b/utils_test.go @@ -3,7 +3,6 @@ package sarama import "testing" func TestVersionCompare(t *testing.T) { - t.Parallel() if V0_8_2_0.IsAtLeast(V0_8_2_1) { t.Error("0.8.2.0 >= 0.8.2.1") } @@ -28,7 +27,6 @@ func TestVersionCompare(t *testing.T) { } func TestVersionParsing(t *testing.T) { - t.Parallel() validVersions := []string{ "0.8.2.0", "0.8.2.1",