dhcpsrv/dhcpsrv_app.py

455 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
dhcpsrv v1.0.0 - portable single-exe edition.
made by engelgardt
This file combines what previously lived in dhcpsrv.ps1 + dhcpsrv.py:
- admin check
- NIC selection (filters out wireless / VPN / virtual)
- static IP setting (netsh)
- DHCP server with rich live UI
- revert NIC prompt on exit
Build:
pyinstaller --onefile --uac-admin --name dhcpsrv --console dhcpsrv_app.py
"""
import os, sys, ctypes, json, subprocess, signal, socket, struct, threading, time
from collections import deque
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
# Enable VT (ANSI escape) processing on Windows console BEFORE any output.
def _enable_vt():
if os.name != "nt":
return
try:
k = ctypes.windll.kernel32
STD_OUT, STD_ERR = -11, -12
ENABLE_VT = 0x0004
for std in (STD_OUT, STD_ERR):
h = k.GetStdHandle(std)
mode = ctypes.c_ulong()
if k.GetConsoleMode(h, ctypes.byref(mode)):
k.SetConsoleMode(h, mode.value | ENABLE_VT)
except Exception:
pass
_enable_vt()
__version__ = "1.0.0"
# Fixed heights used by the Layout — used to compute the clients table fit
HEADER_LINES = 5
EVENTS_LINES = 14
TBL_OVERHEAD = 6 # panel borders + table header row + table top/bottom rules
# Hardcoded defaults — pick NIC, everything else is auto.
DEFAULT_SERVER_IP = "10.10.10.1"
DEFAULT_NETMASK = "255.255.255.0"
POOL_SIZE = 50 # addresses starting at server_ip + 1
DEFAULT_LEASE = 7200 # 2 hours
# TFTP option always = server IP
# Stats counters
stats = {"packets": 0, "discovers": 0, "requests": 0, "releases": 0}
# rich
try:
from rich.console import Console, Group
from rich.live import Live
from rich.table import Table
from rich.text import Text
from rich.panel import Panel
from rich.layout import Layout
from rich.prompt import Prompt, Confirm
except ImportError:
print("'rich' missing in the bundled build")
input("Press Enter to exit")
sys.exit(10)
console = Console(log_path=False)
# ---------- admin ----------
def is_admin():
try: return ctypes.windll.shell32.IsUserAnAdmin() != 0
except: return False
def require_admin():
if not is_admin():
# Re-launch elevated; original exits
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(f'"{a}"' for a in sys.argv), None, 1)
sys.exit(0)
# ---------- helpers ----------
CREATE_NO_WINDOW = 0x08000000
def run_ps(cmd, timeout=15):
return subprocess.run(["powershell.exe","-NoProfile","-NonInteractive","-Command",cmd],
capture_output=True, text=True, timeout=timeout,
creationflags=CREATE_NO_WINDOW)
def run_netsh(args, timeout=15):
return subprocess.run(["netsh"] + args, capture_output=True, text=True,
timeout=timeout, creationflags=CREATE_NO_WINDOW)
# ---------- NIC enumeration ----------
SKIP_DESCR = ("VPN","Virtual","AnyConnect","TAP-","TUN-","Bluetooth","Loopback",
"WAN Miniport","Hyper-V","VMware","VirtualBox","WireGuard","OpenVPN",
"Tailscale","ZeroTier")
SKIP_MEDIA = ("Native 802.11","Wireless WAN")
def list_adapters():
cmd = (r"Get-NetAdapter | ForEach-Object {"
r" $ip = (Get-NetIPAddress -InterfaceIndex $_.ifIndex -AddressFamily IPv4 -ErrorAction SilentlyContinue | "
r" Where-Object PrefixOrigin -ne 'WellKnown' | Select-Object -ExpandProperty IPAddress) -join ','; "
r" [pscustomobject]@{"
r" Name=$_.Name; Description=$_.InterfaceDescription; Status=$_.Status; "
r" Virtual=[bool]$_.Virtual; MediaType=$_.MediaType; ifIndex=$_.ifIndex; IPv4=$ip"
r" }} | ConvertTo-Json -Depth 3 -Compress")
r = run_ps(cmd, timeout=20)
if r.returncode != 0 or not r.stdout.strip():
return []
data = json.loads(r.stdout)
if isinstance(data, dict): data = [data]
out = []
for a in data:
if a["Status"] in ("Disabled","Not Present"): continue
if a.get("Virtual"): continue
if a.get("MediaType") in SKIP_MEDIA: continue
descr = (a.get("Description") or "") + " " + (a.get("Name") or "")
if any(k.lower() in descr.lower() for k in SKIP_DESCR): continue
out.append(a)
out.sort(key=lambda x: x["ifIndex"])
return out
# ---------- DHCP server state ----------
clients = {} # mac -> {ip_int, host, last, ping_ok}
clients_lock = threading.Lock()
events = deque(maxlen=200)
events_lock = threading.Lock()
refresh_evt = threading.Event() # set whenever UI should re-render
def log_event(markup_line):
with events_lock:
events.append(markup_line)
refresh_evt.set()
# Config (filled by main)
SERVER_IP = ""
NETMASK = "255.255.255.0"
POOL = []
LEASE = 7200
TFTP = ""
def ip2int(ip): return struct.unpack("!I", socket.inet_aton(ip))[0]
def int2ip(n): return socket.inet_ntoa(struct.pack("!I", n))
def now_s(): return datetime.now().strftime("%H:%M:%S")
# ---------- ping ----------
# Windows `ping` exit code is unreliable: it can return 0 with "Destination host
# unreachable" or with a stale ARP-based "reply" from the local stack. The only
# trustworthy success marker is the "TTL=" substring in stdout (present across
# locales — e.g. "...time<1ms TTL=64" / "...время<1мс TTL=64").
def ping_one(ip, timeout_ms=600):
try:
r = subprocess.run(["ping","-n","1","-w",str(timeout_ms),ip],
capture_output=True, timeout=2, text=True,
creationflags=CREATE_NO_WINDOW)
out = (r.stdout or "") + (r.stderr or "")
return "TTL=" in out
except Exception:
return False
def ping_loop():
pool_exec = ThreadPoolExecutor(max_workers=16)
while True:
with clients_lock:
items = [(m, c["ip_int"]) for m, c in clients.items()]
changed = False
if items:
ips = [int2ip(i) for _, i in items]
res = list(pool_exec.map(ping_one, ips))
with clients_lock:
for (mac, _), ok in zip(items, res):
if mac in clients and clients[mac].get("ping_ok") != ok:
clients[mac]["ping_ok"] = ok
changed = True
if changed:
refresh_evt.set()
time.sleep(1.0)
# ---------- DHCP logic ----------
def alloc_ip(mac):
with clients_lock:
if mac in clients: return clients[mac]["ip_int"]
used = {c["ip_int"] for c in clients.values()}
for ipn in POOL:
if ipn not in used:
clients[mac] = {"ip_int": ipn, "host": "", "last": now_s(), "ping_ok": False}
return ipn
return None
def touch_client(mac, ipn=None, host=None):
with clients_lock:
if mac not in clients:
clients[mac] = {"ip_int": ipn or 0, "host": host or "", "last": now_s(), "ping_ok": False}
else:
clients[mac]["last"] = now_s()
if ipn: clients[mac]["ip_int"] = ipn
if host: clients[mac]["host"] = host
def parse_options(data):
opts = {}; i = 240
while i < len(data):
code = data[i]
if code == 0: i += 1; continue
if code == 255: break
if i + 1 >= len(data): break
L = data[i+1]
opts[code] = data[i+2:i+2+L]
i += 2 + L
return opts
def get_hostname(opts):
h = opts.get(12)
if h:
try: return h.rstrip(b"\x00").decode(errors="replace")
except: return ""
return ""
def build_reply(req, dhcp_type, yiaddr_int):
pkt = bytearray(240)
pkt[0] = 2; pkt[1] = 1; pkt[2] = 6; pkt[3] = 0
pkt[4:8] = req[4:8]
pkt[10:12] = req[10:12]
pkt[16:20] = struct.pack("!I", yiaddr_int)
pkt[20:24] = socket.inet_aton(SERVER_IP)
pkt[28:44] = req[28:44]
pkt[236:240] = b"\x63\x82\x53\x63"
o = bytearray()
o += bytes([53,1,dhcp_type])
o += bytes([54,4]) + socket.inet_aton(SERVER_IP)
o += bytes([51,4]) + struct.pack("!I", LEASE)
o += bytes([1,4]) + socket.inet_aton(NETMASK)
o += bytes([3,4]) + socket.inet_aton(SERVER_IP)
o += bytes([6,4]) + socket.inet_aton(SERVER_IP)
tb = TFTP.encode()
o += bytes([66, len(tb)]) + tb
o += bytes([150,4]) + socket.inet_aton(TFTP)
o += bytes([255])
return bytes(pkt) + bytes(o)
# ---------- UI ----------
def render_table():
t = Table(expand=True, header_style="bold")
t.add_column("#", style="dim", width=3, justify="right")
t.add_column("IP", width=16)
t.add_column("Hostname", min_width=10)
t.add_column("MAC", width=19)
t.add_column("Last seen",style="dim", width=10)
t.add_column("Ping", width=6, justify="center")
with clients_lock:
rows = sorted(clients.items(), key=lambda kv: kv[1]["ip_int"])
# Auto-fit to available height (header + events panels are fixed-size in Layout).
avail = max(1, console.size.height - HEADER_LINES - EVENTS_LINES - TBL_OVERHEAD)
overflow = max(0, len(rows) - avail)
if overflow:
rows = rows[:avail - 1] # leave one slot for the "(+N more)" marker
if not rows:
t.add_row("","","(no clients yet)","","","")
else:
for i,(mac,c) in enumerate(rows,1):
ping = Text("OK", style="bold green") if c.get("ping_ok") else Text("--", style="bold red")
t.add_row(str(i), int2ip(c["ip_int"]), c.get("host") or "", mac, c.get("last",""), ping)
if overflow:
t.add_row("", "", f"[dim](+{overflow} more — enlarge the window)[/]", "", "", "")
return t
def render_header():
with clients_lock:
leased = len(clients)
body = (f"[bold cyan]dhcpsrv v{__version__}[/] [dim]made by engelgardt[/]\n"
f"Server: [bold]{SERVER_IP}[/]/{NETMASK} "
f"Pool: [bold]{int2ip(POOL[0])}{int2ip(POOL[-1])}[/] "
f"Lease: [bold]{LEASE}s[/] "
f"TFTP: [bold]{TFTP}[/]\n"
f"Leases: [bold]{leased}/{len(POOL)}[/] "
f"Pkts: [dim]{stats['packets']}[/] "
f"DISCOVER: [cyan]{stats['discovers']}[/] "
f"REQUEST: [green]{stats['requests']}[/] "
f"RELEASE: [yellow]{stats['releases']}[/] "
f"[dim]Ctrl+C to stop[/]")
return Panel(body, border_style="cyan")
def render_events_panel():
with events_lock:
last = list(events)[-20:]
body = "\n".join(last) if last else "[dim](no events yet)[/]"
return Panel(body, title="Events", border_style="dim")
def render_screen():
layout = Layout()
layout.split_column(
Layout(render_header(), name="hdr", size=HEADER_LINES),
Layout(Panel(render_table(), title="Clients", border_style="cyan"), name="tbl"),
Layout(render_events_panel(), name="evt", size=EVENTS_LINES),
)
return layout
def ui_loop(stop):
# Pure event-driven: refresh only on real state changes or terminal resize.
# Clear screen AND scrollback so wheel-scrolling won't expose pre-launch text.
sys.stdout.write("\x1b[2J\x1b[3J\x1b[H")
sys.stdout.flush()
last_size = console.size
with Live(render_screen(), auto_refresh=False, console=console, screen=True,
redirect_stdout=False, redirect_stderr=False) as live:
live.refresh()
while not stop.is_set():
triggered = refresh_evt.wait(timeout=0.5)
if stop.is_set(): break
# Detect window resize without spamming refreshes
cur_size = console.size
resized = (cur_size != last_size)
if resized:
last_size = cur_size
if triggered:
refresh_evt.clear()
if triggered or resized:
live.update(render_screen(), refresh=True)
# ---------- main flow ----------
def select_nic():
console.rule("[bold cyan]Available adapters")
adapters = list_adapters()
if not adapters:
console.print("[red]No suitable wired adapters found.[/]")
return None
for i, a in enumerate(adapters, 1):
ip = a.get("IPv4") or ""
console.print(f" {i}) [{a['Status']}] {a['Name']} ({a['Description']}) {ip}")
while True:
s = Prompt.ask("Select adapter number").strip()
if s.isdigit() and 1 <= int(s) <= len(adapters):
return adapters[int(s)-1]
console.print("[red]Invalid selection.[/]")
def set_static_ip(nic_name, ip, mask):
console.print(f"[yellow]Setting {nic_name}{ip} / {mask} ...[/]")
run_netsh(["interface","ipv4","set","address",f"name={nic_name}","static",ip,mask])
def revert_dhcp(nic_name):
run_netsh(["interface","ipv4","set","address",f"name={nic_name}","source=dhcp"])
run_netsh(["interface","ipv4","set","dnsservers",f"name={nic_name}","source=dhcp"])
def main():
global SERVER_IP, POOL, LEASE, TFTP
require_admin()
console.print(f"[bold cyan]dhcpsrv v{__version__}[/] - portable laptop-side DHCP server")
console.print("[dim]made by engelgardt[/]")
console.print()
nic = select_nic()
if not nic: input("Press Enter to exit"); return
SERVER_IP = DEFAULT_SERVER_IP
LEASE = DEFAULT_LEASE
TFTP = SERVER_IP
server_n = ip2int(SERVER_IP)
POOL = list(range(server_n + 1, server_n + 1 + POOL_SIZE))
set_static_ip(nic["Name"], SERVER_IP, NETMASK)
log_event(f"[dim][{now_s()}][/] [bold]NIC[/] {nic['Name']}{SERVER_IP}/{NETMASK}")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
try:
s.bind(("0.0.0.0", 67))
except OSError as e:
console.print(f"[bold red]bind UDP/67 failed:[/] {e}")
console.print("[yellow]Another DHCP service (Tftpd32, ICS, Windows DHCP) may be running.[/]")
input("Press Enter to exit"); return
stop = threading.Event()
threading.Thread(target=ping_loop, daemon=True).start()
threading.Thread(target=ui_loop, args=(stop,), daemon=True).start()
def shutdown(sig=None, frm=None):
stop.set()
console.print()
console.print(f"[dim][{now_s()}] Shutting down...[/]")
try: s.close()
except: pass
# Ask revert
try:
if Confirm.ask(f"Revert {nic['Name']} back to DHCP?", default=False):
revert_dhcp(nic["Name"])
console.print("[green]NIC reverted to DHCP[/]")
except (EOFError, KeyboardInterrupt): pass
input("Press Enter to exit")
sys.exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
while True:
try:
data, _ = s.recvfrom(2048)
except KeyboardInterrupt:
shutdown(); return
except OSError:
continue
if len(data) < 240 or data[0] != 1: continue
stats["packets"] += 1
mac = ":".join(f"{b:02x}" for b in data[28:34])
opts = parse_options(data)
msg = opts.get(53)
if not msg: continue
mt = msg[0]
host = get_hostname(opts)
host_s = f" [{host}]" if host else ""
if mt == 1:
stats["discovers"] += 1
ipn = alloc_ip(mac)
if ipn is None:
log_event(f"[dim][{now_s()}][/] [red]DISCOVER[/] {mac} → [red]POOL EXHAUSTED[/]")
continue
touch_client(mac, ipn, host)
s.sendto(build_reply(data, 2, ipn), ("255.255.255.255", 68))
log_event(f"[dim][{now_s()}][/] [cyan]DISCOVER[/] {mac}{host_s} → OFFER {int2ip(ipn)}")
elif mt == 3:
stats["requests"] += 1
req_ip = opts.get(50)
with clients_lock:
cached = clients.get(mac, {}).get("ip_int")
ipn = struct.unpack("!I", req_ip)[0] if req_ip else cached
if ipn is None: continue
touch_client(mac, ipn, host)
s.sendto(build_reply(data, 5, ipn), ("255.255.255.255", 68))
log_event(f"[dim][{now_s()}][/] [green]REQUEST[/] {mac}{host_s} → ACK {int2ip(ipn)}")
elif mt == 7:
stats["releases"] += 1
with clients_lock:
old = clients.pop(mac, None)
if old:
log_event(f"[dim][{now_s()}][/] [yellow]RELEASE[/] {mac} → freed {int2ip(old['ip_int'])}")
elif mt == 8:
log_event(f"[dim][{now_s()}][/] [blue]INFORM[/] {mac}{host_s}")
if __name__ == "__main__":
main()