Skip to content

Commit

Permalink
Merge pull request apache#2656, make sure serialization exception sen…
Browse files Browse the repository at this point in the history
…ds back to consumer to preventing endless waiting.

Fixes apache#1903: Our customized serialization id exceeds the maximum limit, now it cannot work on 2.6.2 anymore.
  • Loading branch information
beiwei30 authored and CrazyHZM committed Dec 6, 2018
1 parent 0541a36 commit 999d34b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byt

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
ObjectInput in = s.deserialize(channel.getUrl(), is);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
Expand All @@ -152,8 +150,9 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
try {
ObjectInput in = deserialize(channel, is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand All @@ -163,12 +162,12 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
data = decodeResponseData(channel, in, getRequestData(id));
}
res.setResult(data);
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
} else {
res.setErrorMessage(in.readUTF());
}
} else {
res.setErrorMessage(in.readUTF());
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
Expand All @@ -180,6 +179,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
ObjectInput in = deserialize(channel, is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand All @@ -198,6 +198,11 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
}

private ObjectInput deserialize(Channel channel, InputStream is, byte proto) throws IOException {
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
return s.deserialize(channel.getUrl(), is);
}

protected Object getRequestData(long id) {
DefaultFuture future = DefaultFuture.getFuture(id);
if (future == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
import org.apache.dubbo.remoting.telnet.codec.TelnetCodec;

import org.apache.dubbo.remoting.transport.CodecSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -147,6 +148,23 @@ public void test_Decode_Error_Response_Object() throws IOException {
Assert.assertEquals(90, obj.getStatus());
}

@Test
public void testInvalidSerializaitonId() throws Exception {
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x8F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
Object obj = decode(header);
Assert.assertTrue(obj instanceof Request);
Request request = (Request) obj;
Assert.assertTrue(request.isBroken());
Assert.assertTrue(request.getData() instanceof IOException);
header = new byte[]{MAGIC_HIGH, MAGIC_LOW, (byte)0x1F, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};

obj = decode(header);
Assert.assertTrue(obj instanceof Response);
Response response = (Response) obj;
Assert.assertEquals(response.getStatus(), Response.CLIENT_ERROR);
Assert.assertTrue(response.getErrorMessage().contains("IOException"));
}

@Test
public void test_Decode_Check_Payload() throws IOException {
byte[] header = new byte[]{MAGIC_HIGH, MAGIC_LOW, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ protected Object decode(Channel channel, InputStream is, int readable, byte[] he

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
ObjectInput in = s.deserialize(channel.getUrl(), is);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
Expand All @@ -143,8 +141,9 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
try {
ObjectInput in = deserialize(channel, is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand All @@ -154,12 +153,12 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
data = decodeResponseData(channel, in, getRequestData(id));
}
res.setResult(data);
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
} else {
res.setErrorMessage(in.readUTF());
}
} else {
res.setErrorMessage(in.readUTF());
} catch (Throwable t) {
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
Expand All @@ -171,6 +170,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
ObjectInput in = deserialize(channel, is, proto);
Object data;
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
Expand All @@ -189,6 +189,11 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
}

private ObjectInput deserialize(Channel channel, InputStream is, byte proto) throws IOException {
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
return s.deserialize(channel.getUrl(), is);
}

protected Object getRequestData(long id) {
DefaultFuture future = DefaultFuture.getFuture(id);
if (future == null)
Expand Down

0 comments on commit 999d34b

Please sign in to comment.