Skip to content

Commit

Permalink
Worker ports (#485)
Browse files Browse the repository at this point in the history
* WIP: WorkersPorts to manage workers ports

* Compile bug fixed

* Code docs for WorkersPorts

* Peers connectivity algo complete

* node: pureconfig -> ficus

* Tiny fixes

* Contract tests fixed

* Some debug outputs

* RocksDbStore on a thread;
libc6-compat added to docker image

* Tiny fixes & more logs

* Tiny fixes

* Support capacity and api port in CLI, deployment scripts and docs (#488)

* [skip ci] fixed thread pool usage in RocksDBStore

* Remove monitoring from travis.yml

* Doc comments for WorkerP2pConnectivity; tiny compile bug fix

* fix a bug with musl

* change dir from /lib64 to /usr/lib for symbolic link for musl

* fix a typo

* Calm down the logging for expected errors in p2p connectivity

* Add support for Geth & Rinkeby (#492)
  • Loading branch information
alari committed Feb 25, 2019
1 parent 2b868c0 commit b05bb89
Show file tree
Hide file tree
Showing 74 changed files with 1,289 additions and 627 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ matrix:
- <<: *_scala_fluence_template
name: Fluence integration tests
before_script:
- docker pull tendermint/tendermint:0.27.4
- docker pull tendermint/tendermint:0.30.1
script:
- 'if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then PATH=$PATH:$HOME/.cargo/bin sbt ++$TRAVIS_SCALA_VERSION compile; fi'
# Some tests in LlamadbIntegrationTest require 2 Gb RAM
Expand Down Expand Up @@ -104,6 +104,7 @@ matrix:

env:
- RUST_TEST_THREADS=1
- RUST_BACKTRACE=1

script:
- pushd ../bootstrap
Expand Down
72 changes: 41 additions & 31 deletions bootstrap/contracts/Deployer.sol
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ contract Deployer {
// Publicly reachable & verifiable node address; has `node` prefix as `address` is a reserved word
bytes24 nodeAddress;

// Next port that could be used for running worker
uint16 nextPort;
// The last port of Node's dedicated range
uint16 lastPort;
// Api port of the node
uint16 apiPort;
// Node's capacity
uint16 capacity;

// ethereum address of the miner which runs this node
address owner;
Expand Down Expand Up @@ -110,9 +110,6 @@ contract Deployer {

// IDs of participating nodes
bytes32[] nodeIDs;

// Worker's ports for each node
uint16[] ports;
}

// Emitted when there is enough Workers for some App
Expand Down Expand Up @@ -187,22 +184,18 @@ contract Deployer {
/** @dev Adds node with specified port range to the work-waiting queue
* @param nodeID Tendermint's ValidatorKey
* @param nodeAddress currently Tendermint P2P node ID + IP address, subject to change
* @param startPort starting port for node's port range
* @param endPort ending port for node's port range
* @param apiPort node's api port
* @param capacity node's capacity
* emits NewNode event about new node
* emits ClusterFormed event when there is enough nodes for some Code
*/
function addNode(bytes32 nodeID, bytes24 nodeAddress, uint16 startPort, uint16 endPort, bool isPrivate)
function addNode(bytes32 nodeID, bytes24 nodeAddress, uint16 apiPort, uint16 capacity, bool isPrivate)
external
{
require(nodes[nodeID].id == 0, "This node is already registered");

// port range is inclusive
// if startPort == endPort, then node can host just a single code
require(startPort <= endPort, "Port range is empty or incorrect");

// Save the node
nodes[nodeID] = Node(nodeID, nodeAddress, startPort, endPort, msg.sender, isPrivate, new uint256[](0));
nodes[nodeID] = Node(nodeID, nodeAddress, apiPort, capacity, msg.sender, isPrivate, new uint256[](0));
nodesIds.push(nodeID);

// No need to add private nodes to readyNodes, as they could only used with by-id pinning
Expand All @@ -220,7 +213,7 @@ contract Deployer {

// We should stop if there's no more ports in this node -- its addition has no more effect
Node storage node = nodes[nodeID];
if(node.nextPort > node.lastPort) break;
if(node.capacity == 0) break;
} else i++;
}
}
Expand Down Expand Up @@ -263,7 +256,7 @@ contract Deployer {
clusterSize,
msg.sender,
pinToNodes,
Cluster(0, new bytes32[](0), new uint16[](0)) // TODO: this is awful
Cluster(0, new bytes32[](0)) // TODO: this is awful
);
appIDs.push(appID);

Expand Down Expand Up @@ -301,7 +294,6 @@ contract Deployer {
* emits AppRemoved event on successful deletion
* reverts if you're not app owner
* reverts if app or cluster aren't not found
* TODO: free nodes' ports after app deletion
*/
function deleteApp(uint256 appID)
external
Expand All @@ -328,7 +320,12 @@ contract Deployer {
internal
{
for (uint i = 0; i < nodeIDsArray.length; i++) {
uint256[] storage appIDsArray = nodes[nodeIDsArray[i]].appIDs;
bytes32 nodeID = nodeIDsArray[i];
Node storage node = nodes[nodeID];

incrementCapacity(node);

uint256[] storage appIDsArray = node.appIDs;
uint idx = indexOf(appID, appIDsArray);
require(idx < nodeIDsArray.length, "error deleting app: app not found in node.appIDs");
removeArrayElement(idx, appIDsArray);
Expand Down Expand Up @@ -421,7 +418,7 @@ contract Deployer {
Node storage node = nodes[app.pinToNodes[i]];

// Return false if there's not enough capacity on pin-to node to deploy the app
if(node.nextPort > node.lastPort) {
if(node.capacity == 0) {
return false;
}

Expand Down Expand Up @@ -470,7 +467,7 @@ contract Deployer {
// arrays containing nodes' data to be sent in a `ClusterFormed` event
bytes32[] memory nodeIDs = new bytes32[](app.clusterSize);
bytes24[] memory workerAddrs = new bytes24[](app.clusterSize);
uint16[] memory workerPorts = new uint16[](app.clusterSize);
uint16[] memory apiPorts = new uint16[](app.clusterSize);

// j holds the number of currently collected nodes and a position in event data arrays
for (uint8 j = 0; j < app.clusterSize; j++) {
Expand All @@ -479,41 +476,54 @@ contract Deployer {
// copy node's data to arrays so it can be sent in event
nodeIDs[j] = node.id;
workerAddrs[j] = node.nodeAddress;
workerPorts[j] = node.nextPort;
apiPorts[j] = node.apiPort;

useNodePort(node);
decrementCapacity(node);
node.appIDs.push(app.appID);
}

uint genesisTime = now;

// saving selected nodes as a cluster with assigned app
app.cluster = Cluster(genesisTime, nodeIDs, workerPorts);
app.cluster = Cluster(genesisTime, nodeIDs);

// notify Fluence node it's time to run real-time workers and
// create a Tendermint cluster hosting selected App (defined by storageHash)
emit AppDeployed(app.appID, app.storageHash, genesisTime, nodeIDs, workerAddrs, workerPorts);
emit AppDeployed(app.appID, app.storageHash, genesisTime, nodeIDs, workerAddrs, apiPorts);
}

/** @dev increments node's currentPort
* and removes it from readyNodes if there are no more ports left
/** @dev decrement node's capacity
* and removes it from readyNodes if there is no more capacity left
* returns true if node was deleted from readyNodes
*/
function useNodePort(Node storage node)
function decrementCapacity(Node storage node)
internal
{
// increment port, it will be used for the next code
node.nextPort++;
// decrement capacity
node.capacity--;

// check if node will be able to host a code next time; if no, remove it
if (node.nextPort > node.lastPort) {
if (node.capacity == 0) {
uint readyNodeIdx = indexOf(node.id, readyNodes);
if (readyNodeIdx < readyNodes.length) {
removeReadyNode(readyNodeIdx);
}
}
}

/** @dev increment node's capacity
* and add it to readyNodes if there was no capacity left
*/
function incrementCapacity(Node storage node)
internal
{
node.capacity++;

if(node.capacity == 1) {
readyNodes.push(node.id);
}
}


/** @dev Removes an element on specified position from 'readyNodes'
* @param index position in 'readyNodes' to remove
Expand Down
14 changes: 8 additions & 6 deletions bootstrap/contracts/Network.sol
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ contract Network is Deployer {
Node memory node = nodes[nodeID];
return (
node.nodeAddress,
node.nextPort,
node.lastPort,
node.apiPort,
node.capacity,
node.owner,
node.isPrivate,
node.appIDs
Expand All @@ -68,7 +68,7 @@ contract Network is Deployer {
function getApp(uint256 appID)
external
view
returns (bytes32, bytes32, uint8, address, bytes32[], uint, bytes32[], uint16[])
returns (bytes32, bytes32, uint8, address, bytes32[], uint, bytes32[])
{
App memory app = apps[appID];
require(app.appID > 0, "there is no such app");
Expand All @@ -81,8 +81,7 @@ contract Network is Deployer {
app.pinToNodes,

app.cluster.genesisTime,
app.cluster.nodeIDs,
app.cluster.ports
app.cluster.nodeIDs
);
}

Expand All @@ -98,13 +97,16 @@ contract Network is Deployer {
require(app.appID > 0, "there is no such app");

bytes24[] memory addresses = new bytes24[](app.cluster.nodeIDs.length);
uint16[] memory ports = new uint16[](app.cluster.nodeIDs.length);

for(uint8 i = 0; i < app.cluster.nodeIDs.length; i++) {
addresses[i] = nodes[app.cluster.nodeIDs[i]].nodeAddress;
ports[i] = nodes[app.cluster.nodeIDs[i]].apiPort;
}

return (
addresses,
app.cluster.ports
ports
);
}

Expand Down
5 changes: 2 additions & 3 deletions bootstrap/test/FluenceContractTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ contract('Fluence', function ([_, owner, anyone]) {
truffleAssert.eventNotEmitted(addApp.receipt, utils.appDeployedEvent)
});

it("Should reuse node until the port range is exhausted", async function() {
it("Should reuse node until the capacity is exhausted", async function() {
let count = 1;
let ports = 2;

Expand Down Expand Up @@ -164,9 +164,8 @@ contract('Fluence', function ([_, owner, anyone]) {

let genesis = app[5];
let nodeIDs = app[6];
let ports = app[7];

return genesis > 0 && nodeIDs.length > 0 && ports.length > 0;
return genesis > 0 && nodeIDs.length > 0;
});

assert(deployedApps.length, 2);
Expand Down
2 changes: 1 addition & 1 deletion bootstrap/test/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async function addNodes (contract, count, nodeIP, ownerAddress, portCount = 2, i
nodeID,
nodeIP,
1000,
1000 + portCount - 1,
portCount,
isPrivate,
{ from: ownerAddress }
);
Expand Down
14 changes: 11 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ lazy val node = project
sttp,
sttpCatsBackend,
fs2io,
pureConfig,
ficus,
circeGeneric,
circeParser,
http4sDsl,
Expand Down Expand Up @@ -290,8 +290,16 @@ lazy val node = project
from("openjdk:8-jre-alpine")
runRaw(s"wget -q $dockerBinary -O- | tar -C /usr/bin/ -zxv docker/docker --strip-components=1")

// this is needed for some binaries (e.g. rocksdb) to run properly on alpine linux since they need libc and
// alpine use musl
runRaw("ln -sf /lib/libc.musl-x86_64.so.1 /usr/lib/ld-linux-x86-64.so.2")

volume("/master") // anonymous volume to store all data

// p2p ports range
env("MIN_PORT", "10000")
env("MAX_PORT", "11000")

/*
* The following directory structure is assumed in node/src/main/resources:
* docker/
Expand All @@ -302,11 +310,11 @@ lazy val node = project

copy(artifact, artifactTargetPath)

cmd("java", "-jar", artifactTargetPath)
cmd("java", "-jar", "-Dconfig.file=/master/application.conf", artifactTargetPath)
entryPoint("sh", "/entrypoint.sh")
}
}
)
.settings(buildContractBeforeDocker())
.enablePlugins(AutomateHeaderPlugin, DockerPlugin)
.dependsOn(ethclient, swarm, `statemachine-control`)
.dependsOn(ethclient, swarm, `statemachine-control`, `kvstore`)
2 changes: 1 addition & 1 deletion cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluence"
version = "0.1.3"
version = "0.1.4"
authors = ["Fluence Labs"]
publish=false
edition = "2018"
Expand Down
7 changes: 4 additions & 3 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ The following command will register a node:
--base64_tendermint_key \
--secret_key 0xcb0799337df06a6c73881bab91304a68199a430ccd4bc378e37e51fd1b118133 \
--wait_syncing \
--start_port 25000 \
--last_port 25010
--api_port 25000 \
--capacity 10
```

Parameters are:
Expand All @@ -76,7 +76,8 @@ Parameters are:
- `--secret_key 0xcb0799337df06a6c73881bab91304a68199a430ccd4bc378e37e51fd1b118133` denotes an Ethereum private key, used for offline transaction signing. _Use your Ethereum private key here_
- using `--password` is possible instead of private key, but private key is preferred
- `--wait_syncing` so CLI waits until Ethereum node is fully synced
- `--start_port 25000` and `--last_port 25010` denote ports where apps (workers) will be hosted. 25000:25010 is inclusive, so 10 workers could be started on such a node
- `--api_port 25000` specifies the main port of the Fluence node, so other nodes and users know where to connect
- `--capacity 10` limits number of apps that could be run on the node by 10


### Publish an app
Expand Down
Loading

0 comments on commit b05bb89

Please sign in to comment.