refactor: split into modules under src/dhcpsrv/

The previous monolithic dhcpsrv_app.py (~500 lines) is now 7 focused modules:

- src/dhcpsrv/__init__.py    : single source of truth for __version__
- src/dhcpsrv/__main__.py    : entry for python -m dhcpsrv
- src/dhcpsrv/app.py         : main orchestration, wires the rest
- src/dhcpsrv/platform_win.py: VT enable + UAC self-elevate
- src/dhcpsrv/update_check.py: GitHub /releases/latest poll
- src/dhcpsrv/network.py     : NIC enumeration, netsh, ping
- src/dhcpsrv/dhcp.py        : DhcpConfig, DhcpServer, packet parse/build, server loop
- src/dhcpsrv/ui.py          : rich-based full-screen TUI

Also added:
- dhcpsrv-launcher.py at repo root: absolute-import entry for PyInstaller
- pyproject.toml: deps + dynamic version
- CONTRIBUTING.md: layout, build, and release flow

CI workflow now builds from dhcpsrv-launcher.py.
No user-visible behaviour change.
This commit is contained in:
engelgardt 2026-05-16 12:28:07 +03:00
parent 7871f63c7d
commit 540bad0ecb
13 changed files with 814 additions and 494 deletions

View file

@ -28,7 +28,7 @@ jobs:
run: echo "version=${GITHUB_REF_NAME#v}" >> "$GITHUB_OUTPUT" run: echo "version=${GITHUB_REF_NAME#v}" >> "$GITHUB_OUTPUT"
- name: Build executable - name: Build executable
run: python -m PyInstaller --onefile --uac-admin --console --name dhcpsrv dhcpsrv_app.py run: python -m PyInstaller --onefile --uac-admin --console --name dhcpsrv --paths src dhcpsrv-launcher.py
- name: Package portable folder - name: Package portable folder
shell: pwsh shell: pwsh

79
CONTRIBUTING.md Normal file
View file

@ -0,0 +1,79 @@
# Contributing
> Project layout, build, and release flow. **If you only want to use the tool — read [README](README.md) instead.**
## Repo layout
```
dhcpsrv/
├── .github/
│ ├── workflows/release.yml ← CI: tag-driven build + GitHub Release
│ └── ISSUE_TEMPLATE/ ← bug / feature / security routing
├── src/dhcpsrv/ ← package source (≤200 lines per module)
│ ├── __init__.py ← single source of truth for __version__
│ ├── __main__.py ← entry: python -m dhcpsrv
│ ├── app.py ← main flow, wires everything
│ ├── platform_win.py ← VT enable + UAC self-elevate
│ ├── update_check.py ← GitHub /releases/latest poll
│ ├── network.py ← list_adapters, netsh, ping (no shared state)
│ ├── dhcp.py ← DhcpConfig, DhcpServer, packet parse/build
│ └── ui.py ← rich-based full-screen TUI
├── pyproject.toml ← deps, packaging, dynamic version
├── CHANGELOG.md ← Keep a Changelog format, newest first
├── CONTRIBUTING.md ← this file
├── LICENSE / README.md / SECURITY.md
└── .gitignore
```
## Run from source (no exe)
```
python -m pip install rich
python -m dhcpsrv
```
`python -m dhcpsrv` finds `src/dhcpsrv/__main__.py` because the package lives under `src/`. You'll need administrator privileges for UDP/67 and `netsh` — the tool self-elevates via UAC.
## Editable install (development)
```
python -m pip install -e .
dhcpsrv
```
`-e .` makes the entry-point `dhcpsrv` available on PATH; edits in `src/dhcpsrv/` take effect immediately.
## Build the portable .exe
```
python -m pip install pyinstaller rich
python -m PyInstaller --onefile --uac-admin --console --name dhcpsrv --paths src dhcpsrv-launcher.py
```
`dhcpsrv-launcher.py` (at repo root) is the PyInstaller entry — it does an
*absolute* import (`from dhcpsrv.app import main`) which is needed when
PyInstaller runs the bundled script as a standalone module. The `--paths src`
flag tells PyInstaller where to find the `dhcpsrv` package itself. Output:
`dist/dhcpsrv.exe`.
## Cut a release
1. Update `src/dhcpsrv/__init__.py` — bump `__version__` to `X.Y.Z`.
2. Update `CHANGELOG.md` — move items from `[Unreleased]` into a new `[X.Y.Z]` section with today's date.
3. Commit: `git commit -am "vX.Y.Z: …"`.
4. Tag: `git tag vX.Y.Z`.
5. Push: `git push && git push --tags`.
That's it. GitHub Actions picks up the tag, builds the exe, writes the SHA-256, and creates the GitHub Release with the zip attached.
## Where features go
| Adding... | Touch this module |
|---|---|
| A new DHCP option in the reply | `dhcp.py``DhcpServer.build_reply` |
| A new adapter filter | `network.py``SKIP_DESCRIPTION` / `SKIP_MEDIA` / `list_adapters` |
| A new column in the clients table | `ui.py``Ui._render_table` |
| Something shown in the header | `ui.py``Ui._render_header` |
| A startup check or banner line | `app.py``main()` |
| A change to UAC / VT logic | `platform_win.py` |
| Tweaking the GitHub update-check UX | `update_check.py` |

13
dhcpsrv-launcher.py Normal file
View file

@ -0,0 +1,13 @@
"""
PyInstaller entry point sits at the repo root and uses an *absolute* import
so the bundled exe doesn't need relative-import resolution at runtime.
For dev work without an install use `python -m dhcpsrv` instead (that path
goes through `src/dhcpsrv/__main__.py` and relative imports work).
"""
from dhcpsrv.app import main
if __name__ == "__main__":
main()

View file

