TeachMeBitcoin

Building a Real-Time TCP Packet Stream Parser in Pure Python

From TeachMeBitcoin, the free encyclopedia Reading time: 4 min

Building a Real-Time TCP Packet Stream Parser in Pure Python

To understand how a Bitcoin full node parses continuous TCP byte streams on a live network, we can implement a custom Stream Demultiplexer in Python.

By writing a stream parser from scratch, we can model how incoming socket data is buffered, aligned via sliding-window scanning, and demultiplexed into valid, cryptographically checked protocol packets.


1. The Stream Demultiplexer Source Code

Below is a complete, production-grade, zero-dependency Python script simulating a live, corrupted, or fragmented network buffer stream.

import hashlib
import struct
import io

class BitcoinStreamDemuxer:
 def __init__(self, magic_bytes=b"\xf9\xbe\xb4\xd9"):
 self.magic_bytes = magic_bytes
 self.stream_buffer = io.BytesIO()

 def feed_bytes(self, data: bytes):
 """Appends raw network bytes to our local sliding-window stream buffer."""
 # Save current position, write to the end, restore position
 pos = self.stream_buffer.tell()
 self.stream_buffer.seek(0, io.SEEK_END)
 self.stream_buffer.write(data)
 self.stream_buffer.seek(pos)

 def double_sha256(self, payload: bytes) -> bytes:
 """Computes double-SHA256 for checksum validation."""
 return hashlib.sha256(hashlib.sha256(payload).digest()).digest()

 def parse_next_packet(self):
 """
 Attempts to align the byte stream and extract the next complete message packet.
 Returns a tuple: (command, payload) or None if stream is incomplete.
 """
 while True:
 current_pos = self.stream_buffer.tell()

 # Fetch remaining unread buffer bytes
 remaining_bytes = self.stream_buffer.read()
 self.stream_buffer.seek(current_pos) # Reset read pointer

 # We need at least 24 bytes (Header size) to proceed
 if len(remaining_bytes) < 24:
 return None

 # Check if current stream starts with the magic bytes
 if remaining_bytes[:4] == self.magic_bytes:
 # Magic bytes found at current head! Attempt to parse header.
 header_data = remaining_bytes[:24]
 magic, cmd_raw, size, checksum = struct.unpack("<4s12sI4s", header_data)

 command = cmd_raw.split(b"\x00")[0].decode("ascii", errors="replace")

 # Check if we have the complete payload in our buffer
 total_expected_len = 24 + size
 if len(remaining_bytes) < total_expected_len:
 # Incomplete payload. Wait for more bytes to be fed.
 return None

 # Extract payload bytes
 payload = remaining_bytes[24:total_expected_len]

 # Verify cryptographic checksum
 calculated_checksum = self.double_sha256(payload)[:4]
 if calculated_checksum != checksum:
 print(f"[!] Checksum mismatch for '{command}'! Dropping 1 corrupt byte.")
 self.stream_buffer.seek(current_pos + 1)
 continue

 # Success! Advance the buffer pointer past this packet
 self.stream_buffer.seek(current_pos + total_expected_len)
 return command, payload
 else:
 # No magic bytes found at current head! Slide window forward 1 byte.
 # This drops garbage bytes and seeks alignment.
 self.stream_buffer.seek(current_pos + 1)

 def clean_buffer(self):
 """Truncates processed bytes from memory to prevent memory leaks."""
 pos = self.stream_buffer.tell()
 remaining = self.stream_buffer.read()
 self.stream_buffer = io.BytesIO(remaining)
 self.stream_buffer.seek(0)

# --- SIMULATOR EXECUTION ---
if __name__ == "__main__":
 demuxer = BitcoinStreamDemuxer()

 # Let's craft two mock messages: 'ping' and 'pong'
 def create_mock_packet(command, payload):
 magic = b"\xf9\xbe\xb4\xd9"
 cmd_padded = command.encode("ascii").ljust(12, b"\x00")
 size = len(payload)
 checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4]
 header = struct.pack("<4s12sI4s", magic, cmd_padded, size, checksum)
 return header + payload

 packet_ping = create_mock_packet("ping", b"\x11\x22\x33\x44\x55\x66\x77\x88")
 packet_pong = create_mock_packet("pong", b"\x88\x77\x66\x55\x44\x33\x22\x11")

 # 1. Feed garbage bytes (representing socket noise before sync)
 print("[*] Feeding 5 bytes of garbage socket noise...")
 demuxer.feed_bytes(b"\xAA\xBB\xCC\xDD\xEE")

 # Try parsing
 res = demuxer.parse_next_packet()
 print(f" └─ Parse result (expect None): {res}")

 # 2. Feed fragmented first half of Ping packet (only header + 3 bytes payload)
 print("\n[*] Feeding fragmented 'ping' packet (Header + 3 bytes)...")
 demuxer.feed_bytes(packet_ping[:27])
 res = demuxer.parse_next_packet()
 print(f" └─ Parse result (expect None): {res}")

 # 3. Feed the remainder of Ping packet + full Pong packet + trailing noise
 print("\n[*] Feeding remainder of 'ping' + full 'pong' packet + garbage trailing noise...")
 demuxer.feed_bytes(packet_ping[27:] + packet_pong + b"\xFF\xFF")

 # Parse first packet
 res_1 = demuxer.parse_next_packet()
 print(f" ├─ Parse result 1 (expect 'ping'): {res_1}")

 # Parse second packet
 res_2 = demuxer.parse_next_packet()
 print(f" ├─ Parse result 2 (expect 'pong'): {res_2}")

 # Try parsing remainder
 res_3 = demuxer.parse_next_packet()
 print(f" └─ Parse result 3 (expect None): {res_3}")

 # Clean buffer
 demuxer.clean_buffer()
 print(f"\n[✔] Buffer pruned. Unprocessed buffer bytes remaining: {demuxer.stream_buffer.read()}")

️ 2. Executing the Parser Simulator

Save the script and execute with python3:

python3 python_stream_parser.py

Expected Output Logs:

[*] Feeding 5 bytes of garbage socket noise...
 └─ Parse result (expect None): None

[*] Feeding fragmented 'ping' packet (Header + 3 bytes)...
 └─ Parse result (expect None): None

[*] Feeding remainder of 'ping' + full 'pong' packet + garbage trailing noise...
 ├─ Parse result 1 (expect 'ping'): ('ping', b'\x11\x22\x33\x44\x55\x66\x77\x88')
 ├─ Parse result 2 (expect 'pong'): ('pong', b'\x88\x77\x66\x55\x44\x33\x22\x11')
 └─ Parse result 3 (expect None): None

[✔] Buffer pruned. Unprocessed buffer bytes remaining: b'\xff\xff'

As displayed, the sliding window dropped the 5 initial garbage bytes, waited until the full ping payload was completed, extracted both messages sequentially, and left only the trailing garbage bytes \xff\xff in queue.

☕ Help support TeachMeBitcoin

TeachMeBitcoin is an ad-free, open-source educational repository curated by a passionate team of Bitcoin researchers and educators for public benefit. If you found our articles helpful, please consider supporting our hosting and ongoing content updates with a clean donation:

Ethereum: 0x578417C51783663D8A6A811B3544E1f779D39A85
Bitcoin: bc1q77k9e95rn669kpzyjr8ke9w95zhk7pa5s63qzz
Solana: 4ycT2ayqeMucixj3wS8Ay8Tq9NRDYRPKYbj3UGESyQ4J
Address copied to clipboard!