Python Libraries
Python is excellent for scripting, bots, and backend services with Nostr.
pynostr
The most feature-complete Python Nostr library.
Install:
pip install pynostr
Repository: github.com/holgern/pynostr
Key Generation
from nostr.key import PrivateKey
# Generate new keypair
private_key = PrivateKey()
public_key = private_key.public_key
print(f"Private (hex): {private_key.hex()}")
print(f"Public (hex): {public_key.hex()}")
print(f"Private (bech32): {private_key.bech32()}")
print(f"Public (bech32): {public_key.bech32()}")
From Existing Key
from nostr.key import PrivateKey
# From hex
private_key = PrivateKey(bytes.fromhex("your-hex-key-here"))
# From bech32 (nsec)
private_key = PrivateKey.from_nsec("nsec1...")
Creating Events
from nostr.event import Event
from nostr.key import PrivateKey
import time
private_key = PrivateKey()
# Create event
event = Event(
public_key=private_key.public_key.hex(),
content="Hello from Python!",
kind=1,
tags=[
["t", "nostr"],
["t", "python"]
],
created_at=int(time.time())
)
# Sign the event
event.sign(private_key.hex())
print(f"Event ID: {event.id}")
print(f"Signature: {event.signature}")
Connecting to Relays
import asyncio
from nostr.relay_manager import RelayManager
from nostr.message_type import ClientMessageType
async def main():
relay_manager = RelayManager()
relay_manager.add_relay("wss://relay.damus.io")
relay_manager.add_relay("wss://nos.lol")
await relay_manager.open_connections()
# Publish event
message = event.to_message()
relay_manager.publish_message(message)
await asyncio.sleep(2) # Wait for responses
await relay_manager.close_connections()
asyncio.run(main())
Subscribing to Events
import asyncio
from nostr.relay_manager import RelayManager
from nostr.filter import Filter, Filters
from nostr.message_type import ClientMessageType
async def main():
relay_manager = RelayManager()
relay_manager.add_relay("wss://relay.damus.io")
await relay_manager.open_connections()
# Create filter
filters = Filters([
Filter(
kinds=[1],
limit=10
)
])
subscription_id = "my-sub"
relay_manager.add_subscription(subscription_id, filters)
# Handle incoming messages
while relay_manager.message_pool.has_events():
event_msg = relay_manager.message_pool.get_event()
print(f"Event: {event_msg.event.content}")
await relay_manager.close_connections()
asyncio.run(main())
python-nostr
Simpler library for basic operations.
Install:
pip install python-nostr
from nostr.key import PrivateKey
from nostr.relay_manager import RelayManager
from nostr.event import Event
# Quick start
private_key = PrivateKey()
event = Event(private_key.public_key.hex(), "Hello!")
private_key.sign_event(event)
nostr-sdk (Python Bindings)
Rust-powered bindings for Python.
Install:
pip install nostr-sdk
from nostr_sdk import Keys, Client, EventBuilder
async def main():
# Generate keys
keys = Keys.generate()
print(f"Public key: {keys.public_key().to_bech32()}")
# Create client
client = Client(keys)
# Add relays
await client.add_relay("wss://relay.damus.io")
await client.connect()
# Publish note
builder = EventBuilder.text_note("Hello from nostr-sdk!")
output = await client.send_event_builder(builder)
print(f"Event ID: {output.id.to_bech32()}")
import asyncio
asyncio.run(main())
Building a Bot
Example: Auto-responder bot
import asyncio
from nostr.key import PrivateKey
from nostr.event import Event
from nostr.relay_manager import RelayManager
from nostr.filter import Filter, Filters
import time
class NostrBot:
def __init__(self, nsec: str, relays: list):
self.private_key = PrivateKey.from_nsec(nsec)
self.public_key = self.private_key.public_key
self.relay_manager = RelayManager()
for relay in relays:
self.relay_manager.add_relay(relay)
async def start(self):
await self.relay_manager.open_connections()
# Subscribe to mentions
filters = Filters([
Filter(
kinds=[1],
pubkey_refs=[self.public_key.hex()]
)
])
self.relay_manager.add_subscription("mentions", filters)
print(f"Bot started: {self.public_key.bech32()}")
await self.listen()
async def listen(self):
while True:
if self.relay_manager.message_pool.has_events():
msg = self.relay_manager.message_pool.get_event()
await self.handle_mention(msg.event)
await asyncio.sleep(0.1)
async def handle_mention(self, event):
print(f"Mentioned by: {event.public_key}")
# Create reply
reply = Event(
public_key=self.public_key.hex(),
content="Thanks for the mention!",
kind=1,
tags=[
["e", event.id, "", "reply"],
["p", event.public_key]
],
created_at=int(time.time())
)
reply.sign(self.private_key.hex())
self.relay_manager.publish_message(reply.to_message())
# Usage
bot = NostrBot(
nsec="nsec1...",
relays=["wss://relay.damus.io", "wss://nos.lol"]
)
asyncio.run(bot.start())
NIP-05 Verification
import httpx
async def verify_nip05(identifier: str) -> dict:
"""Verify a NIP-05 identifier like alice@example.com"""
name, domain = identifier.split("@")
url = f"https://{domain}/.well-known/nostr.json?name={name}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
data = response.json()
if name in data.get("names", {}):
return {
"valid": True,
"pubkey": data["names"][name],
"relays": data.get("relays", {}).get(data["names"][name], [])
}
return {"valid": False}
# Usage
result = await verify_nip05("alice@example.com")
print(result)
Encryption (NIP-04)
from nostr.key import PrivateKey
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
def encrypt_dm(sender_privkey: PrivateKey, recipient_pubkey: str, message: str) -> str:
"""Encrypt a direct message using NIP-04"""
shared_secret = sender_privkey.compute_shared_secret(recipient_pubkey)
iv = os.urandom(16)
cipher = AESGCM(shared_secret)
ciphertext = cipher.encrypt(iv, message.encode(), None)
return f"{base64.b64encode(ciphertext).decode()}?iv={base64.b64encode(iv).decode()}"
def decrypt_dm(recipient_privkey: PrivateKey, sender_pubkey: str, encrypted: str) -> str:
"""Decrypt a direct message"""
ciphertext_b64, iv_b64 = encrypted.split("?iv=")
ciphertext = base64.b64decode(ciphertext_b64)
iv = base64.b64decode(iv_b64)
shared_secret = recipient_privkey.compute_shared_secret(sender_pubkey)
cipher = AESGCM(shared_secret)
return cipher.decrypt(iv, ciphertext, None).decode()
Best Practices
-
Use async for relay connections
-
Handle disconnections:
async def reconnect_loop():
while True:
try:
await relay_manager.open_connections()
await listen()
except Exception as e:
print(f"Disconnected: {e}")
await asyncio.sleep(5) -
Rate limit your publishing
-
Verify events before processing