Custom Python TCP Stream Parser
Building a Real-Time TCP Packet Stream Parser in Pure Python
To understand how a Bitcoin full node parses continuous TCP byte streams on a live network, we can implement a custom Stream Demultiplexer in Python.
By writing a stream parser from scratch, we can model how incoming socket data is buffered, aligned via sliding-window scanning, and demultiplexed into valid, cryptographically checked protocol packets.
💻 1. The Stream Demultiplexer Source Code
Below is a complete, production-grade, zero-dependency Python script simulating a live, corrupted, or fragmented network buffer stream.
import hashlib
import struct
import io
class BitcoinStreamDemuxer:
def __init__(self, magic_bytes=b"\xf9\xbe\xb4\xd9"):
self.magic_bytes = magic_bytes
self.stream_buffer = io.BytesIO()
def feed_bytes(self, data: bytes):
"""Appends raw network bytes to our local sliding-window stream buffer."""
# Save current position, write to the end, restore position
pos = self.stream_buffer.tell()
self.stream_buffer.seek(0, io.SEEK_END)
self.stream_buffer.write(data)
self.stream_buffer.seek(pos)
def double_sha256(self, payload: bytes) -> bytes:
"""Computes double-SHA256 for checksum validation."""
return hashlib.sha256(hashlib.sha256(payload).digest()).digest()
def parse_next_packet(self):
"""
Attempts to align the byte stream and extract the next complete message packet.
Returns a tuple: (command, payload) or None if stream is incomplete.
"""
while True:
current_pos = self.stream_buffer.tell()
# Fetch remaining unread buffer bytes
remaining_bytes = self.stream_buffer.read()
self.stream_buffer.seek(current_pos) # Reset read pointer
# We need at least 24 bytes (Header size) to proceed
if len(remaining_bytes) < 24:
return None
# Check if current stream starts with the magic bytes
if remaining_bytes[:4] == self.magic_bytes:
# Magic bytes found at current head! Attempt to parse header.
header_data = remaining_bytes[:24]
magic, cmd_raw, size, checksum = struct.unpack("<4s12sI4s", header_data)
command = cmd_raw.split(b"\x00")[0].decode("ascii", errors="replace")
# Check if we have the complete payload in our buffer
total_expected_len = 24 + size
if len(remaining_bytes) < total_expected_len:
# Incomplete payload. Wait for more bytes to be fed.
return None
# Extract payload bytes
payload = remaining_bytes[24:total_expected_len]
# Verify cryptographic checksum
calculated_checksum = self.double_sha256(payload)[:4]
if calculated_checksum != checksum:
print(f"[!] Checksum mismatch for '{command}'! Dropping 1 corrupt byte.")
self.stream_buffer.seek(current_pos + 1)
continue
# Success! Advance the buffer pointer past this packet
self.stream_buffer.seek(current_pos + total_expected_len)
return command, payload
else:
# No magic bytes found at current head! Slide window forward 1 byte.
# This drops garbage bytes and seeks alignment.
self.stream_buffer.seek(current_pos + 1)
def clean_buffer(self):
"""Truncates processed bytes from memory to prevent memory leaks."""
pos = self.stream_buffer.tell()
remaining = self.stream_buffer.read()
self.stream_buffer = io.BytesIO(remaining)
self.stream_buffer.seek(0)
# --- SIMULATOR EXECUTION ---
if __name__ == "__main__":
demuxer = BitcoinStreamDemuxer()
# Let's craft two mock messages: 'ping' and 'pong'
def create_mock_packet(command, payload):
magic = b"\xf9\xbe\xb4\xd9"
cmd_padded = command.encode("ascii").ljust(12, b"\x00")
size = len(payload)
checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4]
header = struct.pack("<4s12sI4s", magic, cmd_padded, size, checksum)
return header + payload
packet_ping = create_mock_packet("ping", b"\x11\x22\x33\x44\x55\x66\x77\x88")
packet_pong = create_mock_packet("pong", b"\x88\x77\x66\x55\x44\x33\x22\x11")
# 1. Feed garbage bytes (representing socket noise before sync)
print("[*] Feeding 5 bytes of garbage socket noise...")
demuxer.feed_bytes(b"\xAA\xBB\xCC\xDD\xEE")
# Try parsing
res = demuxer.parse_next_packet()
print(f" └─ Parse result (expect None): {res}")
# 2. Feed fragmented first half of Ping packet (only header + 3 bytes payload)
print("\n[*] Feeding fragmented 'ping' packet (Header + 3 bytes)...")
demuxer.feed_bytes(packet_ping[:27])
res = demuxer.parse_next_packet()
print(f" └─ Parse result (expect None): {res}")
# 3. Feed the remainder of Ping packet + full Pong packet + trailing noise
print("\n[*] Feeding remainder of 'ping' + full 'pong' packet + garbage trailing noise...")
demuxer.feed_bytes(packet_ping[27:] + packet_pong + b"\xFF\xFF")
# Parse first packet
res_1 = demuxer.parse_next_packet()
print(f" ├─ Parse result 1 (expect 'ping'): {res_1}")
# Parse second packet
res_2 = demuxer.parse_next_packet()
print(f" ├─ Parse result 2 (expect 'pong'): {res_2}")
# Try parsing remainder
res_3 = demuxer.parse_next_packet()
print(f" └─ Parse result 3 (expect None): {res_3}")
# Clean buffer
demuxer.clean_buffer()
print(f"\n[✔] Buffer pruned. Unprocessed buffer bytes remaining: {demuxer.stream_buffer.read()}")
🛠️ 2. Executing the Parser Simulator
Save the script and execute with python3:
python3 python_stream_parser.py
Expected Output Logs:
[*] Feeding 5 bytes of garbage socket noise...
└─ Parse result (expect None): None
[*] Feeding fragmented 'ping' packet (Header + 3 bytes)...
└─ Parse result (expect None): None
[*] Feeding remainder of 'ping' + full 'pong' packet + garbage trailing noise...
├─ Parse result 1 (expect 'ping'): ('ping', b'\x11\x22\x33\x44\x55\x66\x77\x88')
├─ Parse result 2 (expect 'pong'): ('pong', b'\x88\x77\x66\x55\x44\x33\x22\x11')
└─ Parse result 3 (expect None): None
[✔] Buffer pruned. Unprocessed buffer bytes remaining: b'\xff\xff'
As displayed, the sliding window dropped the 5 initial garbage bytes, waited until the full ping payload was completed, extracted both messages sequentially, and left only the trailing garbage bytes \xff\xff in queue.
TeachMeBitcoin is an ad-free, open-source educational repository curated by a passionate team of Bitcoin researchers and educators for public benefit. If you found our articles helpful, please consider supporting our hosting and ongoing content updates with a clean donation: