vrcx 0.2.0-dev: BMC + SDS parallel collector

Initial commit of vrcx (Vegman Remote Collect, extended) — the BMC-only
bmccollect renamed and extended with a parallel SDS-host log branch.

- dev/prod/old repo layout
- per-host bmc/ + os/ subdirs, archives/dump_<ip>.tar.gz, outer session tarball
- SdsSession (paramiko, sudo via -S), OS_COMMAND_TABLE (lsiget, storcli, smartctl, journal, dmidecode, etc.)
- SDS IP discovery via Redfish EthernetInterfaces -> /24 ping-sweep -> arp -a
- UI shows BMC|OS dual progress per host
- CI/pyinstaller paths updated for dev/
This commit is contained in:
Engelgardt23 2026-05-18 17:34:54 +03:00
commit 4e727b669d
25 changed files with 1842 additions and 0 deletions

49
.github/ISSUE_TEMPLATE/bug_report.yml vendored Normal file
View file

@ -0,0 +1,49 @@
name: Bug report
description: Something doesn't work as expected
labels: ["bug"]
body:
- type: input
id: version
attributes:
label: Version
description: Visible in the startup banner.
placeholder: v0.1.0
validations:
required: true
- type: textarea
id: steps
attributes:
label: Steps to reproduce
placeholder: |
1. ...
2. ...
3. ...
validations:
required: true
- type: textarea
id: expected
attributes:
label: What you expected to happen
validations:
required: true
- type: textarea
id: actual
attributes:
label: What actually happened
description: Paste any error output verbatim. Screenshots are welcome.
validations:
required: true
- type: input
id: bmc_fw
attributes:
label: BMC firmware version (if relevant)
placeholder: e.g. YADRO VEGMAN Sx20 BMC Firmware v1.8r1389cd
- type: textarea
id: extra
attributes:
label: Anything else?

5
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View file

@ -0,0 +1,5 @@
blank_issues_enabled: false
contact_links:
- name: Security vulnerability
url: https://github.com/Engelgardt23/bmccollect/security/advisories/new
about: Please report security issues privately via GitHub Security Advisories — not as a public issue.

View file

@ -0,0 +1,22 @@
name: Feature request
description: Suggest a new feature or an improvement
labels: ["enhancement"]
body:
- type: textarea
id: motivation
attributes:
label: What's the use case?
validations:
required: true
- type: textarea
id: proposal
attributes:
label: Proposed solution
validations:
required: true
- type: textarea
id: alternatives
attributes:
label: Alternatives considered

26
.gitignore vendored Normal file
View file

@ -0,0 +1,26 @@
# PyInstaller build artefacts
build/
dist/
*.spec
# Python cache
__pycache__/
*.py[cod]
# Distribution staging folders (built per-version, attached to GitHub Releases)
portable-v*/
# Local backup of release archives
releases/
# Runtime output
out/
# Editor / OS junk
.vscode/
.idea/
.DS_Store
Thumbs.db
# Local scratch: squashfs/ISO extracts during dev inspection
.sds_inspect/

20
CHANGELOG.md Normal file
View file

@ -0,0 +1,20 @@
# Changelog
All notable changes to **bmccollect** are documented in this file.
The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and the project uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.1.0] - 2026-05-16
### Added
- Initial public release.
- Re-implementation of the original YADRO VRC tool as a properly structured Python project.
- Multi-BMC parallel collection over SSH, with the YADRO BMC CLI command set + raw shell + `cat` + `journalctl` + Redfish.
- Output structure 1:1 with VRC v1.1b — same per-host `<serial>_bmcdump_<datetime>` layout inside a single session `tar.gz`.
- Full-screen rich-based TUI: per-host progress table + rolling events panel.
- Auto-update check on startup against GitHub `/releases/latest`.
- MIT licensed.
[Unreleased]: https://github.com/Engelgardt23/bmccollect/compare/v0.1.0...HEAD
[0.1.0]: https://github.com/Engelgardt23/bmccollect/releases/tag/v0.1.0

68
CONTRIBUTING.md Normal file
View file

@ -0,0 +1,68 @@
# Contributing
> Project layout, build, and release flow. **If you only want to use the tool — read [README](README.md) instead.**
## Repo layout
```
bmccollect/
├── .github/
│ ├── workflows/release.yml ← CI: tag-driven build + GitHub Release
│ └── ISSUE_TEMPLATE/ ← bug / feature / security routing
├── src/bmccollect/ ← package source (≤200 lines per module)
│ ├── __init__.py ← single source of truth for __version__
│ ├── __main__.py ← entry: python -m bmccollect
│ ├── app.py ← orchestration: prompts, threads, packaging
│ ├── platform_win.py ← VT enable
│ ├── update_check.py ← GitHub /releases/latest poll
│ ├── commands.py ← table of "filename → how to obtain it"
│ ├── bmc.py ← BmcSession (SSH + Redfish helper)
│ ├── collector.py ← per-host collect loop
│ ├── tarball.py ← layout, per-host & session tar.gz
│ └── ui.py ← rich-based TUI
├── bmccollect-launcher.py ← PyInstaller entry (root, absolute import)
├── pyproject.toml ← deps, packaging, dynamic version
├── CHANGELOG.md / CONTRIBUTING.md / LICENSE / README.md / SECURITY.md
└── .gitignore
```
## Run from source
```
python -m pip install rich paramiko
PYTHONPATH=src python -m bmccollect
```
## Editable install
```
python -m pip install -e .
bmccollect
```
## Build the portable .exe
```
python -m pip install pyinstaller rich paramiko
python -m PyInstaller --onefile --console --name bmccollect --paths src bmccollect-launcher.py
```
## Cut a release
1. Update `src/bmccollect/__init__.py` — bump `__version__` to `X.Y.Z`.
2. Update `CHANGELOG.md` — move items from `[Unreleased]` into a new `[X.Y.Z]` section with today's date.
3. Commit: `git commit -am "vX.Y.Z: …"`.
4. Tag: `git tag vX.Y.Z`.
5. Push: `git push && git push --tags`.
CI builds the exe and creates the GitHub Release with the zip attached.
## Where features go
| Adding... | Touch this module |
|---|---|
| A new artefact (file in the dump) | `commands.py` → one new `CommandSpec` row |
| Support for a new BMC API (e.g. IPMI) | `bmc.py` → add a method on `BmcSession`; reference it from `commands.py` with a new `kind` |
| New per-host UI column / progress detail | `ui.py``Ui._render_table` + `set_progress` / `set_summary` |
| Different output naming or layout | `tarball.py` |
| Tweaking the startup banner / prompts | `app.py``main` / `_prompt_inputs` |

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 engelgardt
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

49
README.md Normal file
View file