@ -1,493 +0,0 @@
"""
dhcpsrv v1.1.0 - portable single-exe edition.
made by engelgardt
This file combines what previously lived in dhcpsrv.ps1 + dhcpsrv.py:
- admin check
- NIC selection (filters out wireless / VPN / virtual)
- static IP setting (netsh)
- DHCP server with rich live UI
- revert NIC prompt on exit
Build:
pyinstaller --onefile --uac-admin --name dhcpsrv --console dhcpsrv_app.py
"""
import os, sys, ctypes, json, subprocess, signal, socket, struct, threading, time
import urllib.request, webbrowser
from collections import deque
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
# Enable VT (ANSI escape) processing on Windows console BEFORE any output.
def _enable_vt():
if os.name != "nt":
return
try:
k = ctypes.windll.kernel32
STD_OUT, STD_ERR = -11, -12
ENABLE_VT = 0x0004
for std in (STD_OUT, STD_ERR):
h = k.GetStdHandle(std)
mode = ctypes.c_ulong()
if k.GetConsoleMode(h, ctypes.byref(mode)):
k.SetConsoleMode(h, mode.value | ENABLE_VT)
except Exception:
pass
_enable_vt()
__version__ = "1.1.0"
GITHUB_REPO = "Engelgardt23/dhcpsrv"
# Fixed heights used by the Layout — used to compute the clients table fit
HEADER_LINES = 5
EVENTS_LINES = 14
TBL_OVERHEAD = 6 # panel borders + table header row + table top/bottom rules
# Hardcoded defaults — pick NIC, everything else is auto.
DEFAULT_SERVER_IP = "10.10.10.1"
DEFAULT_NETMASK = "255.255.255.0"
POOL_SIZE = 50 # addresses starting at server_ip + 1
DEFAULT_LEASE = 7200 # 2 hours
# TFTP option always = server IP
# Stats counters
stats = {"packets": 0, "discovers": 0, "requests": 0, "releases": 0}
# rich
try:
from rich.console import Console, Group
from rich.live import Live
from rich.table import Table
from rich.text import Text
from rich.panel import Panel
from rich.layout import Layout
from rich.prompt import Prompt, Confirm
except ImportError:
print("'rich' missing in the bundled build")
input("Press Enter to exit")
sys.exit(10)
console = Console(log_path=False)
# ---------- admin ----------
def is_admin():
try: return ctypes.windll.shell32.IsUserAnAdmin() != 0
except: return False
def require_admin():
if not is_admin():
# Re-launch elevated; original exits
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(f'"{a}"' for a in sys.argv), None, 1)
sys.exit(0)
# ---------- update check ----------
def _parse_version(s):
try:
s = (s or "").strip().lstrip("v")
return tuple(int(x) for x in s.split(".")[:3])
except Exception:
return (0, 0, 0)
def check_for_update():
"""Query GitHub for the latest release. If newer than __version__, prompt to open the page."""
try:
url = f"https://api.github.com/repos/{GITHUB_REPO}/releases/latest"
req = urllib.request.Request(url, headers={
"Accept": "application/vnd.github+json",
"User-Agent": f"dhcpsrv/{__version__}",
})
with urllib.request.urlopen(req, timeout=3) as r:
data = json.loads(r.read().decode("utf-8", errors="replace"))
latest = (data.get("tag_name") or "").strip()
page = data.get("html_url") or f"https://github.com/{GITHUB_REPO}/releases/latest"
if _parse_version(latest) > _parse_version(__version__):
console.rule("[bold yellow]Update available")
console.print(f"Current: [dim]v{__version__}[/] Latest: [bold green]{latest}[/]")
try:
if Confirm.ask("Open the download page in your browser?", default=True):
webbrowser.open(page)
except (EOFError, KeyboardInterrupt):
pass
console.print()
except Exception:
# Offline / GitHub rate-limit / API error — skip silently.
pass
# ---------- helpers ----------
CREATE_NO_WINDOW = 0x08000000
def run_ps(cmd, timeout=15):
return subprocess.run(["powershell.exe","-NoProfile","-NonInteractive","-Command",cmd],
capture_output=True, text=True, timeout=timeout,
creationflags=CREATE_NO_WINDOW)
def run_netsh(args, timeout=15):
return subprocess.run(["netsh"] + args, capture_output=True, text=True,
timeout=timeout, creationflags=CREATE_NO_WINDOW)
# ---------- NIC enumeration ----------
SKIP_DESCR = ("VPN","Virtual","AnyConnect","TAP-","TUN-","Bluetooth","Loopback",
"WAN Miniport","Hyper-V","VMware","VirtualBox","WireGuard","OpenVPN",
"Tailscale","ZeroTier")
SKIP_MEDIA = ("Native 802.11","Wireless WAN")
def list_adapters():
cmd = (r"Get-NetAdapter | ForEach-Object {"
r" $ip = (Get-NetIPAddress -InterfaceIndex $_.ifIndex -AddressFamily IPv4 -ErrorAction SilentlyContinue | "
r" Where-Object PrefixOrigin -ne 'WellKnown' | Select-Object -ExpandProperty IPAddress) -join ','; "
r" [pscustomobject]@{"
r" Name=$_.Name; Description=$_.InterfaceDescription; Status=$_.Status; "
r" Virtual=[bool]$_.Virtual; MediaType=$_.MediaType; ifIndex=$_.ifIndex; IPv4=$ip"
r" }} | ConvertTo-Json -Depth 3 -Compress")
r = run_ps(cmd, timeout=20)
if r.returncode != 0 or not r.stdout.strip():
return []
data = json.loads(r.stdout)
if isinstance(data, dict): data = [data]
out = []
for a in data:
if a["Status"] in ("Disabled","Not Present"): continue
if a.get("Virtual"): continue
if a.get("MediaType") in SKIP_MEDIA: continue
descr = (a.get("Description") or "") + " " + (a.get("Name") or "")
if any(k.lower() in descr.lower() for k in SKIP_DESCR): continue
out.append(a)
out.sort(key=lambda x: x["ifIndex"])
return out
# ---------- DHCP server state ----------
clients = {} # mac -> {ip_int, host, last, ping_ok}
clients_lock = threading.Lock()
events = deque(maxlen=200)
events_lock = threading.Lock()
refresh_evt = threading.Event() # set whenever UI should re-render
def log_event(markup_line):
with events_lock:
events.append(markup_line)
refresh_evt.set()
# Config (filled by main)
SERVER_IP = ""
NETMASK = "255.255.255.0"
POOL = []
LEASE = 7200
TFTP = ""
def ip2int(ip): return struct.unpack("!I", socket.inet_aton(ip))[0]
def int2ip(n): return socket.inet_ntoa(struct.pack("!I", n))
def now_s(): return datetime.now().strftime("%H:%M:%S")
# ---------- ping ----------
# Windows `ping` exit code is unreliable: it can return 0 with "Destination host
# unreachable" or with a stale ARP-based "reply" from the local stack. The only
# trustworthy success marker is the "TTL=" substring in stdout (present across
# locales — e.g. "...time<1ms TTL=64" / "...время<1мс TTL=64").
def ping_one(ip, timeout_ms=600):
try:
r = subprocess.run(["ping","-n","1","-w",str(timeout_ms),ip],
capture_output=True, timeout=2, text=True,
creationflags=CREATE_NO_WINDOW)
out = (r.stdout or "") + (r.stderr or "")
return "TTL=" in out
except Exception:
return False
def ping_loop():
pool_exec = ThreadPoolExecutor(max_workers=16)
while True:
with clients_lock:
items = [(m, c["ip_int"]) for m, c in clients.items()]
changed = False
if items:
ips = [int2ip(i) for _, i in items]
res = list(pool_exec.map(ping_one, ips))
with clients_lock:
for (mac, _), ok in zip(items, res):
if mac in clients and clients[mac].get("ping_ok") != ok:
clients[mac]["ping_ok"] = ok
changed = True
if changed:
refresh_evt.set()
time.sleep(1.0)
# ---------- DHCP logic ----------
def alloc_ip(mac):
with clients_lock:
if mac in clients: return clients[mac]["ip_int"]
used = {c["ip_int"] for c in clients.values()}
for ipn in POOL:
if ipn not in used:
clients[mac] = {"ip_int": ipn, "host": "", "last": now_s(), "ping_ok": False}
return ipn
return None
def touch_client(mac, ipn=None, host=None):
with clients_lock:
if mac not in clients:
clients[mac] = {"ip_int": ipn or 0, "host": host or "", "last": now_s(), "ping_ok": False}
else:
clients[mac]["last"] = now_s()
if ipn: clients[mac]["ip_int"] = ipn
if host: clients[mac]["host"] = host
def parse_options(data):
opts = {}; i = 240
while i < len(data):
code = data[i]
if code == 0: i += 1; continue
if code == 255: break
if i + 1 >= len(data): break
L = data[i+1]
opts[code] = data[i+2:i+2+L]
i += 2 + L
return opts
def get_hostname(opts):
h = opts.get(12)
if h:
try: return h.rstrip(b"\x00").decode(errors="replace")
except: return ""
return ""
def build_reply(req, dhcp_type, yiaddr_int):
pkt = bytearray(240)
pkt[0] = 2; pkt[1] = 1; pkt[2] = 6; pkt[3] = 0
pkt[4:8] = req[4:8]
pkt[10:12] = req[10:12]
pkt[16:20] = struct.pack("!I", yiaddr_int)
pkt[20:24] = socket.inet_aton(SERVER_IP)
pkt[28:44] = req[28:44]
pkt[236:240] = b"\x63\x82\x53\x63"
o = bytearray()
o += bytes([53,1,dhcp_type])
o += bytes([54,4]) + socket.inet_aton(SERVER_IP)
o += bytes([51,4]) + struct.pack("!I", LEASE)
o += bytes([1,4]) + socket.inet_aton(NETMASK)
o += bytes([3,4]) + socket.inet_aton(SERVER_IP)
o += bytes([6,4]) + socket.inet_aton(SERVER_IP)
tb = TFTP.encode()
o += bytes([66, len(tb)]) + tb
o += bytes([150,4]) + socket.inet_aton(TFTP)
o += bytes([255])
return bytes(pkt) + bytes(o)
# ---------- UI ----------
def render_table():
t = Table(expand=True, header_style="bold")
t.add_column("#", style="dim", width=3, justify="right")
t.add_column("IP", width=16)
t.add_column("Hostname", min_width=10)
t.add_column("MAC", width=19)
t.add_column("Last seen",style="dim", width=10)
t.add_column("Ping", width=6, justify="center")
with clients_lock:
rows = sorted(clients.items(), key=lambda kv: kv[1]["ip_int"])
# Auto-fit to available height (header + events panels are fixed-size in Layout).
avail = max(1, console.size.height - HEADER_LINES - EVENTS_LINES - TBL_OVERHEAD)
overflow = max(0, len(rows) - avail)
if overflow:
rows = rows[:avail - 1] # leave one slot for the "(+N more)" marker
if not rows:
t.add_row("","","(no clients yet)","","","")
else:
for i,(mac,c) in enumerate(rows,1):
ping = Text("OK", style="bold green") if c.get("ping_ok") else Text("--", style="bold red")
t.add_row(str(i), int2ip(c["ip_int"]), c.get("host") or "", mac, c.get("last",""), ping)
if overflow:
t.add_row("", "", f"[dim](+{overflow} more — enlarge the window)[/]", "", "", "")
return t
def render_header():
with clients_lock:
leased = len(clients)
body = (f"[bold cyan]dhcpsrv v{__version__}[/] [dim]made by engelgardt[/]\n"
f"Server: [bold]{SERVER_IP}[/]/{NETMASK} "
f"Pool: [bold]{int2ip(POOL[0])}{int2ip(POOL[-1])}[/] "
f"Lease: [bold]{LEASE}s[/] "
f"TFTP: [bold]{TFTP}[/]\n"
f"Leases: [bold]{leased}/{len(POOL)}[/] "
f"Pkts: [dim]{stats['packets']}[/] "
f"DISCOVER: [cyan]{stats['discovers']}[/] "
f"REQUEST: [green]{stats['requests']}[/] "
f"RELEASE: [yellow]{stats['releases']}[/] "
f"[dim]Ctrl+C to stop[/]")
return Panel(body, border_style="cyan")
def render_events_panel():
with events_lock:
last = list(events)[-20:]
body = "\n".join(last) if last else "[dim](no events yet)[/]"
return Panel(body, title="Events", border_style="dim")
def render_screen():
layout = Layout()
layout.split_column(
Layout(render_header(), name="hdr", size=HEADER_LINES),
Layout(Panel(render_table(), title="Clients", border_style="cyan"), name="tbl"),
Layout(render_events_panel(), name="evt", size=EVENTS_LINES),
)
return layout
def ui_loop(stop):
# Pure event-driven: refresh only on real state changes or terminal resize.
# Clear screen AND scrollback so wheel-scrolling won't expose pre-launch text.
sys.stdout.write("\x1b[2J\x1b[3J\x1b[H")
sys.stdout.flush()
last_size = console.size
with Live(render_screen(), auto_refresh=False, console=console, screen=True,
redirect_stdout=False, redirect_stderr=False) as live:
live.refresh()
while not stop.is_set():
triggered = refresh_evt.wait(timeout=0.5)
if stop.is_set(): break
# Detect window resize without spamming refreshes
cur_size = console.size
resized = (cur_size != last_size)
if resized:
last_size = cur_size
if triggered:
refresh_evt.clear()
if triggered or resized:
live.update(render_screen(), refresh=True)
# ---------- main flow ----------
def select_nic():
console.rule("[bold cyan]Available adapters")
adapters = list_adapters()
if not adapters:
console.print("[red]No suitable wired adapters found.[/]")
return None
for i, a in enumerate(adapters, 1):
ip = a.get("IPv4") or ""
console.print(f" {i}) [{a['Status']}] {a['Name']} ({a['Description']}) {ip}")
while True:
s = Prompt.ask("Select adapter number").strip()
if s.isdigit() and 1 <= int(s) <= len(adapters):
return adapters[int(s)-1]
console.print("[red]Invalid selection.[/]")
def set_static_ip(nic_name, ip, mask):
console.print(f"[yellow]Setting {nic_name}{ip} / {mask} ...[/]")
run_netsh(["interface","ipv4","set","address",f"name={nic_name}","static",ip,mask])
def revert_dhcp(nic_name):
run_netsh(["interface","ipv4","set","address",f"name={nic_name}","source=dhcp"])
run_netsh(["interface","ipv4","set","dnsservers",f"name={nic_name}","source=dhcp"])
def main():
global SERVER_IP, POOL, LEASE, TFTP
require_admin()
console.print(f"[bold cyan]dhcpsrv v{__version__}[/] - portable laptop-side DHCP server")
console.print("[dim]made by engelgardt[/]")
console.print()
check_for_update()
nic = select_nic()
if not nic: input("Press Enter to exit"); return
SERVER_IP = DEFAULT_SERVER_IP
LEASE = DEFAULT_LEASE
TFTP = SERVER_IP
server_n = ip2int(SERVER_IP)
POOL = list(range(server_n + 1, server_n + 1 + POOL_SIZE))
set_static_ip(nic["Name"], SERVER_IP, NETMASK)
log_event(f"[dim][{now_s()}][/] [bold]NIC[/] {nic['Name']}{SERVER_IP}/{NETMASK}")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
try:
s.bind(("0.0.0.0", 67))
except OSError as e:
console.print(f"[bold red]bind UDP/67 failed:[/] {e}")
console.print("[yellow]Another DHCP service (Tftpd32, ICS, Windows DHCP) may be running.[/]")
input("Press Enter to exit"); return
stop = threading.Event()
threading.Thread(target=ping_loop, daemon=True).start()
threading.Thread(target=ui_loop, args=(stop,), daemon=True).start()
def shutdown(sig=None, frm=None):
stop.set()
console.print()
console.print(f"[dim][{now_s()}] Shutting down...[/]")
try: s.close()
except: pass
# Ask revert
try:
if Confirm.ask(f"Revert {nic['Name']} back to DHCP?", default=False):
revert_dhcp(nic["Name"])
console.print("[green]NIC reverted to DHCP[/]")
except (EOFError, KeyboardInterrupt): pass
input("Press Enter to exit")
sys.exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
while True:
try:
data, _ = s.recvfrom(2048)
except KeyboardInterrupt:
shutdown(); return
except OSError:
continue
if len(data) < 240 or data[0] != 1: continue
stats["packets"] += 1
mac = ":".join(f"{b:02x}" for b in data[28:34])
opts = parse_options(data)
msg = opts.get(53)
if not msg: continue
mt = msg[0]
host = get_hostname(opts)
host_s = f" [{host}]" if host else ""
if mt == 1:
stats["discovers"] += 1
ipn = alloc_ip(mac)
if ipn is None:
log_event(f"[dim][{now_s()}][/] [red]DISCOVER[/] {mac} → [red]POOL EXHAUSTED[/]")
continue
touch_client(mac, ipn, host)
s.sendto(build_reply(data, 2, ipn), ("255.255.255.255", 68))
log_event(f"[dim][{now_s()}][/] [cyan]DISCOVER[/] {mac}{host_s} → OFFER {int2ip(ipn)}")
elif mt == 3:
stats["requests"] += 1
req_ip = opts.get(50)
with clients_lock:
cached = clients.get(mac, {}).get("ip_int")
ipn = struct.unpack("!I", req_ip)[0] if req_ip else cached
if ipn is None: continue
touch_client(mac, ipn, host)
s.sendto(build_reply(data, 5, ipn), ("255.255.255.255", 68))
log_event(f"[dim][{now_s()}][/] [green]REQUEST[/] {mac}{host_s} → ACK {int2ip(ipn)}")
elif mt == 7:
stats["releases"] += 1
with clients_lock:
old = clients.pop(mac, None)
if old:
log_event(f"[dim][{now_s()}][/] [yellow]RELEASE[/] {mac} → freed {int2ip(old['ip_int'])}")
elif mt == 8:
log_event(f"[dim][{now_s()}][/] [blue]INFORM[/] {mac}{host_s}")
if __name__ == "__main__":
main()

