From 540bad0ecb19b559cc8e4a89c8eecce1fca70c41 Mon Sep 17 00:00:00 2001 From: engelgardt Date: Sat, 16 May 2026 12:28:07 +0300 Subject: [PATCH] refactor: split into modules under src/dhcpsrv/ The previous monolithic dhcpsrv_app.py (~500 lines) is now 7 focused modules: - src/dhcpsrv/__init__.py : single source of truth for __version__ - src/dhcpsrv/__main__.py : entry for python -m dhcpsrv - src/dhcpsrv/app.py : main orchestration, wires the rest - src/dhcpsrv/platform_win.py: VT enable + UAC self-elevate - src/dhcpsrv/update_check.py: GitHub /releases/latest poll - src/dhcpsrv/network.py : NIC enumeration, netsh, ping - src/dhcpsrv/dhcp.py : DhcpConfig, DhcpServer, packet parse/build, server loop - src/dhcpsrv/ui.py : rich-based full-screen TUI Also added: - dhcpsrv-launcher.py at repo root: absolute-import entry for PyInstaller - pyproject.toml: deps + dynamic version - CONTRIBUTING.md: layout, build, and release flow CI workflow now builds from dhcpsrv-launcher.py. No user-visible behaviour change. --- .github/workflows/release.yml | 2 +- CONTRIBUTING.md | 79 ++++++ dhcpsrv-launcher.py | 13 + dhcpsrv_app.py | 493 ---------------------------------- pyproject.toml | 26 ++ src/dhcpsrv/__init__.py | 10 + src/dhcpsrv/__main__.py | 11 + src/dhcpsrv/app.py | 90 +++++++ src/dhcpsrv/dhcp.py | 235 ++++++++++++++++ src/dhcpsrv/network.py | 103 +++++++ src/dhcpsrv/platform_win.py | 50 ++++ src/dhcpsrv/ui.py | 143 ++++++++++ src/dhcpsrv/update_check.py | 53 ++++ 13 files changed, 814 insertions(+), 494 deletions(-) create mode 100644 CONTRIBUTING.md create mode 100644 dhcpsrv-launcher.py delete mode 100644 dhcpsrv_app.py create mode 100644 pyproject.toml create mode 100644 src/dhcpsrv/__init__.py create mode 100644 src/dhcpsrv/__main__.py create mode 100644 src/dhcpsrv/app.py create mode 100644 src/dhcpsrv/dhcp.py create mode 100644 src/dhcpsrv/network.py create mode 100644 src/dhcpsrv/platform_win.py create mode 100644 src/dhcpsrv/ui.py create mode 100644 src/dhcpsrv/update_check.py diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7904ef8..8e2f3a8 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -28,7 +28,7 @@ jobs: run: echo "version=${GITHUB_REF_NAME#v}" >> "$GITHUB_OUTPUT" - name: Build executable - run: python -m PyInstaller --onefile --uac-admin --console --name dhcpsrv dhcpsrv_app.py + run: python -m PyInstaller --onefile --uac-admin --console --name dhcpsrv --paths src dhcpsrv-launcher.py - name: Package portable folder shell: pwsh diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..88eb01b --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,79 @@ +# Contributing + +> Project layout, build, and release flow. **If you only want to use the tool — read [README](README.md) instead.** + +## Repo layout + +``` +dhcpsrv/ +├── .github/ +│ ├── workflows/release.yml ← CI: tag-driven build + GitHub Release +│ └── ISSUE_TEMPLATE/ ← bug / feature / security routing +├── src/dhcpsrv/ ← package source (≤200 lines per module) +│ ├── __init__.py ← single source of truth for __version__ +│ ├── __main__.py ← entry: python -m dhcpsrv +│ ├── app.py ← main flow, wires everything +│ ├── platform_win.py ← VT enable + UAC self-elevate +│ ├── update_check.py ← GitHub /releases/latest poll +│ ├── network.py ← list_adapters, netsh, ping (no shared state) +│ ├── dhcp.py ← DhcpConfig, DhcpServer, packet parse/build +│ └── ui.py ← rich-based full-screen TUI +├── pyproject.toml ← deps, packaging, dynamic version +├── CHANGELOG.md ← Keep a Changelog format, newest first +├── CONTRIBUTING.md ← this file +├── LICENSE / README.md / SECURITY.md +└── .gitignore +``` + +## Run from source (no exe) + +``` +python -m pip install rich +python -m dhcpsrv +``` + +`python -m dhcpsrv` finds `src/dhcpsrv/__main__.py` because the package lives under `src/`. You'll need administrator privileges for UDP/67 and `netsh` — the tool self-elevates via UAC. + +## Editable install (development) + +``` +python -m pip install -e . +dhcpsrv +``` + +`-e .` makes the entry-point `dhcpsrv` available on PATH; edits in `src/dhcpsrv/` take effect immediately. + +## Build the portable .exe + +``` +python -m pip install pyinstaller rich +python -m PyInstaller --onefile --uac-admin --console --name dhcpsrv --paths src dhcpsrv-launcher.py +``` + +`dhcpsrv-launcher.py` (at repo root) is the PyInstaller entry — it does an +*absolute* import (`from dhcpsrv.app import main`) which is needed when +PyInstaller runs the bundled script as a standalone module. The `--paths src` +flag tells PyInstaller where to find the `dhcpsrv` package itself. Output: +`dist/dhcpsrv.exe`. + +## Cut a release + +1. Update `src/dhcpsrv/__init__.py` — bump `__version__` to `X.Y.Z`. +2. Update `CHANGELOG.md` — move items from `[Unreleased]` into a new `[X.Y.Z]` section with today's date. +3. Commit: `git commit -am "vX.Y.Z: …"`. +4. Tag: `git tag vX.Y.Z`. +5. Push: `git push && git push --tags`. + +That's it. GitHub Actions picks up the tag, builds the exe, writes the SHA-256, and creates the GitHub Release with the zip attached. + +## Where features go + +| Adding... | Touch this module | +|---|---| +| A new DHCP option in the reply | `dhcp.py` → `DhcpServer.build_reply` | +| A new adapter filter | `network.py` → `SKIP_DESCRIPTION` / `SKIP_MEDIA` / `list_adapters` | +| A new column in the clients table | `ui.py` → `Ui._render_table` | +| Something shown in the header | `ui.py` → `Ui._render_header` | +| A startup check or banner line | `app.py` → `main()` | +| A change to UAC / VT logic | `platform_win.py` | +| Tweaking the GitHub update-check UX | `update_check.py` | diff --git a/dhcpsrv-launcher.py b/dhcpsrv-launcher.py new file mode 100644 index 0000000..060d9a9 --- /dev/null +++ b/dhcpsrv-launcher.py @@ -0,0 +1,13 @@ +""" +PyInstaller entry point — sits at the repo root and uses an *absolute* import +so the bundled exe doesn't need relative-import resolution at runtime. + +For dev work without an install use `python -m dhcpsrv` instead (that path +goes through `src/dhcpsrv/__main__.py` and relative imports work). +""" + +from dhcpsrv.app import main + + +if __name__ == "__main__": + main() diff --git a/dhcpsrv_app.py b/dhcpsrv_app.py deleted file mode 100644 index 6354e13..0000000 --- a/dhcpsrv_app.py +++ /dev/null @@ -1,493 +0,0 @@ -""" -dhcpsrv v1.1.0 - portable single-exe edition. -made by engelgardt - -This file combines what previously lived in dhcpsrv.ps1 + dhcpsrv.py: - - admin check - - NIC selection (filters out wireless / VPN / virtual) - - static IP setting (netsh) - - DHCP server with rich live UI - - revert NIC prompt on exit - -Build: - pyinstaller --onefile --uac-admin --name dhcpsrv --console dhcpsrv_app.py -""" - -import os, sys, ctypes, json, subprocess, signal, socket, struct, threading, time -import urllib.request, webbrowser -from collections import deque -from datetime import datetime -from concurrent.futures import ThreadPoolExecutor - - -# Enable VT (ANSI escape) processing on Windows console BEFORE any output. -def _enable_vt(): - if os.name != "nt": - return - try: - k = ctypes.windll.kernel32 - STD_OUT, STD_ERR = -11, -12 - ENABLE_VT = 0x0004 - for std in (STD_OUT, STD_ERR): - h = k.GetStdHandle(std) - mode = ctypes.c_ulong() - if k.GetConsoleMode(h, ctypes.byref(mode)): - k.SetConsoleMode(h, mode.value | ENABLE_VT) - except Exception: - pass -_enable_vt() - -__version__ = "1.1.0" -GITHUB_REPO = "Engelgardt23/dhcpsrv" - -# Fixed heights used by the Layout — used to compute the clients table fit -HEADER_LINES = 5 -EVENTS_LINES = 14 -TBL_OVERHEAD = 6 # panel borders + table header row + table top/bottom rules - -# Hardcoded defaults — pick NIC, everything else is auto. -DEFAULT_SERVER_IP = "10.10.10.1" -DEFAULT_NETMASK = "255.255.255.0" -POOL_SIZE = 50 # addresses starting at server_ip + 1 -DEFAULT_LEASE = 7200 # 2 hours -# TFTP option always = server IP - -# Stats counters -stats = {"packets": 0, "discovers": 0, "requests": 0, "releases": 0} - -# rich -try: - from rich.console import Console, Group - from rich.live import Live - from rich.table import Table - from rich.text import Text - from rich.panel import Panel - from rich.layout import Layout - from rich.prompt import Prompt, Confirm -except ImportError: - print("'rich' missing in the bundled build") - input("Press Enter to exit") - sys.exit(10) - -console = Console(log_path=False) - - -# ---------- admin ---------- -def is_admin(): - try: return ctypes.windll.shell32.IsUserAnAdmin() != 0 - except: return False - -def require_admin(): - if not is_admin(): - # Re-launch elevated; original exits - ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(f'"{a}"' for a in sys.argv), None, 1) - sys.exit(0) - - -# ---------- update check ---------- -def _parse_version(s): - try: - s = (s or "").strip().lstrip("v") - return tuple(int(x) for x in s.split(".")[:3]) - except Exception: - return (0, 0, 0) - -def check_for_update(): - """Query GitHub for the latest release. If newer than __version__, prompt to open the page.""" - try: - url = f"https://api.github.com/repos/{GITHUB_REPO}/releases/latest" - req = urllib.request.Request(url, headers={ - "Accept": "application/vnd.github+json", - "User-Agent": f"dhcpsrv/{__version__}", - }) - with urllib.request.urlopen(req, timeout=3) as r: - data = json.loads(r.read().decode("utf-8", errors="replace")) - latest = (data.get("tag_name") or "").strip() - page = data.get("html_url") or f"https://github.com/{GITHUB_REPO}/releases/latest" - if _parse_version(latest) > _parse_version(__version__): - console.rule("[bold yellow]Update available") - console.print(f"Current: [dim]v{__version__}[/] Latest: [bold green]{latest}[/]") - try: - if Confirm.ask("Open the download page in your browser?", default=True): - webbrowser.open(page) - except (EOFError, KeyboardInterrupt): - pass - console.print() - except Exception: - # Offline / GitHub rate-limit / API error — skip silently. - pass - - -# ---------- helpers ---------- -CREATE_NO_WINDOW = 0x08000000 -def run_ps(cmd, timeout=15): - return subprocess.run(["powershell.exe","-NoProfile","-NonInteractive","-Command",cmd], - capture_output=True, text=True, timeout=timeout, - creationflags=CREATE_NO_WINDOW) - -def run_netsh(args, timeout=15): - return subprocess.run(["netsh"] + args, capture_output=True, text=True, - timeout=timeout, creationflags=CREATE_NO_WINDOW) - - -# ---------- NIC enumeration ---------- -SKIP_DESCR = ("VPN","Virtual","AnyConnect","TAP-","TUN-","Bluetooth","Loopback", - "WAN Miniport","Hyper-V","VMware","VirtualBox","WireGuard","OpenVPN", - "Tailscale","ZeroTier") -SKIP_MEDIA = ("Native 802.11","Wireless WAN") - -def list_adapters(): - cmd = (r"Get-NetAdapter | ForEach-Object {" - r" $ip = (Get-NetIPAddress -InterfaceIndex $_.ifIndex -AddressFamily IPv4 -ErrorAction SilentlyContinue | " - r" Where-Object PrefixOrigin -ne 'WellKnown' | Select-Object -ExpandProperty IPAddress) -join ','; " - r" [pscustomobject]@{" - r" Name=$_.Name; Description=$_.InterfaceDescription; Status=$_.Status; " - r" Virtual=[bool]$_.Virtual; MediaType=$_.MediaType; ifIndex=$_.ifIndex; IPv4=$ip" - r" }} | ConvertTo-Json -Depth 3 -Compress") - r = run_ps(cmd, timeout=20) - if r.returncode != 0 or not r.stdout.strip(): - return [] - data = json.loads(r.stdout) - if isinstance(data, dict): data = [data] - out = [] - for a in data: - if a["Status"] in ("Disabled","Not Present"): continue - if a.get("Virtual"): continue - if a.get("MediaType") in SKIP_MEDIA: continue - descr = (a.get("Description") or "") + " " + (a.get("Name") or "") - if any(k.lower() in descr.lower() for k in SKIP_DESCR): continue - out.append(a) - out.sort(key=lambda x: x["ifIndex"]) - return out - - -# ---------- DHCP server state ---------- -clients = {} # mac -> {ip_int, host, last, ping_ok} -clients_lock = threading.Lock() -events = deque(maxlen=200) -events_lock = threading.Lock() -refresh_evt = threading.Event() # set whenever UI should re-render - -def log_event(markup_line): - with events_lock: - events.append(markup_line) - refresh_evt.set() - -# Config (filled by main) -SERVER_IP = "" -NETMASK = "255.255.255.0" -POOL = [] -LEASE = 7200 -TFTP = "" - -def ip2int(ip): return struct.unpack("!I", socket.inet_aton(ip))[0] -def int2ip(n): return socket.inet_ntoa(struct.pack("!I", n)) -def now_s(): return datetime.now().strftime("%H:%M:%S") - - -# ---------- ping ---------- -# Windows `ping` exit code is unreliable: it can return 0 with "Destination host -# unreachable" or with a stale ARP-based "reply" from the local stack. The only -# trustworthy success marker is the "TTL=" substring in stdout (present across -# locales — e.g. "...time<1ms TTL=64" / "...время<1мс TTL=64"). -def ping_one(ip, timeout_ms=600): - try: - r = subprocess.run(["ping","-n","1","-w",str(timeout_ms),ip], - capture_output=True, timeout=2, text=True, - creationflags=CREATE_NO_WINDOW) - out = (r.stdout or "") + (r.stderr or "") - return "TTL=" in out - except Exception: - return False - -def ping_loop(): - pool_exec = ThreadPoolExecutor(max_workers=16) - while True: - with clients_lock: - items = [(m, c["ip_int"]) for m, c in clients.items()] - changed = False - if items: - ips = [int2ip(i) for _, i in items] - res = list(pool_exec.map(ping_one, ips)) - with clients_lock: - for (mac, _), ok in zip(items, res): - if mac in clients and clients[mac].get("ping_ok") != ok: - clients[mac]["ping_ok"] = ok - changed = True - if changed: - refresh_evt.set() - time.sleep(1.0) - - -# ---------- DHCP logic ---------- -def alloc_ip(mac): - with clients_lock: - if mac in clients: return clients[mac]["ip_int"] - used = {c["ip_int"] for c in clients.values()} - for ipn in POOL: - if ipn not in used: - clients[mac] = {"ip_int": ipn, "host": "", "last": now_s(), "ping_ok": False} - return ipn - return None - -def touch_client(mac, ipn=None, host=None): - with clients_lock: - if mac not in clients: - clients[mac] = {"ip_int": ipn or 0, "host": host or "", "last": now_s(), "ping_ok": False} - else: - clients[mac]["last"] = now_s() - if ipn: clients[mac]["ip_int"] = ipn - if host: clients[mac]["host"] = host - -def parse_options(data): - opts = {}; i = 240 - while i < len(data): - code = data[i] - if code == 0: i += 1; continue - if code == 255: break - if i + 1 >= len(data): break - L = data[i+1] - opts[code] = data[i+2:i+2+L] - i += 2 + L - return opts - -def get_hostname(opts): - h = opts.get(12) - if h: - try: return h.rstrip(b"\x00").decode(errors="replace") - except: return "" - return "" - -def build_reply(req, dhcp_type, yiaddr_int): - pkt = bytearray(240) - pkt[0] = 2; pkt[1] = 1; pkt[2] = 6; pkt[3] = 0 - pkt[4:8] = req[4:8] - pkt[10:12] = req[10:12] - pkt[16:20] = struct.pack("!I", yiaddr_int) - pkt[20:24] = socket.inet_aton(SERVER_IP) - pkt[28:44] = req[28:44] - pkt[236:240] = b"\x63\x82\x53\x63" - o = bytearray() - o += bytes([53,1,dhcp_type]) - o += bytes([54,4]) + socket.inet_aton(SERVER_IP) - o += bytes([51,4]) + struct.pack("!I", LEASE) - o += bytes([1,4]) + socket.inet_aton(NETMASK) - o += bytes([3,4]) + socket.inet_aton(SERVER_IP) - o += bytes([6,4]) + socket.inet_aton(SERVER_IP) - tb = TFTP.encode() - o += bytes([66, len(tb)]) + tb - o += bytes([150,4]) + socket.inet_aton(TFTP) - o += bytes([255]) - return bytes(pkt) + bytes(o) - - -# ---------- UI ---------- -def render_table(): - t = Table(expand=True, header_style="bold") - t.add_column("#", style="dim", width=3, justify="right") - t.add_column("IP", width=16) - t.add_column("Hostname", min_width=10) - t.add_column("MAC", width=19) - t.add_column("Last seen",style="dim", width=10) - t.add_column("Ping", width=6, justify="center") - with clients_lock: - rows = sorted(clients.items(), key=lambda kv: kv[1]["ip_int"]) - - # Auto-fit to available height (header + events panels are fixed-size in Layout). - avail = max(1, console.size.height - HEADER_LINES - EVENTS_LINES - TBL_OVERHEAD) - overflow = max(0, len(rows) - avail) - if overflow: - rows = rows[:avail - 1] # leave one slot for the "(+N more)" marker - - if not rows: - t.add_row("—","—","(no clients yet)","—","—","—") - else: - for i,(mac,c) in enumerate(rows,1): - ping = Text("OK", style="bold green") if c.get("ping_ok") else Text("--", style="bold red") - t.add_row(str(i), int2ip(c["ip_int"]), c.get("host") or "—", mac, c.get("last","—"), ping) - if overflow: - t.add_row("…", "", f"[dim](+{overflow} more — enlarge the window)[/]", "", "", "") - return t - -def render_header(): - with clients_lock: - leased = len(clients) - body = (f"[bold cyan]dhcpsrv v{__version__}[/] [dim]made by engelgardt[/]\n" - f"Server: [bold]{SERVER_IP}[/]/{NETMASK} " - f"Pool: [bold]{int2ip(POOL[0])}–{int2ip(POOL[-1])}[/] " - f"Lease: [bold]{LEASE}s[/] " - f"TFTP: [bold]{TFTP}[/]\n" - f"Leases: [bold]{leased}/{len(POOL)}[/] " - f"Pkts: [dim]{stats['packets']}[/] " - f"DISCOVER: [cyan]{stats['discovers']}[/] " - f"REQUEST: [green]{stats['requests']}[/] " - f"RELEASE: [yellow]{stats['releases']}[/] " - f"[dim]Ctrl+C to stop[/]") - return Panel(body, border_style="cyan") - -def render_events_panel(): - with events_lock: - last = list(events)[-20:] - body = "\n".join(last) if last else "[dim](no events yet)[/]" - return Panel(body, title="Events", border_style="dim") - -def render_screen(): - layout = Layout() - layout.split_column( - Layout(render_header(), name="hdr", size=HEADER_LINES), - Layout(Panel(render_table(), title="Clients", border_style="cyan"), name="tbl"), - Layout(render_events_panel(), name="evt", size=EVENTS_LINES), - ) - return layout - -def ui_loop(stop): - # Pure event-driven: refresh only on real state changes or terminal resize. - # Clear screen AND scrollback so wheel-scrolling won't expose pre-launch text. - sys.stdout.write("\x1b[2J\x1b[3J\x1b[H") - sys.stdout.flush() - - last_size = console.size - with Live(render_screen(), auto_refresh=False, console=console, screen=True, - redirect_stdout=False, redirect_stderr=False) as live: - live.refresh() - while not stop.is_set(): - triggered = refresh_evt.wait(timeout=0.5) - if stop.is_set(): break - - # Detect window resize without spamming refreshes - cur_size = console.size - resized = (cur_size != last_size) - if resized: - last_size = cur_size - - if triggered: - refresh_evt.clear() - - if triggered or resized: - live.update(render_screen(), refresh=True) - - -# ---------- main flow ---------- -def select_nic(): - console.rule("[bold cyan]Available adapters") - adapters = list_adapters() - if not adapters: - console.print("[red]No suitable wired adapters found.[/]") - return None - for i, a in enumerate(adapters, 1): - ip = a.get("IPv4") or "—" - console.print(f" {i}) [{a['Status']}] {a['Name']} ({a['Description']}) {ip}") - while True: - s = Prompt.ask("Select adapter number").strip() - if s.isdigit() and 1 <= int(s) <= len(adapters): - return adapters[int(s)-1] - console.print("[red]Invalid selection.[/]") - -def set_static_ip(nic_name, ip, mask): - console.print(f"[yellow]Setting {nic_name} → {ip} / {mask} ...[/]") - run_netsh(["interface","ipv4","set","address",f"name={nic_name}","static",ip,mask]) - -def revert_dhcp(nic_name): - run_netsh(["interface","ipv4","set","address",f"name={nic_name}","source=dhcp"]) - run_netsh(["interface","ipv4","set","dnsservers",f"name={nic_name}","source=dhcp"]) - -def main(): - global SERVER_IP, POOL, LEASE, TFTP - require_admin() - - console.print(f"[bold cyan]dhcpsrv v{__version__}[/] - portable laptop-side DHCP server") - console.print("[dim]made by engelgardt[/]") - console.print() - - check_for_update() - - nic = select_nic() - if not nic: input("Press Enter to exit"); return - - SERVER_IP = DEFAULT_SERVER_IP - LEASE = DEFAULT_LEASE - TFTP = SERVER_IP - server_n = ip2int(SERVER_IP) - POOL = list(range(server_n + 1, server_n + 1 + POOL_SIZE)) - - set_static_ip(nic["Name"], SERVER_IP, NETMASK) - log_event(f"[dim][{now_s()}][/] [bold]NIC[/] {nic['Name']} → {SERVER_IP}/{NETMASK}") - - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - try: - s.bind(("0.0.0.0", 67)) - except OSError as e: - console.print(f"[bold red]bind UDP/67 failed:[/] {e}") - console.print("[yellow]Another DHCP service (Tftpd32, ICS, Windows DHCP) may be running.[/]") - input("Press Enter to exit"); return - - stop = threading.Event() - threading.Thread(target=ping_loop, daemon=True).start() - threading.Thread(target=ui_loop, args=(stop,), daemon=True).start() - - def shutdown(sig=None, frm=None): - stop.set() - console.print() - console.print(f"[dim][{now_s()}] Shutting down...[/]") - try: s.close() - except: pass - # Ask revert - try: - if Confirm.ask(f"Revert {nic['Name']} back to DHCP?", default=False): - revert_dhcp(nic["Name"]) - console.print("[green]NIC reverted to DHCP[/]") - except (EOFError, KeyboardInterrupt): pass - input("Press Enter to exit") - sys.exit(0) - signal.signal(signal.SIGINT, shutdown) - signal.signal(signal.SIGTERM, shutdown) - - while True: - try: - data, _ = s.recvfrom(2048) - except KeyboardInterrupt: - shutdown(); return - except OSError: - continue - if len(data) < 240 or data[0] != 1: continue - stats["packets"] += 1 - mac = ":".join(f"{b:02x}" for b in data[28:34]) - opts = parse_options(data) - msg = opts.get(53) - if not msg: continue - mt = msg[0] - host = get_hostname(opts) - host_s = f" [{host}]" if host else "" - - if mt == 1: - stats["discovers"] += 1 - ipn = alloc_ip(mac) - if ipn is None: - log_event(f"[dim][{now_s()}][/] [red]DISCOVER[/] {mac} → [red]POOL EXHAUSTED[/]") - continue - touch_client(mac, ipn, host) - s.sendto(build_reply(data, 2, ipn), ("255.255.255.255", 68)) - log_event(f"[dim][{now_s()}][/] [cyan]DISCOVER[/] {mac}{host_s} → OFFER {int2ip(ipn)}") - elif mt == 3: - stats["requests"] += 1 - req_ip = opts.get(50) - with clients_lock: - cached = clients.get(mac, {}).get("ip_int") - ipn = struct.unpack("!I", req_ip)[0] if req_ip else cached - if ipn is None: continue - touch_client(mac, ipn, host) - s.sendto(build_reply(data, 5, ipn), ("255.255.255.255", 68)) - log_event(f"[dim][{now_s()}][/] [green]REQUEST[/] {mac}{host_s} → ACK {int2ip(ipn)}") - elif mt == 7: - stats["releases"] += 1 - with clients_lock: - old = clients.pop(mac, None) - if old: - log_event(f"[dim][{now_s()}][/] [yellow]RELEASE[/] {mac} → freed {int2ip(old['ip_int'])}") - elif mt == 8: - log_event(f"[dim][{now_s()}][/] [blue]INFORM[/] {mac}{host_s}") - -if __name__ == "__main__": - main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..eef9f22 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "dhcpsrv" +description = "Portable laptop-side DHCP server for storage/server engineers." +readme = "README.md" +requires-python = ">=3.10" +license = { text = "MIT" } +authors = [{ name = "engelgardt" }] +dependencies = ["rich>=13"] +dynamic = ["version"] + +[project.urls] +Homepage = "https://github.com/Engelgardt23/dhcpsrv" +Issues = "https://github.com/Engelgardt23/dhcpsrv/issues" + +[project.scripts] +dhcpsrv = "dhcpsrv.app:main" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.setuptools.dynamic] +version = { attr = "dhcpsrv.__version__" } diff --git a/src/dhcpsrv/__init__.py b/src/dhcpsrv/__init__.py new file mode 100644 index 0000000..7b837c7 --- /dev/null +++ b/src/dhcpsrv/__init__.py @@ -0,0 +1,10 @@ +""" +dhcpsrv - portable laptop-side DHCP server for storage/server engineers. +made by engelgardt + +The single source of truth for the project version. Bump this before tagging +a release; CI reads the tag, the code reads this constant. +""" + +__version__ = "1.1.0" +GITHUB_REPO = "Engelgardt23/dhcpsrv" diff --git a/src/dhcpsrv/__main__.py b/src/dhcpsrv/__main__.py new file mode 100644 index 0000000..7b2ad53 --- /dev/null +++ b/src/dhcpsrv/__main__.py @@ -0,0 +1,11 @@ +"""Entry point for `python -m dhcpsrv` from a checked-out / installed package. + +The PyInstaller-bundled exe uses `dhcpsrv-launcher.py` at the repo root instead, +because PyInstaller runs the bundled script as a standalone module — relative +imports fail there.""" + +from .app import main + + +if __name__ == "__main__": + main() diff --git a/src/dhcpsrv/app.py b/src/dhcpsrv/app.py new file mode 100644 index 0000000..b685cb8 --- /dev/null +++ b/src/dhcpsrv/app.py @@ -0,0 +1,90 @@ +""" +Application entry: wires admin check, update check, NIC selection, the DHCP +server, the UI and the ping loop together.""" + +from __future__ import annotations +import signal +import sys +import threading + +from rich.console import Console +from rich.prompt import Confirm, Prompt + +from . import __version__ +from .platform_win import enable_vt, require_admin +from .update_check import check_for_update +from .network import list_adapters, set_static_ip, revert_to_dhcp +from .dhcp import DhcpConfig, DhcpServer, now_s +from .ui import Ui + + +def _select_nic(console: Console) -> dict | None: + console.rule("[bold cyan]Available adapters") + adapters = list_adapters() + if not adapters: + console.print("[red]No suitable wired adapters found.[/]") + return None + for i, a in enumerate(adapters, 1): + ip = a.get("IPv4") or "—" + console.print(f" {i}) [{a['Status']}] {a['Name']} ({a['Description']}) {ip}") + while True: + s = Prompt.ask("Select adapter number").strip() + if s.isdigit() and 1 <= int(s) <= len(adapters): + return adapters[int(s) - 1] + console.print("[red]Invalid selection.[/]") + + +def main() -> None: + enable_vt() + require_admin() + + console = Console(log_path=False) + console.print(f"[bold cyan]dhcpsrv v{__version__}[/] - portable laptop-side DHCP server") + console.print("[dim]made by engelgardt[/]") + console.print() + + check_for_update(console) + + nic = _select_nic(console) + if not nic: + input("Press Enter to exit"); return + + cfg = DhcpConfig.with_defaults() + console.print(f"[yellow]Setting {nic['Name']} → {cfg.server_ip} / {cfg.netmask} ...[/]") + set_static_ip(nic["Name"], cfg.server_ip, cfg.netmask) + + # Wire server <-> ui through callbacks so neither imports the other. + ui = Ui.__new__(Ui) # forward declaration so server callbacks can capture it + server = DhcpServer(cfg=cfg, log=lambda m: ui.log(m), on_change=lambda: ui.request_refresh()) + Ui.__init__(ui, server) # finish UI init now that server exists + + stop = threading.Event() + threading.Thread(target=server.run, args=(stop,), daemon=True).start() + threading.Thread(target=server.ping_loop, args=(stop,), daemon=True).start() + + def shutdown(sig=None, frm=None): + stop.set() + try: + print() + print(f"[{now_s()}] Shutting down...") + try: + if Confirm.ask(f"Revert {nic['Name']} back to DHCP?", default=False): + revert_to_dhcp(nic["Name"]) + print("NIC reverted to DHCP") + except (EOFError, KeyboardInterrupt): + pass + finally: + input("Press Enter to exit") + sys.exit(0) + + signal.signal(signal.SIGINT, shutdown) + signal.signal(signal.SIGTERM, shutdown) + + try: + ui.run(stop) + except KeyboardInterrupt: + shutdown() + + +if __name__ == "__main__": + main() diff --git a/src/dhcpsrv/dhcp.py b/src/dhcpsrv/dhcp.py new file mode 100644 index 0000000..ceeed3b --- /dev/null +++ b/src/dhcpsrv/dhcp.py @@ -0,0 +1,235 @@ +""" +DHCP server: packet parsing/building, the lease table, and the main socket +loop. Pure logic — UI rendering and ping live in their own modules. + +The `DhcpServer` instance owns the runtime state (clients, stats). The UI +holds a reference to it and reads it whenever it renders.""" + +from __future__ import annotations +import socket +import struct +import sys +import threading +from dataclasses import dataclass, field +from datetime import datetime +from typing import Callable, Optional + +from .network import ping_one + + +# ---------- helpers ---------- +def ip2int(ip: str) -> int: + return struct.unpack("!I", socket.inet_aton(ip))[0] + + +def int2ip(n: int) -> str: + return socket.inet_ntoa(struct.pack("!I", n)) + + +def now_s() -> str: + return datetime.now().strftime("%H:%M:%S") + + +# ---------- config ---------- +@dataclass +class DhcpConfig: + server_ip: str + netmask: str + pool: list[int] # list of int IPs to hand out + lease: int # seconds + tftp: str # value for options 66 / 150 + + @classmethod + def with_defaults(cls, server_ip: str = "10.10.10.1", + netmask: str = "255.255.255.0", + pool_size: int = 50, + lease: int = 7200) -> "DhcpConfig": + n = ip2int(server_ip) + pool = list(range(n + 1, n + 1 + pool_size)) + return cls(server_ip=server_ip, netmask=netmask, pool=pool, + lease=lease, tftp=server_ip) + + +# ---------- server ---------- +@dataclass +class DhcpServer: + cfg: DhcpConfig + log: Callable[[str], None] # how to push an event line into the UI + on_change: Callable[[], None] # called whenever something the UI cares about changed + + clients: dict[str, dict] = field(default_factory=dict) + lock: threading.Lock = field(default_factory=threading.Lock) + stats: dict[str, int] = field(default_factory=lambda: { + "packets": 0, "discovers": 0, "requests": 0, "releases": 0, + }) + + # --- lease allocation --- + def alloc_ip(self, mac: str) -> Optional[int]: + with self.lock: + if mac in self.clients: + return self.clients[mac]["ip_int"] + used = {c["ip_int"] for c in self.clients.values()} + for ipn in self.cfg.pool: + if ipn not in used: + self.clients[mac] = { + "ip_int": ipn, "host": "", "last": now_s(), "ping_ok": False, + } + return ipn + return None + + def touch_client(self, mac: str, ipn: Optional[int] = None, host: Optional[str] = None) -> None: + with self.lock: + if mac not in self.clients: + self.clients[mac] = { + "ip_int": ipn or 0, "host": host or "", + "last": now_s(), "ping_ok": False, + } + else: + self.clients[mac]["last"] = now_s() + if ipn: self.clients[mac]["ip_int"] = ipn + if host: self.clients[mac]["host"] = host + + def release_client(self, mac: str) -> Optional[int]: + with self.lock: + old = self.clients.pop(mac, None) + return old["ip_int"] if old else None + + # --- packet parsing --- + @staticmethod + def parse_options(data: bytes) -> dict[int, bytes]: + opts: dict[int, bytes] = {} + i = 240 + while i < len(data): + code = data[i] + if code == 0: + i += 1; continue + if code == 255: + break + if i + 1 >= len(data): + break + L = data[i + 1] + opts[code] = data[i + 2 : i + 2 + L] + i += 2 + L + return opts + + @staticmethod + def get_hostname(opts: dict[int, bytes]) -> str: + h = opts.get(12) + if not h: + return "" + try: + return h.rstrip(b"\x00").decode(errors="replace") + except Exception: + return "" + + # --- packet building --- + def build_reply(self, req: bytes, dhcp_type: int, yiaddr_int: int) -> bytes: + pkt = bytearray(240) + pkt[0] = 2; pkt[1] = 1; pkt[2] = 6; pkt[3] = 0 + pkt[4:8] = req[4:8] # xid + pkt[10:12] = req[10:12] # flags + pkt[16:20] = struct.pack("!I", yiaddr_int) # yiaddr + pkt[20:24] = socket.inet_aton(self.cfg.server_ip) # siaddr + pkt[28:44] = req[28:44] # chaddr + pkt[236:240] = b"\x63\x82\x53\x63" # magic cookie + + o = bytearray() + o += bytes([53, 1, dhcp_type]) # message type + o += bytes([54, 4]) + socket.inet_aton(self.cfg.server_ip) # server id + o += bytes([51, 4]) + struct.pack("!I", self.cfg.lease) # lease time + o += bytes([1, 4]) + socket.inet_aton(self.cfg.netmask) # subnet mask + o += bytes([3, 4]) + socket.inet_aton(self.cfg.server_ip) # router + o += bytes([6, 4]) + socket.inet_aton(self.cfg.server_ip) # DNS + tftp = self.cfg.tftp.encode() + o += bytes([66, len(tftp)]) + tftp # TFTP server name + o += bytes([150, 4]) + socket.inet_aton(self.cfg.tftp) # TFTP server addr + o += bytes([255]) + return bytes(pkt) + bytes(o) + + # --- ping loop, runs in its own thread --- + def ping_loop(self, stop: threading.Event) -> None: + from concurrent.futures import ThreadPoolExecutor + pool_exec = ThreadPoolExecutor(max_workers=16) + while not stop.wait(1.0): + with self.lock: + items = [(m, c["ip_int"]) for m, c in self.clients.items()] + if not items: + continue + ips = [int2ip(i) for _, i in items] + results = list(pool_exec.map(ping_one, ips)) + changed = False + with self.lock: + for (mac, _), ok in zip(items, results): + if mac in self.clients and self.clients[mac].get("ping_ok") != ok: + self.clients[mac]["ping_ok"] = ok + changed = True + if changed: + self.on_change() + + # --- main server loop --- + def run(self, stop: threading.Event) -> None: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + try: + s.bind(("0.0.0.0", 67)) + except OSError as e: + self.log(f"[bold red]bind UDP/67 failed:[/] {e}") + self.log("[yellow]Another DHCP service (Tftpd32 DHCP, ICS, Windows DHCP) may be running.[/]") + stop.set() + return + + while not stop.is_set(): + try: + data, _ = s.recvfrom(2048) + except OSError: + continue + if len(data) < 240 or data[0] != 1: + continue + self.stats["packets"] += 1 + mac = ":".join(f"{b:02x}" for b in data[28:34]) + opts = self.parse_options(data) + msg = opts.get(53) + if not msg: + continue + mt = msg[0] + host = self.get_hostname(opts) + host_s = f" [{host}]" if host else "" + + if mt == 1: # DISCOVER + self.stats["discovers"] += 1 + ipn = self.alloc_ip(mac) + if ipn is None: + self.log(f"[dim][{now_s()}][/] [red]DISCOVER[/] {mac} → [red]POOL EXHAUSTED[/]") + continue + self.touch_client(mac, ipn, host) + s.sendto(self.build_reply(data, 2, ipn), ("255.255.255.255", 68)) + self.log(f"[dim][{now_s()}][/] [cyan]DISCOVER[/] {mac}{host_s} → OFFER {int2ip(ipn)}") + + elif mt == 3: # REQUEST + self.stats["requests"] += 1 + req_ip = opts.get(50) + with self.lock: + cached = self.clients.get(mac, {}).get("ip_int") + ipn = struct.unpack("!I", req_ip)[0] if req_ip else cached + if ipn is None: + continue + self.touch_client(mac, ipn, host) + s.sendto(self.build_reply(data, 5, ipn), ("255.255.255.255", 68)) + self.log(f"[dim][{now_s()}][/] [green]REQUEST[/] {mac}{host_s} → ACK {int2ip(ipn)}") + + elif mt == 7: # RELEASE + self.stats["releases"] += 1 + old = self.release_client(mac) + if old is not None: + self.log(f"[dim][{now_s()}][/] [yellow]RELEASE[/] {mac} → freed {int2ip(old)}") + + elif mt == 8: # INFORM + self.log(f"[dim][{now_s()}][/] [blue]INFORM[/] {mac}{host_s}") + + self.on_change() + + try: + s.close() + except Exception: + pass diff --git a/src/dhcpsrv/network.py b/src/dhcpsrv/network.py new file mode 100644 index 0000000..9cf6e31 --- /dev/null +++ b/src/dhcpsrv/network.py @@ -0,0 +1,103 @@ +""" +Network plumbing: enumerate physical NICs, set / revert IP via netsh, ping hosts. + +Pure functions; no shared state. The ping loop is in `dhcp.py` because it +mutates the DHCP server's client table.""" + +from __future__ import annotations +import json +import os +import subprocess +from typing import Any + +CREATE_NO_WINDOW = 0x08000000 if os.name == "nt" else 0 + +# Adapters we never show in the picker. +SKIP_DESCRIPTION = ( + "VPN", "Virtual", "AnyConnect", "TAP-", "TUN-", "Bluetooth", "Loopback", + "WAN Miniport", "Hyper-V", "VMware", "VirtualBox", "WireGuard", "OpenVPN", + "Tailscale", "ZeroTier", +) +SKIP_MEDIA = ("Native 802.11", "Wireless WAN") + + +def _run_ps(cmd: str, timeout: int = 15) -> subprocess.CompletedProcess: + return subprocess.run( + ["powershell.exe", "-NoProfile", "-NonInteractive", "-Command", cmd], + capture_output=True, text=True, timeout=timeout, + creationflags=CREATE_NO_WINDOW, + ) + + +def _run_netsh(args: list[str], timeout: int = 15) -> subprocess.CompletedProcess: + return subprocess.run( + ["netsh", *args], + capture_output=True, text=True, timeout=timeout, + creationflags=CREATE_NO_WINDOW, + ) + + +def list_adapters() -> list[dict[str, Any]]: + """Return physical wired adapters only — skip wireless / VPN / virtual.""" + cmd = ( + r"Get-NetAdapter | ForEach-Object {" + r" $ip = (Get-NetIPAddress -InterfaceIndex $_.ifIndex -AddressFamily IPv4 -ErrorAction SilentlyContinue | " + r" Where-Object PrefixOrigin -ne 'WellKnown' | Select-Object -ExpandProperty IPAddress) -join ','; " + r" [pscustomobject]@{" + r" Name=$_.Name; Description=$_.InterfaceDescription; Status=$_.Status; " + r" Virtual=[bool]$_.Virtual; MediaType=$_.MediaType; ifIndex=$_.ifIndex; IPv4=$ip" + r" }} | ConvertTo-Json -Depth 3 -Compress" + ) + r = _run_ps(cmd, timeout=20) + if r.returncode != 0 or not r.stdout.strip(): + return [] + data = json.loads(r.stdout) + if isinstance(data, dict): + data = [data] + + out: list[dict[str, Any]] = [] + for a in data: + if a.get("Status") in ("Disabled", "Not Present"): + continue + if a.get("Virtual"): + continue + if a.get("MediaType") in SKIP_MEDIA: + continue + haystack = ((a.get("Description") or "") + " " + (a.get("Name") or "")).lower() + if any(k.lower() in haystack for k in SKIP_DESCRIPTION): + continue + out.append(a) + out.sort(key=lambda x: x["ifIndex"]) + return out + + +def set_static_ip(nic_name: str, ip: str, mask: str) -> None: + _run_netsh(["interface", "ipv4", "set", "address", f"name={nic_name}", "static", ip, mask]) + + +def revert_to_dhcp(nic_name: str) -> None: + _run_netsh(["interface", "ipv4", "set", "address", f"name={nic_name}", "source=dhcp"]) + _run_netsh(["interface", "ipv4", "set", "dnsservers", f"name={nic_name}", "source=dhcp"]) + + +def ping_one(ip: str, timeout_ms: int = 600) -> bool: + """Windows-`ping` based reachability test. + + Windows' `ping` exit code is unreliable — it can return 0 with + "Destination host unreachable" or with a stale-ARP-based reply from the + local stack. The only trustworthy success marker is the `TTL=` substring + in stdout (present across locales — e.g. `...time<1ms TTL=64` or + `...время<1мс TTL=64`).""" + try: + if os.name == "nt": + cmd = ["ping", "-n", "1", "-w", str(timeout_ms), ip] + else: + cmd = ["ping", "-c", "1", "-W", "1", ip] + r = subprocess.run( + cmd, capture_output=True, timeout=2, text=True, + creationflags=CREATE_NO_WINDOW, + ) + out = (r.stdout or "") + (r.stderr or "") + return "TTL=" in out + except Exception: + return False diff --git a/src/dhcpsrv/platform_win.py b/src/dhcpsrv/platform_win.py new file mode 100644 index 0000000..ae47b36 --- /dev/null +++ b/src/dhcpsrv/platform_win.py @@ -0,0 +1,50 @@ +""" +Windows-specific bits: VT (ANSI) processing in the console, UAC self-elevation. +""" + +from __future__ import annotations +import ctypes +import os +import sys + + +def enable_vt() -> None: + """Enable virtual-terminal processing on the Windows console so that ESC + escape sequences from raw stdout writes are interpreted as colours / clear + instead of being printed as literal characters. + + Rich enables this for its own writes; we still call it so direct + `sys.stdout.write("\\x1b[...")` works (used to clear the scrollback when + the alt screen starts).""" + if os.name != "nt": + return + try: + k = ctypes.windll.kernel32 + STD_OUT, STD_ERR = -11, -12 + ENABLE_VT = 0x0004 + for std in (STD_OUT, STD_ERR): + h = k.GetStdHandle(std) + mode = ctypes.c_ulong() + if k.GetConsoleMode(h, ctypes.byref(mode)): + k.SetConsoleMode(h, mode.value | ENABLE_VT) + except Exception: + pass + + +def is_admin() -> bool: + try: + return ctypes.windll.shell32.IsUserAnAdmin() != 0 + except Exception: + return False + + +def require_admin() -> None: + """If we're not running elevated, relaunch ourselves through UAC and exit. + + Works both for the PyInstaller bundle (`sys.executable` is the exe itself) + and for a plain `python src/dhcpsrv/__main__.py` run.""" + if is_admin(): + return + args = " ".join(f'"{a}"' for a in sys.argv) + ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, args, None, 1) + sys.exit(0) diff --git a/src/dhcpsrv/ui.py b/src/dhcpsrv/ui.py new file mode 100644 index 0000000..8d4b692 --- /dev/null +++ b/src/dhcpsrv/ui.py @@ -0,0 +1,143 @@ +""" +Rich-based full-screen TUI. + +`Ui` owns the events buffer and the refresh trigger; it reads everything else +from the `DhcpServer` it's bound to.""" + +from __future__ import annotations +import sys +import threading +from collections import deque +from typing import Callable + +from rich.console import Console +from rich.layout import Layout +from rich.live import Live +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +from . import __version__ +from .dhcp import DhcpServer, int2ip + + +# Fixed-size layout slots — used to compute the clients-table fit. +HEADER_LINES = 5 +EVENTS_LINES = 14 +TBL_OVERHEAD = 6 # panel borders + table header + table rules + + +class Ui: + def __init__(self, server: DhcpServer): + self.server = server + self.console = Console(log_path=False) + self.events = deque(maxlen=200) + self.events_lock = threading.Lock() + self.refresh_evt = threading.Event() + + # --- public for other modules --- + def log(self, markup: str) -> None: + """Append an event line. Called from the DHCP server thread.""" + with self.events_lock: + self.events.append(markup) + self.refresh_evt.set() + + def request_refresh(self) -> None: + """Called when something the UI cares about changed (e.g. a ping result).""" + self.refresh_evt.set() + + # --- rendering --- + def _render_header(self) -> Panel: + with self.server.lock: + leased = len(self.server.clients) + st = self.server.stats + cfg = self.server.cfg + body = ( + f"[bold cyan]dhcpsrv v{__version__}[/] [dim]made by engelgardt[/]\n" + f"Server: [bold]{cfg.server_ip}[/]/{cfg.netmask} " + f"Pool: [bold]{int2ip(cfg.pool[0])}–{int2ip(cfg.pool[-1])}[/] " + f"Lease: [bold]{cfg.lease}s[/] " + f"TFTP: [bold]{cfg.tftp}[/]\n" + f"Leases: [bold]{leased}/{len(cfg.pool)}[/] " + f"Pkts: [dim]{st['packets']}[/] " + f"DISCOVER: [cyan]{st['discovers']}[/] " + f"REQUEST: [green]{st['requests']}[/] " + f"RELEASE: [yellow]{st['releases']}[/] " + f"[dim]Ctrl+C to stop[/]" + ) + return Panel(body, border_style="cyan") + + def _render_table(self) -> Table: + t = Table(expand=True, header_style="bold") + t.add_column("#", style="dim", width=3, justify="right") + t.add_column("IP", width=16) + t.add_column("Hostname", min_width=10) + t.add_column("MAC", width=19) + t.add_column("Last seen", style="dim", width=10) + t.add_column("Ping", width=6, justify="center") + + with self.server.lock: + rows = sorted(self.server.clients.items(), key=lambda kv: kv[1]["ip_int"]) + + avail = max(1, self.console.size.height - HEADER_LINES - EVENTS_LINES - TBL_OVERHEAD) + overflow = max(0, len(rows) - avail) + if overflow: + rows = rows[: avail - 1] # leave one slot for the "(+N more)" marker + + if not rows: + t.add_row("—", "—", "(no clients yet)", "—", "—", "—") + else: + for i, (mac, c) in enumerate(rows, 1): + ping = (Text("OK", style="bold green") + if c.get("ping_ok") else Text("--", style="bold red")) + t.add_row( + str(i), + int2ip(c["ip_int"]), + c.get("host") or "—", + mac, + c.get("last", "—"), + ping, + ) + if overflow: + t.add_row("…", "", f"[dim](+{overflow} more — enlarge the window)[/]", "", "", "") + return t + + def _render_events(self) -> Panel: + with self.events_lock: + last = list(self.events)[-20:] + body = "\n".join(last) if last else "[dim](no events yet)[/]" + return Panel(body, title="Events", border_style="dim") + + def _render_screen(self) -> Layout: + layout = Layout() + layout.split_column( + Layout(self._render_header(), name="hdr", size=HEADER_LINES), + Layout(Panel(self._render_table(), title="Clients", border_style="cyan"), name="tbl"), + Layout(self._render_events(), name="evt", size=EVENTS_LINES), + ) + return layout + + # --- main loop --- + def run(self, stop: threading.Event) -> None: + """Run until `stop` is set. Event-driven: redraws only on real changes + or terminal resize.""" + # Clear screen + scrollback so wheel-scrolling can't expose pre-launch text. + sys.stdout.write("\x1b[2J\x1b[3J\x1b[H") + sys.stdout.flush() + + last_size = self.console.size + with Live(self._render_screen(), auto_refresh=False, console=self.console, + screen=True, redirect_stdout=False, redirect_stderr=False) as live: + live.refresh() + while not stop.is_set(): + triggered = self.refresh_evt.wait(timeout=0.5) + if stop.is_set(): + break + cur_size = self.console.size + resized = (cur_size != last_size) + if resized: + last_size = cur_size + if triggered: + self.refresh_evt.clear() + if triggered or resized: + live.update(self._render_screen(), refresh=True) diff --git a/src/dhcpsrv/update_check.py b/src/dhcpsrv/update_check.py new file mode 100644 index 0000000..5ef3ff9 --- /dev/null +++ b/src/dhcpsrv/update_check.py @@ -0,0 +1,53 @@ +""" +Auto-update check. + +On startup, ask GitHub for the latest release tag. If it's newer than the +local `__version__`, ask the user whether to open the download page in a +browser. Silent on any error (offline, rate-limit, etc.).""" + +from __future__ import annotations +import json +import urllib.request +import webbrowser + +from rich.console import Console +from rich.prompt import Confirm + +from . import __version__, GITHUB_REPO + + +def _parse_version(s: str) -> tuple[int, int, int]: + try: + s = (s or "").strip().lstrip("v") + parts = [int(x) for x in s.split(".")[:3]] + while len(parts) < 3: + parts.append(0) + return tuple(parts) # type: ignore[return-value] + except Exception: + return (0, 0, 0) + + +def check_for_update(console: Console) -> None: + try: + url = f"https://api.github.com/repos/{GITHUB_REPO}/releases/latest" + req = urllib.request.Request(url, headers={ + "Accept": "application/vnd.github+json", + "User-Agent": f"dhcpsrv/{__version__}", + }) + with urllib.request.urlopen(req, timeout=3) as r: + data = json.loads(r.read().decode("utf-8", errors="replace")) + latest = (data.get("tag_name") or "").strip() + page = data.get("html_url") or f"https://github.com/{GITHUB_REPO}/releases/latest" + + if _parse_version(latest) > _parse_version(__version__): + console.rule("[bold yellow]Update available") + console.print(f"Current: [dim]v{__version__}[/] Latest: [bold green]{latest}[/]") + try: + if Confirm.ask("Open the download page in your browser?", default=True): + webbrowser.open(page) + except (EOFError, KeyboardInterrupt): + pass + console.print() + except Exception: + # Offline / API error — silent on purpose. + pass