Skip to content

Commit

Permalink
fix constructor and command line such as url can be passed from recei…
Browse files Browse the repository at this point in the history
…ver to sender

Summary: fix constructor and command line such as url can be passed from receiver to sender

closes facebook#25, closes facebook#9, progress on facebook#16

note: there is an incompatible change that you must specify a -transfer_id if you aren't going to use the URL as the default behavior is now to generate a unique id on the receiver with the expectations the url is used on the sender to connect (which is what the wdt_e2e_simple_test.sh demonstrates) but if you use the old style command line you must now specify the same non empty -transfer_id on both side

Reviewed By: @nikunjy

Differential Revision: D2289794
  • Loading branch information
ldemailly committed Jul 29, 2015
1 parent 4cb26e3 commit c0526b7
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 105 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2)
# There is no C per se in WDT but if you use CXX only here many checks fail
# Version is Major.Minor.YYMMDDX for up to 10 releases per day
# Minor currently is also the protocol version - has to match with Protocol.cpp
project("WDT" LANGUAGES C CXX VERSION 1.14.1507270)
project("WDT" LANGUAGES C CXX VERSION 1.15.1507280)

# On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2)
set(CMAKE_CXX_STANDARD 11)
Expand Down
7 changes: 1 addition & 6 deletions Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,8 @@ Receiver::Receiver(const WdtTransferRequest &transferRequest) {
}
}

Receiver::Receiver(int port, int numSockets)
: Receiver(WdtTransferRequest(port, numSockets)) {
}