26
pyproject.toml Normal file
View file

@ -0,0 +1,26 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "dhcpsrv"
description = "Portable laptop-side DHCP server for storage/server engineers."
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
authors = [{ name = "engelgardt" }]
dependencies = ["rich>=13"]
dynamic = ["version"]
[project.urls]
Homepage = "https://github.com/Engelgardt23/dhcpsrv"
Issues = "https://github.com/Engelgardt23/dhcpsrv/issues"
[project.scripts]
dhcpsrv = "dhcpsrv.app:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.dynamic]
version = { attr = "dhcpsrv.__version__" }

10
src/dhcpsrv/__init__.py Normal file
View file

@ -0,0 +1,10 @@
"""
dhcpsrv - portable laptop-side DHCP server for storage/server engineers.
made by engelgardt
The single source of truth for the project version. Bump this before tagging
a release; CI reads the tag, the code reads this constant.
"""
__version__ = "1.1.0"
GITHUB_REPO = "Engelgardt23/dhcpsrv"

11
src/dhcpsrv/__main__.py Normal file
View file

@ -0,0 +1,11 @@
"""Entry point for `python -m dhcpsrv` from a checked-out / installed package.
The PyInstaller-bundled exe uses `dhcpsrv-launcher.py` at the repo root instead,
because PyInstaller runs the bundled script as a standalone module relative
imports fail there."""
from .app import main
if __name__ == "__main__":
main()

