Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decode message on receive #5

Merged
merged 10 commits into from
May 10, 2020
206 changes: 141 additions & 65 deletions balancesharing/balancesharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
import json
import os
import struct
import time
from binascii import hexlify, unhexlify

hexlify(struct.pack(">H", 437))

QUERY_FOAF_BALANCES = 437
REPLY_FOAF_BALANCES = 439

CHAIN_HASH = r'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'
BTC_CHAIN_HASH = CHAIN_HASH
foaf_network = None

plugin = pyln.client.Plugin()


# __version__ was introduced in 0.0.7.1, with utf8 passthrough support.
try:
if version.parse(lightning.__version__) >= version.parse("0.0.7.1"):
Expand All @@ -51,6 +50,7 @@ def get_funds(plugin):
return outputs, channels


# TODO: remove when finished, or keep for test cases
def list_funds_mock():
""""read funds from file"""
dir_path = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -62,79 +62,146 @@ def list_funds_mock():


# TODO: we need to extend this, if we want to handle multiple channels per peer


def get_channel(channels, peer_id):
""""searches for ONE channel with the given peer_id"""
for ch in channels:
if ch["peer_id"] == peer_id:
return ch

return None


def encode_reply_foaf_balances(short_channels, amt_to_rebalance, plugin):
"""Encode flow_value and amt_to_rebalance"""
"""
H -> type: short
32s -> chain_hash: 32byte char
Q -> timestamp: unsigned long
Q -> amt_to_rebalance: unsigned long long
H -> number_of_short_channels: unsigned short
{len*8}s -> short_channel_id
"""
global REPLY_FOAF_BALANCES
global CHAIN_HASH

# fund_list = list_funds_mock()
# channels = fund_list["channels"]
#
# # TODO: remove mock
# mock_short_channels = []
# for ch in channels:
# mock_short_channels.append(ch["short_channel_id"])
# short_channels = mock_short_channels

# time.time() returns a float with 4 decimal places
timestamp = int(time.time() * 1000)
number_of_short_channels = len(short_channels)
channel_array_sign = str(number_of_short_channels * 8) + 's'
plugin.log("Channel sign: {channel_array_sign}"
.format(channel_array_sign=channel_array_sign))
return hexlify(struct.pack(
"!H32sQQH", REPLY_FOAF_BALANCES, CHAIN_HASH.encode('ASCII'),
timestamp, number_of_short_channels, amt_to_rebalance)
).decode('ASCII')


def encode_query_foaf_balances(flow_value, amt_to_rebalance):
"""Encode flow_value (char) and amount (unsigend long long) """
return struct.pack("!cQ", flow_value, amt_to_rebalance)
"""Encode flow_value and amt_to_rebalance"""
"""
H -> type: short
32s -> chain_hash: 32byte char
c -> flow_value: char
Q -> amt_to_rebalance: unsigned long long
"""
global QUERY_FOAF_BALANCES
global CHAIN_HASH
return hexlify(
struct.pack("!H32scQ", QUERY_FOAF_BALANCES, CHAIN_HASH.encode('ASCII'), flow_value, amt_to_rebalance)).decode(
'ASCII')


def decode_query_foaf_balances(data):
"""Decode query_foaf_balances. Returns a byte and int"""
return struct.unpack("!cQ", data)
"""Decode query_foaf_balances. Return type, chain_hash, flow_value and amt_to_rebalance"""
msg_type, chain_hash, flow, amt = struct.unpack("!H32scQ", unhexlify(data.encode('ASCII')))
chain_hash = chain_hash.decode('ASCII')
return msg_type, chain_hash, flow, amt


def get_flow_value(flow):
if type(flow) is not int:
return None

if flow == 1:
return b'\x01'
elif flow == 0:
return b'\x00'
return None


def get_amount(amt):
if type(amt) is not int or amt <= 0:
return None
return amt


def log_error(msg):
plugin.log("Error in balancesharing plugin: {msg}".format(msg=msg))


@plugin.method("foafbalance")
def foafbalance(plugin):
def foafbalance(plugin, flow, amount):
"""gets the balance of our friends channels"""
global foaf_network
foaf_network = nx.DiGraph()
flow_value = b'\x01'
amt_to_rebalance = int(123)
plugin.log("Building query_foaf_balances message...")
# Read input data
flow_value = get_flow_value(flow)
if flow_value is None:
log_error("argument 'flow_value' for function 'foafbalance' was not 0 or 1")
return

amt_to_rebalance = get_amount(amount)
if amt_to_rebalance is None:
log_error("argument 'amt_to_rebalance' for function 'foafbalance' was not valid")
return

data = encode_query_foaf_balances(flow_value, amt_to_rebalance)
new_flow_value, new_amt_to_rebalance = decode_query_foaf_balances(data)

