Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement solana wallet #1

Merged
merged 9 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ $ python example/balance.py --net mainnet --addr 6ASf5EcmmEHTgDJ4X4ZT5vT6iHVJBXP
# 0.002030181
```

**example/transfer.py**

Transfer sol to other.

```sh
$ python example/transfer.py --prikey 0x1 --to 8pM1DN3RiT8vbom5u1sNryaNT1nyL8CTTW3b5PwWXRBH --value 0.05

# 4GhcAygac8krnrJgF2tCSNxRyWsquCZ26NPM6o9oP3bPQFkAzi22CGn9RszBXzqPErujVxwzenTHoTMHuiZm98Wu
```

## Test

```sh
Expand All @@ -51,9 +61,7 @@ $ cd solana-release-x86_64-unknown-linux-gnu.tar.bz2
# Run test validator
$ solana-test-validator -l /tmp/test-ledger
$ solana config set --url localhost
# Create default wallet
$ solana-keygen new
$ solana transfer --allow-unfunded-recipient 6ASf5EcmmEHTgDJ4X4ZT5vT6iHVJBXPg5AN5YoTCpGWt 10000
$ solana airdrop 99 6ASf5EcmmEHTgDJ4X4ZT5vT6iHVJBXPg5AN5YoTCpGWt

$ pytest -v
```
Expand Down
24 changes: 24 additions & 0 deletions example/transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import argparse
import sol

# Transfer sol to other.

parser = argparse.ArgumentParser()
parser.add_argument('--net', type=str, choices=['develop', 'mainnet', 'testnet'], default='develop')
parser.add_argument('--prikey', type=str, help='private key')
parser.add_argument('--to', type=str, required=True, help='to address')
parser.add_argument('--value', type=float, help='sol value')
args = parser.parse_args()

if args.net == 'develop':
sol.config.current = sol.config.develop
if args.net == 'mainnet':
sol.config.current = sol.config.mainnet
if args.net == 'testnet':
sol.config.current = sol.config.testnet

user = sol.wallet.Wallet(sol.core.PriKey(bytearray(int(args.prikey, 0).to_bytes(32))))
hole = sol.core.PubKey.base58_decode(args.to)
hash = user.transfer(hole, 0.05 * sol.denomination.sol)
sol.rpc.wait(sol.base58.encode(hash))
print(sol.base58.encode(hash))
1 change: 1 addition & 0 deletions sol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from . import ed25519
from . import eddsa
from . import rpc
from . import wallet
4 changes: 4 additions & 0 deletions sol/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ def __setattr__(self, name, value):

develop = ObjectDict({
'url': 'http://127.0.0.1:8899',
# Solana's base fee is a fixed 5000 lamports (0.000005 SOL) per signature, and most transactions have one signature.
'base_fee': 5000,
})

mainnet = ObjectDict({
'url': 'https://api.mainnet-beta.solana.com',
'base_fee': 5000,
})

testnet = ObjectDict({
'url': 'https://api.devnet.solana.com',
'base_fee': 5000,
})


Expand Down
290 changes: 290 additions & 0 deletions sol/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import json
import sol.base58
import sol.eddsa
Expand Down Expand Up @@ -58,3 +59,292 @@ def hex(self) -> str:
@staticmethod
def hex_decode(data: str) -> Self:
return PriKey(bytearray.fromhex(data))


class SystemProgram:

pubkey = PubKey(bytearray(32))

@staticmethod
def create():
pass

@staticmethod
def assign():
pass

@staticmethod
def transfer(value: int) -> bytearray:
r = bytearray()
r.extend(bytearray(int(2).to_bytes(4, 'little')))
r.extend(bytearray(int(value).to_bytes(8, 'little')))
return r

@staticmethod
def create_with_seed():
pass

@staticmethod
def advance_nonce_account():
pass

@staticmethod
def withdraw_nonce_account():
pass

@staticmethod
def initialize_nonce_account():
pass

@staticmethod
def authorize_nonce_account():
pass

@staticmethod
def allocate():
pass

@staticmethod
def allocate_with_seed():
pass

@staticmethod
def assign_with_seed():
pass

@staticmethod
def transfer_with_seed():
pass

@staticmethod
def upgrade_nonce_account():
pass


def compact_u16_encode(n: int) -> bytearray:
# Same as u16, but serialized with 1 to 3 bytes. If the value is above 0x7f, the top bit is set and the remaining
# value is stored in the next bytes. Each byte follows the same pattern until the 3rd byte. The 3rd byte, if
# needed, uses all 8 bits to store the last byte of the original value.
assert n >= 0
assert n <= 0xffff
if n <= 0x7f:
return bytearray([n])
if n <= 0x3fff:
a = n & 0x7f | 0x80
b = n >> 7
return bytearray([a, b])
if n <= 0xffff:
a = n & 0x7f | 0x80
n = n >> 7
b = n & 0x7f | 0x80
c = n >> 7
return bytearray([a, b, c])
raise Exception


def compact_u16_decode(data: bytearray) -> int:
return compact_u16_decode_reader(io.BytesIO(data))


def compact_u16_decode_reader(reader: typing.BinaryIO) -> int:
c = reader.read(1)[0]
if c <= 0x7f:
return c
n = c & 0x7f
c = reader.read(1)[0]
m = c & 0x7f
n += m << 7
if c <= 0x7f:
return n
c = reader.read(1)[0]
n += c << 14
return n


class Instruction:
# A compact encoding of an instruction.
def __init__(self, program_id_index: int, accounts: typing.List[int], data: bytearray) -> None:
self.program_id_index = program_id_index
self.accounts = accounts
self.data = data

def __repr__(self) -> str:
return json.dumps(self.json())

def json(self) -> typing.Dict:
return {
'programIdIndex': self.program_id_index,
'accounts': self.accounts,
'data': sol.base58.encode(self.data)
}

@staticmethod
def json_decode(data: typing.Dict) -> Self:
return Instruction(data['programIdIndex'], data['accounts'], sol.base58.decode(data['data']))

def serialize(self) -> bytearray:
r = bytearray()
r.append(self.program_id_index)
r.extend(compact_u16_encode(len(self.accounts)))
for e in self.accounts:
r.append(e)
r.extend(compact_u16_encode(len(self.data)))
r.extend(self.data)
return r

@staticmethod
def serialize_decode(data: bytearray) -> Self:
return Instruction.serialize_decode_reader(io.BytesIO(data))

@staticmethod
def serialize_decode_reader(reader: io.BytesIO) -> Self:
i = Instruction(0, [], bytearray())
i.program_id_index = int(reader.read(1)[0])
for _ in range(compact_u16_decode_reader(reader)):
i.accounts.append(int(reader.read(1)[0]))
i.data = bytearray(reader.read(compact_u16_decode_reader(reader)))
return i


class MessageHeader:
def __init__(
self,
num_required_signatures: int,
num_readonly_signed_accounts: int,
num_readonly_unsigned_accounts: int
) -> None:
self.num_required_signatures = num_required_signatures
self.num_readonly_signed_accounts = num_readonly_signed_accounts
self.num_readonly_unsigned_accounts = num_readonly_unsigned_accounts

def __repr__(self) -> str:
return json.dumps(self.json())

def json(self) -> typing.Dict:
return {
'numRequiredSignatures': self.num_required_signatures,
'numReadonlySignedAccounts': self.num_readonly_signed_accounts,
'numReadonlyUnsignedAccounts': self.num_readonly_unsigned_accounts,
}

@staticmethod
def json_decode(data: str) -> Self:
return MessageHeader(
data['numRequiredSignatures'],
data['numReadonlySignedAccounts'],
data['numReadonlyUnsignedAccounts'],
)

def serialize(self) -> bytearray:
return bytearray([
self.num_required_signatures,
self.num_readonly_signed_accounts,
self.num_readonly_unsigned_accounts,
])

@staticmethod
def serialize_decode(data: bytearray) -> Self:
assert len(data) == 3
return MessageHeader(data[0], data[1], data[2])

@staticmethod
def serialize_decode_reader(reader: io.BytesIO) -> Self:
return MessageHeader.serialize_decode(bytearray(reader.read(3)))


class Message:
def __init__(
self,
header: MessageHeader,
account_keys: typing.List[PubKey],
recent_blockhash: bytearray,
instructions: typing.List[Instruction]
) -> None:
self.header = header
self.account_keys = account_keys
self.recent_blockhash = recent_blockhash
self.instructions = instructions

def __repr__(self) -> str:
return json.dumps(self.json())

def json(self) -> typing.Dict:
return {
'header': self.header.json(),
'accountKeys': [e.base58() for e in self.account_keys],
'recentBlockhash': sol.base58.encode(self.recent_blockhash),
'instructions': [e.json() for e in self.instructions],
}

@staticmethod
def json_decode(data: str) -> Self:
return Message(
MessageHeader.json_decode(data['header']),
[PubKey.base58_decode(e) for e in data['accountKeys']],
sol.base58.decode(data['recentBlockhash']),
[Instruction.json_decode(e) for e in data['instructions']]
)

def serialize(self) -> bytearray:
r = bytearray()
r.extend(self.header.serialize())
r.extend(compact_u16_encode(len(self.account_keys)))
for e in self.account_keys:
r.extend(e.p)
r.extend(self.recent_blockhash)
r.extend(compact_u16_encode(len(self.instructions)))
for e in self.instructions:
r.extend(e.serialize())
return r

@staticmethod
def serialize_decode(data: bytearray) -> Self:
return Message.serialize_decode_reader(io.BytesIO(data))

@staticmethod
def serialize_decode_reader(reader: io.BytesIO) -> Self:
m = Message(MessageHeader.serialize_decode_reader(reader), [], bytearray(), [])
for _ in range(compact_u16_decode_reader(reader)):
m.account_keys.append(PubKey(bytearray(reader.read(32))))
m.recent_blockhash = bytearray(reader.read(32))
for _ in range(compact_u16_decode_reader(reader)):
m. instructions.append(Instruction.serialize_decode_reader(reader))
return m


class Transaction:
def __init__(self, signatures: typing.List[bytearray], message: Message) -> None:
self.signatures = signatures
self.message = message

def __repr__(self) -> str:
return json.dumps(self.json())

def json(self) -> typing.Dict:
return {
'signatures': [sol.base58.encode(e) for e in self.signatures],
'message': self.message.json()
}

@staticmethod
def json_decode(data: typing.Dict) -> Self:
return Transaction([sol.base58.decode(e) for e in data['signatures']], Message.json_decode(data['message']))

def serialize(self) -> bytearray:
r = bytearray()
r.extend(compact_u16_encode(len(self.signatures)))
for e in self.signatures:
r.extend(e)
r.extend(self.message.serialize())
return r

@staticmethod
def serialize_decode(data: bytearray) -> Self:
return Transaction.serialize_decode_reader(io.BytesIO(data))

@staticmethod
def serialize_decode_reader(reader: io.BytesIO) -> Self:
s = []
for _ in range(compact_u16_decode_reader(reader)):
s.append(bytearray(reader.read(64)))
return Transaction(s, Message.serialize_decode_reader(reader))
Loading