90
src/dhcpsrv/app.py Normal file
View file

@ -0,0 +1,90 @@
"""
Application entry: wires admin check, update check, NIC selection, the DHCP
server, the UI and the ping loop together."""
from __future__ import annotations
import signal
import sys
import threading
from rich.console import Console
from rich.prompt import Confirm, Prompt
from . import __version__
from .platform_win import enable_vt, require_admin
from .update_check import check_for_update
from .network import list_adapters, set_static_ip, revert_to_dhcp
from .dhcp import DhcpConfig, DhcpServer, now_s
from .ui import Ui
def _select_nic(console: Console) -> dict | None:
console.rule("[bold cyan]Available adapters")
adapters = list_adapters()
if not adapters:
console.print("[red]No suitable wired adapters found.[/]")
return None
for i, a in enumerate(adapters, 1):
ip = a.get("IPv4") or ""
console.print(f" {i}) [{a['Status']}] {a['Name']} ({a['Description']}) {ip}")
while True:
s = Prompt.ask("Select adapter number").strip()
if s.isdigit() and 1 <= int(s) <= len(adapters):
return adapters[int(s) - 1]
console.print("[red]Invalid selection.[/]")
def main() -> None:
enable_vt()
require_admin()
console = Console(log_path=False)
console.print(f"[bold cyan]dhcpsrv v{__version__}[/] - portable laptop-side DHCP server")
console.print("[dim]made by engelgardt[/]")
console.print()
check_for_update(console)
nic = _select_nic(console)
if not nic:
input("Press Enter to exit"); return
cfg = DhcpConfig.with_defaults()
console.print(f"[yellow]Setting {nic['Name']}{cfg.server_ip} / {cfg.netmask} ...[/]")
set_static_ip(nic["Name"], cfg.server_ip, cfg.netmask)
# Wire server <-> ui through callbacks so neither imports the other.
ui = Ui.__new__(Ui) # forward declaration so server callbacks can capture it
server = DhcpServer(cfg=cfg, log=lambda m: ui.log(m), on_change=lambda: ui.request_refresh())
Ui.__init__(ui, server) # finish UI init now that server exists
stop = threading.Event()
threading.Thread(target=server.run, args=(stop,), daemon=True).start()
threading.Thread(target=server.ping_loop, args=(stop,), daemon=True).start()
def shutdown(sig=None, frm=None):
stop.set()
try:
print()
print(f"[{now_s()}] Shutting down...")
try:
if Confirm.ask(f"Revert {nic['Name']} back to DHCP?", default=False):
revert_to_dhcp(nic["Name"])
print("NIC reverted to DHCP")
except (EOFError, KeyboardInterrupt):
pass
finally:
input("Press Enter to exit")
sys.exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
try:
ui.run(stop)
except KeyboardInterrupt:
shutdown()
if __name__ == "__main__":
main()

