Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IllegalArgumentException: frame txn not sequencing #217

Closed
StrongestNumber9 opened this issue Jul 1, 2024 · 6 comments · Fixed by #224
Closed

IllegalArgumentException: frame txn not sequencing #217

StrongestNumber9 opened this issue Jul 1, 2024 · 6 comments · Fixed by #224
Assignees
Labels
bug Something isn't working

Comments

@StrongestNumber9
Copy link
Contributor

Describe the bug

Multithreaded feeding through cfe_35 (router) to rlp_08 (receiver) caused rlp_08 to print this:

11:35:13.694 [pool-1-thread-1] ERROR com.teragrep.rlp_03.channel.context.RelpReadImpl - IOException <java.nio.channels.ClosedChannelException> while reading from socket. Closing establishedContext PeerAddress </127.0.0.1> PeerPort <50142>.
11:35:13.707 [pool-1-thread-7] ERROR com.teragrep.rlp_03.channel.context.RelpReadImpl - run() threw
java.lang.IllegalArgumentException: frame txn not sequencing
        at com.teragrep.rlp_03.frame.delegate.SequencingDelegate.accept(SequencingDelegate.java:85) ~[rlp_08-jar-with-dependencies.jar:0.0.1-SNAPSHOT]
        at com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate.accept(DefaultFrameDelegate.java:77) ~[rlp_08-jar-with-dependencies.jar:0.0.1-SNAPSHOT]
        at com.teragrep.rlp_03.channel.context.RelpReadImpl.processFrame(RelpReadImpl.java:234) ~[rlp_08-jar-with-dependencies.jar:0.0.1-SNAPSHOT]
        at com.teragrep.rlp_03.channel.context.RelpReadImpl.run(RelpReadImpl.java:147) ~[rlp_08-jar-with-dependencies.jar:0.0.1-SNAPSHOT]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.lang.Thread.run(Thread.java:1583) ~[?:?]

Tcpflood command and logs

$ while true; do tcpflood -t 127.0.0.1 -T relp-plain -p 4600 -m 10000000 -M "<14>1 2020-05-15T13:24:03.603Z performance-test-host performance-test-tag - - - hello" -c 10 -Y; done;
00010 open connections
starting run 1
00010 close connectionsnections
00010 open connections
starting run 1
00999900tcpflood: onErr 'error waiting on required session state, session broken'

relpCltSendSyslog() failed with relp error code 10015
00128
send() failed "Success" at socket 1, index 128, msgNum 129
error sending messages
00010 close connections
00010 open connections
starting run 1
00010 close connectionsnections
00010 open connections
starting run 1

Expected behavior

No such issue happens

How to reproduce

I guess sending enough events. Multithreaded input, multithreaded processing and multithreaded receiving server.

Screenshots

Software version

cfe_35 is https://github.com/kortemik/cfe_35/tree/to-9 which uses newest rlp_03

@StrongestNumber9 StrongestNumber9 added the bug Something isn't working label Jul 1, 2024
@StrongestNumber9
Copy link
Contributor Author

I have not been able to reproduce this so far, also updated rlp_08 to rlp_03 v 8.0.0

@StrongestNumber9
Copy link
Contributor Author

Occasionally managed to reproduce it with cfe_35 -> rlp_08 using tcpflood -t 127.0.0.1 -T relp-plain -p 4600 -m 123 -M "<14>1 2020-05-15T13:24:03.603Z performance-test-host performance-test-tag - - - hello" -c 10 -Y after fresh start of both servers

@StrongestNumber9
Copy link
Contributor Author

Seems like it will eventually (couple of minutes+?) happen if I let the following run:

while true; do tcpflood -t 127.0.0.1 -T relp-plain -p 4600 -m 123 -M "<14>1 2020-05-15T13:24:03.603Z performance-test-host performance-test-tag - - - hello" -c 100 -Y; done;

@StrongestNumber9
Copy link
Contributor Author

Seems like using locks in

public CompletableFuture<RelpFrame> transmit(RelpFrame relpFrame) {
is the correct f ix

diff --git a/src/main/java/com/teragrep/rlp_03/client/RelpClientImpl.java b/src/main/java/com/teragrep/rlp_03/client/RelpClientImpl.java
index 8c5d860..4d48f25 100644
--- a/src/main/java/com/teragrep/rlp_03/client/RelpClientImpl.java
+++ b/src/main/java/com/teragrep/rlp_03/client/RelpClientImpl.java
@@ -53,6 +53,8 @@ import com.teragrep.rlp_03.frame.fragment.FragmentFactory;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Simple client with asynchronous transmit and {@link java.util.concurrent.Future} based receive.
@@ -63,12 +65,14 @@ public final class RelpClientImpl implements RelpClient {
     private final TransactionService transactionService;
     private final AtomicInteger txnCounter;
     private final FragmentFactory fragmentFactory;
+    private final Lock lock;
 
     RelpClientImpl(EstablishedContext establishedContext, TransactionService transactionService) {
         this.establishedContext = establishedContext;
         this.transactionService = transactionService;
         this.txnCounter = new AtomicInteger();
         this.fragmentFactory = new FragmentFactory();
+        this.lock = new ReentrantLock();
     }
 
     /**
@@ -79,19 +83,26 @@ public final class RelpClientImpl implements RelpClient {
      */
     @Override
     public CompletableFuture<RelpFrame> transmit(RelpFrame relpFrame) {
-        int txnInt = txnCounter.incrementAndGet();
-        Fragment txn = fragmentFactory.create(txnInt);
+        lock.lock();
+        CompletableFuture<RelpFrame> future;
+        try {
+            int txnInt = txnCounter.incrementAndGet();
+            Fragment txn = fragmentFactory.create(txnInt);
 
-        RelpFrame relpFrameToXmit = new RelpFrameImpl(
-                txn,
-                relpFrame.command(),
-                relpFrame.payloadLength(),
-                relpFrame.payload(),
-                relpFrame.endOfTransfer()
-        );
-        CompletableFuture<RelpFrame> future = transactionService.create(relpFrameToXmit);
+            RelpFrame relpFrameToXmit = new RelpFrameImpl(
+                    txn,
+                    relpFrame.command(),
+                    relpFrame.payloadLength(),
+                    relpFrame.payload(),
+                    relpFrame.endOfTransfer()
+            );
+            future = transactionService.create(relpFrameToXmit);
 
-        establishedContext.egress().accept(relpFrameToXmit.toWriteable());
+            establishedContext.egress().accept(relpFrameToXmit.toWriteable());
+        }
+        finally {
+            lock.unlock();
+        }
         return future;
     }
 
@@ -100,8 +111,14 @@ public final class RelpClientImpl implements RelpClient {
      */
     @Override
     public void close() {
-        transactionService.close();
-        establishedContext.close();
+        lock.lock();
+        try {
+            transactionService.close();
+            establishedContext.close();
+        }
+        finally {
+            lock.unlock();
+        }
     }

@kortemik
Copy link
Member

yes fix looks mostly correct, there are two thread safe resources being used but combined use needs synchronization thus the lock.

@kortemik
Copy link
Member

kortemik commented Sep 25, 2024

please pull request this in @StrongestNumber9

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants