TeachMeBitcoin

Custom Python TCP Stream Parser

From TeachMeBitcoin, the free encyclopedia ⏱️ 4 min read

Building a Real-Time TCP Packet Stream Parser in Pure Python

To understand how a Bitcoin full node parses continuous TCP byte streams on a live network, we can implement a custom Stream Demultiplexer in Python.

By writing a stream parser from scratch, we can model how incoming socket data is buffered, aligned via sliding-window scanning, and demultiplexed into valid, cryptographically checked protocol packets.


💻 1. The Stream Demultiplexer Source Code

Below is a complete, production-grade, zero-dependency Python script simulating a live, corrupted, or fragmented network buffer stream.

import hashlib
import struct
import io

class BitcoinStreamDemuxer:
    def __init__(self, magic_bytes=b"\xf9\xbe\xb4\xd9"):
        self.magic_bytes = magic_bytes
        self.stream_buffer = io.BytesIO()

    def feed_bytes(self, data: bytes):
        """Appends raw network bytes to our local sliding-window stream buffer."""
        # Save current position, write to the end, restore position
        pos = self.stream_buffer.tell()
        self.stream_buffer.seek(0, io.SEEK_END)
        self.stream_buffer.write(data)
        self.stream_buffer.seek(pos)

    def double_sha256(self, payload: bytes) -> bytes:
        """Computes double-SHA256 for checksum validation."""
        return hashlib.sha256(hashlib.sha256(payload).digest()).digest()

    def parse_next_packet(self):
        """
        Attempts to align the byte stream and extract the next complete message packet.
        Returns a tuple: (command, payload) or None if stream is incomplete.
        """
        while True:
            current_pos = self.stream_buffer.tell()

            # Fetch remaining unread buffer bytes
            remaining_bytes = self.stream_buffer.read()
            self.stream_buffer.seek(current_pos) # Reset read pointer

            # We need at least 24 bytes (Header size) to proceed
            if len(remaining_bytes) < 24:
                return None

            # Check if current stream starts with the magic bytes
            if remaining_bytes[:4] == self.magic_bytes:
                # Magic bytes found at current head! Attempt to parse header.
                header_data = remaining_bytes[:24]
                magic, cmd_raw, size, checksum = struct.unpack("<4s12sI4s", header_data)

                command = cmd_raw.split(b"\x00")[0].decode("ascii", errors="replace")

                # Check if we have the complete payload in our buffer
                total_expected_len = 24 + size
                if len(remaining_bytes) < total_expected_len:
                    # Incomplete payload. Wait for more bytes to be fed.
                    return None

                # Extract payload bytes
                payload = remaining_bytes[24:total_expected_len]

                # Verify cryptographic checksum
                calculated_checksum = self.double_sha256(payload)[:4]
                if calculated_checksum != checksum:
                    print(f"[!] Checksum mismatch for '{command}'! Dropping 1 corrupt byte.")
                    self.stream_buffer.seek(current_pos + 1)
                    continue

                # Success! Advance the buffer pointer past this packet
                self.stream_buffer.seek(current_pos + total_expected_len)
                return command, payload
            else:
                # No magic bytes found at current head! Slide window forward 1 byte.
                # This drops garbage bytes and seeks alignment.
                self.stream_buffer.seek(current_pos + 1)

    def clean_buffer(self):
        """Truncates processed bytes from memory to prevent memory leaks."""
        pos = self.stream_buffer.tell()
        remaining = self.stream_buffer.read()
        self.stream_buffer = io.BytesIO(remaining)
        self.stream_buffer.seek(0)

# --- SIMULATOR EXECUTION ---
if __name__ == "__main__":
    demuxer = BitcoinStreamDemuxer()

    # Let's craft two mock messages: 'ping' and 'pong'
    def create_mock_packet(command, payload):
        magic = b"\xf9\xbe\xb4\xd9"
        cmd_padded = command.encode("ascii").ljust(12, b"\x00")
        size = len(payload)
        checksum = hashlib.sha256(hashlib.sha256(payload).digest()).digest()[:4]
        header = struct.pack("<4s12sI4s", magic, cmd_padded, size, checksum)
        return header + payload

    packet_ping = create_mock_packet("ping", b"\x11\x22\x33\x44\x55\x66\x77\x88")
    packet_pong = create_mock_packet("pong", b"\x88\x77\x66\x55\x44\x33\x22\x11")

    # 1. Feed garbage bytes (representing socket noise before sync)
    print("[*] Feeding 5 bytes of garbage socket noise...")
    demuxer.feed_bytes(b"\xAA\xBB\xCC\xDD\xEE")

    # Try parsing
    res = demuxer.parse_next_packet()
    print(f"    └─ Parse result (expect None): {res}")

    # 2. Feed fragmented first half of Ping packet (only header + 3 bytes payload)
    print("\n[*] Feeding fragmented 'ping' packet (Header + 3 bytes)...")
    demuxer.feed_bytes(packet_ping[:27])
    res = demuxer.parse_next_packet()
    print(f"    └─ Parse result (expect None): {res}")

    # 3. Feed the remainder of Ping packet + full Pong packet + trailing noise
    print("\n[*] Feeding remainder of 'ping' + full 'pong' packet + garbage trailing noise...")
    demuxer.feed_bytes(packet_ping[27:] + packet_pong + b"\xFF\xFF")

    # Parse first packet
    res_1 = demuxer.parse_next_packet()
    print(f"    ├─ Parse result 1 (expect 'ping'): {res_1}")

    # Parse second packet
    res_2 = demuxer.parse_next_packet()
    print(f"    ├─ Parse result 2 (expect 'pong'): {res_2}")

    # Try parsing remainder
    res_3 = demuxer.parse_next_packet()
    print(f"    └─ Parse result 3 (expect None): {res_3}")

    # Clean buffer
    demuxer.clean_buffer()
    print(f"\n[✔] Buffer pruned. Unprocessed buffer bytes remaining: {demuxer.stream_buffer.read()}")

🛠️ 2. Executing the Parser Simulator

Save the script and execute with python3:

python3 python_stream_parser.py

Expected Output Logs:

[*] Feeding 5 bytes of garbage socket noise...
    └─ Parse result (expect None): None

[*] Feeding fragmented 'ping' packet (Header + 3 bytes)...
    └─ Parse result (expect None): None

[*] Feeding remainder of 'ping' + full 'pong' packet + garbage trailing noise...
    ├─ Parse result 1 (expect 'ping'): ('ping', b'\x11\x22\x33\x44\x55\x66\x77\x88')
    ├─ Parse result 2 (expect 'pong'): ('pong', b'\x88\x77\x66\x55\x44\x33\x22\x11')
    └─ Parse result 3 (expect None): None

[✔] Buffer pruned. Unprocessed buffer bytes remaining: b'\xff\xff'

As displayed, the sliding window dropped the 5 initial garbage bytes, waited until the full ping payload was completed, extracted both messages sequentially, and left only the trailing garbage bytes \xff\xff in queue.

☕ Help support TeachMeBitcoin

TeachMeBitcoin is an ad-free, open-source educational repository curated by a passionate team of Bitcoin researchers and educators for public benefit. If you found our articles helpful, please consider supporting our hosting and ongoing content updates with a clean donation:

Ethereum: 0x578417C51783663D8A6A811B3544E1f779D39A85
Bitcoin: bc1q77k9e95rn669kpzyjr8ke9w95zhk7pa5s63qzz
Solana: 4ycT2ayqeMucixj3wS8Ay8Tq9NRDYRPKYbj3UGESyQ4J
Address copied to clipboard!