235
src/dhcpsrv/dhcp.py Normal file
View file

@ -0,0 +1,235 @@
"""
DHCP server: packet parsing/building, the lease table, and the main socket
loop. Pure logic UI rendering and ping live in their own modules.
The `DhcpServer` instance owns the runtime state (clients, stats). The UI
holds a reference to it and reads it whenever it renders."""
from __future__ import annotations
import socket
import struct
import sys
import threading
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Optional
from .network import ping_one
# ---------- helpers ----------
def ip2int(ip: str) -> int:
return struct.unpack("!I", socket.inet_aton(ip))[0]
def int2ip(n: int) -> str:
return socket.inet_ntoa(struct.pack("!I", n))
def now_s() -> str:
return datetime.now().strftime("%H:%M:%S")
# ---------- config ----------
@dataclass
class DhcpConfig:
server_ip: str
netmask: str
pool: list[int] # list of int IPs to hand out
lease: int # seconds
tftp: str # value for options 66 / 150
@classmethod
def with_defaults(cls, server_ip: str = "10.10.10.1",
netmask: str = "255.255.255.0",
pool_size: int = 50,
lease: int = 7200) -> "DhcpConfig":
n = ip2int(server_ip)
pool = list(range(n + 1, n + 1 + pool_size))
return cls(server_ip=server_ip, netmask=netmask, pool=pool,
lease=lease, tftp=server_ip)
# ---------- server ----------
@dataclass
class DhcpServer:
cfg: DhcpConfig
log: Callable[[str], None] # how to push an event line into the UI
on_change: Callable[[], None] # called whenever something the UI cares about changed
clients: dict[str, dict] = field(default_factory=dict)
lock: threading.Lock = field(default_factory=threading.Lock)
stats: dict[str, int] = field(default_factory=lambda: {
"packets": 0, "discovers": 0, "requests": 0, "releases": 0,
})
# --- lease allocation ---
def alloc_ip(self, mac: str) -> Optional[int]:
with self.lock:
if mac in self.clients:
return self.clients[mac]["ip_int"]
used = {c["ip_int"] for c in self.clients.values()}
for ipn in self.cfg.pool:
if ipn not in used:
self.clients[mac] = {
"ip_int": ipn, "host": "", "last": now_s(), "ping_ok": False,
}
return ipn
return None
def touch_client(self, mac: str, ipn: Optional[int] = None, host: Optional[str] = None) -> None:
with self.lock:
if mac not in self.clients:
self.clients[mac] = {
"ip_int": ipn or 0, "host": host or "",
"last": now_s(), "ping_ok": False,
}
else:
self.clients[mac]["last"] = now_s()
if ipn: self.clients[mac]["ip_int"] = ipn
if host: self.clients[mac]["host"] = host
def release_client(self, mac: str) -> Optional[int]:
with self.lock:
old = self.clients.pop(mac, None)
return old["ip_int"] if old else None
# --- packet parsing ---
@staticmethod
def parse_options(data: bytes) -> dict[int, bytes]:
opts: dict[int, bytes] = {}
i = 240
while i < len(data):
code = data[i]
if code == 0:
i += 1; continue
if code == 255:
break
if i + 1 >= len(data):
break
L = data[i + 1]
opts[code] = data[i + 2 : i + 2 + L]
i += 2 + L
return opts
@staticmethod
def get_hostname(opts: dict[int, bytes]) -> str:
h = opts.get(12)
if not h:
return ""
try:
return h.rstrip(b"\x00").decode(errors="replace")
except Exception:
return ""
# --- packet building ---
def build_reply(self, req: bytes, dhcp_type: int, yiaddr_int: int) -> bytes:
pkt = bytearray(240)
pkt[0] = 2; pkt[1] = 1; pkt[2] = 6; pkt[3] = 0
pkt[4:8] = req[4:8] # xid
pkt[10:12] = req[10:12] # flags
pkt[16:20] = struct.pack("!I", yiaddr_int) # yiaddr
pkt[20:24] = socket.inet_aton(self.cfg.server_ip) # siaddr
pkt[28:44] = req[28:44] # chaddr
pkt[236:240] = b"\x63\x82\x53\x63" # magic cookie
o = bytearray()
o += bytes([53, 1, dhcp_type]) # message type
o += bytes([54, 4]) + socket.inet_aton(self.cfg.server_ip) # server id
o += bytes([51, 4]) + struct.pack("!I", self.cfg.lease) # lease time
o += bytes([1, 4]) + socket.inet_aton(self.cfg.netmask) # subnet mask
o += bytes([3, 4]) + socket.inet_aton(self.cfg.server_ip) # router
o += bytes([6, 4]) + socket.inet_aton(self.cfg.server_ip) # DNS
tftp = self.cfg.tftp.encode()
o += bytes([66, len(tftp)]) + tftp # TFTP server name
o += bytes([150, 4]) + socket.inet_aton(self.cfg.tftp) # TFTP server addr
o += bytes([255])
return bytes(pkt) + bytes(o)
# --- ping loop, runs in its own thread ---
def ping_loop(self, stop: threading.Event) -> None:
from concurrent.futures import ThreadPoolExecutor
pool_exec = ThreadPoolExecutor(max_workers=16)
while not stop.wait(1.0):
with self.lock:
items = [(m, c["ip_int"]) for m, c in self.clients.items()]
if not items:
continue
ips = [int2ip(i) for _, i in items]
results = list(pool_exec.map(ping_one, ips))
changed = False
with self.lock:
for (mac, _), ok in zip(items, results):
if mac in self.clients and self.clients[mac].get("ping_ok") != ok:
self.clients[mac]["ping_ok"] = ok
changed = True
if changed:
self.on_change()
# --- main server loop ---
def run(self, stop: threading.Event) -> None:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
try:
s.bind(("0.0.0.0", 67))
except OSError as e:
self.log(f"[bold red]bind UDP/67 failed:[/] {e}")
self.log("[yellow]Another DHCP service (Tftpd32 DHCP, ICS, Windows DHCP) may be running.[/]")
stop.set()
return
while not stop.is_set():
try:
data, _ = s.recvfrom(2048)
except OSError:
continue
if len(data) < 240 or data[0] != 1:
continue
self.stats["packets"] += 1
mac = ":".join(f"{b:02x}" for b in data[28:34])
opts = self.parse_options(data)
msg = opts.get(53)
if not msg:
continue
mt = msg[0]
host = self.get_hostname(opts)
host_s = f" [{host}]" if host else ""
if mt == 1: # DISCOVER
self.stats["discovers"] += 1
ipn = self.alloc_ip(mac)
if ipn is None:
self.log(f"[dim][{now_s()}][/] [red]DISCOVER[/] {mac} → [red]POOL EXHAUSTED[/]")
continue
self.touch_client(mac, ipn, host)
s.sendto(self.build_reply(data, 2, ipn), ("255.255.255.255", 68))
self.log(f"[dim][{now_s()}][/] [cyan]DISCOVER[/] {mac}{host_s} → OFFER {int2ip(ipn)}")
elif mt == 3: # REQUEST
self.stats["requests"] += 1
req_ip = opts.get(50)
with self.lock:
cached = self.clients.get(mac, {}).get("ip_int")
ipn = struct.unpack("!I", req_ip)[0] if req_ip else cached
if ipn is None:
continue
self.touch_client(mac, ipn, host)
s.sendto(self.build_reply(data, 5, ipn), ("255.255.255.255", 68))
self.log(f"[dim][{now_s()}][/] [green]REQUEST[/] {mac}{host_s} → ACK {int2ip(ipn)}")
elif mt == 7: # RELEASE
self.stats["releases"] += 1
old = self.release_client(mac)
if old is not None:
self.log(f"[dim][{now_s()}][/] [yellow]RELEASE[/] {mac} → freed {int2ip(old)}")
elif mt == 8: # INFORM
self.log(f"[dim][{now_s()}][/] [blue]INFORM[/] {mac}{host_s}")
self.on_change()
try:
s.close()
except Exception:
pass

