Compare commits
No commits in common. "main" and "v1.1.0" have entirely different histories.
25 changed files with 497 additions and 1470 deletions
56
.github/ISSUE_TEMPLATE/bug_report.yml
vendored
56
.github/ISSUE_TEMPLATE/bug_report.yml
vendored
|
|
@ -1,56 +0,0 @@
|
|||
name: Bug report
|
||||
description: Something doesn't work as expected
|
||||
labels: ["bug"]
|
||||
body:
|
||||
- type: input
|
||||
id: version
|
||||
attributes:
|
||||
label: Version
|
||||
description: Visible in the startup banner.
|
||||
placeholder: v1.1.0
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: textarea
|
||||
id: steps
|
||||
attributes:
|
||||
label: Steps to reproduce
|
||||
placeholder: |
|
||||
1. ...
|
||||
2. ...
|
||||
3. ...
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: textarea
|
||||
id: expected
|
||||
attributes:
|
||||
label: What you expected to happen
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: textarea
|
||||
id: actual
|
||||
attributes:
|
||||
label: What actually happened
|
||||
description: Paste any error output verbatim. Screenshots are welcome.
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: input
|
||||
id: os
|
||||
attributes:
|
||||
label: Windows version
|
||||
placeholder: e.g. Windows 11 24H2
|
||||
|
||||
- type: input
|
||||
id: network
|
||||
attributes:
|
||||
label: Network setup (if relevant)
|
||||
placeholder: e.g. USB Realtek NIC to a server's BMC port via an 8-port switch
|
||||
|
||||
- type: textarea
|
||||
id: extra
|
||||
attributes:
|
||||
label: Anything else?
|
||||
description: Workarounds tried, related links, etc.
|
||||
5
.github/ISSUE_TEMPLATE/config.yml
vendored
5
.github/ISSUE_TEMPLATE/config.yml
vendored
|
|
@ -1,5 +0,0 @@
|
|||
blank_issues_enabled: false
|
||||
contact_links:
|
||||
- name: Security vulnerability
|
||||
url: https://github.com/Engelgardt23/dhcpsrv/security/advisories/new
|
||||
about: Please report security issues privately via GitHub Security Advisories — not as a public issue.
|
||||
23
.github/ISSUE_TEMPLATE/feature_request.yml
vendored
23
.github/ISSUE_TEMPLATE/feature_request.yml
vendored
|
|
@ -1,23 +0,0 @@
|
|||
name: Feature request
|
||||
description: Suggest a new feature or an improvement
|
||||
labels: ["enhancement"]
|
||||
body:
|
||||
- type: textarea
|
||||
id: motivation
|
||||
attributes:
|
||||
label: What's the use case?
|
||||
description: What are you trying to do, and why is the current behavior not enough?
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: textarea
|
||||
id: proposal
|
||||
attributes:
|
||||
label: Proposed solution
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: textarea
|
||||
id: alternatives
|
||||
attributes:
|
||||
label: Alternatives considered
|
||||
80
.github/workflows/release.yml
vendored
80
.github/workflows/release.yml
vendored
|
|
@ -1,80 +0,0 @@
|
|||
name: Release
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- 'v*.*.*'
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: windows-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.12'
|
||||
|
||||
- name: Install build dependencies
|
||||
run: python -m pip install --upgrade pip pyinstaller rich
|
||||
|
||||
- name: Resolve version from tag
|
||||
id: ver
|
||||
shell: bash
|
||||
run: echo "version=${GITHUB_REF_NAME#v}" >> "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Build executable
|
||||
run: python -m PyInstaller --onefile --uac-admin --console --name dhcpsrv --icon assets/icon.ico --paths src dhcpsrv-launcher.py
|
||||
|
||||
- name: Package portable folder
|
||||
shell: pwsh
|
||||
run: |
|
||||
$ver = '${{ steps.ver.outputs.version }}'
|
||||
$folder = "dhcpsrv-v$ver"
|
||||
New-Item -ItemType Directory -Path $folder | Out-Null
|
||||
Copy-Item dist/dhcpsrv.exe $folder/
|
||||
@"
|
||||
dhcpsrv v$ver - portable edition
|
||||
made by engelgardt
|
||||
|
||||
Minimal laptop-side DHCP server for storage/server engineers.
|
||||
|
||||
USAGE
|
||||
Double-click dhcpsrv.exe.
|
||||
Accept the UAC prompt (admin needed to bind UDP/67 and reconfigure the NIC).
|
||||
Pick the NIC plugged into your server/switch - that's the only question.
|
||||
Press Ctrl+C to stop.
|
||||
|
||||
DEFAULTS
|
||||
Server IP : 10.10.10.1/24
|
||||
Pool : 10.10.10.2 .. 10.10.10.51 (50 addresses)
|
||||
Lease : 7200 seconds (2 hours)
|
||||
TFTP opt : server IP
|
||||
|
||||
NOTES
|
||||
- Nothing is installed. Delete the folder to remove.
|
||||
- UAC prompt appears every time (no scheduled-task shortcut in portable mode).
|
||||
- If Tftpd32 has its DHCP module enabled, disable it - UDP/67 is then taken.
|
||||
"@ | Out-File -FilePath "$folder/README.txt" -Encoding UTF8
|
||||
Compress-Archive -Path $folder -DestinationPath "dhcpsrv-portable-v$ver.zip"
|
||||
|
||||
- name: Generate SHA-256 checksum
|
||||
shell: pwsh
|
||||
run: |
|
||||
$ver = '${{ steps.ver.outputs.version }}'
|
||||
$zip = "dhcpsrv-portable-v$ver.zip"
|
||||
$hash = (Get-FileHash -Algorithm SHA256 $zip).Hash.ToLower()
|
||||
"$hash $zip" | Out-File -FilePath "$zip.sha256" -Encoding ASCII -NoNewline
|
||||
Get-Content "$zip.sha256"
|
||||
|
||||
- name: Create release
|
||||
uses: softprops/action-gh-release@v2
|
||||
with:
|
||||
files: |
|
||||
dhcpsrv-portable-v${{ steps.ver.outputs.version }}.zip
|
||||
dhcpsrv-portable-v${{ steps.ver.outputs.version }}.zip.sha256
|
||||
generate_release_notes: true
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
|
|
@ -7,10 +7,7 @@ dist/
|
|||
__pycache__/
|
||||
*.py[cod]
|
||||
|
||||
# Local build cache (prod/test/old portable folders per version)
|
||||
builds/
|
||||
|
||||
# Legacy staging folders (kept for compatibility with old checkouts)
|
||||
# Distribution staging folders (built per-version, attached to GitHub Releases)
|
||||
portable-v*/
|
||||
|
||||
# Local backup of release archives (kept locally for history, not in repo)
|
||||
|
|
|
|||
61
CHANGELOG.md
61
CHANGELOG.md
|
|
@ -1,61 +0,0 @@
|
|||
# Changelog
|
||||
|
||||
All notable changes to **dhcpsrv** are documented in this file.
|
||||
|
||||
The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and the project uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [1.2.1] - 2026-05-18
|
||||
### Changed
|
||||
- The `Update available (vX.Y.Z)` hint in the header is now a clickable hyperlink that opens the GitHub releases page (OSC 8 terminal hyperlink). Modern terminals (Windows Terminal, VS Code, WezTerm, most Linux/macOS terminals) render it as a link — `Ctrl+Click` to follow. Older consoles show the plain text, so nothing breaks.
|
||||
- Russian tagline tightened: dropped the `для инженера` phrase, the wording was carried over from an earlier draft and felt out of place.
|
||||
|
||||
## [1.2.0] - 2026-05-18
|
||||
### Added
|
||||
- Russian UI translation. On first launch the application asks which language to use (`1) English`, `2) Русский`) and writes the answer to a fresh `config.ini` next to `dhcpsrv.exe`. To change the language later, edit `language = en` / `language = ru` in that file — the comment at the top of the file explains how, in both languages.
|
||||
- `config.ini` becomes the future home for other settable defaults (pool, lease, server IP) — currently only `[General] language` is consumed.
|
||||
- Bilingual `README.ru.md` linked from the main `README.md`.
|
||||
|
||||
## [1.1.3] - 2026-05-17
|
||||
### Changed
|
||||
- Update check no longer interrupts startup with an interactive prompt. If a newer release is available, the header line now shows a quiet `update available (vX.Y.Z)` hint right-aligned in dim grey — no key press required.
|
||||
### Fixed
|
||||
- Bumped `__version__` from `1.1.1` to `1.1.3` after the v1.1.2 release packaged a binary that still self-reported as v1.1.1 (the source constant was not bumped before tagging). The source is now once again the single source of truth.
|
||||
|
||||
## [1.1.2] - 2026-05-17
|
||||
### Changed
|
||||
- Dropped the `made by engelgardt` line from the startup banner too — author credit lives in the README only.
|
||||
### Added
|
||||
- Embedded application icon in the exe (via PyInstaller `--icon assets/icon.ico`).
|
||||
|
||||
## [1.1.1] - 2026-05-16
|
||||
### Changed
|
||||
- The persistent header panel no longer prints the `made by engelgardt` line. Author credit moves to the one-off startup banner and the README only — the always-on UI stays tighter.
|
||||
|
||||
## [1.1.0] - 2026-05-16
|
||||
### Added
|
||||
- Auto-update check on startup. Polls GitHub `/releases/latest` with a 3-second timeout. If a newer version is available, prints a yellow notice and offers to open the download page in your browser. Silent on offline / API errors.
|
||||
|
||||
## [1.0.0] - 2026-05-16
|
||||
### Added
|
||||
- First public release on GitHub.
|
||||
- Single portable `.exe` (~12 MB) — no Python required on target machines.
|
||||
- Full-screen TUI built on `rich`: header with server config + live counters (Leases / Pkts / DISCOVER / REQUEST / RELEASE), clients table (`#`, IP, Hostname, MAC, Last seen, Ping), scrolling events panel.
|
||||
- Hardcoded sensible defaults: server `10.10.10.1/24`, pool `10.10.10.2..10.10.10.51` (50 addresses), lease `7200 s`, TFTP option (66/150) = server IP.
|
||||
- Only one prompt at startup: NIC selection.
|
||||
- Adapter filter — only physical wired NICs appear in the picker (no Wi-Fi, VPN, virtual, Hyper-V, VMware, VirtualBox, TAP/TUN, WireGuard, OpenVPN, Tailscale, ZeroTier, Bluetooth, Loopback, WAN Miniport).
|
||||
- Reliable ping check via the `TTL=` substring in `ping` output — a real BMC reboot is reflected as red `--`.
|
||||
- Pure event-driven UI refresh — no flicker on resize or while idle.
|
||||
- Auto-fit clients table to terminal height (`(+N more — enlarge the window)` marker on overflow).
|
||||
- Scrollback cleared on startup so mouse-wheel doesn't expose pre-launch text.
|
||||
- MIT licensed.
|
||||
|
||||
[Unreleased]: https://github.com/Engelgardt23/dhcpsrv/compare/v1.2.1...HEAD
|
||||
[1.2.1]: https://github.com/Engelgardt23/dhcpsrv/compare/v1.2.0...v1.2.1
|
||||
[1.2.0]: https://github.com/Engelgardt23/dhcpsrv/compare/v1.1.3...v1.2.0
|
||||
[1.1.3]: https://github.com/Engelgardt23/dhcpsrv/compare/v1.1.2...v1.1.3
|
||||
[1.1.2]: https://github.com/Engelgardt23/dhcpsrv/compare/v1.1.1...v1.1.2
|
||||
[1.1.1]: https://github.com/Engelgardt23/dhcpsrv/compare/v1.1.0...v1.1.1
|
||||
[1.1.0]: https://github.com/Engelgardt23/dhcpsrv/compare/v1.0.0...v1.1.0
|
||||
[1.0.0]: https://github.com/Engelgardt23/dhcpsrv/releases/tag/v1.0.0
|
||||
|
|
@ -1,79 +0,0 @@
|
|||
# Contributing
|
||||
|
||||
> Project layout, build, and release flow. **If you only want to use the tool — read [README](README.md) instead.**
|
||||
|
||||
## Repo layout
|
||||
|
||||
```
|
||||
dhcpsrv/
|
||||
├── .github/
|
||||
│ ├── workflows/release.yml ← CI: tag-driven build + GitHub Release
|
||||
│ └── ISSUE_TEMPLATE/ ← bug / feature / security routing
|
||||
├── src/dhcpsrv/ ← package source (≤200 lines per module)
|
||||
│ ├── __init__.py ← single source of truth for __version__
|
||||
│ ├── __main__.py ← entry: python -m dhcpsrv
|
||||
│ ├── app.py ← main flow, wires everything
|
||||
│ ├── platform_win.py ← VT enable + UAC self-elevate
|
||||
│ ├── update_check.py ← GitHub /releases/latest poll
|
||||
│ ├── network.py ← list_adapters, netsh, ping (no shared state)
|
||||
│ ├── dhcp.py ← DhcpConfig, DhcpServer, packet parse/build
|
||||
│ └── ui.py ← rich-based full-screen TUI
|
||||
├── pyproject.toml ← deps, packaging, dynamic version
|
||||
├── CHANGELOG.md ← Keep a Changelog format, newest first
|
||||
├── CONTRIBUTING.md ← this file
|
||||
├── LICENSE / README.md / SECURITY.md
|
||||
└── .gitignore
|
||||
```
|
||||
|
||||
## Run from source (no exe)
|
||||
|
||||
```
|
||||
python -m pip install rich
|
||||
python -m dhcpsrv
|
||||
```
|
||||
|
||||
`python -m dhcpsrv` finds `src/dhcpsrv/__main__.py` because the package lives under `src/`. You'll need administrator privileges for UDP/67 and `netsh` — the tool self-elevates via UAC.
|
||||
|
||||
## Editable install (development)
|
||||
|
||||
```
|
||||
python -m pip install -e .
|
||||
dhcpsrv
|
||||
```
|
||||
|
||||
`-e .` makes the entry-point `dhcpsrv` available on PATH; edits in `src/dhcpsrv/` take effect immediately.
|
||||
|
||||
## Build the portable .exe
|
||||
|
||||
```
|
||||
python -m pip install pyinstaller rich
|
||||
python -m PyInstaller --onefile --uac-admin --console --name dhcpsrv --paths src dhcpsrv-launcher.py
|
||||
```
|
||||
|
||||
`dhcpsrv-launcher.py` (at repo root) is the PyInstaller entry — it does an
|
||||
*absolute* import (`from dhcpsrv.app import main`) which is needed when
|
||||
PyInstaller runs the bundled script as a standalone module. The `--paths src`
|
||||
flag tells PyInstaller where to find the `dhcpsrv` package itself. Output:
|
||||
`dist/dhcpsrv.exe`.
|
||||
|
||||
## Cut a release
|
||||
|
||||
1. Update `src/dhcpsrv/__init__.py` — bump `__version__` to `X.Y.Z`.
|
||||
2. Update `CHANGELOG.md` — move items from `[Unreleased]` into a new `[X.Y.Z]` section with today's date.
|
||||
3. Commit: `git commit -am "vX.Y.Z: …"`.
|
||||
4. Tag: `git tag vX.Y.Z`.
|
||||
5. Push: `git push && git push --tags`.
|
||||
|
||||
That's it. GitHub Actions picks up the tag, builds the exe, writes the SHA-256, and creates the GitHub Release with the zip attached.
|
||||
|
||||
## Where features go
|
||||
|
||||
| Adding... | Touch this module |
|
||||
|---|---|
|
||||
| A new DHCP option in the reply | `dhcp.py` → `DhcpServer.build_reply` |
|
||||
| A new adapter filter | `network.py` → `SKIP_DESCRIPTION` / `SKIP_MEDIA` / `list_adapters` |
|
||||
| A new column in the clients table | `ui.py` → `Ui._render_table` |
|
||||
| Something shown in the header | `ui.py` → `Ui._render_header` |
|
||||
| A startup check or banner line | `app.py` → `main()` |
|
||||
| A change to UAC / VT logic | `platform_win.py` |
|
||||
| Tweaking the GitHub update-check UX | `update_check.py` |
|
||||
|
|
@ -3,8 +3,6 @@
|
|||
[](https://github.com/Engelgardt23/dhcpsrv/releases/latest)
|
||||
[](LICENSE)
|
||||
|
||||
🇺🇸 English | [🇷🇺 Русский](README.ru.md)
|
||||
|
||||
A tiny portable **DHCP server** for the laptop of a storage/server engineer.
|
||||
One double-click — pick a NIC — done. Live table of clients, ping status, packet counters. No install, no Python required on the target machine.
|
||||
|
||||
|
|
@ -45,8 +43,8 @@ The asset is `dhcpsrv-portable-vX.Y.Z.zip` (~12 MB).
|
|||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
┌─ Clients ───────────────────────────────────────────────────────────────┐
|
||||
│ # │ IP │ Hostname │ MAC │ Last seen │ Ping │
|
||||
│ 1 │ 10.10.10.2 │ server-01 │ a0:c5:f2:13:57:46 │ 17:42:18 │ OK │
|
||||
│ 2 │ 10.10.10.3 │ server-02 │ 70:b3:d5:11:22:33 │ 17:42:21 │ -- │
|
||||
│ 1 │ 10.10.10.2 │ vegman-r120 │ a0:c5:f2:13:57:46 │ 17:42:18 │ OK │
|
||||
│ 2 │ 10.10.10.3 │ vegman-s220 │ 70:b3:d5:11:22:33 │ 17:42:21 │ -- │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
┌─ Events ────────────────────────────────────────────────────────────────┐
|
||||
│ [17:42:18] DISCOVER a0:c5:f2:13:57:46 → OFFER 10.10.10.2 │
|
||||
|
|
@ -56,7 +54,7 @@ The asset is `dhcpsrv-portable-vX.Y.Z.zip` (~12 MB).
|
|||
|
||||
## Typical scenarios
|
||||
|
||||
- **Server with shared LOM** — one cable into the BMC/host port, BMC and the host OS both get IPs from this DHCP.
|
||||
- **VEGMAN with shared LOM** — one cable into the BMC/host port, BMC and the host OS both get IPs from this DHCP.
|
||||
- **8-port switch** — laptop on one port, up to 7 servers on the rest; the 50-address pool covers everyone.
|
||||
- **Direct cable into a dedicated Mgmt port** — single client (the BMC).
|
||||
|
||||
|
|
|
|||
99
README.ru.md
99
README.ru.md
|
|
@ -1,99 +0,0 @@
|
|||
# dhcpsrv
|
||||
|
||||
[](https://github.com/Engelgardt23/dhcpsrv/releases/latest)
|
||||
[](LICENSE)
|
||||
|
||||
[🇺🇸 English](README.md) | 🇷🇺 Русский
|
||||
|
||||
Маленький портативный **DHCP-сервер** для ноутбука инженера хранения / серверов.
|
||||
Двойной клик — выбрал сетевую — готово. Живая таблица клиентов, статус ping, счётчики пакетов. Ничего не устанавливается, Python на целевой машине не нужен.
|
||||
|
||||
Сделан под сценарий «воткнул кабель, увидел как BMC получил IP» во время прошивки, восстановления и бенчмарков.
|
||||
|
||||
> **Автор: engelgardt.**
|
||||
|
||||
---
|
||||
|
||||
## Скачать
|
||||
|
||||
Последний релиз: [**страница релизов**](https://github.com/Engelgardt23/dhcpsrv/releases/latest).
|
||||
Архив `dhcpsrv-portable-vX.Y.Z.zip` (~12 МБ).
|
||||
|
||||
## Запуск
|
||||
|
||||
1. Распакуй куда угодно.
|
||||
2. Двойной клик по `dhcpsrv.exe`.
|
||||
3. **При первом запуске** программа спросит язык интерфейса (1 — English, 2 — Русский). Ответ запишется в `config.ini` рядом с exe — потом можно поменять руками.
|
||||
4. Подтверди UAC (admin нужен, чтобы занять UDP/67 и переключить адаптер на статический IP).
|
||||
5. Выбери сетевой адаптер, воткнутый в твой сервер или коммутатор — это единственный вопрос.
|
||||
6. `Ctrl+C` — стоп. Спросит, вернуть ли адаптер обратно в DHCP.
|
||||
|
||||
## Настройки по умолчанию (никаких других вопросов)
|
||||
|
||||
| Параметр | Значение |
|
||||
|---|---|
|
||||
| IP сервера | `10.10.10.1/24` |
|
||||
| Пул | `10.10.10.2 .. 10.10.10.51` (50 адресов) |
|
||||
| Lease | `7200 с` (2 часа — переживёт долгий стресс-тест) |
|
||||
| Опция TFTP | IP сервера (BMC сразу увидит твой Tftpd32) |
|
||||
|
||||
## Что на экране
|
||||
|
||||
```
|
||||
┌─ dhcpsrv v1.2.0 ────────────────────────────────────────────────────────┐
|
||||
│ Сервер: 10.10.10.1/255.255.255.0 Пул: 10.10.10.2–10.10.10.51 … │
|
||||
│ Аренды: 3/50 Пакетов: 47 DISCOVER: 12 REQUEST: 11 RELEASE: 0 │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
┌─ Клиенты ───────────────────────────────────────────────────────────────┐
|
||||
│ # │ IP │ Имя хоста │ MAC │ Последний │ Пинг │
|
||||
│ 1 │ 10.10.10.2 │ server-01 │ a0:c5:f2:13:57:46 │ 17:42:18 │ OK │
|
||||
│ 2 │ 10.10.10.3 │ server-02 │ 70:b3:d5:11:22:33 │ 17:42:21 │ -- │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
┌─ События ───────────────────────────────────────────────────────────────┐
|
||||
│ [17:42:18] DISCOVER a0:c5:f2:13:57:46 → OFFER 10.10.10.2 │
|
||||
│ [17:42:18] REQUEST a0:c5:f2:13:57:46 → ACK 10.10.10.2 │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Типичные сценарии
|
||||
|
||||
- **Сервер с общим LOM** — один кабель в порт BMC/host, и BMC и хост-ОС получают IP с этого DHCP.
|
||||
- **8-портовый свитч** — ноут на одном порту, до 7 серверов на остальных; пул из 50 адресов покрывает всех.
|
||||
- **Прямой кабель в выделенный Mgmt-порт** — один клиент (BMC).
|
||||
|
||||
## Совместимость
|
||||
|
||||
- Windows 10 / 11.
|
||||
- В списке адаптеров отфильтровано всё лишнее (Wi-Fi, Cisco AnyConnect, Hyper-V, VMware, VirtualBox, TAP/TUN, WireGuard, OpenVPN, Tailscale, ZeroTier).
|
||||
- Имена адаптеров с пробелами или не-ASCII экранируются корректно для `netsh`.
|
||||
|
||||
## Конфиг
|
||||
|
||||
При первом запуске рядом с `dhcpsrv.exe` появится `config.ini`:
|
||||
|
||||
```ini
|
||||
# Чтобы сменить язык интерфейса, измените 'language' ниже.
|
||||
# Допустимые значения: en, ru
|
||||
[General]
|
||||
language = ru
|
||||
```
|
||||
|
||||
В будущих релизах сюда же переедут пул, lease и IP сервера — пока что переменная одна.
|
||||
|
||||
## Заметки
|
||||
|
||||
- На машине ничего не устанавливается. Удалить — стереть папку.
|
||||
- UAC спросит каждый раз. (Если хочешь убрать на *своей* машине — пропусти `dhcpsrv.exe` через Scheduled Task с галкой «Run with highest privileges» и запускай через `schtasks /run /tn dhcpsrv`.)
|
||||
- Если в Tftpd32 включён модуль DHCP — отключи, UDP/67 окажется занят.
|
||||
- Lease по умолчанию 7200 с; клиент продлевает в середине срока. Если нужно «навсегда», исходник поддерживает `2147483647` — пересобери из исходников.
|
||||
|
||||
## Сборка из исходников
|
||||
|
||||
```
|
||||
python -m pip install rich pyinstaller
|
||||
python -m PyInstaller --onefile --uac-admin --console --name dhcpsrv dhcpsrv-launcher.py
|
||||
```
|
||||
|
||||
## Лицензия
|
||||
|
||||
MIT — см. [LICENSE](LICENSE).
|
||||
35
SECURITY.md
35
SECURITY.md
|
|
@ -1,35 +0,0 @@
|
|||
# Security policy
|
||||
|
||||
Thanks for taking the time to look at this. Even small tools can introduce real
|
||||
risk — this one binds a privileged UDP port and reconfigures a network adapter
|
||||
on the host — so vulnerability reports are very welcome.
|
||||
|
||||
## Supported versions
|
||||
|
||||
Only the latest tagged release on GitHub is supported. Older versions will not
|
||||
get fixes; please upgrade first.
|
||||
|
||||
## How to report a vulnerability
|
||||
|
||||
**Please do not open a public issue** for security-sensitive findings.
|
||||
|
||||
Use GitHub's private security advisories: go to the
|
||||
[Security tab](../../security/advisories/new) of this repo and click
|
||||
"Report a vulnerability". GitHub will route it privately.
|
||||
|
||||
Please include:
|
||||
- The version you tested (the startup banner is enough).
|
||||
- Steps to reproduce, ideally with a packet capture or a short script.
|
||||
- An assessment of impact (LAN-only? remote? admin needed? etc.).
|
||||
|
||||
Reports are reviewed and addressed on a best-effort basis. A fix and a public
|
||||
advisory will be published once the issue is resolved. Reporters are credited
|
||||
unless they prefer to stay anonymous.
|
||||
|
||||
## Out of scope
|
||||
|
||||
- DoS by flooding the DHCP server on the local link (it's a small tool meant
|
||||
for direct-cable / single-switch use; flooding your own laptop is your call).
|
||||
- Behavior when run **without** administrator privileges (the tool refuses to
|
||||
start in that case anyway).
|
||||
- Issues that require the attacker to already control the user's machine.
|
||||
BIN
assets/icon.ico
BIN
assets/icon.ico
Binary file not shown.
|
Before Width: | Height: | Size: 48 KiB |
|
|
@ -1,13 +0,0 @@
|
|||
"""
|
||||
PyInstaller entry point — sits at the repo root and uses an *absolute* import
|
||||
so the bundled exe doesn't need relative-import resolution at runtime.
|
||||
|
||||
For dev work without an install use `python -m dhcpsrv` instead (that path
|
||||
goes through `src/dhcpsrv/__main__.py` and relative imports work).
|
||||
"""
|
||||
|
||||
from dhcpsrv.app import main
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
493
dhcpsrv_app.py
Normal file
493
dhcpsrv_app.py
Normal file
|
|
@ -0,0 +1,493 @@
|
|||
"""
|
||||
dhcpsrv v1.1.0 - portable single-exe edition.
|
||||
made by engelgardt
|
||||
|
||||
This file combines what previously lived in dhcpsrv.ps1 + dhcpsrv.py:
|
||||
- admin check
|
||||
- NIC selection (filters out wireless / VPN / virtual)
|
||||
- static IP setting (netsh)
|
||||
- DHCP server with rich live UI
|
||||
- revert NIC prompt on exit
|
||||
|
||||
Build:
|
||||
pyinstaller --onefile --uac-admin --name dhcpsrv --console dhcpsrv_app.py
|
||||
"""
|
||||
|
||||
import os, sys, ctypes, json, subprocess, signal, socket, struct, threading, time
|
||||
import urllib.request, webbrowser
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
|
||||
# Enable VT (ANSI escape) processing on Windows console BEFORE any output.
|
||||
def _enable_vt():
|
||||
if os.name != "nt":
|
||||
return
|
||||
try:
|
||||
k = ctypes.windll.kernel32
|
||||
STD_OUT, STD_ERR = -11, -12
|
||||
ENABLE_VT = 0x0004
|
||||
for std in (STD_OUT, STD_ERR):
|
||||
h = k.GetStdHandle(std)
|
||||
mode = ctypes.c_ulong()
|
||||
if k.GetConsoleMode(h, ctypes.byref(mode)):
|
||||
k.SetConsoleMode(h, mode.value | ENABLE_VT)
|
||||
except Exception:
|
||||
pass
|
||||
_enable_vt()
|
||||
|
||||
__version__ = "1.1.0"
|
||||
GITHUB_REPO = "Engelgardt23/dhcpsrv"
|
||||
|
||||
# Fixed heights used by the Layout — used to compute the clients table fit
|
||||
HEADER_LINES = 5
|
||||
EVENTS_LINES = 14
|
||||
TBL_OVERHEAD = 6 # panel borders + table header row + table top/bottom rules
|
||||
|
||||
# Hardcoded defaults — pick NIC, everything else is auto.
|
||||
DEFAULT_SERVER_IP = "10.10.10.1"
|
||||
DEFAULT_NETMASK = "255.255.255.0"
|
||||
POOL_SIZE = 50 # addresses starting at server_ip + 1
|
||||
DEFAULT_LEASE = 7200 # 2 hours
|
||||
# TFTP option always = server IP
|
||||
|
||||
# Stats counters
|
||||
stats = {"packets": 0, "discovers": 0, "requests": 0, "releases": 0}
|
||||
|
||||
# rich
|
||||
try:
|
||||
from rich.console import Console, Group
|
||||
from rich.live import Live
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
from rich.panel import Panel
|
||||
from rich.layout import Layout
|
||||
from rich.prompt import Prompt, Confirm
|
||||
except ImportError:
|
||||
print("'rich' missing in the bundled build")
|
||||
input("Press Enter to exit")
|
||||
sys.exit(10)
|
||||
|
||||
console = Console(log_path=False)
|
||||
|
||||
|
||||
# ---------- admin ----------
|
||||
def is_admin():
|
||||
try: return ctypes.windll.shell32.IsUserAnAdmin() != 0
|
||||
except: return False
|
||||
|
||||
def require_admin():
|
||||
if not is_admin():
|
||||
# Re-launch elevated; original exits
|
||||
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(f'"{a}"' for a in sys.argv), None, 1)
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
# ---------- update check ----------
|
||||
def _parse_version(s):
|
||||
try:
|
||||
s = (s or "").strip().lstrip("v")
|
||||
return tuple(int(x) for x in s.split(".")[:3])
|
||||
except Exception:
|
||||
return (0, 0, 0)
|
||||
|
||||
def check_for_update():
|
||||
"""Query GitHub for the latest release. If newer than __version__, prompt to open the page."""
|
||||
try:
|
||||
url = f"https://api.github.com/repos/{GITHUB_REPO}/releases/latest"
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Accept": "application/vnd.github+json",
|
||||
"User-Agent": f"dhcpsrv/{__version__}",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=3) as r:
|
||||
data = json.loads(r.read().decode("utf-8", errors="replace"))
|
||||
latest = (data.get("tag_name") or "").strip()
|
||||
page = data.get("html_url") or f"https://github.com/{GITHUB_REPO}/releases/latest"
|
||||
if _parse_version(latest) > _parse_version(__version__):
|
||||
console.rule("[bold yellow]Update available")
|
||||
console.print(f"Current: [dim]v{__version__}[/] Latest: [bold green]{latest}[/]")
|
||||
try:
|
||||
if Confirm.ask("Open the download page in your browser?", default=True):
|
||||
webbrowser.open(page)
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
pass
|
||||
console.print()
|
||||
except Exception:
|
||||
# Offline / GitHub rate-limit / API error — skip silently.
|
||||
pass
|
||||
|
||||
|
||||
# ---------- helpers ----------
|
||||
CREATE_NO_WINDOW = 0x08000000
|
||||
def run_ps(cmd, timeout=15):
|
||||
return subprocess.run(["powershell.exe","-NoProfile","-NonInteractive","-Command",cmd],
|
||||
capture_output=True, text=True, timeout=timeout,
|
||||
creationflags=CREATE_NO_WINDOW)
|
||||
|
||||
def run_netsh(args, timeout=15):
|
||||
return subprocess.run(["netsh"] + args, capture_output=True, text=True,
|
||||
timeout=timeout, creationflags=CREATE_NO_WINDOW)
|
||||
|
||||
|
||||
# ---------- NIC enumeration ----------
|
||||
SKIP_DESCR = ("VPN","Virtual","AnyConnect","TAP-","TUN-","Bluetooth","Loopback",
|
||||
"WAN Miniport","Hyper-V","VMware","VirtualBox","WireGuard","OpenVPN",
|
||||
"Tailscale","ZeroTier")
|
||||
SKIP_MEDIA = ("Native 802.11","Wireless WAN")
|
||||
|
||||
def list_adapters():
|
||||
cmd = (r"Get-NetAdapter | ForEach-Object {"
|
||||
r" $ip = (Get-NetIPAddress -InterfaceIndex $_.ifIndex -AddressFamily IPv4 -ErrorAction SilentlyContinue | "
|
||||
r" Where-Object PrefixOrigin -ne 'WellKnown' | Select-Object -ExpandProperty IPAddress) -join ','; "
|
||||
r" [pscustomobject]@{"
|
||||
r" Name=$_.Name; Description=$_.InterfaceDescription; Status=$_.Status; "
|
||||
r" Virtual=[bool]$_.Virtual; MediaType=$_.MediaType; ifIndex=$_.ifIndex; IPv4=$ip"
|
||||
r" }} | ConvertTo-Json -Depth 3 -Compress")
|
||||
r = run_ps(cmd, timeout=20)
|
||||
if r.returncode != 0 or not r.stdout.strip():
|
||||
return []
|
||||
data = json.loads(r.stdout)
|
||||
if isinstance(data, dict): data = [data]
|
||||
out = []
|
||||
for a in data:
|
||||
if a["Status"] in ("Disabled","Not Present"): continue
|
||||
if a.get("Virtual"): continue
|
||||
if a.get("MediaType") in SKIP_MEDIA: continue
|
||||
descr = (a.get("Description") or "") + " " + (a.get("Name") or "")
|
||||
if any(k.lower() in descr.lower() for k in SKIP_DESCR): continue
|
||||
out.append(a)
|
||||
out.sort(key=lambda x: x["ifIndex"])
|
||||
return out
|
||||
|
||||
|
||||
# ---------- DHCP server state ----------
|
||||
clients = {} # mac -> {ip_int, host, last, ping_ok}
|
||||
clients_lock = threading.Lock()
|
||||
events = deque(maxlen=200)
|
||||
events_lock = threading.Lock()
|
||||
refresh_evt = threading.Event() # set whenever UI should re-render
|
||||
|
||||
def log_event(markup_line):
|
||||
with events_lock:
|
||||
events.append(markup_line)
|
||||
refresh_evt.set()
|
||||
|
||||
# Config (filled by main)
|
||||
SERVER_IP = ""
|
||||
NETMASK = "255.255.255.0"
|
||||
POOL = []
|
||||
LEASE = 7200
|
||||
TFTP = ""
|
||||
|
||||
def ip2int(ip): return struct.unpack("!I", socket.inet_aton(ip))[0]
|
||||
def int2ip(n): return socket.inet_ntoa(struct.pack("!I", n))
|
||||
def now_s(): return datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
|
||||
# ---------- ping ----------
|
||||
# Windows `ping` exit code is unreliable: it can return 0 with "Destination host
|
||||
# unreachable" or with a stale ARP-based "reply" from the local stack. The only
|
||||
# trustworthy success marker is the "TTL=" substring in stdout (present across
|
||||
# locales — e.g. "...time<1ms TTL=64" / "...время<1мс TTL=64").
|
||||
def ping_one(ip, timeout_ms=600):
|
||||
try:
|
||||
r = subprocess.run(["ping","-n","1","-w",str(timeout_ms),ip],
|
||||
capture_output=True, timeout=2, text=True,
|
||||
creationflags=CREATE_NO_WINDOW)
|
||||
out = (r.stdout or "") + (r.stderr or "")
|
||||
return "TTL=" in out
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def ping_loop():
|
||||
pool_exec = ThreadPoolExecutor(max_workers=16)
|
||||
while True:
|
||||
with clients_lock:
|
||||
items = [(m, c["ip_int"]) for m, c in clients.items()]
|
||||
changed = False
|
||||
if items:
|
||||
ips = [int2ip(i) for _, i in items]
|
||||
res = list(pool_exec.map(ping_one, ips))
|
||||
with clients_lock:
|
||||
for (mac, _), ok in zip(items, res):
|
||||
if mac in clients and clients[mac].get("ping_ok") != ok:
|
||||
clients[mac]["ping_ok"] = ok
|
||||
changed = True
|
||||
if changed:
|
||||
refresh_evt.set()
|
||||
time.sleep(1.0)
|
||||
|
||||
|
||||
# ---------- DHCP logic ----------
|
||||
def alloc_ip(mac):
|
||||
with clients_lock:
|
||||
if mac in clients: return clients[mac]["ip_int"]
|
||||
used = {c["ip_int"] for c in clients.values()}
|
||||
for ipn in POOL:
|
||||
if ipn not in used:
|
||||
clients[mac] = {"ip_int": ipn, "host": "", "last": now_s(), "ping_ok": False}
|
||||
return ipn
|
||||
return None
|
||||
|
||||
def touch_client(mac, ipn=None, host=None):
|
||||
with clients_lock:
|
||||
if mac not in clients:
|
||||
clients[mac] = {"ip_int": ipn or 0, "host": host or "", "last": now_s(), "ping_ok": False}
|
||||
else:
|
||||
clients[mac]["last"] = now_s()
|
||||
if ipn: clients[mac]["ip_int"] = ipn
|
||||
if host: clients[mac]["host"] = host
|
||||
|
||||
def parse_options(data):
|
||||
opts = {}; i = 240
|
||||
while i < len(data):
|
||||
code = data[i]
|
||||
if code == 0: i += 1; continue
|
||||
if code == 255: break
|
||||
if i + 1 >= len(data): break
|
||||
L = data[i+1]
|
||||
opts[code] = data[i+2:i+2+L]
|
||||
i += 2 + L
|
||||
return opts
|
||||
|
||||
def get_hostname(opts):
|
||||
h = opts.get(12)
|
||||
if h:
|
||||
try: return h.rstrip(b"\x00").decode(errors="replace")
|
||||
except: return ""
|
||||
return ""
|
||||
|
||||
def build_reply(req, dhcp_type, yiaddr_int):
|
||||
pkt = bytearray(240)
|
||||
pkt[0] = 2; pkt[1] = 1; pkt[2] = 6; pkt[3] = 0
|
||||
pkt[4:8] = req[4:8]
|
||||
pkt[10:12] = req[10:12]
|
||||
pkt[16:20] = struct.pack("!I", yiaddr_int)
|
||||
pkt[20:24] = socket.inet_aton(SERVER_IP)
|
||||
pkt[28:44] = req[28:44]
|
||||
pkt[236:240] = b"\x63\x82\x53\x63"
|
||||
o = bytearray()
|
||||
o += bytes([53,1,dhcp_type])
|
||||
o += bytes([54,4]) + socket.inet_aton(SERVER_IP)
|
||||
o += bytes([51,4]) + struct.pack("!I", LEASE)
|
||||
o += bytes([1,4]) + socket.inet_aton(NETMASK)
|
||||
o += bytes([3,4]) + socket.inet_aton(SERVER_IP)
|
||||
o += bytes([6,4]) + socket.inet_aton(SERVER_IP)
|
||||
tb = TFTP.encode()
|
||||
o += bytes([66, len(tb)]) + tb
|
||||
o += bytes([150,4]) + socket.inet_aton(TFTP)
|
||||
o += bytes([255])
|
||||
return bytes(pkt) + bytes(o)
|
||||
|
||||
|
||||
# ---------- UI ----------
|
||||
def render_table():
|
||||
t = Table(expand=True, header_style="bold")
|
||||
t.add_column("#", style="dim", width=3, justify="right")
|
||||
t.add_column("IP", width=16)
|
||||
t.add_column("Hostname", min_width=10)
|
||||
t.add_column("MAC", width=19)
|
||||
t.add_column("Last seen",style="dim", width=10)
|
||||
t.add_column("Ping", width=6, justify="center")
|
||||
with clients_lock:
|
||||
rows = sorted(clients.items(), key=lambda kv: kv[1]["ip_int"])
|
||||
|
||||
# Auto-fit to available height (header + events panels are fixed-size in Layout).
|
||||
avail = max(1, console.size.height - HEADER_LINES - EVENTS_LINES - TBL_OVERHEAD)
|
||||
overflow = max(0, len(rows) - avail)
|
||||
if overflow:
|
||||
rows = rows[:avail - 1] # leave one slot for the "(+N more)" marker
|
||||
|
||||
if not rows:
|
||||
t.add_row("—","—","(no clients yet)","—","—","—")
|
||||
else:
|
||||
for i,(mac,c) in enumerate(rows,1):
|
||||
ping = Text("OK", style="bold green") if c.get("ping_ok") else Text("--", style="bold red")
|
||||
t.add_row(str(i), int2ip(c["ip_int"]), c.get("host") or "—", mac, c.get("last","—"), ping)
|
||||
if overflow:
|
||||
t.add_row("…", "", f"[dim](+{overflow} more — enlarge the window)[/]", "", "", "")
|
||||
return t
|
||||
|
||||
def render_header():
|
||||
with clients_lock:
|
||||
leased = len(clients)
|
||||
body = (f"[bold cyan]dhcpsrv v{__version__}[/] [dim]made by engelgardt[/]\n"
|
||||
f"Server: [bold]{SERVER_IP}[/]/{NETMASK} "
|
||||
f"Pool: [bold]{int2ip(POOL[0])}–{int2ip(POOL[-1])}[/] "
|
||||
f"Lease: [bold]{LEASE}s[/] "
|
||||
f"TFTP: [bold]{TFTP}[/]\n"
|
||||
f"Leases: [bold]{leased}/{len(POOL)}[/] "
|
||||
f"Pkts: [dim]{stats['packets']}[/] "
|
||||
f"DISCOVER: [cyan]{stats['discovers']}[/] "
|
||||
f"REQUEST: [green]{stats['requests']}[/] "
|
||||
f"RELEASE: [yellow]{stats['releases']}[/] "
|
||||
f"[dim]Ctrl+C to stop[/]")
|
||||
return Panel(body, border_style="cyan")
|
||||
|
||||
def render_events_panel():
|
||||
with events_lock:
|
||||
last = list(events)[-20:]
|
||||
body = "\n".join(last) if last else "[dim](no events yet)[/]"
|
||||
return Panel(body, title="Events", border_style="dim")
|
||||
|
||||
def render_screen():
|
||||
layout = Layout()
|
||||
layout.split_column(
|
||||
Layout(render_header(), name="hdr", size=HEADER_LINES),
|
||||
Layout(Panel(render_table(), title="Clients", border_style="cyan"), name="tbl"),
|
||||
Layout(render_events_panel(), name="evt", size=EVENTS_LINES),
|
||||
)
|
||||
return layout
|
||||
|
||||
def ui_loop(stop):
|
||||
# Pure event-driven: refresh only on real state changes or terminal resize.
|
||||
# Clear screen AND scrollback so wheel-scrolling won't expose pre-launch text.
|
||||
sys.stdout.write("\x1b[2J\x1b[3J\x1b[H")
|
||||
sys.stdout.flush()
|
||||
|
||||
last_size = console.size
|
||||
with Live(render_screen(), auto_refresh=False, console=console, screen=True,
|
||||
redirect_stdout=False, redirect_stderr=False) as live:
|
||||
live.refresh()
|
||||
while not stop.is_set():
|
||||
triggered = refresh_evt.wait(timeout=0.5)
|
||||
if stop.is_set(): break
|
||||
|
||||
# Detect window resize without spamming refreshes
|
||||
cur_size = console.size
|
||||
resized = (cur_size != last_size)
|
||||
if resized:
|
||||
last_size = cur_size
|
||||
|
||||
if triggered:
|
||||
refresh_evt.clear()
|
||||
|
||||
if triggered or resized:
|
||||
live.update(render_screen(), refresh=True)
|
||||
|
||||
|
||||
# ---------- main flow ----------
|
||||
def select_nic():
|
||||
console.rule("[bold cyan]Available adapters")
|
||||
adapters = list_adapters()
|
||||
if not adapters:
|
||||
console.print("[red]No suitable wired adapters found.[/]")
|
||||
return None
|
||||
for i, a in enumerate(adapters, 1):
|
||||
ip = a.get("IPv4") or "—"
|
||||
console.print(f" {i}) [{a['Status']}] {a['Name']} ({a['Description']}) {ip}")
|
||||
while True:
|
||||
s = Prompt.ask("Select adapter number").strip()
|
||||
if s.isdigit() and 1 <= int(s) <= len(adapters):
|
||||
return adapters[int(s)-1]
|
||||
console.print("[red]Invalid selection.[/]")
|
||||
|
||||
def set_static_ip(nic_name, ip, mask):
|
||||
console.print(f"[yellow]Setting {nic_name} → {ip} / {mask} ...[/]")
|
||||
run_netsh(["interface","ipv4","set","address",f"name={nic_name}","static",ip,mask])
|
||||
|
||||
def revert_dhcp(nic_name):
|
||||
run_netsh(["interface","ipv4","set","address",f"name={nic_name}","source=dhcp"])
|
||||
run_netsh(["interface","ipv4","set","dnsservers",f"name={nic_name}","source=dhcp"])
|
||||
|
||||
def main():
|
||||
global SERVER_IP, POOL, LEASE, TFTP
|
||||
require_admin()
|
||||
|
||||
console.print(f"[bold cyan]dhcpsrv v{__version__}[/] - portable laptop-side DHCP server")
|
||||
console.print("[dim]made by engelgardt[/]")
|
||||
console.print()
|
||||
|
||||
check_for_update()
|
||||
|
||||
nic = select_nic()
|
||||
if not nic: input("Press Enter to exit"); return
|
||||
|
||||
SERVER_IP = DEFAULT_SERVER_IP
|
||||
LEASE = DEFAULT_LEASE
|
||||
TFTP = SERVER_IP
|
||||
server_n = ip2int(SERVER_IP)
|
||||
POOL = list(range(server_n + 1, server_n + 1 + POOL_SIZE))
|
||||
|
||||
set_static_ip(nic["Name"], SERVER_IP, NETMASK)
|
||||
log_event(f"[dim][{now_s()}][/] [bold]NIC[/] {nic['Name']} → {SERVER_IP}/{NETMASK}")
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
try:
|
||||
s.bind(("0.0.0.0", 67))
|
||||
except OSError as e:
|
||||
console.print(f"[bold red]bind UDP/67 failed:[/] {e}")
|
||||
console.print("[yellow]Another DHCP service (Tftpd32, ICS, Windows DHCP) may be running.[/]")
|
||||
input("Press Enter to exit"); return
|
||||
|
||||
stop = threading.Event()
|
||||
threading.Thread(target=ping_loop, daemon=True).start()
|
||||
threading.Thread(target=ui_loop, args=(stop,), daemon=True).start()
|
||||
|
||||
def shutdown(sig=None, frm=None):
|
||||
stop.set()
|
||||
console.print()
|
||||
console.print(f"[dim][{now_s()}] Shutting down...[/]")
|
||||
try: s.close()
|
||||
except: pass
|
||||
# Ask revert
|
||||
try:
|
||||
if Confirm.ask(f"Revert {nic['Name']} back to DHCP?", default=False):
|
||||
revert_dhcp(nic["Name"])
|
||||
console.print("[green]NIC reverted to DHCP[/]")
|
||||
except (EOFError, KeyboardInterrupt): pass
|
||||
input("Press Enter to exit")
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, shutdown)
|
||||
signal.signal(signal.SIGTERM, shutdown)
|
||||
|
||||
while True:
|
||||
try:
|
||||
data, _ = s.recvfrom(2048)
|
||||
except KeyboardInterrupt:
|
||||
shutdown(); return
|
||||
except OSError:
|
||||
continue
|
||||
if len(data) < 240 or data[0] != 1: continue
|
||||
stats["packets"] += 1
|
||||
mac = ":".join(f"{b:02x}" for b in data[28:34])
|
||||
opts = parse_options(data)
|
||||
msg = opts.get(53)
|
||||
if not msg: continue
|
||||
mt = msg[0]
|
||||
host = get_hostname(opts)
|
||||
host_s = f" [{host}]" if host else ""
|
||||
|
||||
if mt == 1:
|
||||
stats["discovers"] += 1
|
||||
ipn = alloc_ip(mac)
|
||||
if ipn is None:
|
||||
log_event(f"[dim][{now_s()}][/] [red]DISCOVER[/] {mac} → [red]POOL EXHAUSTED[/]")
|
||||
continue
|
||||
touch_client(mac, ipn, host)
|
||||
s.sendto(build_reply(data, 2, ipn), ("255.255.255.255", 68))
|
||||
log_event(f"[dim][{now_s()}][/] [cyan]DISCOVER[/] {mac}{host_s} → OFFER {int2ip(ipn)}")
|
||||
elif mt == 3:
|
||||
stats["requests"] += 1
|
||||
req_ip = opts.get(50)
|
||||
with clients_lock:
|
||||
cached = clients.get(mac, {}).get("ip_int")
|
||||
ipn = struct.unpack("!I", req_ip)[0] if req_ip else cached
|
||||
if ipn is None: continue
|
||||
touch_client(mac, ipn, host)
|
||||
s.sendto(build_reply(data, 5, ipn), ("255.255.255.255", 68))
|
||||
log_event(f"[dim][{now_s()}][/] [green]REQUEST[/] {mac}{host_s} → ACK {int2ip(ipn)}")
|
||||
elif mt == 7:
|
||||
stats["releases"] += 1
|
||||
with clients_lock:
|
||||
old = clients.pop(mac, None)
|
||||
if old:
|
||||
log_event(f"[dim][{now_s()}][/] [yellow]RELEASE[/] {mac} → freed {int2ip(old['ip_int'])}")
|
||||
elif mt == 8:
|
||||
log_event(f"[dim][{now_s()}][/] [blue]INFORM[/] {mac}{host_s}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
[build-system]
|
||||
requires = ["setuptools>=68"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "dhcpsrv"
|
||||
description = "Portable laptop-side DHCP server for storage/server engineers."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
license = { text = "MIT" }
|
||||
authors = [{ name = "engelgardt" }]
|
||||
dependencies = ["rich>=13"]
|
||||
dynamic = ["version"]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://github.com/Engelgardt23/dhcpsrv"
|
||||
Issues = "https://github.com/Engelgardt23/dhcpsrv/issues"
|
||||
|
||||
[project.scripts]
|
||||
dhcpsrv = "dhcpsrv.app:main"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = { attr = "dhcpsrv.__version__" }
|
||||
|
|
@ -1,10 +0,0 @@
|
|||
"""
|
||||
dhcpsrv - portable laptop-side DHCP server for storage/server engineers.
|
||||
made by engelgardt
|
||||
|
||||
The single source of truth for the project version. Bump this before tagging
|
||||
a release; CI reads the tag, the code reads this constant.
|
||||
"""
|
||||
|
||||
__version__ = "1.2.2"
|
||||
GITHUB_REPO = "engel/dhcpsrv" # на Forgejo (git.engelgardt23.ru)
|
||||
|
|
@ -1,11 +0,0 @@
|
|||
"""Entry point for `python -m dhcpsrv` from a checked-out / installed package.
|
||||
|
||||
The PyInstaller-bundled exe uses `dhcpsrv-launcher.py` at the repo root instead,
|
||||
because PyInstaller runs the bundled script as a standalone module — relative
|
||||
imports fail there."""
|
||||
|
||||
from .app import main
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,109 +0,0 @@
|
|||
"""
|
||||
Application entry: wires admin check, update check, NIC selection, the DHCP
|
||||
server, the UI and the ping loop together."""
|
||||
|
||||
from __future__ import annotations
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from rich.console import Console
|
||||
from rich.prompt import Confirm, Prompt
|
||||
from rich.table import Table
|
||||
|
||||
from . import __version__, GITHUB_REPO
|
||||
from .platform_win import enable_vt, require_admin
|
||||
from .update_check import check_for_update
|
||||
from .network import list_adapters, set_static_ip, revert_to_dhcp
|
||||
from .dhcp import DhcpConfig, DhcpServer, now_s
|
||||
from .ui import Ui
|
||||
from .config import load_config
|
||||
from .i18n import set_language, t
|
||||
|
||||
|
||||
def _select_nic(console: Console) -> dict | None:
|
||||
console.rule(f"[bold cyan]{t('available_adapters')}")
|
||||
adapters = list_adapters()
|
||||
if not adapters:
|
||||
console.print(f"[red]{t('no_adapters')}[/]")
|
||||
return None
|
||||
for i, a in enumerate(adapters, 1):
|
||||
ip = a.get("IPv4") or "—"
|
||||
console.print(f" {i}) [{a['Status']}] {a['Name']} ({a['Description']}) {ip}")
|
||||
while True:
|
||||
s = Prompt.ask(t("select_adapter")).strip()
|
||||
if s.isdigit() and 1 <= int(s) <= len(adapters):
|
||||
return adapters[int(s) - 1]
|
||||
console.print(f"[red]{t('invalid_selection')}[/]")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
enable_vt()
|
||||
|
||||
# Language prompt (writes config.ini on first run) happens BEFORE admin
|
||||
# elevation so the user does not have to answer it twice after the UAC
|
||||
# bounce.
|
||||
cfg_data = load_config()
|
||||
set_language(cfg_data["language"])
|
||||
|
||||
require_admin()
|
||||
|
||||
console = Console(log_path=False)
|
||||
|
||||
title = f"[bold cyan]dhcpsrv v{__version__}[/] {t('tagline')}"
|
||||
latest = check_for_update()
|
||||
if latest:
|
||||
release_url = f"https://git.engelgardt23.ru/{GITHUB_REPO}/releases/latest"
|
||||
notice = t("update_available", tag=latest)
|
||||
header = Table.grid(expand=True)
|
||||
header.add_column(justify="left", ratio=1)
|
||||
header.add_column(justify="right")
|
||||
header.add_row(title, f"[dim][link={release_url}]{notice}[/link][/]")
|
||||
console.print(header)
|
||||
else:
|
||||
console.print(title)
|
||||
console.print()
|
||||
|
||||
nic = _select_nic(console)
|
||||
if not nic:
|
||||
input(t("press_enter")); return
|
||||
|
||||
cfg = DhcpConfig.with_defaults()
|
||||
console.print(f"[yellow]{t('setting_nic', name=nic['Name'], ip=cfg.server_ip, mask=cfg.netmask)}[/]")
|
||||
set_static_ip(nic["Name"], cfg.server_ip, cfg.netmask)
|
||||
|
||||
# Wire server <-> ui through callbacks so neither imports the other.
|
||||
ui = Ui.__new__(Ui) # forward declaration so server callbacks can capture it
|
||||
server = DhcpServer(cfg=cfg, log=lambda m: ui.log(m), on_change=lambda: ui.request_refresh())
|
||||
Ui.__init__(ui, server) # finish UI init now that server exists
|
||||
|
||||
stop = threading.Event()
|
||||
threading.Thread(target=server.run, args=(stop,), daemon=True).start()
|
||||
threading.Thread(target=server.ping_loop, args=(stop,), daemon=True).start()
|
||||
|
||||
def shutdown(sig=None, frm=None):
|
||||
stop.set()
|
||||
try:
|
||||
print()
|
||||
print(t("shutting_down", ts=now_s()))
|
||||
try:
|
||||
if Confirm.ask(t("revert_nic", name=nic["Name"]), default=False):
|
||||
revert_to_dhcp(nic["Name"])
|
||||
print(t("nic_reverted"))
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
pass
|
||||
finally:
|
||||
input(t("press_enter"))
|
||||
sys.exit(0)
|
||||
|
||||
signal.signal(signal.SIGINT, shutdown)
|
||||
signal.signal(signal.SIGTERM, shutdown)
|
||||
|
||||
try:
|
||||
ui.run(stop)
|
||||
except KeyboardInterrupt:
|
||||
shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,94 +0,0 @@
|
|||
"""
|
||||
config.ini handling next to the executable.
|
||||
|
||||
On first run the file does not exist — we ask the user which language to use
|
||||
and write the answer alongside the exe. On every subsequent run we just read
|
||||
it. The .ini has a leading bilingual comment explaining how to change values
|
||||
by editing the file directly.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import configparser
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
SUPPORTED_LANGS = ("en", "ru")
|
||||
DEFAULT_LANG = "en"
|
||||
|
||||
CONFIG_HEADER = """\
|
||||
# ---------------------------------------------------------------------------
|
||||
# dhcpsrv configuration
|
||||
#
|
||||
# To change the interface language, edit the 'language' value below.
|
||||
# Valid values: en, ru
|
||||
#
|
||||
# Чтобы сменить язык интерфейса, измените значение 'language' ниже.
|
||||
# Допустимые значения: en, ru
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def app_dir() -> Path:
|
||||
"""Directory holding the running executable (or source folder when run via python)."""
|
||||
if getattr(sys, "frozen", False):
|
||||
return Path(sys.executable).resolve().parent
|
||||
return Path(__file__).resolve().parent
|
||||
|
||||
|
||||
def config_path() -> Path:
|
||||
return app_dir() / "config.ini"
|
||||
|
||||
|
||||
def _ask_language() -> str:
|
||||
"""First-run prompt. Stdin is always available in console apps, no Rich here
|
||||
yet (we run before the main console is set up)."""
|
||||
print()
|
||||
print("Select language / Выберите язык:")
|
||||
print(" 1) English")
|
||||
print(" 2) Русский")
|
||||
while True:
|
||||
try:
|
||||
choice = input("> ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
return DEFAULT_LANG
|
||||
if choice == "1":
|
||||
return "en"
|
||||
if choice == "2":
|
||||
return "ru"
|
||||
print("Please enter 1 or 2 / Введите 1 или 2")
|
||||
|
||||
|
||||
def _write_config(lang: str) -> None:
|
||||
path = config_path()
|
||||
cp = configparser.ConfigParser()
|
||||
cp["General"] = {"language": lang}
|
||||
with path.open("w", encoding="utf-8") as f:
|
||||
f.write(CONFIG_HEADER)
|
||||
cp.write(f)
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
"""Return the active configuration dict. Side-effect: creates config.ini on
|
||||
first run after prompting the user."""
|
||||
path = config_path()
|
||||
if not path.exists():
|
||||
lang = _ask_language()
|
||||
try:
|
||||
_write_config(lang)
|
||||
except OSError:
|
||||
# read-only location — fall back to in-memory default
|
||||
pass
|
||||
return {"language": lang}
|
||||
|
||||
cp = configparser.ConfigParser()
|
||||
try:
|
||||
cp.read(path, encoding="utf-8")
|
||||
except (configparser.Error, OSError):
|
||||
return {"language": DEFAULT_LANG}
|
||||
|
||||
lang = (cp.get("General", "language", fallback=DEFAULT_LANG) or DEFAULT_LANG).strip().lower()
|
||||
if lang not in SUPPORTED_LANGS:
|
||||
lang = DEFAULT_LANG
|
||||
return {"language": lang}
|
||||
|
|
@ -1,236 +0,0 @@
|
|||
"""
|
||||
DHCP server: packet parsing/building, the lease table, and the main socket
|
||||
loop. Pure logic — UI rendering and ping live in their own modules.
|
||||
|
||||
The `DhcpServer` instance owns the runtime state (clients, stats). The UI
|
||||
holds a reference to it and reads it whenever it renders."""
|
||||
|
||||
from __future__ import annotations
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
import threading
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Callable, Optional
|
||||
|
||||
from .network import ping_one
|
||||
from .i18n import t as _t
|
||||
|
||||
|
||||
# ---------- helpers ----------
|
||||
def ip2int(ip: str) -> int:
|
||||
return struct.unpack("!I", socket.inet_aton(ip))[0]
|
||||
|
||||
|
||||
def int2ip(n: int) -> str:
|
||||
return socket.inet_ntoa(struct.pack("!I", n))
|
||||
|
||||
|
||||
def now_s() -> str:
|
||||
return datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
|
||||
# ---------- config ----------
|
||||
@dataclass
|
||||
class DhcpConfig:
|
||||
server_ip: str
|
||||
netmask: str
|
||||
pool: list[int] # list of int IPs to hand out
|
||||
lease: int # seconds
|
||||
tftp: str # value for options 66 / 150
|
||||
|
||||
@classmethod
|
||||
def with_defaults(cls, server_ip: str = "10.10.10.1",
|
||||
netmask: str = "255.255.255.0",
|
||||
pool_size: int = 50,
|
||||
lease: int = 7200) -> "DhcpConfig":
|
||||
n = ip2int(server_ip)
|
||||
pool = list(range(n + 1, n + 1 + pool_size))
|
||||
return cls(server_ip=server_ip, netmask=netmask, pool=pool,
|
||||
lease=lease, tftp=server_ip)
|
||||
|
||||
|
||||
# ---------- server ----------
|
||||
@dataclass
|
||||
class DhcpServer:
|
||||
cfg: DhcpConfig
|
||||
log: Callable[[str], None] # how to push an event line into the UI
|
||||
on_change: Callable[[], None] # called whenever something the UI cares about changed
|
||||
|
||||
clients: dict[str, dict] = field(default_factory=dict)
|
||||
lock: threading.Lock = field(default_factory=threading.Lock)
|
||||
stats: dict[str, int] = field(default_factory=lambda: {
|
||||
"packets": 0, "discovers": 0, "requests": 0, "releases": 0,
|
||||
})
|
||||
|
||||
# --- lease allocation ---
|
||||
def alloc_ip(self, mac: str) -> Optional[int]:
|
||||
with self.lock:
|
||||
if mac in self.clients:
|
||||
return self.clients[mac]["ip_int"]
|
||||
used = {c["ip_int"] for c in self.clients.values()}
|
||||
for ipn in self.cfg.pool:
|
||||
if ipn not in used:
|
||||
self.clients[mac] = {
|
||||
"ip_int": ipn, "host": "", "last": now_s(), "ping_ok": False,
|
||||
}
|
||||
return ipn
|
||||
return None
|
||||
|
||||
def touch_client(self, mac: str, ipn: Optional[int] = None, host: Optional[str] = None) -> None:
|
||||
with self.lock:
|
||||
if mac not in self.clients:
|
||||
self.clients[mac] = {
|
||||
"ip_int": ipn or 0, "host": host or "",
|
||||
"last": now_s(), "ping_ok": False,
|
||||
}
|
||||
else:
|
||||
self.clients[mac]["last"] = now_s()
|
||||
if ipn: self.clients[mac]["ip_int"] = ipn
|
||||
if host: self.clients[mac]["host"] = host
|
||||
|
||||
def release_client(self, mac: str) -> Optional[int]:
|
||||
with self.lock:
|
||||
old = self.clients.pop(mac, None)
|
||||
return old["ip_int"] if old else None
|
||||
|
||||
# --- packet parsing ---
|
||||
@staticmethod
|
||||
def parse_options(data: bytes) -> dict[int, bytes]:
|
||||
opts: dict[int, bytes] = {}
|
||||
i = 240
|
||||
while i < len(data):
|
||||
code = data[i]
|
||||
if code == 0:
|
||||
i += 1; continue
|
||||
if code == 255:
|
||||
break
|
||||
if i + 1 >= len(data):
|
||||
break
|
||||
L = data[i + 1]
|
||||
opts[code] = data[i + 2 : i + 2 + L]
|
||||
i += 2 + L
|
||||
return opts
|
||||
|
||||
@staticmethod
|
||||
def get_hostname(opts: dict[int, bytes]) -> str:
|
||||
h = opts.get(12)
|
||||
if not h:
|
||||
return ""
|
||||
try:
|
||||
return h.rstrip(b"\x00").decode(errors="replace")
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
# --- packet building ---
|
||||
def build_reply(self, req: bytes, dhcp_type: int, yiaddr_int: int) -> bytes:
|
||||
pkt = bytearray(240)
|
||||
pkt[0] = 2; pkt[1] = 1; pkt[2] = 6; pkt[3] = 0
|
||||
pkt[4:8] = req[4:8] # xid
|
||||
pkt[10:12] = req[10:12] # flags
|
||||
pkt[16:20] = struct.pack("!I", yiaddr_int) # yiaddr
|
||||
pkt[20:24] = socket.inet_aton(self.cfg.server_ip) # siaddr
|
||||
pkt[28:44] = req[28:44] # chaddr
|
||||
pkt[236:240] = b"\x63\x82\x53\x63" # magic cookie
|
||||
|
||||
o = bytearray()
|
||||
o += bytes([53, 1, dhcp_type]) # message type
|
||||
o += bytes([54, 4]) + socket.inet_aton(self.cfg.server_ip) # server id
|
||||
o += bytes([51, 4]) + struct.pack("!I", self.cfg.lease) # lease time
|
||||
o += bytes([1, 4]) + socket.inet_aton(self.cfg.netmask) # subnet mask
|
||||
o += bytes([3, 4]) + socket.inet_aton(self.cfg.server_ip) # router
|
||||
o += bytes([6, 4]) + socket.inet_aton(self.cfg.server_ip) # DNS
|
||||
tftp = self.cfg.tftp.encode()
|
||||
o += bytes([66, len(tftp)]) + tftp # TFTP server name
|
||||
o += bytes([150, 4]) + socket.inet_aton(self.cfg.tftp) # TFTP server addr
|
||||
o += bytes([255])
|
||||
return bytes(pkt) + bytes(o)
|
||||
|
||||
# --- ping loop, runs in its own thread ---
|
||||
def ping_loop(self, stop: threading.Event) -> None:
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
pool_exec = ThreadPoolExecutor(max_workers=16)
|
||||
while not stop.wait(1.0):
|
||||
with self.lock:
|
||||
items = [(m, c["ip_int"]) for m, c in self.clients.items()]
|
||||
if not items:
|
||||
continue
|
||||
ips = [int2ip(i) for _, i in items]
|
||||
results = list(pool_exec.map(ping_one, ips))
|
||||
changed = False
|
||||
with self.lock:
|
||||
for (mac, _), ok in zip(items, results):
|
||||
if mac in self.clients and self.clients[mac].get("ping_ok") != ok:
|
||||
self.clients[mac]["ping_ok"] = ok
|
||||
changed = True
|
||||
if changed:
|
||||
self.on_change()
|
||||
|
||||
# --- main server loop ---
|
||||
def run(self, stop: threading.Event) -> None:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
try:
|
||||
s.bind(("0.0.0.0", 67))
|
||||
except OSError as e:
|
||||
self.log(f"[bold red]{_t('bind_failed')}[/] {e}")
|
||||
self.log(f"[yellow]{_t('bind_hint')}[/]")
|
||||
stop.set()
|
||||
return
|
||||
|
||||
while not stop.is_set():
|
||||
try:
|
||||
data, _ = s.recvfrom(2048)
|
||||
except OSError:
|
||||
continue
|
||||
if len(data) < 240 or data[0] != 1:
|
||||
continue
|
||||
self.stats["packets"] += 1
|
||||
mac = ":".join(f"{b:02x}" for b in data[28:34])
|
||||
opts = self.parse_options(data)
|
||||
msg = opts.get(53)
|
||||
if not msg:
|
||||
continue
|
||||
mt = msg[0]
|
||||
host = self.get_hostname(opts)
|
||||
host_s = f" [{host}]" if host else ""
|
||||
|
||||
if mt == 1: # DISCOVER
|
||||
self.stats["discovers"] += 1
|
||||
ipn = self.alloc_ip(mac)
|
||||
if ipn is None:
|
||||
self.log(f"[dim][{now_s()}][/] [red]DISCOVER[/] {mac} → [red]{_t('pool_exhausted')}[/]")
|
||||
continue
|
||||
self.touch_client(mac, ipn, host)
|
||||
s.sendto(self.build_reply(data, 2, ipn), ("255.255.255.255", 68))
|
||||
self.log(f"[dim][{now_s()}][/] [cyan]DISCOVER[/] {mac}{host_s} → OFFER {int2ip(ipn)}")
|
||||
|
||||
elif mt == 3: # REQUEST
|
||||
self.stats["requests"] += 1
|
||||
req_ip = opts.get(50)
|
||||
with self.lock:
|
||||
cached = self.clients.get(mac, {}).get("ip_int")
|
||||
ipn = struct.unpack("!I", req_ip)[0] if req_ip else cached
|
||||
if ipn is None:
|
||||
continue
|
||||
self.touch_client(mac, ipn, host)
|
||||
s.sendto(self.build_reply(data, 5, ipn), ("255.255.255.255", 68))
|
||||
self.log(f"[dim][{now_s()}][/] [green]REQUEST[/] {mac}{host_s} → ACK {int2ip(ipn)}")
|
||||
|
||||
elif mt == 7: # RELEASE
|
||||
self.stats["releases"] += 1
|
||||
old = self.release_client(mac)
|
||||
if old is not None:
|
||||
self.log(f"[dim][{now_s()}][/] [yellow]RELEASE[/] {mac} → freed {int2ip(old)}")
|
||||
|
||||
elif mt == 8: # INFORM
|
||||
self.log(f"[dim][{now_s()}][/] [blue]INFORM[/] {mac}{host_s}")
|
||||
|
||||
self.on_change()
|
||||
|
||||
try:
|
||||
s.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
|
@ -1,115 +0,0 @@
|
|||
"""
|
||||
Tiny in-memory translation table. We do not need .po/.mo machinery for a
|
||||
two-language CLI tool — a flat dict per language is enough.
|
||||
|
||||
Usage:
|
||||
from .i18n import t, set_language
|
||||
set_language("ru")
|
||||
print(t("no_adapters"))
|
||||
|
||||
`t(key, **params)` performs `.format(**params)` on the returned string, so
|
||||
placeholders work the same way as f-strings.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
_lang = "en"
|
||||
|
||||
STRINGS: dict[str, dict[str, str]] = {
|
||||
"en": {
|
||||
# app.py / startup
|
||||
"tagline": "- portable laptop-side DHCP server",
|
||||
"update_available": "Update available ({tag})",
|
||||
"available_adapters": "Available adapters",
|
||||
"no_adapters": "No suitable wired adapters found.",
|
||||
"select_adapter": "Select adapter number",
|
||||
"invalid_selection": "Invalid selection.",
|
||||
"press_enter": "Press Enter to exit",
|
||||
"setting_nic": "Setting {name} → {ip} / {mask} ...",
|
||||
"shutting_down": "[{ts}] Shutting down...",
|
||||
"revert_nic": "Revert {name} back to DHCP?",
|
||||
"nic_reverted": "NIC reverted to DHCP",
|
||||
|
||||
# dhcp.py / server messages
|
||||
"bind_failed": "bind UDP/67 failed:",
|
||||
"bind_hint": "Another DHCP service (Tftpd32 DHCP, ICS, Windows DHCP) may be running.",
|
||||
"pool_exhausted": "POOL EXHAUSTED",
|
||||
|
||||
# ui.py / header & panels
|
||||
"panel_server": "Server",
|
||||
"panel_pool": "Pool",
|
||||
"panel_lease": "Lease",
|
||||
"panel_tftp": "TFTP",
|
||||
"panel_leases": "Leases",
|
||||
"panel_pkts": "Pkts",
|
||||
"panel_ctrlc": "Ctrl+C to stop",
|
||||
"col_ip": "IP",
|
||||
"col_host": "Hostname",
|
||||
"col_mac": "MAC",
|
||||
"col_last": "Last seen",
|
||||
"col_ping": "Ping",
|
||||
"no_clients": "(no clients yet)",
|
||||
"more_clients": "(+{n} more — enlarge the window)",
|
||||
"events_title": "Events",
|
||||
"clients_title": "Clients",
|
||||
"no_events": "(no events yet)",
|
||||
},
|
||||
"ru": {
|
||||
"tagline": "— портативный DHCP-сервер",
|
||||
"update_available": "Доступно обновление ({tag})",
|
||||
"available_adapters": "Доступные адаптеры",
|
||||
"no_adapters": "Подходящие проводные адаптеры не найдены.",
|
||||
"select_adapter": "Введите номер адаптера",
|
||||
"invalid_selection": "Неверный выбор.",
|
||||
"press_enter": "Нажмите Enter для выхода",
|
||||
"setting_nic": "Назначаю {name} → {ip} / {mask} ...",
|
||||
"shutting_down": "[{ts}] Завершение работы...",
|
||||
"revert_nic": "Вернуть {name} обратно в режим DHCP?",
|
||||
"nic_reverted": "Адаптер возвращён в режим DHCP",
|
||||
|
||||
"bind_failed": "не удалось занять UDP/67:",
|
||||
"bind_hint": "Возможно, уже запущен другой DHCP (модуль DHCP в Tftpd32, ICS, Windows DHCP).",
|
||||
"pool_exhausted": "ПУЛ ИСЧЕРПАН",
|
||||
|
||||
"panel_server": "Сервер",
|
||||
"panel_pool": "Пул",
|
||||
"panel_lease": "Аренда",
|
||||
"panel_tftp": "TFTP",
|
||||
"panel_leases": "Аренды",
|
||||
"panel_pkts": "Пакетов",
|
||||
"panel_ctrlc": "Ctrl+C — выход",
|
||||
"col_ip": "IP",
|
||||
"col_host": "Имя хоста",
|
||||
"col_mac": "MAC",
|
||||
"col_last": "Последний",
|
||||
"col_ping": "Пинг",
|
||||
"no_clients": "(клиентов пока нет)",
|
||||
"more_clients": "(ещё +{n} — увеличьте окно)",
|
||||
"events_title": "События",
|
||||
"clients_title": "Клиенты",
|
||||
"no_events": "(пока пусто)",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def set_language(lang: str) -> None:
|
||||
global _lang
|
||||
if lang in STRINGS:
|
||||
_lang = lang
|
||||
|
||||
|
||||
def language() -> str:
|
||||
return _lang
|
||||
|
||||
|
||||
def t(key: str, **params) -> str:
|
||||
"""Translate `key` for the active language; fall back to English if a key is
|
||||
missing in the chosen language. Apply `.format(**params)` for placeholders."""
|
||||
s = STRINGS.get(_lang, {}).get(key) or STRINGS["en"].get(key, key)
|
||||
if params:
|
||||
try:
|
||||
return s.format(**params)
|
||||
except (KeyError, IndexError):
|
||||
return s
|
||||
return s
|
||||
|
|
@ -1,103 +0,0 @@
|
|||
"""
|
||||
Network plumbing: enumerate physical NICs, set / revert IP via netsh, ping hosts.
|
||||
|
||||
Pure functions; no shared state. The ping loop is in `dhcp.py` because it
|
||||
mutates the DHCP server's client table."""
|
||||
|
||||
from __future__ import annotations
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Any
|
||||
|
||||
CREATE_NO_WINDOW = 0x08000000 if os.name == "nt" else 0
|
||||
|
||||
# Adapters we never show in the picker.
|
||||
SKIP_DESCRIPTION = (
|
||||
"VPN", "Virtual", "AnyConnect", "TAP-", "TUN-", "Bluetooth", "Loopback",
|
||||
"WAN Miniport", "Hyper-V", "VMware", "VirtualBox", "WireGuard", "OpenVPN",
|
||||
"Tailscale", "ZeroTier",
|
||||
)
|
||||
SKIP_MEDIA = ("Native 802.11", "Wireless WAN")
|
||||
|
||||
|
||||
def _run_ps(cmd: str, timeout: int = 15) -> subprocess.CompletedProcess:
|
||||
return subprocess.run(
|
||||
["powershell.exe", "-NoProfile", "-NonInteractive", "-Command", cmd],
|
||||
capture_output=True, text=True, timeout=timeout,
|
||||
creationflags=CREATE_NO_WINDOW,
|
||||
)
|
||||
|
||||
|
||||
def _run_netsh(args: list[str], timeout: int = 15) -> subprocess.CompletedProcess:
|
||||
return subprocess.run(
|
||||
["netsh", *args],
|
||||
capture_output=True, text=True, timeout=timeout,
|
||||
creationflags=CREATE_NO_WINDOW,
|
||||
)
|
||||
|
||||
|
||||
def list_adapters() -> list[dict[str, Any]]:
|
||||
"""Return physical wired adapters only — skip wireless / VPN / virtual."""
|
||||
cmd = (
|
||||
r"Get-NetAdapter | ForEach-Object {"
|
||||
r" $ip = (Get-NetIPAddress -InterfaceIndex $_.ifIndex -AddressFamily IPv4 -ErrorAction SilentlyContinue | "
|
||||
r" Where-Object PrefixOrigin -ne 'WellKnown' | Select-Object -ExpandProperty IPAddress) -join ','; "
|
||||
r" [pscustomobject]@{"
|
||||
r" Name=$_.Name; Description=$_.InterfaceDescription; Status=$_.Status; "
|
||||
r" Virtual=[bool]$_.Virtual; MediaType=$_.MediaType; ifIndex=$_.ifIndex; IPv4=$ip"
|
||||
r" }} | ConvertTo-Json -Depth 3 -Compress"
|
||||
)
|
||||
r = _run_ps(cmd, timeout=20)
|
||||
if r.returncode != 0 or not r.stdout.strip():
|
||||
return []
|
||||
data = json.loads(r.stdout)
|
||||
if isinstance(data, dict):
|
||||
data = [data]
|
||||
|
||||
out: list[dict[str, Any]] = []
|
||||
for a in data:
|
||||
if a.get("Status") in ("Disabled", "Not Present"):
|
||||
continue
|
||||
if a.get("Virtual"):
|
||||
continue
|
||||
if a.get("MediaType") in SKIP_MEDIA:
|
||||
continue
|
||||
haystack = ((a.get("Description") or "") + " " + (a.get("Name") or "")).lower()
|
||||
if any(k.lower() in haystack for k in SKIP_DESCRIPTION):
|
||||
continue
|
||||
out.append(a)
|
||||
out.sort(key=lambda x: x["ifIndex"])
|
||||
return out
|
||||
|
||||
|
||||
def set_static_ip(nic_name: str, ip: str, mask: str) -> None:
|
||||
_run_netsh(["interface", "ipv4", "set", "address", f"name={nic_name}", "static", ip, mask])
|
||||
|
||||
|
||||
def revert_to_dhcp(nic_name: str) -> None:
|
||||
_run_netsh(["interface", "ipv4", "set", "address", f"name={nic_name}", "source=dhcp"])
|
||||
_run_netsh(["interface", "ipv4", "set", "dnsservers", f"name={nic_name}", "source=dhcp"])
|
||||
|
||||
|
||||
def ping_one(ip: str, timeout_ms: int = 600) -> bool:
|
||||
"""Windows-`ping` based reachability test.
|
||||
|
||||
Windows' `ping` exit code is unreliable — it can return 0 with
|
||||
"Destination host unreachable" or with a stale-ARP-based reply from the
|
||||
local stack. The only trustworthy success marker is the `TTL=` substring
|
||||
in stdout (present across locales — e.g. `...time<1ms TTL=64` or
|
||||
`...время<1мс TTL=64`)."""
|
||||
try:
|
||||
if os.name == "nt":
|
||||
cmd = ["ping", "-n", "1", "-w", str(timeout_ms), ip]
|
||||
else:
|
||||
cmd = ["ping", "-c", "1", "-W", "1", ip]
|
||||
r = subprocess.run(
|
||||
cmd, capture_output=True, timeout=2, text=True,
|
||||
creationflags=CREATE_NO_WINDOW,
|
||||
)
|
||||
out = (r.stdout or "") + (r.stderr or "")
|
||||
return "TTL=" in out
|
||||
except Exception:
|
||||
return False
|
||||
|
|
@ -1,50 +0,0 @@
|
|||
"""
|
||||
Windows-specific bits: VT (ANSI) processing in the console, UAC self-elevation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import ctypes
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
||||
def enable_vt() -> None:
|
||||
"""Enable virtual-terminal processing on the Windows console so that ESC
|
||||
escape sequences from raw stdout writes are interpreted as colours / clear
|
||||
instead of being printed as literal characters.
|
||||
|
||||
Rich enables this for its own writes; we still call it so direct
|
||||
`sys.stdout.write("\\x1b[...")` works (used to clear the scrollback when
|
||||
the alt screen starts)."""
|
||||
if os.name != "nt":
|
||||
return
|
||||
try:
|
||||
k = ctypes.windll.kernel32
|
||||
STD_OUT, STD_ERR = -11, -12
|
||||
ENABLE_VT = 0x0004
|
||||
for std in (STD_OUT, STD_ERR):
|
||||
h = k.GetStdHandle(std)
|
||||
mode = ctypes.c_ulong()
|
||||
if k.GetConsoleMode(h, ctypes.byref(mode)):
|
||||
k.SetConsoleMode(h, mode.value | ENABLE_VT)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def is_admin() -> bool:
|
||||
try:
|
||||
return ctypes.windll.shell32.IsUserAnAdmin() != 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def require_admin() -> None:
|
||||
"""If we're not running elevated, relaunch ourselves through UAC and exit.
|
||||
|
||||
Works both for the PyInstaller bundle (`sys.executable` is the exe itself)
|
||||
and for a plain `python src/dhcpsrv/__main__.py` run."""
|
||||
if is_admin():
|
||||
return
|
||||
args = " ".join(f'"{a}"' for a in sys.argv)
|
||||
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, args, None, 1)
|
||||
sys.exit(0)
|
||||
|
|
@ -1,144 +0,0 @@
|
|||
"""
|
||||
Rich-based full-screen TUI.
|
||||
|
||||
`Ui` owns the events buffer and the refresh trigger; it reads everything else
|
||||
from the `DhcpServer` it's bound to."""
|
||||
|
||||
from __future__ import annotations
|
||||
import sys
|
||||
import threading
|
||||
from collections import deque
|
||||
from typing import Callable
|
||||
|
||||
from rich.console import Console
|
||||
from rich.layout import Layout
|
||||
from rich.live import Live
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
|
||||
from . import __version__
|
||||
from .dhcp import DhcpServer, int2ip
|
||||
from .i18n import t as _t
|
||||
|
||||
|
||||
# Fixed-size layout slots — used to compute the clients-table fit.
|
||||
HEADER_LINES = 5
|
||||
EVENTS_LINES = 14
|
||||
TBL_OVERHEAD = 6 # panel borders + table header + table rules
|
||||
|
||||
|
||||
class Ui:
|
||||
def __init__(self, server: DhcpServer):
|
||||
self.server = server
|
||||
self.console = Console(log_path=False)
|
||||
self.events = deque(maxlen=200)
|
||||
self.events_lock = threading.Lock()
|
||||
self.refresh_evt = threading.Event()
|
||||
|
||||
# --- public for other modules ---
|
||||
def log(self, markup: str) -> None:
|
||||
"""Append an event line. Called from the DHCP server thread."""
|
||||
with self.events_lock:
|
||||
self.events.append(markup)
|
||||
self.refresh_evt.set()
|
||||
|
||||
def request_refresh(self) -> None:
|
||||
"""Called when something the UI cares about changed (e.g. a ping result)."""
|
||||
self.refresh_evt.set()
|
||||
|
||||
# --- rendering ---
|
||||
def _render_header(self) -> Panel:
|
||||
with self.server.lock:
|
||||
leased = len(self.server.clients)
|
||||
st = self.server.stats
|
||||
cfg = self.server.cfg
|
||||
body = (
|
||||
f"[bold cyan]dhcpsrv v{__version__}[/]\n"
|
||||
f"{_t('panel_server')}: [bold]{cfg.server_ip}[/]/{cfg.netmask} "
|
||||
f"{_t('panel_pool')}: [bold]{int2ip(cfg.pool[0])}–{int2ip(cfg.pool[-1])}[/] "
|
||||
f"{_t('panel_lease')}: [bold]{cfg.lease}s[/] "
|
||||
f"{_t('panel_tftp')}: [bold]{cfg.tftp}[/]\n"
|
||||
f"{_t('panel_leases')}: [bold]{leased}/{len(cfg.pool)}[/] "
|
||||
f"{_t('panel_pkts')}: [dim]{st['packets']}[/] "
|
||||
f"DISCOVER: [cyan]{st['discovers']}[/] "
|
||||
f"REQUEST: [green]{st['requests']}[/] "
|
||||
f"RELEASE: [yellow]{st['releases']}[/] "
|
||||
f"[dim]{_t('panel_ctrlc')}[/]"
|
||||
)
|
||||
return Panel(body, border_style="cyan")
|
||||
|
||||
def _render_table(self) -> Table:
|
||||
tbl = Table(expand=True, header_style="bold")
|
||||
tbl.add_column("#", style="dim", width=3, justify="right")
|
||||
tbl.add_column(_t("col_ip"), width=16)
|
||||
tbl.add_column(_t("col_host"), min_width=10)
|
||||
tbl.add_column(_t("col_mac"), width=19)
|
||||
tbl.add_column(_t("col_last"), style="dim", width=10)
|
||||
tbl.add_column(_t("col_ping"), width=6, justify="center")
|
||||
|
||||
with self.server.lock:
|
||||
rows = sorted(self.server.clients.items(), key=lambda kv: kv[1]["ip_int"])
|
||||
|
||||
avail = max(1, self.console.size.height - HEADER_LINES - EVENTS_LINES - TBL_OVERHEAD)
|
||||
overflow = max(0, len(rows) - avail)
|
||||
if overflow:
|
||||
rows = rows[: avail - 1] # leave one slot for the "(+N more)" marker
|
||||
|
||||
if not rows:
|
||||
tbl.add_row("—", "—", _t("no_clients"), "—", "—", "—")
|
||||
else:
|
||||
for i, (mac, c) in enumerate(rows, 1):
|
||||
ping = (Text("OK", style="bold green")
|
||||
if c.get("ping_ok") else Text("--", style="bold red"))
|
||||
tbl.add_row(
|
||||
str(i),
|
||||
int2ip(c["ip_int"]),
|
||||
c.get("host") or "—",
|
||||
mac,
|
||||
c.get("last", "—"),
|
||||
ping,
|
||||
)
|
||||
if overflow:
|
||||
tbl.add_row("…", "", f"[dim]{_t('more_clients', n=overflow)}[/]", "", "", "")
|
||||
return tbl
|
||||
|
||||
def _render_events(self) -> Panel:
|
||||
with self.events_lock:
|
||||
last = list(self.events)[-20:]
|
||||
body = "\n".join(last) if last else f"[dim]{_t('no_events')}[/]"
|
||||
return Panel(body, title=_t("events_title"), border_style="dim")
|
||||
|
||||
def _render_screen(self) -> Layout:
|
||||
layout = Layout()
|
||||
layout.split_column(
|
||||
Layout(self._render_header(), name="hdr", size=HEADER_LINES),
|
||||
Layout(Panel(self._render_table(), title=_t("clients_title"), border_style="cyan"), name="tbl"),
|
||||
Layout(self._render_events(), name="evt", size=EVENTS_LINES),
|
||||
)
|
||||
return layout
|
||||
|
||||
# --- main loop ---
|
||||
def run(self, stop: threading.Event) -> None:
|
||||
"""Run until `stop` is set. Event-driven: redraws only on real changes
|
||||
or terminal resize."""
|
||||
# Clear screen + scrollback so wheel-scrolling can't expose pre-launch text.
|
||||
sys.stdout.write("\x1b[2J\x1b[3J\x1b[H")
|
||||
sys.stdout.flush()
|
||||
|
||||
last_size = self.console.size
|
||||
with Live(self._render_screen(), auto_refresh=False, console=self.console,
|
||||
screen=True, redirect_stdout=False, redirect_stderr=False) as live:
|
||||
live.refresh()
|
||||
while not stop.is_set():
|
||||
triggered = self.refresh_evt.wait(timeout=0.5)
|
||||
if stop.is_set():
|
||||
break
|
||||
cur_size = self.console.size
|
||||
resized = (cur_size != last_size)
|
||||
if resized:
|
||||
last_size = cur_size
|
||||
if triggered:
|
||||
self.refresh_evt.clear()
|
||||
if triggered or resized:
|
||||
live.update(self._render_screen(), refresh=True)
|
||||
|
|
@ -1,43 +0,0 @@
|
|||
"""
|
||||
Auto-update check.
|
||||
|
||||
On startup, ask GitHub for the latest release tag. If it's newer than the
|
||||
local `__version__`, return the tag string so the caller can show a quiet
|
||||
hint in the header. Silent on any error (offline, rate-limit, etc.).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import json
|
||||
import urllib.request
|
||||
|
||||
from . import __version__, GITHUB_REPO
|
||||
|
||||
|
||||
def _parse_version(s: str) -> tuple[int, int, int]:
|
||||
try:
|
||||
s = (s or "").strip().lstrip("v")
|
||||
parts = [int(x) for x in s.split(".")[:3]]
|
||||
while len(parts) < 3:
|
||||
parts.append(0)
|
||||
return tuple(parts) # type: ignore[return-value]
|
||||
except Exception:
|
||||
return (0, 0, 0)
|
||||
|
||||
|
||||
def check_for_update() -> str | None:
|
||||
"""Return the latest release tag (e.g. 'v1.2.0') if it is newer than the
|
||||
currently running version. Returns None when up to date, offline, or on
|
||||
any error — the caller decides how (or whether) to render the hint."""
|
||||
try:
|
||||
url = f"https://git.engelgardt23.ru/api/v1/repos/{GITHUB_REPO}/releases/latest"
|
||||
req = urllib.request.Request(url, headers={
|
||||
"User-Agent": f"dhcpsrv/{__version__}",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=3) as r:
|
||||
data = json.loads(r.read().decode("utf-8", errors="replace"))
|
||||
latest = (data.get("tag_name") or "").strip()
|
||||
if latest and _parse_version(latest) > _parse_version(__version__):
|
||||
return latest
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
|
@ -1,69 +0,0 @@
|
|||
# Regenerate assets/icon.ico for this project.
|
||||
# Edit $Text / colors below and re-run.
|
||||
|
||||
param(
|
||||
[string]$Text = "DHCP",
|
||||
[int[]]$ColorFrom = @(80, 160, 255),
|
||||
[int[]]$ColorTo = @(20, 60, 140),
|
||||
[string]$OutPath = "$PSScriptRoot\..\assets\icon.ico"
|
||||
)
|
||||
|
||||
Add-Type -AssemblyName System.Drawing
|
||||
|
||||
$outDir = Split-Path $OutPath -Parent
|
||||
if (-not (Test-Path $outDir)) { New-Item -ItemType Directory -Path $outDir -Force | Out-Null }
|
||||
|
||||
$bgFrom = [System.Drawing.Color]::FromArgb(255, $ColorFrom[0], $ColorFrom[1], $ColorFrom[2])
|
||||
$bgTo = [System.Drawing.Color]::FromArgb(255, $ColorTo[0], $ColorTo[1], $ColorTo[2])
|
||||
|
||||
$sizes = 256, 128, 64, 48, 32, 16
|
||||
$pngs = @()
|
||||
foreach ($s in $sizes) {
|
||||
$bmp = New-Object System.Drawing.Bitmap $s, $s
|
||||
$g = [System.Drawing.Graphics]::FromImage($bmp)
|
||||
$g.SmoothingMode = 'AntiAlias'
|
||||
$g.TextRenderingHint = 'AntiAliasGridFit'
|
||||
|
||||
$path = New-Object System.Drawing.Drawing2D.GraphicsPath
|
||||
$r = $s - 2
|
||||
$path.AddEllipse(1, 1, $r, $r)
|
||||
$brush = New-Object System.Drawing.Drawing2D.PathGradientBrush($path)
|
||||
$brush.CenterColor = $bgFrom
|
||||
$brush.SurroundColors = @($bgTo)
|
||||
$g.FillEllipse($brush, 1, 1, $r, $r)
|
||||
|
||||
$pen = New-Object System.Drawing.Pen ([System.Drawing.Color]::FromArgb(60, 0, 0, 0)), ([float]($s/64))
|
||||
$g.DrawEllipse($pen, 1, 1, $r, $r)
|
||||
|
||||
$fontSize = [float]($s * 0.32)
|
||||
$font = New-Object System.Drawing.Font "Segoe UI Black", $fontSize, ([System.Drawing.FontStyle]::Bold), ([System.Drawing.GraphicsUnit]::Pixel)
|
||||
$sf = New-Object System.Drawing.StringFormat
|
||||
$sf.Alignment = 'Center'; $sf.LineAlignment = 'Center'
|
||||
$shadow = New-Object System.Drawing.SolidBrush ([System.Drawing.Color]::FromArgb(80, 0, 0, 0))
|
||||
$g.DrawString($Text, $font, $shadow, [float]($s/2 + $s*0.02), [float]($s/2 + $s*0.02), $sf)
|
||||
$g.DrawString($Text, $font, [System.Drawing.Brushes]::White, [float]($s/2), [float]($s/2), $sf)
|
||||
$g.Dispose()
|
||||
|
||||
$ms = New-Object System.IO.MemoryStream
|
||||
$bmp.Save($ms, [System.Drawing.Imaging.ImageFormat]::Png)
|
||||
$pngs += ,@{ size = $s; bytes = $ms.ToArray() }
|
||||
$ms.Dispose(); $bmp.Dispose()
|
||||
}
|
||||
|
||||
$fs = [System.IO.File]::Create($OutPath)
|
||||
$bw = New-Object System.IO.BinaryWriter $fs
|
||||
$bw.Write([uint16]0)
|
||||
$bw.Write([uint16]1)
|
||||
$bw.Write([uint16]$pngs.Count)
|
||||
$offset = 6 + 16 * $pngs.Count
|
||||
foreach ($p in $pngs) {
|
||||
$w = if ($p.size -ge 256) { 0 } else { $p.size }
|
||||
$bw.Write([byte]$w); $bw.Write([byte]$w)
|
||||
$bw.Write([byte]0); $bw.Write([byte]0)
|
||||
$bw.Write([uint16]1); $bw.Write([uint16]32)
|
||||
$bw.Write([uint32]$p.bytes.Length); $bw.Write([uint32]$offset)
|
||||
$offset += $p.bytes.Length
|
||||
}
|
||||
foreach ($p in $pngs) { $bw.Write($p.bytes) }
|
||||
$bw.Close(); $fs.Close()
|
||||
Write-Host "Wrote $OutPath"
|
||||
Loading…
Reference in a new issue