@ -0,0 +1,49 @@
# bmccollect
[![Latest release](https://img.shields.io/github/v/release/Engelgardt23/bmccollect)](https://github.com/Engelgardt23/bmccollect/releases/latest)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)
A portable collector of YADRO BMC diagnostic logs. Re-implementation of the original VRC tool, packaged as a maintainable Python project — same output structure expected by YADRO support, but readable source, modular layout, CI-built releases.
> **Made by engelgardt.**
---
## Download
Grab the latest release: [**releases page**](https://github.com/Engelgardt23/bmccollect/releases/latest).
The asset is `bmccollect-portable-vX.Y.Z.zip`.
## Run
1. Unzip anywhere.
2. Double-click `bmccollect.exe`.
3. Paste one or more BMC IPs (whitespace / comma / newline separated). End input with an empty line.
4. Enter username (default `admin`) and password.
5. Watch the live progress table while the tool collects each BMC in parallel.
6. When it's done you get a single `out/<DDMMYYYY_HHMMSS>/<stamp>.tar.gz` ready to send to support.
`Ctrl+C` aborts. The output folder is kept regardless — you can pack it manually if needed.
## What it collects
For each BMC: `inventory.json`, `lsinventory.json`, `sensors.log`, `sellog.log`, `bmc-state.txt`, `host-state.txt`, `bmc-net-cfg.log`, `cpuinfo`, `meminfo`, `osrelease`, `disk-usage.log`, `failed-services.log`, `top.log`, `bmc-journal_full_date.log`, journals for `obmc-console` and `obmc-yadro-vrm-setter`, a Redfish `/redfish/v1/Systems` dump, and others — see [`commands.py`](src/bmccollect/commands.py) for the full command table. Adding a new artefact is one line in that table.
## Compatibility
- Output structure mirrors VRC v1.1b — YADRO support flow is unchanged.
- Tested against `vegman-sx20` BMC firmware.
- Windows 10 / 11 host (the only place this tool runs).
## Build from source
```
python -m pip install rich paramiko pyinstaller
python -m PyInstaller --onefile --console --name bmccollect --paths src bmccollect-launcher.py
```
See [`CONTRIBUTING.md`](CONTRIBUTING.md) for the full layout and release flow.
## License
MIT — see [LICENSE](LICENSE).

32
SECURITY.md Normal file
View file

@ -0,0 +1,32 @@
# Security policy
Thanks for taking the time to look at this. This tool authenticates against
BMCs over SSH and HTTPS, runs commands as the chosen user, and writes their
output to disk — so vulnerability reports are very welcome.
## Supported versions
Only the latest tagged release on GitHub is supported. Older versions will
not get fixes; please upgrade first.
## How to report a vulnerability
**Please do not open a public issue** for security-sensitive findings.
Use GitHub's private security advisories: go to the
[Security tab](../../security/advisories/new) of this repo and click
"Report a vulnerability". GitHub will route it privately.
Please include:
- The version you tested (the startup banner is enough).
- Steps to reproduce.
- An assessment of impact.
Reports are reviewed and addressed on a best-effort basis. A fix and a public
advisory will be published once the issue is resolved. Reporters are credited
unless they prefer to stay anonymous.
## Out of scope
- Issues that require the attacker to already control the host or the BMC.
- Behaviour with explicitly broken credentials.

26
dev/pyproject.toml Normal file
View file

@ -0,0 +1,26 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "vrcx"
description = "Vegman Remote Collect (extended) — diagnostic log collector for YADRO Vegman servers (BMC + SDS host)."
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
authors = [{ name = "engelgardt" }]
dependencies = ["rich>=13", "paramiko>=3"]
dynamic = ["version"]
[project.urls]
Homepage = "https://github.com/Engelgardt23/bmccollect"
Issues = "https://github.com/Engelgardt23/bmccollect/issues"
[project.scripts]
vrcx = "vrcx.app:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.dynamic]
version = { attr = "vrcx.__version__" }

14
dev/src/vrcx/__init__.py Normal file
View file

@ -0,0 +1,14 @@
"""
vrcx - Vegman Remote Collect (extended).
made by engelgardt
Portable diagnostic collector for YADRO Vegman servers. Connects (in parallel)
to one or more BMC hosts and, optionally, to their SDS service OS, runs a
fixed set of diagnostic commands on each side, captures the output into a
per-host {bmc,os}/ layout, and packs everything into a single tar.gz.
The single source of truth for the project version. Bump this before tagging
a release; CI reads the tag, the code reads this constant."""
__version__ = "0.2.0-dev"
GITHUB_REPO = "Engelgardt23/bmccollect"

7
dev/src/vrcx/__main__.py Normal file
View file

@ -0,0 +1,7 @@
"""Entry point for `python -m vrcx`."""
from .app import main
if __name__ == "__main__":
main()

259
dev/src/vrcx/app.py Normal file
View file

@ -0,0 +1,259 @@
"""
Application entry: collects credentials + IP list (BMC + optional SDS),
resolves SDS IPs via RedfishARP, runs the BMC and OS collectors in
parallel per host, shows a live TUI, and packs the result.
"""
from __future__ import annotations
import os
import re
import shutil
import sys
import threading
from concurrent.futures import ThreadPoolExecutor, Future
from dataclasses import dataclass
from pathlib import Path
from rich.console import Console
from rich.prompt import Confirm, Prompt
from . import __version__
from .bmc import BmcSession
from .collector import collect_host
from .discover import discover_sds_ip
from .os_collector import collect_host_os
from .platform_win import enable_vt
from .tarball import (
session_stamp,
make_session_dir, make_per_host_dir, finalize_session,
)
from .ui import Ui
from .update_check import check_for_update
_IP_RE = re.compile(r"^(?:\d{1,3}\.){3}\d{1,3}$")
def _parse_ips(raw: str) -> list[str]:
tokens = re.split(r"[\s,;]+", (raw or "").strip())
return [t for t in tokens if _IP_RE.match(t)]
@dataclass
class Inputs:
hosts: list[str]
bmc_user: str
bmc_pass: str
collect_os: bool
sds_user: str
sds_pass: str
def _prompt_inputs(console: Console) -> Inputs | None:
console.rule("[bold cyan]Targets")
console.print("Enter one or more BMC IP addresses, separated by spaces, commas, or newlines.")
console.print("[dim](End input with an empty line.)[/]")
lines: list[str] = []
while True:
try:
line = input("> ")
except (EOFError, KeyboardInterrupt):
print()
return None
if not line.strip() and lines:
break
if not line.strip():
continue
lines.append(line)
hosts = _parse_ips(" ".join(lines))
if not hosts:
console.print("[red]No valid IP addresses entered.[/]")
return None
console.print()
bmc_user = Prompt.ask("BMC username", default="admin")
bmc_pass = Prompt.ask("BMC password (visible)")
console.print()
collect_os = Confirm.ask("Collect OS logs too?", default=False)
sds_user, sds_pass = "sds", "sds"
if collect_os:
sds_user = Prompt.ask("SDS username", default="sds")
sds_pass = Prompt.ask("SDS password (visible)", default="sds")
return Inputs(hosts=hosts, bmc_user=bmc_user, bmc_pass=bmc_pass,
collect_os=collect_os, sds_user=sds_user, sds_pass=sds_pass)
def _resolve_sds_ip(host: str, bmc_user: str, bmc_pass: str,
console: Console) -> str | None:
"""Open the BMC, try to discover the SDS IP. On failure prompt the user.
Returns the IP, or None when the user chose to skip OS collection."""
console.print(f"[dim]Resolving SDS IP for {host}...[/]")
ip: str | None = None
try:
with BmcSession(host=host, user=bmc_user, password=bmc_pass) as bmc:
ip = discover_sds_ip(bmc)
except Exception as exc:
console.print(f"[yellow] BMC {host}: discovery failed ({exc})[/]")
if ip:
console.print(f"[green] → SDS IP for {host}: {ip}[/]")
return ip
console.print(f"[yellow] SDS IP for {host} not auto-discovered.[/]")
manual = Prompt.ask(
f" Enter SDS IP for {host} (empty to skip OS for this host)",
default="",
).strip()
return manual or None
def _host_worker(host: str, bmc_user: str, bmc_pass: str,
sds_ip: str | None, sds_user: str, sds_pass: str,
session_dir: Path, ui: Ui) -> dict:
ui.set_status(host, "CONNECTING")
ui.log(f"[cyan]{host}[/] starting...")
per_host = make_per_host_dir(session_dir, host)
def bmc_progress(step: int, total: int, label: str, ok_n: int, fail_n: int) -> None:
ui.set_status(host, "COLLECTING")
ui.set_progress(host, "bmc", step, total, label, ok_n, fail_n)
ui.log(f"[cyan]{host}/bmc[/] → {label}")
def os_progress(step: int, total: int, label: str, ok_n: int, fail_n: int) -> None:
ui.set_status(host, "COLLECTING")
ui.set_progress(host, "os", step, total, label, ok_n, fail_n)
ui.log(f"[cyan]{host}/os[/] → {label}")
bmc_summary: dict = {"status": "skip", "ok": 0, "fail": 0, "total": 0, "error": "", "serial": ""}
os_summary: dict = {"status": "skip", "ok": 0, "fail": 0, "total": 0, "error": ""}
with ThreadPoolExecutor(max_workers=2) as ex:
bmc_fut: Future = ex.submit(
collect_host, host, bmc_user, bmc_pass, per_host / "bmc", bmc_progress
)
os_fut: Future | None = None
if sds_ip:
os_fut = ex.submit(
collect_host_os, sds_ip, sds_user, sds_pass,
per_host / "os", os_progress,
)
bmc_summary = bmc_fut.result()
if os_fut is not None:
os_summary = os_fut.result()
bmc_ok = bmc_summary["status"] == "ok"
os_ok = os_summary["status"] in ("ok", "skip")
total_ok = bmc_summary["ok"] + os_summary["ok"]
total_fail = bmc_summary["fail"] + os_summary["fail"]
if bmc_ok and os_ok:
ui.set_status(host, "DONE")
ui.set_summary(host, total_ok, total_fail, bmc_summary.get("serial", ""))
os_note = "" if sds_ip is None else f", OS {os_summary['ok']}/{os_summary['total']} ok"
ui.log(f"[green]{host}[/] done — BMC {bmc_summary['ok']}/{bmc_summary['total']} ok"
f"{os_note}.")
else:
ui.set_status(host, "ERROR")
err = (bmc_summary.get("error") or "") if not bmc_ok else (os_summary.get("error") or "")
ui.set_summary(host, total_ok, total_fail, bmc_summary.get("serial", ""), err[:80])
ui.log(f"[red]{host}[/] FAILED — {err}")
return {
"host": host,
"bmc": bmc_summary,
"os": os_summary,
"sds_ip": sds_ip or "",
}
def main() -> None:
enable_vt()
console = Console(log_path=False)
console.print(f"[bold cyan]vrcx v{__version__}[/] - Vegman Remote Collect (extended)")
console.print("[dim]made by engelgardt[/]")
console.print()
check_for_update(console)
inputs = _prompt_inputs(console)
if inputs is None:
input("Press Enter to exit"); return
# Resolve SDS IPs sequentially (so user prompts don't collide with workers).
sds_ips: dict[str, str | None] = {}
if inputs.collect_os:
console.rule("[bold cyan]SDS discovery")
for h in inputs.hosts:
sds_ips[h] = _resolve_sds_ip(h, inputs.bmc_user, inputs.bmc_pass, console)
enabled = {h for h, ip in sds_ips.items() if ip}
# Output anchor: next to the .exe when frozen, cwd otherwise.
if getattr(sys, "frozen", False):
anchor = Path(sys.executable).resolve().parent
else:
anchor = Path(os.getcwd())
base = anchor / "out"
stamp = session_stamp()
session = make_session_dir(base, stamp)
ui = Ui(session_label=stamp, out_path=str(session),
hosts=inputs.hosts, os_enabled=enabled)
stop = threading.Event()
ui_thread = threading.Thread(target=ui.run, args=(stop,), daemon=True)
ui_thread.start()
summaries: list[dict] = []
aborted = False
outer: Path | None = None
try:
with ThreadPoolExecutor(max_workers=min(8, max(2, len(inputs.hosts)))) as ex:
futures: list[Future] = [
ex.submit(
_host_worker,
h, inputs.bmc_user, inputs.bmc_pass,
sds_ips.get(h),
inputs.sds_user, inputs.sds_pass,
session, ui,
)
for h in inputs.hosts
]
for fut in futures:
summaries.append(fut.result())
except KeyboardInterrupt:
aborted = True
ui.log("[yellow]Aborted by user — removing the incomplete session folder...[/]")
finally:
if aborted:
shutil.rmtree(session, ignore_errors=True)
ui.log(f"[yellow]Removed:[/] {session}")
else:
with open(session / "vrc.log", "w", encoding="utf-8") as f:
for s in summaries:
bmc = s["bmc"]; os_ = s["os"]
f.write(
f"{s['host']} bmc:{bmc['status']} ok={bmc['ok']} fail={bmc['fail']} "
f"serial={bmc.get('serial','')} "
f"os:{os_['status']} ok={os_['ok']} fail={os_['fail']} "
f"sds_ip={s.get('sds_ip','')} "
f"{bmc.get('error','')} {os_.get('error','')}\n"
)
(session / "err_out.log").write_text("", encoding="utf-8")
outer = finalize_session(session)
ui.log(f"[bold green]Bundle ready:[/] {outer}")
stop.set()
ui_thread.join(timeout=2.0)
console.print()
if aborted:
console.print("[yellow]Aborted. Session folder removed.[/]")
else:
console.print(f"[bold green]Done.[/] Bundle: {outer}")
input("Press Enter to exit")
if __name__ == "__main__":
main()

193
dev/src/vrcx/bmc.py Normal file
View file

@ -0,0 +1,193 @@
"""
BMC client: SSH session (interactive YADRO CLI + raw shell) + Redfish HTTPS.
The YADRO CLI is an interactive shell `exec_command` lands us in plain `sh`
without the YADRO command set. We allocate a PTY and synchronise on a
per-command sentinel: every CLI command is sent as
<cmd>; echo '<<<END-<nonce>>>>'
The sentinel appears twice in the buffer (once as the echoed line, once as
the actual `echo` output after the command), and the text between them is the
command's output. This is robust against slow-responding commands and shell
banners.
"""
from __future__ import annotations
import json
import socket
import ssl
import time
import urllib.error
import urllib.request
import uuid
from contextlib import suppress
from typing import Optional
import paramiko
class BmcSession:
def __init__(self, host: str, user: str, password: str, ssh_port: int = 22):
self.host = host
self.user = user
self.password = password
self.ssh_port = ssh_port
self._ssh: Optional[paramiko.SSHClient] = None
self._chan: Optional[paramiko.Channel] = None
self._redfish_token: Optional[str] = None # X-Auth-Token if session auth was used
# ---------- lifecycle ----------
def open(self, timeout: int = 15) -> None:
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect(
self.host, port=self.ssh_port,
username=self.user, password=self.password,
timeout=timeout,
disabled_algorithms={"pubkeys": ["rsa-sha2-256", "rsa-sha2-512"]},
look_for_keys=False, allow_agent=False,
)
self._ssh = c
self._chan = c.invoke_shell(term="xterm", width=200, height=50)
self._drain_banner()
def close(self) -> None:
with suppress(Exception):
if self._chan: self._chan.close()
with suppress(Exception):
if self._ssh: self._ssh.close()
self._chan = None
self._ssh = None
self._redfish_token = None
def __enter__(self) -> "BmcSession":
self.open(); return self
def __exit__(self, *_exc) -> None:
self.close()
# ---------- low-level ----------
def _drain_banner(self, idle_s: float = 1.5, max_s: float = 5.0) -> None:
"""Swallow the welcome banner + first prompt that the BMC sends right after login."""
assert self._chan is not None
t0 = time.time(); last = time.time()
while time.time() - t0 < max_s:
if self._chan.recv_ready():
self._chan.recv(65536)
last = time.time()
elif time.time() - last > idle_s:
return
else:
time.sleep(0.05)
# ---------- public API ----------
def cli(self, cmd: str, timeout: float = 25.0) -> str:
"""Run `cmd` inside the interactive BMC CLI; return only the command's output."""
assert self._chan is not None
nonce = uuid.uuid4().hex[:8]
sentinel = f"<<<END-{nonce}>>>"
line = f"{cmd}; echo '{sentinel}'\n"
self._chan.send(line)
buf = ""
deadline = time.time() + timeout
while time.time() < deadline:
if self._chan.recv_ready():
buf += self._chan.recv(65536).decode(errors="replace")
if buf.count(sentinel) >= 2:
break
else:
time.sleep(0.05)
parts = buf.split(sentinel)
# parts[0]: stuff before the echoed cmd line ended with sentinel
# parts[1]: actual command output, ending right before the second sentinel
# parts[2]: prompt after the second sentinel
if len(parts) >= 3:
out = parts[1]
else:
# Sentinel didn't appear twice — return what we have for diagnostics.
out = buf
return out.lstrip("\r\n").rstrip() + "\n"
def shell(self, cmd: str, timeout: int = 30) -> str:
"""Raw shell command via SSH exec_command (no PTY, no YADRO CLI)."""
assert self._ssh is not None
_, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout, get_pty=False)
out = stdout.read().decode(errors="replace")
err = stderr.read().decode(errors="replace")
return out + (f"\n[stderr]\n{err}" if err.strip() else "")
def cat(self, path: str) -> str:
return self.shell(f"cat {path!s}")
def journal(self, unit: str) -> str:
return self.shell(f"journalctl --no-pager -u '{unit}'")
# ---------- Redfish ----------
def _redfish_open_session(self, timeout: int = 10) -> None:
"""POST /redfish/v1/SessionService/Sessions to obtain X-Auth-Token."""
url = f"https://{self.host}/redfish/v1/SessionService/Sessions"
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
body = json.dumps({"UserName": self.user, "Password": self.password}).encode()
req = urllib.request.Request(
url, data=body,
headers={"Content-Type": "application/json", "Accept": "application/json"},
)
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ctx))
with opener.open(req, timeout=timeout) as r:
self._redfish_token = r.headers.get("X-Auth-Token")
def redfish(self, endpoint: str, timeout: int = 10) -> bytes:
"""GET https://<host><endpoint>. Tries Basic auth first; on 401 retries via a session token."""
url = f"https://{self.host}{endpoint}"
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
def _do_get(auth_header: dict[str, str] | None = None) -> bytes:
req = urllib.request.Request(url, headers={
"Accept": "application/json",
**(auth_header or {}),
})
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ctx))
with opener.open(req, timeout=timeout) as r:
return r.read()
# 1) try basic
import base64
basic = base64.b64encode(f"{self.user}:{self.password}".encode()).decode()
try:
return _do_get({"Authorization": f"Basic {basic}"})
except urllib.error.HTTPError as e:
if e.code != 401:
raise
# 2) fall back to session auth
if not self._redfish_token:
self._redfish_open_session(timeout=timeout)
if not self._redfish_token:
raise RuntimeError("Redfish session auth failed (no X-Auth-Token in response)")
return _do_get({"X-Auth-Token": self._redfish_token})
# ---------- inventory helper ----------
@staticmethod
def serial_from_inventory_json(payload: str) -> str:
"""Best-effort extraction of the chassis serial from `lsinventory -j` JSON."""
try:
data = json.loads(payload)
except Exception:
return ""
if isinstance(data, dict):
for k in ("SerialNumber", "Serial", "serial_number", "serial"):
if k in data and data[k]:
return str(data[k])
ch = data.get("chassis") or data.get("Chassis")
if isinstance(ch, dict):
for k in ("SerialNumber", "Serial"):
if ch.get(k):
return str(ch[k])
return ""

