Skip to content

Commit

Permalink
Merge pull request #6 from callebtc:feat/count_events
Browse files Browse the repository at this point in the history
Feat/count_events
  • Loading branch information
callebtc authored Feb 7, 2023
2 parents 8e39666 + 1331ca1 commit 123cfb5
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 28 deletions.
27 changes: 21 additions & 6 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from nostr.key import PublicKey
import asyncio
import threading
import time
import datetime


async def dm():
Expand All @@ -18,8 +20,7 @@ def callback(event: Event, decrypted_content):
)

client = NostrClient(privatekey_hex=pk)
await asyncio.sleep(1)

# await asyncio.sleep(1)
t = threading.Thread(
target=client.get_dm,
args=(
Expand Down Expand Up @@ -50,20 +51,34 @@ def callback(event: Event):
print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}")

sender_client = NostrClient(privatekey_hex=pk)
await asyncio.sleep(1)
# await asyncio.sleep(1)

to_pubk_hex = (
input("Enter other pubkey (enter nothing to read your own posts): ")
input(
"Enter other pubkey (enter nothing to read your own posts, enter * for all): "
)
or sender_client.public_key.hex()
)
print(f"Subscribing to posts by {to_pubk_hex}")
to_pubk = PublicKey(bytes.fromhex(to_pubk_hex))
if to_pubk_hex == "*":
to_pubk = None
else:
print(f"Subscribing to posts by {to_pubk_hex}")
to_pubk = PublicKey(bytes.fromhex(to_pubk_hex))

filters = {
"since": int(
time.mktime(
(datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple()
)
)
}

t = threading.Thread(
target=sender_client.get_post,
args=(
to_pubk,
callback,
filters,
),
)
t.start()
Expand Down
32 changes: 12 additions & 20 deletions nostr/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

class NostrClient:
relays = [
# "wss://lnbits.link/nostrrelay/client"
"wss://lnbits.link/nostrrelay/client",
"wss://nostr-pub.wellorder.net",
# "wss://nostr.zebedee.cloud",
# "wss://no.str.cr",
"wss://nostr.zebedee.cloud",
"wss://no.str.cr",
] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr"
relay_manager = RelayManager()
private_key: PrivateKey
Expand All @@ -44,6 +44,7 @@ def connect(self):
self.relay_manager.open_connections(
{"cert_reqs": ssl.CERT_NONE}
) # NOTE: This disables ssl certificate verification
self.relay_manager.start_message_workers()

def close(self):
self.relay_manager.close_connections()
Expand All @@ -52,10 +53,6 @@ def generate_keys(self, privatekey_hex: str = None):
pk = bytes.fromhex(privatekey_hex) if privatekey_hex else None
self.private_key = PrivateKey(pk)
self.public_key = self.private_key.public_key
# print(
# f"Nostr private key: {self.private_key.hex()} ({self.private_key.bech32()})"
# )
# print(f"Nostr public key: {self.public_key.hex()} ({self.public_key.bech32()})")

def post(self, message: str):
event = Event(message, self.public_key.hex(), kind=EventKind.TEXT_NOTE)
Expand All @@ -65,18 +62,21 @@ def post(self, message: str):
# print(event_json)
self.relay_manager.publish_message(event_json)

def get_post(self, sender_publickey: PublicKey, callback_func=None):
filters = Filters(
[Filter(authors=[sender_publickey.hex()], kinds=[EventKind.TEXT_NOTE])]
def get_post(
self, sender_publickey: PublicKey = None, callback_func=None, filter_kwargs={}
):
filter = Filter(
authors=[sender_publickey.hex()] if sender_publickey else None,
kinds=[EventKind.TEXT_NOTE],
**filter_kwargs,
)
filters = Filters([filter])
subscription_id = os.urandom(4).hex()
self.relay_manager.add_subscription(subscription_id, filters)

request = [ClientMessageType.REQUEST, subscription_id]
request.extend(filters.to_json_array())
message = json.dumps(request)
# print("Subscribing to events:")
# print(message)
self.relay_manager.publish_message(message)

while True:
Expand Down Expand Up @@ -108,8 +108,6 @@ def get_dm(self, sender_publickey: PublicKey, callback_func=None):
request = [ClientMessageType.REQUEST, subscription_id]
request.extend(filters.to_json_array())
message = json.dumps(request)
# print("Subscribing to events:")
# print(message)
self.relay_manager.publish_message(message)

while True:
Expand All @@ -120,21 +118,15 @@ def get_dm(self, sender_publickey: PublicKey, callback_func=None):
shared_secret = self.private_key.compute_shared_secret(
event_msg.event.public_key
)
# print("shared secret: ", shared_secret.hex())
# print("plain text:", message)
aes = cbc.AESCipher(key=shared_secret)
enc_text_b64, iv_b64 = event_msg.event.content.split("?iv=")
iv = base64.decodebytes(iv_b64.encode("utf-8"))
enc_text = base64.decodebytes(enc_text_b64.encode("utf-8"))
# print("decrypt iv: ", iv)
dec_text = aes.decrypt(iv, enc_text)
# print(f"From {event_msg.event.public_key[:5]}...: {dec_text}")
if callback_func:
callback_func(event_msg.event, dec_text)
except:
pass
# else:
# print(f"\nFrom {event_msg.event.public_key[:5]}...: {event_msg.event.content}")
break
time.sleep(0.1)

Expand Down
34 changes: 32 additions & 2 deletions nostr/relay.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import time
from queue import Queue
from threading import Lock
from websocket import WebSocketApp
from .event import Event
Expand Down Expand Up @@ -34,15 +35,21 @@ def __init__(
self.reconnect: bool = True
self.error_counter: int = 0
self.error_threshold: int = 0
self.num_received_events: int = 0
self.num_sent_events: int = 0
self.num_subscriptions: int = 0
self.ssl_options: dict = {}
self.proxy: dict = {}
self.lock = Lock()
self.queue = Queue()
self.ws = WebSocketApp(
url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_ping=self._on_ping,
on_pong=self._on_pong,
)

def connect(self, ssl_options: dict = None, proxy: dict = None):
Expand All @@ -53,6 +60,7 @@ def connect(self, ssl_options: dict = None, proxy: dict = None):
http_proxy_host=None if proxy is None else proxy.get("host"),
http_proxy_port=None if proxy is None else proxy.get("port"),
proxy_type=None if proxy is None else proxy.get("type"),
ping_interval=5,
)

def close(self):
Expand All @@ -68,9 +76,24 @@ def check_reconnect(self):
time.sleep(1)
self.connect(self.ssl_options, self.proxy)

def publish(self, message: str):
@property
def ping(self):
if self.connected:
self.ws.send(message)
return int((self.ws.last_pong_tm - self.ws.last_ping_tm) * 1000)
else:
return 0

def publish(self, message: str):
self.queue.put(message)

def queue_worker(self):
while True:
if self.connected:
message = self.queue.get()
self.num_sent_events += 1
self.ws.send(message)
else:
time.sleep(0.1)

def add_subscription(self, id, filters: Filters):
with self.lock:
Expand Down Expand Up @@ -105,6 +128,7 @@ def _on_close(self, class_obj, status_code, message):

def _on_message(self, class_obj, message: str):
if self._is_valid_message(message):
self.num_received_events += 1
self.message_pool.add_message(message, self.url)

def _on_error(self, class_obj, error):
Expand All @@ -115,6 +139,12 @@ def _on_error(self, class_obj, error):
else:
self.check_reconnect()

def _on_ping(self, class_obj, message):
return

def _on_pong(self, class_obj, message):
return

def _is_valid_message(self, message: str) -> bool:
message = message.strip("\n")
if not message or message[0] != "[" or message[-1] != "]":
Expand Down
7 changes: 7 additions & 0 deletions nostr/relay_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ def open_connections(self, ssl_options: dict = None, proxy: dict = None):
name=f"{relay.url}-thread",
).start()

def start_message_workers(self):
for relay in self.relays.values():
threading.Thread(
target=relay.queue_worker,
name=f"{relay.url}-queue",
).start()

def close_connections(self):
for relay in self.relays.values():
relay.close()
Expand Down

0 comments on commit 123cfb5

Please sign in to comment.