Skip to content

Commit

Permalink
chore(ttstream): fix ci (#1579)
Browse files Browse the repository at this point in the history
  • Loading branch information
joway authored Oct 17, 2024
1 parent dd52e7f commit e45145a
Show file tree
Hide file tree
Showing 55 changed files with 1,187 additions and 1,535 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
go-version: ${{ matrix.go }}
cache: false # don't use cache for self-hosted runners
- name: Unit Test
run: go test -race -covermode=atomic ./...
run: go test -v -race -covermode=atomic ./...

codegen-test:
runs-on: ubuntu-latest
Expand Down
1 change: 0 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type kClient struct {
sEps endpoint.Endpoint

// streamx
sxEps endpoint.Endpoint
sxStreamMW streamx.StreamMiddleware
sxStreamRecvMW streamx.StreamRecvMiddleware
sxStreamSendMW streamx.StreamSendMiddleware
Expand Down
16 changes: 16 additions & 0 deletions client/client_streamx.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package client

import (
Expand Down
2 changes: 1 addition & 1 deletion client/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestStreaming(t *testing.T) {
cliInfo.ConnPool = connpool
s, cr, _ := remotecli.NewStream(ctx, mockRPCInfo, new(mocks.MockCliTransHandler), cliInfo)
stream := newStream(
s, cr, kc, mockRPCInfo, serviceinfo.StreamingBidirectional,
s.(streaming.Stream), cr, kc, mockRPCInfo, serviceinfo.StreamingBidirectional,
func(stream streaming.Stream, message interface{}) (err error) {
return stream.SendMsg(message)
},
Expand Down
16 changes: 16 additions & 0 deletions client/streamxclient/client.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streamxclient

import (
Expand Down
16 changes: 16 additions & 0 deletions client/streamxclient/client_gen.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streamxclient

import (
Expand Down
16 changes: 16 additions & 0 deletions client/streamxclient/client_option.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streamxclient

import (
Expand Down
16 changes: 16 additions & 0 deletions client/streamxclient/streamxcallopt/call_option.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streamxcallopt

import (
Expand Down
3 changes: 2 additions & 1 deletion internal/server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"os/signal"
"syscall"

"github.com/cloudwego/localsession/backup"

"github.com/cloudwego/kitex/internal/configutil"
"github.com/cloudwego/kitex/internal/stream"
"github.com/cloudwego/kitex/pkg/acl"
Expand All @@ -43,7 +45,6 @@ import (
"github.com/cloudwego/kitex/pkg/stats"
"github.com/cloudwego/kitex/pkg/transmeta"
"github.com/cloudwego/kitex/pkg/utils"
"github.com/cloudwego/localsession/backup"
)

func init() {
Expand Down
13 changes: 5 additions & 8 deletions pkg/remote/trans/streamx/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"errors"
"io"
"log"
"net"
"runtime/debug"
"time"
Expand Down Expand Up @@ -62,8 +61,10 @@ func (f *svrTransHandlerFactory) NewTransHandler(opt *remote.ServerOption) (remo
}, nil
}

var _ remote.ServerTransHandler = &svrTransHandler{}
var errProtocolNotMatch = errors.New("protocol not match")
var (
_ remote.ServerTransHandler = &svrTransHandler{}
errProtocolNotMatch = errors.New("protocol not match")
)

type svrTransHandler struct {
opt *remote.ServerOption
Expand Down Expand Up @@ -106,7 +107,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) error {
streamWorkerPool.GoCtx(ctx, func() {
err := t.OnStream(nctx, conn, ss)
if err != nil && !errors.Is(err, io.EOF) {
klog.CtxErrorf(ctx, "KITEX: stream ReadStream failed: err=%v", nerr)
klog.CtxErrorf(ctx, "KITEX: stream ReadStream failed: err=%v", err)
}
})
}
Expand Down Expand Up @@ -139,13 +140,9 @@ func (t *svrTransHandler) OnStream(ctx context.Context, conn net.Conn, ss stream
if mutableTo := rpcinfo.AsMutableEndpointInfo(ri.To()); mutableTo != nil {
_ = mutableTo.SetMethod(ss.Method())
}
//_ = rpcinfo.AsMutableRPCConfig(ri.Config()).SetTransportProtocol(transport.JSONRPC)

ctx = t.startTracer(ctx, ri)
defer func() {
if err != nil {
log.Println("OnStream failed: ", err)
}
panicErr := recover()
if panicErr != nil {
if conn != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/rpcinfo/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (m *MockRPCConfig) TransportProtocol() (r transport.Protocol) {
return
}

func (m *MockRPCConfig) StreamRecvTimeout() time.Duration {
return time.Duration(0)
}

type MockRPCStats struct{}

func (m *MockRPCStats) Record(context.Context, stats.Event, stats.Status, string) {}
Expand Down
16 changes: 16 additions & 0 deletions pkg/streamx/client_options.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streamx

import (
Expand Down
22 changes: 20 additions & 2 deletions pkg/streamx/header_trailer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package streamx

type Header map[string]string
type Trailer map[string]string
type (
Header map[string]string
Trailer map[string]string
)
Loading

0 comments on commit e45145a

Please sign in to comment.