108
dev/src/vrcx/collector.py Normal file
View file

@ -0,0 +1,108 @@
"""
The orchestrator that ties the BMC client and the command table together.
`collect_host(host, user, password, out_dir, progress)`
Runs every command in `COMMAND_TABLE` against the given BMC and saves
each result into `out_dir/<filename>`. Calls `progress(step, total, label)`
after every step so the UI can render a progress bar.
"""
from __future__ import annotations
import socket
import traceback
from pathlib import Path
from typing import Callable
import paramiko
from .bmc import BmcSession
from .commands import COMMAND_TABLE, CommandSpec
# progress(step, total, label, ok_so_far, fail_so_far)
ProgressCb = Callable[[int, int, str, int, int], None]
def _maybe_truncate(data: bytes, max_lines: int | None) -> bytes:
"""Keep only the last `max_lines` lines, preceded by a one-line header
that records the original size. Used for huge logs."""
if not max_lines:
return data
try:
text = data.decode("utf-8", errors="replace")
except Exception:
return data
lines = text.splitlines()
if len(lines) <= max_lines:
return data
header = (f"# truncated to last {max_lines} lines "
f"(original size: {len(lines)} lines)\n")
return (header + "\n".join(lines[-max_lines:]) + "\n").encode("utf-8")
def _run_one(bmc: BmcSession, spec: CommandSpec) -> bytes:
"""Execute a single command spec and return the output bytes to write."""
if spec.kind == "ssh":
raw = bmc.cli(spec.target).encode("utf-8", errors="replace")
elif spec.kind == "shell":
raw = bmc.shell(spec.target).encode("utf-8", errors="replace")
elif spec.kind == "cat":
raw = bmc.cat(spec.target).encode("utf-8", errors="replace")
elif spec.kind == "journal":
raw = bmc.journal(spec.target).encode("utf-8", errors="replace")
elif spec.kind == "redfish":
# Redfish payloads are JSON; never truncate them as lines.
return bmc.redfish(spec.target)
else:
raise ValueError(f"unknown command kind: {spec.kind!r}")
return _maybe_truncate(raw, spec.max_lines)
def collect_host(
host: str,
user: str,
password: str,
out_dir: Path,
progress: ProgressCb | None = None,
) -> dict:
"""Collect everything from a single BMC into `out_dir`.
Returns a small summary dict useful for the UI: status, error message
(if any), serial number, count of successful / failed steps."""
out_dir.mkdir(parents=True, exist_ok=True)
total = len(COMMAND_TABLE)
ok = 0
fail = 0
serial = ""
try:
with BmcSession(host=host, user=user, password=password) as bmc:
for i, spec in enumerate(COMMAND_TABLE, 1):
if progress:
progress(i, total, spec.filename, ok, fail)
try:
data = _run_one(bmc, spec)
(out_dir / spec.filename).write_bytes(data)
if spec.filename == "inventory.json" and not serial:
serial = bmc.serial_from_inventory_json(data.decode(errors="replace"))
ok += 1
except Exception as exc:
fail += 1
err_path = out_dir / (spec.filename + ".error")
err_path.write_text(
f"# command failed\n{spec.kind}: {spec.target}\n\n"
f"{traceback.format_exception_only(type(exc), exc)[0]}",
encoding="utf-8",
)
# tick once more after the file is written so the UI sees the new totals
if progress:
progress(i, total, spec.filename, ok, fail)
return {"host": host, "status": "ok", "error": "", "serial": serial,
"ok": ok, "fail": fail, "total": total}
except (paramiko.AuthenticationException, socket.timeout, socket.gaierror, OSError) as exc:
return {"host": host, "status": "error", "error": str(exc), "serial": "",
"ok": ok, "fail": fail, "total": total}
except Exception as exc:
return {"host": host, "status": "error", "error": repr(exc), "serial": "",
"ok": ok, "fail": fail, "total": total}

84
dev/src/vrcx/commands.py Normal file
View file

@ -0,0 +1,84 @@
"""
Mapping of "output file name" "how to obtain it from the BMC".
Adding a new artefact = adding one line below. The collector iterates this
table per host and writes each entry into the per-host dump folder.
The file names mirror what the original YADRO VRC tool produces, so the
support flow doesn't change.
`kind`:
"ssh" : run a command in the YADRO BMC CLI (interactive shell over SSH)
"shell" : run a raw shell command on the BMC (via SSH exec_command, bash)
"cat" : `cat <path>` over SSH capture a file verbatim
"journal" : `journalctl --no-pager -u <unit>` for a systemd unit
"redfish" : GET an HTTPS endpoint on the BMC (Redfish API)
`max_lines`:
Cap on the number of lines kept in the output. When the raw response is
bigger, only the **last** `max_lines` lines are written, with a single
"# truncated to last N (original was M lines)" header line on top.
Used to keep huge logs (sellog, journal) small and the run fast the
original VRC pulled ~30 000 lines of sellog by default; 5 000 is plenty
for triage and dramatically reduces transfer time."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class CommandSpec:
filename: str
kind: str # one of: ssh / shell / cat / journal / redfish
target: str # CLI command / shell command / path / unit / endpoint
max_lines: int | None = None
# Initial subset — the same artefacts the original VRC produces. Each line is
# a self-contained collector instruction. Easy to add/remove without touching
# the rest of the code.
#
# Note on YADRO CLI: many top-level words (`bmc`, `host`, `bmc info`, `host
# power`) are **menus**, not leaf commands — typing them just prints their
# sub-command listing. To get actual data you have to go all the way down
# (e.g. `bmc info version`, `host power status`).
COMMAND_TABLE: list[CommandSpec] = [
# --- BMC state & version ---
CommandSpec("bmc-state.txt", "ssh", "bmc info version"),
CommandSpec("chassis-state.txt", "shell", "obmcutil chassisstate || true"),
CommandSpec("host-state.txt", "ssh", "host power status"),
CommandSpec("bmc_ver&power_status.log", "ssh", "bmc info version; bmc info uptime; host power status"),
CommandSpec("hostnamectl.log", "shell", "hostnamectl"),
CommandSpec("uptime.log", "shell", "uptime"),
# --- BMC inventory & sensors ---
CommandSpec("inventory.json", "ssh", "lsinventory -j"),
CommandSpec("lsinventory.json", "ssh", "lsinventory -j"),
CommandSpec("inventory.log", "ssh", "health logs show inventory"),
CommandSpec("sensors.log", "ssh", "health logs show sensors", max_lines=5000),
CommandSpec("sellog.log", "ssh", "health logs show sellog", max_lines=5000),
# --- BMC config & users ---
CommandSpec("bmc-users.txt", "ssh", "user list"),
CommandSpec("bmc-net-cfg.log", "shell", "ip addr; echo ---; ip route; echo ---; ip link"),
CommandSpec("bmc-service-settings.log", "ssh", "bmc services list"),
CommandSpec("fan-settings.log", "ssh", "bmc cooling show"),
# --- Raw OS files / journals ---
CommandSpec("cpuinfo", "cat", "/proc/cpuinfo"),
CommandSpec("meminfo", "cat", "/proc/meminfo"),
CommandSpec("osrelease", "cat", "/etc/os-release"),
CommandSpec("ipaddr.log", "shell", "ip -4 -j addr"),
CommandSpec("iplink.log", "shell", "ip -j link"),
CommandSpec("disk-usage.log", "shell", "df -h"),
CommandSpec("failed-services.log", "shell", "systemctl --failed --no-pager"),
CommandSpec("top.log", "shell", "top -b -n1", max_lines=100),
CommandSpec("fw-printenv.log", "shell", "fw_printenv 2>&1 || true"),
CommandSpec("audit.log", "cat", "/var/log/audit/audit.log", max_lines=5000),
CommandSpec("bmc-journal_full_date.log", "shell", "journalctl --no-pager --since '7 days ago'", max_lines=5000),
CommandSpec("obmc-console.log", "journal", "obmc-console@*.service", max_lines=5000),
CommandSpec("obmc-yadro-vrm-setter.log", "journal", "obmc-yadro-vrm-setter.service", max_lines=5000),
# --- Redfish ---
CommandSpec("redfish.json", "redfish", "/redfish/v1/Systems"),
]

153
dev/src/vrcx/discover.py Normal file
View file

@ -0,0 +1,153 @@
"""
SDS-host discovery: figure out the DHCP-assigned IP of the SDS service OS
by asking the BMC for its host NIC's MAC and then looking that MAC up in
the laptop's local ARP table (warming it with a /24 ping-sweep first).
"""
from __future__ import annotations
import json
import re
import socket
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from .bmc import BmcSession
_HEX = re.compile(r"[^0-9a-f]")
def _norm_mac(mac: str) -> str:
"""Strip separators and lowercase. AA:BB:cc-dd-ee-ff → aabbccddeeff."""
return _HEX.sub("", (mac or "").lower())
# ----- Redfish: enumerate host NIC MACs ----------------------------------
def _redfish_json(bmc: BmcSession, endpoint: str) -> dict | None:
try:
return json.loads(bmc.redfish(endpoint).decode("utf-8", errors="replace"))
except Exception:
return None
def redfish_host_mac(bmc: BmcSession) -> list[str]:
"""Return normalized MACs of every host-NIC interface the BMC reports."""
macs: list[str] = []
systems = _redfish_json(bmc, "/redfish/v1/Systems") or {}
for member in systems.get("Members", []) or []:
sys_id = (member.get("@odata.id") or "").rstrip("/")
if not sys_id:
continue
sys_doc = _redfish_json(bmc, sys_id) or {}
eth_coll_id = ((sys_doc.get("EthernetInterfaces") or {})
.get("@odata.id") or "").rstrip("/")
if not eth_coll_id:
continue
eth_coll = _redfish_json(bmc, eth_coll_id) or {}
for eth_member in eth_coll.get("Members", []) or []:
eth_path = (eth_member.get("@odata.id") or "").rstrip("/")
if not eth_path:
continue
eth = _redfish_json(bmc, eth_path) or {}
mac = _norm_mac(eth.get("MACAddress") or "")
if mac and len(mac) == 12 and mac not in macs:
macs.append(mac)
return macs
# ----- Local network: find our IP that routes to BMC ---------------------
def local_ip_for(target_ip: str) -> str | None:
"""Return the local interface IP the OS would use to reach `target_ip`."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(1.0)
s.connect((target_ip, 1))
return s.getsockname()[0]
except OSError:
return None
# ----- ARP table population & lookup -------------------------------------
def _ping_one(ip: str) -> None:
is_win = sys.platform.startswith("win")
flag = "-n" if is_win else "-c"
wait_flag = "-w" if is_win else "-W"
wait_val = "300" if is_win else "1" # ms on Windows, seconds elsewhere
try:
subprocess.run(
["ping", flag, "1", wait_flag, wait_val, ip],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=2,
)
except Exception:
pass
def ping_sweep_24(local_ip: str, max_workers: int = 64) -> None:
"""Fire one ping at every host in the local /24 to populate the ARP cache."""
parts = local_ip.split(".")
if len(parts) != 4:
return
base = ".".join(parts[:3])
ips = [f"{base}.{i}" for i in range(1, 255) if f"{base}.{i}" != local_ip]
with ThreadPoolExecutor(max_workers=max_workers) as ex:
list(ex.map(_ping_one, ips))
_ARP_LINE = re.compile(
r"(?P<ip>\d+\.\d+\.\d+\.\d+)\s+(?P<mac>[0-9a-fA-F][0-9a-fA-F:-]{15,})"
)
def arp_table() -> list[tuple[str, str]]:
"""Parse `arp -a` and return (ip, normalized_mac) pairs."""
try:
proc = subprocess.run(
["arp", "-a"], capture_output=True, text=True, timeout=5
)
except (FileNotFoundError, subprocess.TimeoutExpired):
return []
out: list[tuple[str, str]] = []
for line in proc.stdout.splitlines():
m = _ARP_LINE.search(line)
if not m:
continue
mac = _norm_mac(m.group("mac"))
if len(mac) == 12:
out.append((m.group("ip"), mac))
return out
def arp_lookup(macs: list[str]) -> str | None:
"""First IP in `arp -a` whose MAC matches any of the given (normalized) MACs."""
target = set(macs)
for ip, mac in arp_table():
if mac in target:
return ip
return None
# ----- Orchestrator ------------------------------------------------------
def discover_sds_ip(bmc: BmcSession, do_sweep: bool = True) -> str | None:
"""Best-effort: BMC → host MACs → ARP. Returns SDS IP or None."""
macs = redfish_host_mac(bmc)
if not macs:
return None
hit = arp_lookup(macs)
if hit:
return hit
if not do_sweep:
return None
local = local_ip_for(bmc.host)
if not local:
return None
ping_sweep_24(local)
time.sleep(1.5)
return arp_lookup(macs)

View file

@ -0,0 +1,139 @@
"""
Orchestrator for the SDS-host side. Mirrors `collector.py` (BMC side), but
runs against an SdsSession and uses OS_COMMAND_TABLE.
`collect_host_os(host, user, password, out_dir, progress)`
Runs every entry in OS_COMMAND_TABLE against the SDS host and writes
each result into `out_dir/<filename>`. Calls `progress(step, total,
label, ok_so_far, fail_so_far)` after every step.
"""
from __future__ import annotations
import socket
import traceback
import uuid
from pathlib import Path
from typing import Callable
import paramiko
from .sds import SdsSession
from .os_commands import OS_COMMAND_TABLE, OsCommandSpec
ProgressCb = Callable[[int, int, str, int, int], None]
def _maybe_truncate(data: bytes, max_lines: int | None) -> bytes:
if not max_lines:
return data
text = data.decode("utf-8", errors="replace")
lines = text.splitlines()
if len(lines) <= max_lines:
return data
header = (f"# truncated to last {max_lines} lines "
f"(original size: {len(lines)} lines)\n")
return (header + "\n".join(lines[-max_lines:]) + "\n").encode("utf-8")
def _run_lsiget(sess: SdsSession, spec: OsCommandSpec, out_path: Path) -> None:
"""Run lsigetlinux.sh on the host, SFTP-pull its output tarball back."""
work_id = uuid.uuid4().hex[:12]
tmp_dir = f"/tmp/vrcx_lsiget_{work_id}"
script = spec.target
setup_and_run = (
f"mkdir -p {tmp_dir} && chmod 777 {tmp_dir} && cd {tmp_dir} && "
f"{script} -Q -B -P && "
f"chmod -R a+rX {tmp_dir}"
)
log = sess.sudo(setup_and_run, timeout=1800)
# LSICAPTUREFILES.TXT holds the output filename (one or more lines —
# take the last non-empty one).
listing = sess.shell(f"cat {tmp_dir}/LSICAPTUREFILES.TXT 2>/dev/null || true")
filename = next(
(ln.strip() for ln in reversed(listing.splitlines()) if ln.strip()),
"",
)
if not filename:
# Cleanup before raising
try:
sess.sudo(f"rm -rf {tmp_dir}", timeout=60)
except Exception:
pass
raise RuntimeError(
"lsiget: LSICAPTUREFILES.TXT is empty; last log lines:\n"
+ "\n".join(log.splitlines()[-20:])
)
remote = filename if filename.startswith("/") else f"{tmp_dir}/{filename}"
try:
sess.download(remote, out_path)
finally:
try:
sess.sudo(f"rm -rf {tmp_dir}", timeout=60)
except Exception:
pass
def _run_one(sess: SdsSession, spec: OsCommandSpec, out_dir: Path) -> None:
"""Execute one spec and write its artefact under `out_dir`."""
if spec.kind == "lsiget":
_run_lsiget(sess, spec, out_dir / spec.filename)
return
if spec.kind == "shell":
raw = sess.shell(spec.target)
elif spec.kind == "sudo":
raw = sess.sudo(spec.target)
elif spec.kind == "cat":
raw = sess.cat(spec.target)
elif spec.kind == "journal":
raw = sess.journal(spec.target)
else:
raise ValueError(f"unknown OS command kind: {spec.kind!r}")
data = _maybe_truncate(raw.encode("utf-8", errors="replace"), spec.max_lines)
(out_dir / spec.filename).write_bytes(data)
def collect_host_os(
host: str,
user: str,
password: str,
out_dir: Path,
progress: ProgressCb | None = None,
) -> dict:
"""Collect the SDS-side bundle for one host into `out_dir`."""
out_dir.mkdir(parents=True, exist_ok=True)
total = len(OS_COMMAND_TABLE)
ok = 0
fail = 0
try:
with SdsSession(host=host, user=user, password=password) as sess:
for i, spec in enumerate(OS_COMMAND_TABLE, 1):
if progress:
progress(i, total, spec.filename, ok, fail)
try:
_run_one(sess, spec, out_dir)
ok += 1
except Exception as exc:
fail += 1
(out_dir / (spec.filename + ".error")).write_text(
f"# command failed\n{spec.kind}: {spec.target}\n\n"
f"{traceback.format_exception_only(type(exc), exc)[0]}",
encoding="utf-8",
)
if progress:
progress(i, total, spec.filename, ok, fail)
return {"host": host, "status": "ok", "error": "",
"ok": ok, "fail": fail, "total": total}
except (paramiko.AuthenticationException, socket.timeout,
socket.gaierror, OSError) as exc:
return {"host": host, "status": "error", "error": str(exc),
"ok": ok, "fail": fail, "total": total}
except Exception as exc:
return {"host": host, "status": "error", "error": repr(exc),
"ok": ok, "fail": fail, "total": total}

View file

@ -0,0 +1,73 @@
"""
Mapping of "output file name" "how to obtain it from the SDS host".
Adding a new artefact = adding one line below. The OS collector iterates
this table per host and writes each entry into the per-host `os/` folder.
`kind`:
"shell" : run a plain shell command (no sudo)
"sudo" : run a shell command via `sudo -S` (password piped in)
"cat" : `cat <path>` (no sudo see "sudo" if root is required)
"journal" : `journalctl --no-pager -u <unit>`
"lsiget" : special run /opt/yadro/tools/LsiGet/lsigetlinux.sh and
SFTP-pull the resulting tarball back. `target` is the
absolute path to the lsiget script on the host.
`max_lines`:
Cap on the number of lines kept in the output. When the raw response is
bigger, only the last `max_lines` lines are written, with a single
"# truncated …" header line. Used to keep huge logs (journal, messages)
small and the run fast.
"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class OsCommandSpec:
filename: str
kind: str # shell | sudo | cat | journal | lsiget
target: str # cmd / path / unit / script
max_lines: int | None = None
# Minimum «support-ready» set. Order is roughly cheap-first so the live
# progress feels responsive; the few really heavy items (lsiget, smartctl
# across all drives) run last.
OS_COMMAND_TABLE: list[OsCommandSpec] = [
# --- identity & versions ---
OsCommandSpec("uname.log", "shell", "uname -a"),
OsCommandSpec("os-release.log", "cat", "/etc/os-release"),
OsCommandSpec("hostnamectl.log", "shell", "hostnamectl"),
OsCommandSpec("uptime.log", "shell", "uptime"),
# --- networking ---
OsCommandSpec("ip.log", "shell", "ip -d a; echo ---; ip r; echo ---; ip l"),
# --- block / pci inventory ---
OsCommandSpec("lsblk.log", "shell", "lsblk -O"),
OsCommandSpec("lspci.log", "shell", "lspci -vvv -nn"),
# --- system info (sudo) ---
OsCommandSpec("dmidecode.log", "sudo", "dmidecode"),
OsCommandSpec("dmesg.log", "sudo", "dmesg -T"),
# --- journal & logs (sudo, truncated) ---
OsCommandSpec("journal-boot.log", "sudo", "journalctl --no-pager -b", max_lines=5000),
OsCommandSpec("messages.log", "sudo", "tail -n 5000 /var/log/messages 2>/dev/null || true", max_lines=5000),
OsCommandSpec("failed-services.log", "shell", "systemctl --failed --no-pager"),
# --- storage / firmware tools ---
OsCommandSpec("nvme-list.log", "sudo", "nvme list && echo --- && nvme list-subsys -v"),
OsCommandSpec("storcli-show-all.log", "sudo",
"/opt/yadro/tools/inventory-management-utility/bundle-fw/tools/MegaRAID/storcli/storcli64 /call show all"),
# --- heavy: SMART for every block device ---
OsCommandSpec("smartctl-all.log", "sudo",
"for d in $(lsblk -dn -o NAME); do echo === /dev/$d ===; smartctl -x /dev/$d; echo; done"),
# --- heavy: full LsiGet bundle (SFTP-pulled, not stdout) ---
OsCommandSpec("lsiget.tar.gz", "lsiget",
"/opt/yadro/tools/LsiGet/lsigetlinux.sh"),
]

View file

@ -0,0 +1,21 @@
"""Windows-specific helpers: VT (ANSI) processing in the console."""
from __future__ import annotations
import ctypes
import os
def enable_vt() -> None:
if os.name != "nt":
return
try:
k = ctypes.windll.kernel32
STD_OUT, STD_ERR = -11, -12
ENABLE_VT = 0x0004
for std in (STD_OUT, STD_ERR):
h = k.GetStdHandle(std)
mode = ctypes.c_ulong()
if k.GetConsoleMode(h, ctypes.byref(mode)):
k.SetConsoleMode(h, mode.value | ENABLE_VT)
except Exception:
pass

118
dev/src/vrcx/sds.py Normal file
View file

@ -0,0 +1,118 @@
"""
SDS host client: plain SSH session over paramiko, plus a sudo helper that
funnels the password through stdin (echo PASS | sudo -S -p '' ...).
Unlike the BMC client this is a normal bash environment no interactive
YADRO CLI, no PTY sentinel dance. exec_command is enough for everything.
"""
from __future__ import annotations
import shlex
import socket
from contextlib import suppress
from pathlib import Path
from typing import Optional
import paramiko
class SdsSession:
def __init__(self, host: str, user: str = "sds", password: str = "sds",
ssh_port: int = 22):
self.host = host
self.user = user
self.password = password
self.ssh_port = ssh_port
self._ssh: Optional[paramiko.SSHClient] = None
self._sftp: Optional[paramiko.SFTPClient] = None
# ---------- lifecycle ----------
def open(self, timeout: int = 15) -> None:
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect(
self.host, port=self.ssh_port,
username=self.user, password=self.password,
timeout=timeout,
look_for_keys=False, allow_agent=False,
)
self._ssh = c
def close(self) -> None:
with suppress(Exception):
if self._sftp: self._sftp.close()
with suppress(Exception):
if self._ssh: self._ssh.close()
self._sftp = None
self._ssh = None
def __enter__(self) -> "SdsSession":
self.open(); return self
def __exit__(self, *_exc) -> None:
self.close()
# ---------- low-level ----------
def _exec(self, cmd: str, timeout: int = 60) -> str:
"""Run `cmd` via exec_command, return stdout + stderr appended."""
assert self._ssh is not None
_, stdout, stderr = self._ssh.exec_command(cmd, timeout=timeout, get_pty=False)
out = stdout.read().decode(errors="replace")
err = stderr.read().decode(errors="replace")
return out + (f"\n[stderr]\n{err}" if err.strip() else "")
# ---------- public API ----------
def shell(self, cmd: str, timeout: int = 60) -> str:
"""Run a plain shell command, no sudo."""
return self._exec(cmd, timeout=timeout)
def sudo(self, cmd: str, timeout: int = 600) -> str:
"""Run `cmd` as root by piping the user's password into `sudo -S`.
The inner command runs through `bash -c` so shell features
(pipelines, redirects, globs) work as expected. `sudo -p ''`
suppresses the password prompt so it doesn't leak into stdout.
"""
wrapped = f"echo {shlex.quote(self.password)} | " \
f"sudo -S -p '' bash -c {shlex.quote(cmd)} 2>&1"
return self._exec(wrapped, timeout=timeout)
def cat(self, path: str, timeout: int = 60) -> str:
return self._exec(f"cat {shlex.quote(path)}", timeout=timeout)
def journal(self, unit: str, since: str | None = None,
timeout: int = 120) -> str:
parts = ["journalctl --no-pager"]
if unit:
parts += ["-u", shlex.quote(unit)]
if since:
parts += ["--since", shlex.quote(since)]
return self._exec(" ".join(parts), timeout=timeout)
# ---------- file transfer ----------
def _ensure_sftp(self) -> paramiko.SFTPClient:
if self._sftp is None:
assert self._ssh is not None
self._sftp = self._ssh.open_sftp()
return self._sftp
def download(self, remote_path: str, local_path: Path) -> None:
"""SFTP-get `remote_path` to `local_path` (local parent must exist)."""
sftp = self._ensure_sftp()
local_path.parent.mkdir(parents=True, exist_ok=True)
sftp.get(remote_path, str(local_path))
def remote_exists(self, remote_path: str) -> bool:
sftp = self._ensure_sftp()
try:
sftp.stat(remote_path)
return True
except (FileNotFoundError, IOError):
return False
# Re-export the standard SSH/socket exceptions so callers can catch them
# without importing paramiko directly.
SSHException = paramiko.SSHException
AuthenticationError = paramiko.AuthenticationException
SocketError = (socket.timeout, socket.gaierror, OSError)

87
dev/src/vrcx/tarball.py Normal file
View file

@ -0,0 +1,87 @@
"""
Output layout & tarball packaging.
Layout under `<repo>/out/`:
out/
<DDMMYYYY_HHMMSS>/
<bmc_ip>/
bmc/ collected from the BMC
os/ collected from the SDS host (optional)
<bmc_ip_2>/
bmc/
os/
archives/
dump_<bmc_ip>.tar.gz
dump_<bmc_ip_2>.tar.gz
vrc.log
err_out.log
<DDMMYYYY_HHMMSS>.tar.gz (one-click bundle for support)
"""
from __future__ import annotations
import re
import tarfile
from datetime import datetime
from pathlib import Path
def session_stamp(now: datetime | None = None) -> str:
return (now or datetime.now()).strftime("%d%m%Y_%H%M%S")
_SAFE = re.compile(r"[^A-Za-z0-9._-]+")
def safe_id(s: str) -> str:
"""File-system safe identifier — used for the per-host folder name."""
s = (s or "").strip() or "unknown"
return _SAFE.sub("_", s)
def make_session_dir(base: Path, stamp: str) -> Path:
d = base / stamp
d.mkdir(parents=True, exist_ok=True)
return d
def make_per_host_dir(session_dir: Path, host: str) -> Path:
"""Per-host folder named after the BMC IP. Always contains bmc/ and os/."""
d = session_dir / safe_id(host)
(d / "bmc").mkdir(parents=True, exist_ok=True)
(d / "os").mkdir(parents=True, exist_ok=True)
return d
def make_archives_dir(session_dir: Path) -> Path:
d = session_dir / "archives"
d.mkdir(parents=True, exist_ok=True)
return d
def per_host_archive_name(host: str) -> str:
return f"dump_{safe_id(host)}.tar.gz"
def tar_directory(src_dir: Path, dst_tar: Path) -> None:
"""Create `dst_tar` (.tar.gz) containing `src_dir` (and its tree). The
archive's top-level entry is the directory itself."""
with tarfile.open(dst_tar, "w:gz") as tf:
tf.add(src_dir, arcname=src_dir.name)
def finalize_session(session_dir: Path) -> Path:
"""For each per-host directory, write archives/dump_<host>.tar.gz and drop
the unpacked folder. Then pack the whole session into <session>.tar.gz."""
archives = make_archives_dir(session_dir)
for child in list(session_dir.iterdir()):
if not child.is_dir() or child.name == "archives":
continue
tar_directory(child, archives / per_host_archive_name(child.name))
for p in sorted(child.rglob("*"), reverse=True):
if p.is_file(): p.unlink(missing_ok=True)
elif p.is_dir(): p.rmdir()
child.rmdir()
outer = session_dir.parent / f"{session_dir.name}.tar.gz"
tar_directory(session_dir, outer)
return outer

