Skip to content
This repository has been archived by the owner on Dec 5, 2024. It is now read-only.

Commit

Permalink
Outgoing transaction size is limited
Browse files Browse the repository at this point in the history
  • Loading branch information
zilm13 committed Apr 24, 2018
1 parent 057dea7 commit a3535b0
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 36 deletions.
27 changes: 25 additions & 2 deletions ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.ethereum.net.shh.ShhMessageFactory;
import org.ethereum.net.swarm.bzz.BzzHandler;
import org.ethereum.net.swarm.bzz.BzzMessageFactory;
import org.ethereum.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -121,6 +122,8 @@ public class Channel {

private PeerStatistics peerStats = new PeerStatistics();

public static final int MAX_SAFE_TXS = 192;

public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMode, ChannelManager channelManager) {
this.channelManager = channelManager;
this.remoteId = remoteId;
Expand Down Expand Up @@ -372,8 +375,28 @@ public void prohibitTransactionProcessing() {
eth.disableTransactions();
}

public void sendTransaction(List<Transaction> tx) {
eth.sendTransaction(tx);
/**
* Send transactions from input to peer corresponded with channel
* Using {@link #sendTransactionsSafely(List)} is recommended instead
* @param txs Transactions
*/
public void sendTransactions(List<Transaction> txs) {
eth.sendTransaction(txs);
}

/**
* Sames as {@link #sendTransactions(List)} but input list is randomly sliced to
* contain not more than {@link MAX_SAFE_TXS} if needed
* @param txs List of txs to send
*/
public void sendTransactionsSafely(List<Transaction> txs) {
List<Transaction> slicedTxs;
if (txs.size() <= MAX_SAFE_TXS) {
slicedTxs = txs;
} else {
slicedTxs = CollectionUtils.truncateRand(txs, MAX_SAFE_TXS);
}
eth.sendTransaction(slicedTxs);
}

public void sendNewBlock(Block block) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private void process(Channel peer) {
public void sendTransaction(List<Transaction> txs, Channel receivedFrom) {
for (Channel channel : activePeers.values()) {
if (channel != receivedFrom) {
channel.sendTransaction(txs);
channel.sendTransactionsSafely(txs);
}
}
}
Expand Down Expand Up @@ -289,7 +289,7 @@ private void newTxDistributeLoop() {
channel = newActivePeers.take();
List<Transaction> pendingTransactions = pendingState.getPendingTransactions();
if (!pendingTransactions.isEmpty()) {
channel.sendTransaction(pendingTransactions);
channel.sendTransactionsSafely(pendingTransactions);
}
} catch (InterruptedException e) {
break;
Expand Down
59 changes: 27 additions & 32 deletions ethereumj-core/src/main/java/org/ethereum/util/CollectionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.ethereum.util;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Predicate;

Expand All @@ -26,24 +27,7 @@
* @since 14.07.2015
*/
public class CollectionUtils {

public static <K, V> List<V> collectList(Collection<K> items, Function<K, V> collector) {
List<V> collected = new ArrayList<>(items.size());
for(K item : items) {
collected.add(collector.apply(item));
}
return collected;
}

public static <K, V> Set<V> collectSet(Collection<K> items, Function<K, V> collector) {
Set<V> collected = new HashSet<>();
for(K item : items) {
collected.add(collector.apply(item));
}
return collected;
}

public static <T> List<T> truncate(List<T> items, int limit) {
public static <T> List<T> truncate(final List<T> items, int limit) {
if(limit > items.size()) {
return new ArrayList<>(items);
}
Expand All @@ -57,23 +41,34 @@ public static <T> List<T> truncate(List<T> items, int limit) {
return truncated;
}

public static <T> List<T> selectList(Collection<T> items, Predicate<T> predicate) {
List<T> selected = new ArrayList<>();
for(T item : items) {
if(predicate.test(item)) {
selected.add(item);
}
public static <T> List<T> truncateRand(final List<T> items, int limit) {
if(limit > items.size()) {
return new ArrayList<>(items);
}
return selected;
}
List<T> truncated = new ArrayList<>(limit);

public static <T> Set<T> selectSet(Collection<T> items, Predicate<T> predicate) {
Set<T> selected = new HashSet<>();
for(T item : items) {
if(predicate.test(item)) {
selected.add(item);
LinkedList<Integer> index = new LinkedList<>();
for (int i = 0; i < items.size(); ++i) {
index.add(i);
}

if (limit * 2 < items.size()) {
// Limit is very small comparing to items.size()
Set<Integer> smallIndex = new HashSet<>();
for (int i = 0; i < limit; ++i) {
int randomNum = ThreadLocalRandom.current().nextInt(0, index.size());
smallIndex.add(index.remove(randomNum));
}
smallIndex.forEach(i -> truncated.add(items.get(i)));
} else {
// Limit is compared to items.size()
for (int i = 0; i < items.size() - limit; ++i) {
int randomNum = ThreadLocalRandom.current().nextInt(0, index.size());
index.remove(randomNum);
}
index.forEach(i -> truncated.add(items.get(i)));
}
return selected;

return truncated;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.ethereum.util;

import org.junit.Test;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class CollectionUtilsTest {

@Test
public void test() {
final List<Integer> input = Arrays.asList(2, 4, 6, 8, 10, 12, 14, 16, 18, 20);
assertEquals(10, input.size());

List<Integer> resEqual = CollectionUtils.truncateRand(input, 10);
assertArrayEquals(input.toArray(), resEqual.toArray());

List<Integer> resEqual2 = CollectionUtils.truncateRand(input, 20);
assertArrayEquals(input.toArray(), resEqual2.toArray());

Set<Integer> excluded = new HashSet<>();
for (int i = 0; i < 1000; ++i) {
List<Integer> resMinusOne = CollectionUtils.truncateRand(input, 9);
Set<Integer> resMinusOneSet = new HashSet<>(resMinusOne);
assertEquals(resMinusOne.size(), resMinusOneSet.size());
AtomicInteger exclusionCounter = new AtomicInteger(0);
input.forEach(x -> {
if(!resMinusOneSet.contains(x)) {
excluded.add(x);
exclusionCounter.getAndIncrement();
}
});
assertEquals(1, exclusionCounter.get());
}
assertEquals("Someday I'll fail due to the nature of random", 10, excluded.size());

Set<Integer> included = new HashSet<>();
for (int i = 0; i < 1000; ++i) {
List<Integer> resOne = CollectionUtils.truncateRand(input, 1);
included.add(resOne.get(0));
assertTrue(input.contains(resOne.get(0)));
}
assertEquals("Someday I'll fail due to the nature of random", 10, included.size());

assertEquals(3, CollectionUtils.truncateRand(input, 3).size());
}
}

0 comments on commit a3535b0

Please sign in to comment.