Receiver::Receiver(int port, int numSockets, const std::string &destDir)
: Receiver(port, numSockets) {
setDir(destDir);
: Receiver(WdtTransferRequest(port, numSockets, destDir)) {
}

WdtTransferRequest Receiver::init() {
Expand Down
8 changes: 1 addition & 7 deletions Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,9 @@ class Receiver : public WdtBase {
explicit Receiver(const WdtTransferRequest &transferRequest);

/**
* Constructor that needs start port and number of ports.
* Constructor with start port, number of ports and directory to write to.
* If the start port is specified as zero, it auto configures the ports
*/
Receiver(int startPort, int numPorts);

/**
* Constructor which also takes the directory where receiver
* will be writing files to.
*/
Receiver(int port, int numSockets, const std::string &destDir);

/// Setup before starting (@see WdtBase.h)
Expand Down
8 changes: 6 additions & 2 deletions WdtBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ WdtTransferRequest::WdtTransferRequest(const vector<int32_t>& ports) {
this->ports = ports;
}

WdtTransferRequest::WdtTransferRequest(int startPort, int numPorts) {
WdtTransferRequest::WdtTransferRequest(int startPort, int numPorts,
const string& directory) {
this->directory = directory;
int portNum = startPort;
for (int i = 0; i < numPorts; i++) {
ports.push_back(portNum);
Expand All @@ -147,7 +149,7 @@ WdtTransferRequest::WdtTransferRequest(const string& uriString) {
errorCode = URI_PARSE_ERROR;
}
string portsStr(wdtUri.getQueryParam(PORTS_PARAM));
StringPiece portsList(portsStr); // pointers into portsStr
StringPiece portsList(portsStr); // pointers into portsStr
do {
StringPiece portNum = portsList.split_step(',');
int port;
Expand Down Expand Up @@ -262,6 +264,8 @@ void WdtBase::setTransferId(const std::string& transferId) {
}

void WdtBase::setProtocolVersion(int64_t protocolVersion) {
WDT_CHECK(protocolVersion > 0) << "Protocol version can't be <= 0 "
<< protocolVersion;
WDT_CHECK(Protocol::negotiateProtocol(protocolVersion) == protocolVersion)
<< "Can not support wdt version " << protocolVersion;
protocolVersion_ = protocolVersion;
Expand Down
2 changes: 1 addition & 1 deletion WdtBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ struct WdtTransferRequest {
* Constructor with start port and num ports. Fills the vector with
* ports from [startPort, startPort + numPorts)
*/
WdtTransferRequest(int startPort, int numPorts);
WdtTransferRequest(int startPort, int numPorts, const std::string& directory);

/// Constructor to construct the request object from a url string
explicit WdtTransferRequest(const std::string& uriString);
Expand Down
6 changes: 3 additions & 3 deletions WdtConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#pragma once

#define WDT_VERSION_MAJOR 1
#define WDT_VERSION_MINOR 14
#define WDT_VERSION_BUILD 1507270
#define WDT_VERSION_MINOR 15
#define WDT_VERSION_BUILD 1507280
// Add -fbcode to version str
#define WDT_VERSION_STR "1.14.1507270-fbcode"
#define WDT_VERSION_STR "1.15.1507280-fbcode"
// Tie minor and proto version
#define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR

Expand Down
7 changes: 3 additions & 4 deletions WdtResourceControllerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ class WdtResourceControllerTest : public WdtResourceController {
}

WdtTransferRequest makeTransferRequest(const string &transferId) {
WdtTransferRequest request(startPort, numPorts);
WdtTransferRequest request(startPort, numPorts, directory);
request.hostName = hostName;
request.transferId = transferId;
request.protocolVersion = protocolVersion;
request.directory = directory;
return request;
}
};
Expand Down Expand Up @@ -356,7 +355,7 @@ void WdtResourceControllerTest::RequestSerializationTest() {
EXPECT_EQ(dummy, transferRequest);
}
{
WdtTransferRequest transferRequest(0, 1);
WdtTransferRequest transferRequest(0, 1, "dir1/dir2");
// Lets not populate anything else
transferRequest.hostName = "localhost";
string serializedString = transferRequest.generateUrl(true);
Expand All @@ -366,7 +365,7 @@ void WdtResourceControllerTest::RequestSerializationTest() {
EXPECT_EQ(transferRequest, dummy);
}
{
WdtTransferRequest transferRequest(0, 8);
WdtTransferRequest transferRequest(0, 8, "/dir3/dir4");
Receiver receiver(transferRequest);
transferRequest = receiver.init();
ASSERT_TRUE(!receiver.getTransferId().empty());
Expand Down
85 changes: 41 additions & 44 deletions wdtCmdLine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
#include "Sender.h"
#include "Receiver.h"
#include "Protocol.h"
#include "WdtResourceController.h"
#include <chrono>
#include <future>
Expand Down Expand Up @@ -43,13 +44,16 @@ DEFINE_string(
DEFINE_bool(parse_transfer_log, false,
"If true, transfer log is parsed and fixed");

DEFINE_string(transfer_id, "", "Transfer id (optional, should match");
DEFINE_string(transfer_id, "",
"Transfer id. Receiver will generate one to be used (via URL) on"
" the sender if not set explictly");
DEFINE_int32(
protocol_version, 0,
protocol_version, facebook::wdt::Protocol::protocol_version,
"Protocol version to use, this is used to simulate protocol negotiation");

DEFINE_string(connection_url, "",
"Provide the connection string to connect to receiver");
"Provide the connection string to connect to receiver"
" (incl. transfer_id and other parameters)");

DECLARE_bool(logtostderr); // default of standard glog is off - let's set it on

Expand Down Expand Up @@ -118,27 +122,37 @@ int main(int argc, char *argv[]) {

LOG(INFO) << "Running WDT " << Protocol::getFullVersion();
ErrorCode retCode = OK;

// Odd ball case of log parsing
if (FLAGS_parse_transfer_log) {
// Log parsing mode
TransferLogManager transferLogManager;
transferLogManager.setRootDir(FLAGS_directory);
if (!transferLogManager.parseAndPrint()) {
LOG(ERROR) << "Transfer log parsing failed";
retCode = ERROR;
}
} else if (FLAGS_destination.empty() && FLAGS_connection_url.empty()) {
Receiver receiver(FLAGS_start_port, FLAGS_num_ports, FLAGS_directory);
receiver.setTransferId(FLAGS_transfer_id);
if (FLAGS_protocol_version > 0) {
receiver.setProtocolVersion(FLAGS_protocol_version);
return ERROR;
}
WdtTransferRequest transferRequest = receiver.init();
if (transferRequest.errorCode == ERROR) {
return OK;
}

// General case : Sender or Receiver
const auto &options = WdtOptions::get();
WdtTransferRequest req(options.start_port, options.num_ports,
FLAGS_directory);
req.transferId = FLAGS_transfer_id;
if (FLAGS_protocol_version > 0) {
req.protocolVersion = FLAGS_protocol_version;
}

if (FLAGS_destination.empty() && FLAGS_connection_url.empty()) {
Receiver receiver(req);
WdtTransferRequest augmentedReq = receiver.init();
if (augmentedReq.errorCode == ERROR) {
LOG(ERROR) << "Error setting up receiver";
return transferRequest.errorCode;
return augmentedReq.errorCode;
}
LOG(INFO) << "Starting receiver with connection url ";
std::cout << transferRequest.generateUrl() << std::endl;
std::cout << augmentedReq.generateUrl() << std::endl;
std::cout.flush();
setUpAbort(receiver);
if (!FLAGS_run_as_daemon) {
Expand Down Expand Up @@ -167,40 +181,23 @@ int main(int argc, char *argv[]) {
fileInfo.emplace_back(fields[0], filesize);
}
}
std::vector<int32_t> ports;
const auto &options = WdtOptions::get();
for (int i = 0; i < options.num_ports; i++) {
ports.push_back(options.start_port + i);
}
std::unique_ptr<Sender> sender;
if (FLAGS_connection_url.empty()) {
sender.reset(
new Sender(FLAGS_destination, FLAGS_directory, ports, fileInfo));
if (FLAGS_protocol_version > 0) {
sender->setProtocolVersion(FLAGS_protocol_version);
}
sender->setTransferId(FLAGS_transfer_id);
} else {
// If you are using a connection url it is
// expected that you set protocol version, ports
// and transfer id in the url
WdtTransferRequest transferRequest(FLAGS_connection_url);
LOG(INFO) << transferRequest.generateUrl(true);
if (transferRequest.directory.empty()) {
transferRequest.directory = FLAGS_directory;
}
sender.reset(new Sender(transferRequest));
req.hostName = FLAGS_destination;
if (!FLAGS_connection_url.empty()) {
LOG(INFO) << "Input url: " << FLAGS_connection_url;
// TODO: merge instead
req = WdtTransferRequest(FLAGS_connection_url);
req.directory = FLAGS_directory; // re-set it for now
}
WdtTransferRequest processedRequest = sender->init();
Sender sender(req);
WdtTransferRequest processedRequest = sender.init();
LOG(INFO) << "Starting sender with details "
<< processedRequest.generateUrl(true);
ADDITIONAL_SENDER_SETUP
setUpAbort(*sender);
sender->setIncludeRegex(FLAGS_include_regex);
sender->setExcludeRegex(FLAGS_exclude_regex);
sender->setPruneDirRegex(FLAGS_prune_dir_regex);
// TODO fix that
std::unique_ptr<TransferReport> report = sender->transfer();
setUpAbort(sender);
sender.setIncludeRegex(FLAGS_include_regex);
sender.setExcludeRegex(FLAGS_exclude_regex);
sender.setPruneDirRegex(FLAGS_prune_dir_regex);
std::unique_ptr<TransferReport> report = sender.transfer();
retCode = report->getSummary().getErrorCode();
}
cancelAbort();
Expand Down
4 changes: 2 additions & 2 deletions wdt_download_resumption_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ checkLastCmdStatusExpectingFailure() {
if [ $LAST_STATUS -eq 0 ] ; then
sudo iptables-restore < $DIR/iptable
echo "expecting wdt failure, but transfer was successful, failing test"
exit 1
exit 1
fi
}

Expand Down Expand Up @@ -165,7 +165,7 @@ TEST_COUNT=$((TEST_COUNT + 1))

echo "Download resumption with network error test(3)"
startNewTransfer
sleep 10
sleep 10
killCurrentTransfer
STARTING_PORT=$((STARTING_PORT + threads))
startNewTransfer
Expand Down
38 changes: 15 additions & 23 deletions wdt_e2e_simple_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,34 +60,28 @@ echo "done with setup"
(cd $DIR/src ; touch a; ln -s doesntexist badlink; dd if=/dev/zero of=c bs=1024 count=1; mkdir d; ln -s ../d d/e; ln -s ../c d/a)
(cd $DIR/extsrc; mkdir TestDir; mkdir TestDir/test; cd TestDir; echo "Text1" >> file1; cd test; echo "Text2" >> file1; ln -s $DIR/extsrc/TestDir; cp -R $DIR/extsrc/TestDir $DIR/src)

# Can't have both client and server send to stdout in parallel or log lines
# get mangled/are missing - so we redirect the server one
echo "$WDTBIN -minloglevel=1 -directory $DIR/dst > $DIR/server.log 2>&1 &"
$WDTBIN -minloglevel=1 -directory $DIR/dst > $DIR/server.log 2>&1 &
# client now retries connects so no need wait for server to be up
pidofreceiver=$!
# To test only 1 socket (single threaded send/receive)
#$WDTBIN -num_sockets=1 -directory $DIR/src -destination ::1
# Normal

CMD="$WDTBIN -minloglevel=1 -directory $DIR/dst 2> $DIR/server.log | head -1 | \
xargs -I URL time $WDTBIN -directory $DIR/src -connection_url URL 2>&1 | \
tee $DIR/client.log"
echo "First transfer: $CMD"
eval $CMD
STATUS=$?
# TODO check for $? / crash... though diff will indirectly find that case

echo "$WDTBIN -directory $DIR/src -destination $HOSTNAME 2>&1 | tee $DIR/client.log"
time $WDTBIN -directory $DIR/src -destination $HOSTNAME 2>&1 | tee $DIR/client.log

# 2nd Receiver:
echo "$WDTBIN -directory $DIR/dst_symlinks >> $DIR/server.log 2>&1 &"
$WDTBIN -directory $DIR/dst_symlinks >> $DIR/server.log 2>&1 &


echo "$WDTBIN -follow_symlinks -directory $DIR/src -destination $HOSTNAME 2>&1 | tee -a $DIR/client.log"
time $WDTBIN -follow_symlinks -directory $DIR/src -destination $HOSTNAME 2>&1 | tee -a $DIR/client.log
CMD="$WDTBIN -minloglevel=1 -directory $DIR/dst_symlinks 2>> $DIR/server.log |\
head -1 | xargs -I URL time $WDTBIN -follow_symlinks -directory $DIR/src \
-connection_url URL 2>&1 | tee $DIR/client.log"
echo "Second transfer: $CMD"
eval $CMD
# TODO check for $? / crash... though diff will indirectly find that case


if [ $DO_VERIFY -eq 1 ] ; then
echo "Verifying for run without follow_symlinks"
echo "Checking for difference `date`"

NUM_FILES=`(cd $DIR/dst ; ( find . -type f | wc -l))`
NUM_FILES=`(cd $DIR/dst && ( find . -type f | wc -l))`
echo "Transfered `du -ks $DIR/dst` kbytes across $NUM_FILES files"

(cd $DIR/src ; ( find . -type f -print0 | xargs -0 md5sum | sort ) \
Expand All @@ -103,7 +97,7 @@ if [ $DO_VERIFY -eq 1 ] ; then
echo "Verifying for run with follow_symlinks"
echo "Checking for difference `date`"

NUM_FILES=`(cd $DIR/dst_symlinks; ( find . -type f | wc -l))`
NUM_FILES=`(cd $DIR/dst_symlinks && ( find . -type f | wc -l))`
echo "Transfered `du -ks $DIR/dst_symlinks` kbytes across $NUM_FILES files"

(cd $DIR/src ; ( find -L . -type f -print0 | xargs -0 md5sum | sort ) \
Expand All @@ -117,10 +111,8 @@ if [ $DO_VERIFY -eq 1 ] ; then
if [ $STATUS -eq 0 ] ; then
STATUS=$SYMLINK_STATUS
fi
#(cd $DIR; ls -lR src/ dst/ )
else
echo "Skipping independant verification"
STATUS=0
fi


Expand Down
4 changes: 3 additions & 1 deletion wdt_e2e_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ printf "(Sockets,Average rate, Max_rate, Save local?, Delay)=%s,%s,%s,%s,%s\n" "
#WDTBIN_OPTS="-buffer_size=$BS -num_sockets=8 -minloglevel 2 -sleep_ms 1 -max_retries 999"
WDTBIN_OPTS="-minloglevel=0 -sleep_millis 1 -max_retries 999 -full_reporting "\
"-avg_mbytes_per_sec=$avg_rate -max_mbytes_per_sec=$max_rate "\
"-num_ports=$threads -throttler_log_time_millis=200 -enable_checksum=true"
"-num_ports=$threads -throttler_log_time_millis=200 -enable_checksum=true "\
"-transfer_id=$$"
WDTBIN="_bin/wdt/wdt $WDTBIN_OPTS"

BASEDIR=/dev/shm/tmpWDT
Expand All @@ -107,6 +108,7 @@ mkdir $DIR/extsrc
#cp -R wdt folly /usr/bin /usr/lib /usr/lib64 /usr/libexec /usr/share $DIR/src
#cp -R wdt folly /usr/bin /usr/lib /usr/lib64 /usr/libexec $DIR/src
cp -R wdt folly /usr/bin /usr/lib $DIR/src
#cp -R wdt folly /usr/bin $DIR/src
#cp -R wdt folly $DIR/src
# Removing symlinks which point to the same source tree
for link in `find -L $DIR/src -xtype l`
Expand Down
4 changes: 3 additions & 1 deletion wdt_max_send_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ fi
REMOTE=::1
SKIP_WRITES="true"

# TODO: switch to url

# Without throttling:
#WDTBIN_OPTS="-sleep_millis 1 -max_retries 3 -num_sockets 13"
# With, still gets almost same max (21G) with throttling set high enough
WDTBIN_OPTS="-sleep_millis 1 -max_retries 3 -num_ports 13
WDTBIN_OPTS="-sleep_millis 1 -max_retries 3 -num_ports 13 -transfer_id=$$
--avg_mbytes_per_sec=26000 --max_mbytes_per_sec=26001 --enable_checksum=false"
CLIENT_PROFILE_FORMAT="%Uuser %Ssystem %Eelapsed %PCPU (%Xtext+%Ddata \
%Mmax)k\n%Iinputs+%Ooutputs (%Fmajor+%Rminor)pagefaults %Wswaps\nCLIENT_PROFILE %U \
Expand Down
Loading

0 comments on commit c0526b7

Please sign in to comment.