208
dev/src/vrcx/ui.py Normal file
View file

@ -0,0 +1,208 @@
"""
Rich-based TUI:
vrcx v0.2.0 made by engelgardt
Session: 16052026_124500 out/16052026_124500/...
Hosts
# │ Host │ Status │ Step │
1 10.7.160.183 COLLECTING BMC 12/30 | OS 5/15
2 10.7.160.184 DONE BMC 30/30 | OS 15/15
Events
[12:45:01] 10.7.160.183 connected
[12:45:02] 10.7.160.183 inventory.json
"""
from __future__ import annotations
import sys
import threading
from collections import deque
from datetime import datetime
from typing import Any
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from . import __version__
HEADER_LINES = 4
EVENTS_LINES = 14
TBL_OVERHEAD = 6
_STATUS_STYLE = {
"PENDING": "dim",
"CONNECTING": "yellow",
"COLLECTING": "cyan",
"PACKING": "blue",
"DONE": "bold green",
"ERROR": "bold red",
}
def now_s() -> str:
return datetime.now().strftime("%H:%M:%S")
def _empty_row(os_enabled: bool) -> dict[str, Any]:
return {
"status": "PENDING",
"bmc_step": 0, "bmc_total": 0, "bmc_label": "",
"bmc_ok": 0, "bmc_fail": 0,
"os_enabled": os_enabled,
"os_step": 0, "os_total": 0, "os_label": "",
"os_ok": 0, "os_fail": 0,
"error": "", "serial": "",
}
class Ui:
def __init__(self, session_label: str, out_path: str, hosts: list[str],
os_enabled: set[str] | None = None):
self.session_label = session_label
self.out_path = out_path
self.console = Console(log_path=False)
self.events = deque(maxlen=300)
self.events_lock = threading.Lock()
self.refresh_evt = threading.Event()
self._os_enabled = set(os_enabled or [])
self.rows: dict[str, dict[str, Any]] = {
h: _empty_row(h in self._os_enabled) for h in hosts
}
self.rows_lock = threading.Lock()
# --- API for the worker threads ---
def log(self, line: str) -> None:
with self.events_lock:
self.events.append(f"[dim][{now_s()}][/] {line}")
self.refresh_evt.set()
def set_status(self, host: str, status: str) -> None:
with self.rows_lock:
if host in self.rows:
self.rows[host]["status"] = status
self.refresh_evt.set()
def set_progress(self, host: str, side: str, step: int, total: int,
label: str, ok: int | None = None,
fail: int | None = None) -> None:
"""`side` is 'bmc' or 'os'."""
prefix = "bmc_" if side == "bmc" else "os_"
with self.rows_lock:
if host in self.rows:
r = self.rows[host]
r[prefix + "step"] = step
r[prefix + "total"] = total
r[prefix + "label"] = label
if ok is not None: r[prefix + "ok"] = ok
if fail is not None: r[prefix + "fail"] = fail
self.refresh_evt.set()
def set_summary(self, host: str, ok: int, fail: int, serial: str = "",
error: str = "") -> None:
with self.rows_lock:
if host in self.rows:
r = self.rows[host]
# `ok` / `fail` here are aggregate across both sides
r["serial"] = serial
r["error"] = error
self.refresh_evt.set()
# --- rendering ---
def _render_header(self) -> Panel:
body = (
f"[bold cyan]vrcx v{__version__}[/]\n"
f"Session: [bold]{self.session_label}[/] Output: [dim]{self.out_path}[/]\n"
f"[bold yellow]Ctrl+C[/] — abort and delete this session folder."
)
return Panel(body, border_style="cyan")
@staticmethod
def _side_cell(prefix: str, label: str, r: dict) -> str:
total = r[prefix + "total"]
if not total:
return f"{label}"
return f"{label} {r[prefix + 'step']}/{total} {r[prefix + 'label']}"
def _render_table(self) -> Table:
t = Table(expand=True, header_style="bold")
t.add_column("#", style="dim", width=3, justify="right")
t.add_column("Host", width=18)
t.add_column("Status", width=12)
t.add_column("Step", overflow="ellipsis")
t.add_column("OK/Fail", width=10, justify="right")
t.add_column("Serial", width=14)
t.add_column("Note", overflow="ellipsis")
with self.rows_lock:
items = list(self.rows.items())
avail = max(1, self.console.size.height - HEADER_LINES - EVENTS_LINES - TBL_OVERHEAD)
overflow = max(0, len(items) - avail)
if overflow:
items = items[: avail - 1]
for i, (host, r) in enumerate(items, 1):
style = _STATUS_STYLE.get(r["status"], "")
bmc_cell = self._side_cell("bmc_", "BMC", r)
if r["os_enabled"]:
step_cell = f"{bmc_cell} | {self._side_cell('os_', 'OS', r)}"
else:
step_cell = bmc_cell
ok_total = r["bmc_ok"] + r["os_ok"]
fail_total = r["bmc_fail"] + r["os_fail"]
note = r.get("error") or ""
t.add_row(
str(i),
host,
Text(r["status"], style=style),
step_cell,
f"{ok_total}/{fail_total}",
r.get("serial") or "",
note,
)
if overflow:
t.add_row("", "", "", f"(+{overflow} more — enlarge window)", "", "", "")
return t
def _render_events(self) -> Panel:
with self.events_lock:
last = list(self.events)[-20:]
body = "\n".join(last) if last else "[dim](no events yet)[/]"
return Panel(body, title="Events", border_style="dim")
def _render_screen(self) -> Layout:
layout = Layout()
layout.split_column(
Layout(self._render_header(), name="hdr", size=HEADER_LINES),
Layout(Panel(self._render_table(), title="Hosts", border_style="cyan"), name="tbl"),
Layout(self._render_events(), name="evt", size=EVENTS_LINES),
)
return layout
# --- main loop ---
def run(self, stop: threading.Event) -> None:
sys.stdout.write("\x1b[2J\x1b[3J\x1b[H")
sys.stdout.flush()
last_size = self.console.size
with Live(self._render_screen(), auto_refresh=False, console=self.console,
screen=True, redirect_stdout=False, redirect_stderr=False) as live:
live.refresh()
while not stop.is_set():
triggered = self.refresh_evt.wait(timeout=0.5)
if stop.is_set():
break
cur_size = self.console.size
resized = (cur_size != last_size)
if resized:
last_size = cur_size
if triggered:
self.refresh_evt.clear()
if triggered or resized:
live.update(self._render_screen(), refresh=True)