plugin.log(str(data))
plugin.log("New values: {flow_value} {amt_to_rebalance}".format(
# todo: remove. only for debugging
msg_type, chain_hash, new_flow_value, new_amt_to_rebalance = decode_query_foaf_balances(data)
plugin.log("Test decoding: {msg_type} -- {chain_hash} -- {flow_value} -- {amt_to_rebalance}".format(
msg_type=msg_type,
chain_hash=chain_hash,
flow_value=new_flow_value,
amt_to_rebalance=new_amt_to_rebalance
))

reply = {}
info = plugin.rpc.getinfo()
#msg = r'01b5' + str(hexlify(struct.pack(">H", 437))) + '126182746121'
msg = r'01b5126182746121'

dir_path = os.path.dirname(os.path.realpath(__file__))
plugin.log(dir_path)

outputs, channels = get_funds(plugin)
global foaf_network
foaf_network = nx.DiGraph()

counter = 0
for peer in plugin.rpc.listpeers()["peers"]:
# check if peer is the desired state
if not peer["connected"] or not has_feature(peer["features"]):
continue
peer_id = peer["id"]

res = plugin.rpc.dev_sendcustommsg(peer_id, msg)
plugin.log("RPC response" + str(res))
res = plugin.rpc.dev_sendcustommsg(peer_id, data)
plugin.log("Sent query_foaf_balances message to {peer_id}. Response: {res}".format(
peer_id=peer_id,
res=res
))

counter = counter + 1

nid = info["id"]
reply["id"] = nid
reply["change"] = nid
nid = plugin.rpc.getinfo()["id"]
reply = {"id": nid, "num_sent_queries": counter}
return reply


def get_message_type(message):
"""takes the 4 hex digits of a string and returns them as an integer
"""takes the 4 hex digits of a string and returns them as an integer
if they are a well known message type"""
assert len(message) > 4
message_type = message[:4]
# >>> hexlify(pack(">H",437))
# b'01b5'
# >>> unpack(">H",unhexlify(b'01b5'))
# (437,)
# >>> unpack(">H",unhexlify(b'01b5'))[0]
return struct.unpack(">H", unhexlify(message_type))[0]
return struct.unpack(">H", unhexlify(message_type.encode('ASCII')))[0]


def get_message_payload(message):
Expand All @@ -160,45 +227,47 @@ def helper_compute_channel_balance_coefficients(channels):
return channels


def handle_query_foaf_balances(payload, plugin):
# TODO: parse from payload
amt_to_rebalance = 150000
# TODO: parse from payload but 1 means Outgoing forwarding
flow_value = 0

if flow_value == 0:
def handle_query_foaf_balances(flow_value, amt_to_rebalance, plugin):
if flow_value == b'\x00':
plugin.log("compute channels on which I want {} satoshis incoming while rebalancing".format(
amt_to_rebalance))
elif flow_value == 1:
elif flow_value == b'\x01':
plugin.log("compute channels on which I want to forward {} satoshis while rebalancing".format(
amt_to_rebalance))

_, channels = get_funds(plugin)

kappa, tau = helper_compute_node_parameters(channels)
nu = float(tau)/kappa
nu = float(tau) / kappa
channels = helper_compute_channel_balance_coefficients(channels)

result = []
channel_ids = []
for channel in channels:
reserve = int(int(channel["channel_total_sat"]) * 0.01) + 1
if flow_value == 1:
if channel["zeta"] > nu:
if int(channel["channel_sat"]) > amt_to_rebalance + reserve:
result.append(channel["short_channel_id"])
channel_ids.append(channel["short_channel_id"])
elif flow_value == 0:
if channel["zeta"] < nu:
if int(channel["channel_total_sat"])-int(channel["channel_sat"]) > amt_to_rebalance + reserve:
result.append(channel["short_channel_id"])
if int(channel["channel_total_sat"]) - int(channel["channel_sat"]) > amt_to_rebalance + reserve:
channel_ids.append(channel["short_channel_id"])
plugin.log("{} of {} channels are good for rebalancing {} satoshis they are: {}".format(
len(result), len(channels), amt_to_rebalance, ", ".join(result)))
return result
len(channel_ids), len(channels), amt_to_rebalance, ", ".join(channel_ids)))
return channel_ids


def send_reply_foaf_balances(peer, channels, plugin):
def send_reply_foaf_balances(peer, amt, channels, plugin):
encode_reply_foaf_balances(channels, amt, plugin)

# TODO: CHECK if 107b is the correct little endian of 439
plugin.rpc.dev_sendcustommsg(peer, "107b123412341234")
return
res = plugin.rpc.dev_sendcustommsg(peer, "107b123412341234")
plugin.log("Sent query_foaf_balances message to {peer_id}. Response: {res}".format(
peer_id=peer,
res=res
))
reply = {"peer_id": peer, "response": res}
return reply


@plugin.hook('peer_connected')
Expand All @@ -209,6 +278,7 @@ def on_connected(plugin, **kwargs):

@plugin.hook('custommsg')
def on_custommsg(peer_id, message, plugin, **kwargs):
global BTC_CHAIN_HASH
plugin.log("Got a custom message {msg} from peer {peer_id}".format(
msg=message,
peer_id=peer_id
Expand All @@ -218,19 +288,25 @@ def on_custommsg(peer_id, message, plugin, **kwargs):
# remove prefix:
message = message[8:]
message_type = get_message_type(message)
message_payload = get_message_payload(message)
# message_payload = get_message_payload(message)

# query_foaf_balances message has type 437 which is 01b5 in hex
return_value = {}
if message_type == QUERY_FOAF_BALANCES:
plugin.log("received query_foaf_balances message")
result = handle_query_foaf_balances(message_payload, plugin)
send_reply_foaf_balances(peer_id, result, plugin)
_, chain_hash, flow, amt = decode_query_foaf_balances(message)

if chain_hash == BTC_CHAIN_HASH:
result = handle_query_foaf_balances(flow, amt, plugin)
r = send_reply_foaf_balances(peer_id, amt, result, plugin)
return_value = {"result": r}
else:
plugin.log("not handling non bitcoin chains for now")

if message_type == REPLY_FOAF_BALANCES:
elif message_type == REPLY_FOAF_BALANCES:
plugin.log("received a reply_foaf_balances message")
plugin.log(message)
plugin.log(str(type(message)))
return {'result': 'continue'}

return return_value


@plugin.init()
Expand Down