103
src/dhcpsrv/network.py Normal file
View file

@ -0,0 +1,103 @@
"""
Network plumbing: enumerate physical NICs, set / revert IP via netsh, ping hosts.
Pure functions; no shared state. The ping loop is in `dhcp.py` because it
mutates the DHCP server's client table."""
from __future__ import annotations
import json
import os
import subprocess
from typing import Any
CREATE_NO_WINDOW = 0x08000000 if os.name == "nt" else 0
# Adapters we never show in the picker.
SKIP_DESCRIPTION = (
"VPN", "Virtual", "AnyConnect", "TAP-", "TUN-", "Bluetooth", "Loopback",
"WAN Miniport", "Hyper-V", "VMware", "VirtualBox", "WireGuard", "OpenVPN",
"Tailscale", "ZeroTier",
)
SKIP_MEDIA = ("Native 802.11", "Wireless WAN")
def _run_ps(cmd: str, timeout: int = 15) -> subprocess.CompletedProcess:
return subprocess.run(
["powershell.exe", "-NoProfile", "-NonInteractive", "-Command", cmd],
capture_output=True, text=True, timeout=timeout,
creationflags=CREATE_NO_WINDOW,
)
def _run_netsh(args: list[str], timeout: int = 15) -> subprocess.CompletedProcess:
return subprocess.run(
["netsh", *args],
capture_output=True, text=True, timeout=timeout,
creationflags=CREATE_NO_WINDOW,
)
def list_adapters() -> list[dict[str, Any]]:
"""Return physical wired adapters only — skip wireless / VPN / virtual."""
cmd = (
r"Get-NetAdapter | ForEach-Object {"
r" $ip = (Get-NetIPAddress -InterfaceIndex $_.ifIndex -AddressFamily IPv4 -ErrorAction SilentlyContinue | "
r" Where-Object PrefixOrigin -ne 'WellKnown' | Select-Object -ExpandProperty IPAddress) -join ','; "
r" [pscustomobject]@{"
r" Name=$_.Name; Description=$_.InterfaceDescription; Status=$_.Status; "
r" Virtual=[bool]$_.Virtual; MediaType=$_.MediaType; ifIndex=$_.ifIndex; IPv4=$ip"
r" }} | ConvertTo-Json -Depth 3 -Compress"
)
r = _run_ps(cmd, timeout=20)
if r.returncode != 0 or not r.stdout.strip():
return []
data = json.loads(r.stdout)
if isinstance(data, dict):
data = [data]
out: list[dict[str, Any]] = []
for a in data:
if a.get("Status") in ("Disabled", "Not Present"):
continue
if a.get("Virtual"):
continue
if a.get("MediaType") in SKIP_MEDIA:
continue
haystack = ((a.get("Description") or "") + " " + (a.get("Name") or "")).lower()
if any(k.lower() in haystack for k in SKIP_DESCRIPTION):
continue
out.append(a)
out.sort(key=lambda x: x["ifIndex"])
return out
def set_static_ip(nic_name: str, ip: str, mask: str) -> None:
_run_netsh(["interface", "ipv4", "set", "address", f"name={nic_name}", "static", ip, mask])
def revert_to_dhcp(nic_name: str) -> None:
_run_netsh(["interface", "ipv4", "set", "address", f"name={nic_name}", "source=dhcp"])
_run_netsh(["interface", "ipv4", "set", "dnsservers", f"name={nic_name}", "source=dhcp"])
def ping_one(ip: str, timeout_ms: int = 600) -> bool:
"""Windows-`ping` based reachability test.
Windows' `ping` exit code is unreliable — it can return 0 with
"Destination host unreachable" or with a stale-ARP-based reply from the
local stack. The only trustworthy success marker is the `TTL=` substring
in stdout (present across locales e.g. `...time<1ms TTL=64` or
`...время<1мс TTL=64`)."""
try:
if os.name == "nt":
cmd = ["ping", "-n", "1", "-w", str(timeout_ms), ip]
else:
cmd = ["ping", "-c", "1", "-W", "1", ip]
r = subprocess.run(
cmd, capture_output=True, timeout=2, text=True,
creationflags=CREATE_NO_WINDOW,
)
out = (r.stdout or "") + (r.stderr or "")
return "TTL=" in out
except Exception:
return False