View file

@ -0,0 +1,47 @@
"""Auto-update check on startup. Same pattern as in dhcpsrv/netswitch."""
from __future__ import annotations
import json
import urllib.request
import webbrowser
from rich.console import Console
from rich.prompt import Confirm
from . import __version__, GITHUB_REPO
def _parse_version(s: str) -> tuple[int, int, int]:
try:
s = (s or "").strip().lstrip("v")
parts = [int(x) for x in s.split(".")[:3]]
while len(parts) < 3:
parts.append(0)
return tuple(parts) # type: ignore[return-value]
except Exception:
return (0, 0, 0)
def check_for_update(console: Console) -> None:
try:
url = f"https://api.github.com/repos/{GITHUB_REPO}/releases/latest"
req = urllib.request.Request(url, headers={
"Accept": "application/vnd.github+json",
"User-Agent": f"vrcx/{__version__}",
})
with urllib.request.urlopen(req, timeout=3) as r:
data = json.loads(r.read().decode("utf-8", errors="replace"))
latest = (data.get("tag_name") or "").strip()
page = data.get("html_url") or f"https://github.com/{GITHUB_REPO}/releases/latest"
if _parse_version(latest) > _parse_version(__version__):
console.rule("[bold yellow]Update available")
console.print(f"Current: [dim]v{__version__}[/] Latest: [bold green]{latest}[/]")
try:
if Confirm.ask("Open the download page in your browser?", default=True):
webbrowser.open(page)
except (EOFError, KeyboardInterrupt):
pass
console.print()
except Exception:
pass

13
dev/vrcx-launcher.py Normal file
View file

@ -0,0 +1,13 @@
"""
PyInstaller entry point sits at the repo root and uses an *absolute* import
so the bundled exe doesn't need relative-import resolution at runtime.
For dev work without an install use `python -m vrcx` instead (that path
goes through `src/vrcx/__main__.py` and relative imports work).
"""
from vrcx.app import main
if __name__ == "__main__":
main()