View file

@ -0,0 +1,50 @@
"""
Windows-specific bits: VT (ANSI) processing in the console, UAC self-elevation.
"""
from __future__ import annotations
import ctypes
import os
import sys
def enable_vt() -> None:
"""Enable virtual-terminal processing on the Windows console so that ESC
escape sequences from raw stdout writes are interpreted as colours / clear
instead of being printed as literal characters.
Rich enables this for its own writes; we still call it so direct
`sys.stdout.write("\\x1b[...")` works (used to clear the scrollback when
the alt screen starts)."""
if os.name != "nt":
return
try:
k = ctypes.windll.kernel32
STD_OUT, STD_ERR = -11, -12
ENABLE_VT = 0x0004
for std in (STD_OUT, STD_ERR):
h = k.GetStdHandle(std)
mode = ctypes.c_ulong()
if k.GetConsoleMode(h, ctypes.byref(mode)):
k.SetConsoleMode(h, mode.value | ENABLE_VT)
except Exception:
pass
def is_admin() -> bool:
try:
return ctypes.windll.shell32.IsUserAnAdmin() != 0
except Exception:
return False
def require_admin() -> None:
"""If we're not running elevated, relaunch ourselves through UAC and exit.
Works both for the PyInstaller bundle (`sys.executable` is the exe itself)
and for a plain `python src/dhcpsrv/__main__.py` run."""
if is_admin():
return
args = " ".join(f'"{a}"' for a in sys.argv)
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, args, None, 1)
sys.exit(0)

143
src/dhcpsrv/ui.py Normal file
View file

@ -0,0 +1,143 @@
"""
Rich-based full-screen TUI.
`Ui` owns the events buffer and the refresh trigger; it reads everything else
from the `DhcpServer` it's bound to."""
from __future__ import annotations
import sys
import threading
from collections import deque
from typing import Callable
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from . import __version__
from .dhcp import DhcpServer, int2ip
# Fixed-size layout slots — used to compute the clients-table fit.
HEADER_LINES = 5
EVENTS_LINES = 14
TBL_OVERHEAD = 6 # panel borders + table header + table rules
class Ui:
def __init__(self, server: DhcpServer):
self.server = server
self.console = Console(log_path=False)
self.events = deque(maxlen=200)
self.events_lock = threading.Lock()
self.refresh_evt = threading.Event()
# --- public for other modules ---
def log(self, markup: str) -> None:
"""Append an event line. Called from the DHCP server thread."""
with self.events_lock:
self.events.append(markup)
self.refresh_evt.set()
def request_refresh(self) -> None:
"""Called when something the UI cares about changed (e.g. a ping result)."""
self.refresh_evt.set()
# --- rendering ---
def _render_header(self) -> Panel:
with self.server.lock:
leased = len(self.server.clients)
st = self.server.stats
cfg = self.server.cfg
body = (
f"[bold cyan]dhcpsrv v{__version__}[/] [dim]made by engelgardt[/]\n"
f"Server: [bold]{cfg.server_ip}[/]/{cfg.netmask} "
f"Pool: [bold]{int2ip(cfg.pool[0])}{int2ip(cfg.pool[-1])}[/] "
f"Lease: [bold]{cfg.lease}s[/] "
f"TFTP: [bold]{cfg.tftp}[/]\n"
f"Leases: [bold]{leased}/{len(cfg.pool)}[/] "
f"Pkts: [dim]{st['packets']}[/] "
f"DISCOVER: [cyan]{st['discovers']}[/] "
f"REQUEST: [green]{st['requests']}[/] "
f"RELEASE: [yellow]{st['releases']}[/] "
f"[dim]Ctrl+C to stop[/]"
)
return Panel(body, border_style="cyan")
def _render_table(self) -> Table:
t = Table(expand=True, header_style="bold")
t.add_column("#", style="dim", width=3, justify="right")
t.add_column("IP", width=16)
t.add_column("Hostname", min_width=10)
t.add_column("MAC", width=19)
t.add_column("Last seen", style="dim", width=10)
t.add_column("Ping", width=6, justify="center")
with self.server.lock:
rows = sorted(self.server.clients.items(), key=lambda kv: kv[1]["ip_int"])
avail = max(1, self.console.size.height - HEADER_LINES - EVENTS_LINES - TBL_OVERHEAD)
overflow = max(0, len(rows) - avail)
if overflow:
rows = rows[: avail - 1] # leave one slot for the "(+N more)" marker
if not rows:
t.add_row("", "", "(no clients yet)", "", "", "")
else:
for i, (mac, c) in enumerate(rows, 1):
ping = (Text("OK", style="bold green")
if c.get("ping_ok") else Text("--", style="bold red"))
t.add_row(
str(i),
int2ip(c["ip_int"]),
c.get("host") or "",
mac,
c.get("last", ""),
ping,
)
if overflow:
t.add_row("", "", f"[dim](+{overflow} more — enlarge the window)[/]", "", "", "")
return t
def _render_events(self) -> Panel:
with self.events_lock:
last = list(self.events)[-20:]
body = "\n".join(last) if last else "[dim](no events yet)[/]"
return Panel(body, title="Events", border_style="dim")
def _render_screen(self) -> Layout:
layout = Layout()
layout.split_column(
Layout(self._render_header(), name="hdr", size=HEADER_LINES),
Layout(Panel(self._render_table(), title="Clients", border_style="cyan"), name="tbl"),
Layout(self._render_events(), name="evt", size=EVENTS_LINES),
)
return layout
# --- main loop ---
def run(self, stop: threading.Event) -> None:
"""Run until `stop` is set. Event-driven: redraws only on real changes
or terminal resize."""
# Clear screen + scrollback so wheel-scrolling can't expose pre-launch text.
sys.stdout.write("\x1b[2J\x1b[3J\x1b[H")
sys.stdout.flush()
last_size = self.console.size
with Live(self._render_screen(), auto_refresh=False, console=self.console,
screen=True, redirect_stdout=False, redirect_stderr=False) as live:
live.refresh()
while not stop.is_set():
triggered = self.refresh_evt.wait(timeout=0.5)
if stop.is_set():
break
cur_size = self.console.size
resized = (cur_size != last_size)
if resized:
last_size = cur_size
if triggered:
self.refresh_evt.clear()
if triggered or resized:
live.update(self._render_screen(), refresh=True)

View file

@ -0,0 +1,53 @@
"""
Auto-update check.
On startup, ask GitHub for the latest release tag. If it's newer than the
local `__version__`, ask the user whether to open the download page in a
browser. Silent on any error (offline, rate-limit, etc.)."""
from __future__ import annotations
import json
import urllib.request
import webbrowser
from rich.console import Console
from rich.prompt import Confirm
from . import __version__, GITHUB_REPO
def _parse_version(s: str) -> tuple[int, int, int]:
try:
s = (s or "").strip().lstrip("v")
parts = [int(x) for x in s.split(".")[:3]]
while len(parts) < 3:
parts.append(0)
return tuple(parts) # type: ignore[return-value]
except Exception:
return (0, 0, 0)
def check_for_update(console: Console) -> None:
try:
url = f"https://api.github.com/repos/{GITHUB_REPO}/releases/latest"
req = urllib.request.Request(url, headers={
"Accept": "application/vnd.github+json",
"User-Agent": f"dhcpsrv/{__version__}",
})
with urllib.request.urlopen(req, timeout=3) as r:
data = json.loads(r.read().decode("utf-8", errors="replace"))
latest = (data.get("tag_name") or "").strip()
page = data.get("html_url") or f"https://github.com/{GITHUB_REPO}/releases/latest"
if _parse_version(latest) > _parse_version(__version__):
console.rule("[bold yellow]Update available")
console.print(f"Current: [dim]v{__version__}[/] Latest: [bold green]{latest}[/]")
try:
if Confirm.ask("Open the download page in your browser?", default=True):
webbrowser.open(page)
except (EOFError, KeyboardInterrupt):
pass
console.print()
except Exception:
# Offline / API error — silent on purpose.
pass