Wave 8: tech-debt audits, core unit tests, CLI pipe input (W11.1-W11.7)
Some checks failed
CI / Determine matrix (push) Has been cancelled
CI / ${{ matrix.os }} / ${{ matrix.build_type }} (push) Has been cancelled

- W11.1 context_plugin audit (architect-huang): 3 findings on ABI exception
  safety, strdup null checks, dead g_max_tokens variable. Rating: B.
- W11.2 config audit (engineer-chen): identified 74-line TOML parser
  duplication between config_plugin and config_store, dual-store data
  isolation, dangling c_str() risk. Rating: C.
- W11.3 event_bus + service_registry unit tests (qa-liu): 12 cases total,
  ctest coverage 2 -> 4 targets, 100% pass.
- W11.4 CLI stdin pipe mode (engineer-zhao): isatty detection, single-shot
  inference path with exit codes 0/1/2/3.
- W11.6 scripts/refresh_status.py (engineer-li): 431-line generator that
  scans 16 profile.md + 5 group.md to regenerate STATUS.md.
- W11.7 destructive testing (qa-xu): 10 input scenarios PASS, found bin
  copy mismatch (BUG-1) plus 3 minor UX bugs for follow-up.

Verified: cmake build 0 error, ctest 4/4 pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2026-05-27 09:06:25 +08:00
parent 004a81db96
commit bb2e8c0220
14 changed files with 1122 additions and 18 deletions

431
scripts/refresh_status.py Normal file
View File

@@ -0,0 +1,431 @@
#!/usr/bin/env python3
"""
Refresh agents/STATUS.md by scanning all profile.md and group files.
Usage:
python scripts/refresh_status.py # Write agents/STATUS.md
python scripts/refresh_status.py --dry-run # Print to stdout only
Requirements: Python 3.8+, standard library only.
Parses YAML front matter from:
- agents/<id>/profile.md (agent_id, name, role, current_groups, performance_log)
- agents/groups/grp-*.md (group_id, name, lead, members, mission, active_tasks, status)
"""
import sys
import re
import argparse
from datetime import date
from pathlib import Path
# Enforce UTF-8 I/O on Windows (stdout/stderr may default to cp936/gbk)
for _stream in (sys.stdout, sys.stderr):
try:
_stream.reconfigure(encoding='utf-8')
except Exception:
pass
# =============================================================================
# Path resolution
# =============================================================================
def _repo_root():
"""Project root (parent of this script's directory)."""
return Path(__file__).resolve().parent.parent
def _agents_dir():
return _repo_root() / 'agents'
# =============================================================================
# YAML front matter helpers
# =============================================================================
def _read_fm(filepath):
"""Return front matter text between first pair of '---' lines, or None."""
try:
text = filepath.read_text(encoding='utf-8')
except (OSError, UnicodeDecodeError) as e:
print(f"ERROR: Cannot read {filepath}: {e}", file=sys.stderr)
return None
m = re.match(r'^---\s*\n(.*?)\n---', text, re.DOTALL)
if not m:
print(f"WARNING: No YAML front matter in {filepath}", file=sys.stderr)
return None
return m.group(1)
def _fm_scalar(fm, key):
"""Return value of a top-level 'key: value' line."""
m = re.search(rf'^{key}:\s*(.+)$', fm, re.MULTILINE)
return m.group(1).strip() if m else None
def _fm_list(fm, key):
"""Return items of a top-level YAML list (key:\\n - item1\\n - item2)."""
section = re.search(rf'^{key}:\s*\n((?: - .+\n?)*)', fm, re.MULTILINE)
if not section:
return []
items = []
for line in section.group(1).split('\n'):
m = re.match(r' - (.+)', line)
if m:
items.append(m.group(1).strip())
return items
def _fm_performance_log(fm):
"""Parse the performance_log YAML list into [{date,event,rating}, ...]."""
entries = []
log_match = re.search(r'^performance_log:', fm, re.MULTILINE)
if not log_match:
return entries
log_section = fm[log_match.start():]
# Each entry starts with " - date:" (indent 2, dash)
blocks = re.split(r'\n - ', log_section)
# blocks[0] = "performance_log:" header; blocks[1:] = "date:...", "event:...", ...
for block in blocks[1:]:
date_m = re.search(r'^\s*date:\s*(.+)$', block, re.MULTILINE)
event_m = re.search(r'^\s*event:\s*["\']?([^"\'\n]+)', block, re.MULTILINE)
rating_m = re.search(r'^\s*rating:\s*(\S+)', block, re.MULTILINE)
if date_m and event_m and rating_m:
entries.append({
'date': date_m.group(1).strip(),
'event': event_m.group(1).strip(),
'rating': rating_m.group(1).strip(),
})
return entries
# =============================================================================
# File parsers
# =============================================================================
def parse_profile(filepath):
"""Parse a single profile.md. Returns dict or None."""
fm = _read_fm(filepath)
if fm is None:
return None
agent_id = _fm_scalar(fm, 'agent_id')
name = _fm_scalar(fm, 'name')
role = _fm_scalar(fm, 'role')
if not all([agent_id, name, role]):
print(f"WARNING: Missing agent_id/name/role in {filepath}", file=sys.stderr)
return None
groups = _fm_list(fm, 'current_groups')
perf_log = _fm_performance_log(fm)
return {
'agent_id': agent_id,
'name': name,
'role': role,
'groups_raw': groups, # raw strings from profile
'perf_log': perf_log,
}
def parse_group(filepath):
"""Parse a single grp-*.md. Returns dict or None."""
fm = _read_fm(filepath)
if fm is None:
return None
gid = _fm_scalar(fm, 'group_id')
name = _fm_scalar(fm, 'name')
lead = _fm_scalar(fm, 'lead')
mission = _fm_scalar(fm, 'mission')
members = _fm_list(fm, 'members')
active_tasks = _fm_list(fm, 'active_tasks')
explicit_status = _fm_scalar(fm, 'status')
standby = _fm_scalar(fm, 'standby')
if not all([gid, name, lead, mission]):
print(f"WARNING: Missing required group fields in {filepath}", file=sys.stderr)
return None
# Determine display status
if explicit_status:
display_status = explicit_status
elif standby and standby.lower() == 'true':
display_status = '待命'
elif active_tasks:
display_status = '执行中'
else:
display_status = '待命'
return {
'group_id': gid,
'name': name,
'lead': lead,
'members': members,
'mission': mission,
'active_tasks': active_tasks,
'status': display_status,
}
# =============================================================================
# Agent status classification
# =============================================================================
def _classify(perf_log):
"""
Determine agent status and contribution from perf_log.
Returns (status, contribution_text, w_number):
status -- 'working' | 'idle'
contribution -- shortened event description
w_number -- extracted W number (e.g. 'W10.2') or ''
"""
if not perf_log:
return 'idle', '', ''
last = perf_log[-1]
status = 'working' if last['rating'].lower() == 'ongoing' else 'idle'
w_match = re.search(r'[Ww](\d+\.\d+|\d+)', last['event'])
w_num = f'W{w_match.group(1)}' if w_match else ''
desc = _shorten_event(last['event'])
return status, desc, w_num
def _shorten_event(text, max_len=72):
"""Compress an event string into a one-line description."""
text = text.strip().strip('"').strip("'")
# Preserve W prefix
w_prefix = ''
w_match = re.match(r'([Ww]\d+\.?\d*)', text)
if w_match:
w_prefix = w_match.group(1)
text = text[w_match.end():]
text = re.sub(r'^[:\-\s]+', '', text)
# Strip "完成:"
text = re.sub(r'^完成[:]\s*', '', text)
# Truncate at sentence-ending period
if '' in text:
text = text.split('')[0]
# If too long, break at a natural separator
if len(text) > max_len:
for sep in ['', ',', '', ';', '']:
idx = text[:max_len].rfind(sep)
if idx > max_len // 2:
text = text[:idx]
break
else:
text = text[:max_len - 3] + '...'
text = text.strip()
if w_prefix:
return f'{w_prefix} {text}'
return text
# =============================================================================
# Group membership supplement
# =============================================================================
def _supplement_groups(profiles, groups):
"""
For each agent, compute the union of profile current_groups and group
memberships (so the '当前小组' column is complete even when profiles
haven't been synced).
Returns a dict: agent_id -> comma-separated group_id string.
"""
# profile-level groups (strip annotations in parens)
profile_groups = {}
for p in profiles:
cleaned = []
for g in p['groups_raw']:
gid = re.sub(r'\s*\(.*\)', '', g).strip()
if gid:
cleaned.append(gid)
profile_groups[p['agent_id']] = set(cleaned)
# group-level reverse lookup
group_membership = {p['agent_id']: set() for p in profiles}
for g in groups:
for m in g['members']:
if m in group_membership:
group_membership[m].add(g['group_id'])
# union
result = {}
for p in profiles:
aid = p['agent_id']
union = profile_groups.get(aid, set()) | group_membership.get(aid, set())
result[aid] = ', '.join(sorted(union)) if union else '--'
return result
# =============================================================================
# Wave aggregation
# =============================================================================
def _collect_waves(profiles):
"""Collect unique W numbers from all profiles. Returns (sorted_list, max)."""
seen = set()
for p in profiles:
for entry in p['perf_log']:
for m in re.finditer(r'[Ww](\d+\.\d+|\d+)', entry['event']):
seen.add(m.group(0))
def _key(w):
parts = re.match(r'[Ww](\d+)\.?(\d*)', w)
major = int(parts.group(1)) if parts else 0
minor = int(parts.group(2)) if parts and parts.group(2) else 0
return (major, minor)
ordered = sorted(seen, key=_key)
return ordered, ordered[-1] if ordered else 'N/A'
# =============================================================================
# STATUS.md generator
# =============================================================================
def generate_status_md(profiles, groups):
"""Build the complete STATUS.md content string."""
today = date.today().isoformat()
n_agents = len(profiles)
n_groups = len(groups)
# Supplement group memberships
group_col = _supplement_groups(profiles, groups)
# Name lookup
name_map = {p['agent_id']: p['name'] for p in profiles}
lines = []
lines.append('# dstalk 实时编制状态')
lines.append('')
lines.append(f'> **最后更新**: {today}')
lines.append(f'> **数据来源**: 由 `scripts/refresh_status.py` 自动扫描全部 {n_agents} 个 `agents/*/profile.md` + {n_groups} 个 `agents/groups/*.md` 生成。')
lines.append('')
# ---- Table 1 ----
lines.append(f'## 表 1员工状态{n_agents} 人)')
lines.append('')
lines.append('| Agent ID | 姓名 | 角色 | 最近一次贡献 | perf_log | 当前小组 | 状态 |')
lines.append('|---|---|---|---|---|---|---|')
for p in profiles:
status, desc, _w = _classify(p['perf_log'])
contrib = desc if desc else '--'
cnt = str(len(p['perf_log']))
groups_str = group_col.get(p['agent_id'], '--')
status_str = 'working' if status == 'working' else 'idle'
lines.append(
f'| {p["agent_id"]} | {p["name"]} | {p["role"]} | '
f'{contrib} | {cnt} | {groups_str} | {status_str} |'
)
lines.append('')
lines.append('> **状态判定规则**: 基于 `performance_log` 最后一条的 `rating`——`ongoing` 视为 `working`,其余 (`A/A+/B/completed/done/success/good`) 视为 `idle`。')
lines.append('')
# ---- Table 2 ----
lines.append(f'## 表 2工作组状态{n_groups} 组)')
lines.append('')
lines.append('| group_id | 名称 | lead | members | mission | active_tasks | 状态 |')
lines.append('|---|---|---|---|---|---|---|')
for g in groups:
lead_name = name_map.get(g['lead'], g['lead'])
member_names = ', '.join(name_map.get(m, m) for m in g['members'])
tasks = ', '.join(g['active_tasks']) if g['active_tasks'] else '--'
lines.append(
f'| {g["group_id"]} | {g["name"]} | {lead_name} | {member_names} | '
f'{g["mission"]} | {tasks} | {g["status"]} |'
)
lines.append('')
lines.append('> **成员列来源**: 以 `agents/groups/*.md` 为准(部分成员 profile 未同步更新 `current_groups`)。')
lines.append('')
# ---- Wave Progress ----
lines.append('## Wave 进度')
lines.append('')
all_waves, max_w = _collect_waves(profiles)
lines.append(f'**已完成高水位**: {max_w}(基于 {n_agents} 份 profile.md 的 performance_log 聚合)')
lines.append('')
if all_waves:
lines.append(f'**已发现 Wave 编号**: {", ".join(all_waves)}')
lines.append('')
return '\n'.join(lines) + '\n'
# =============================================================================
# Main
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description='Refresh agents/STATUS.md from profile.md and group files.'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Print output to stdout without writing STATUS.md'
)
args = parser.parse_args()
agents_dir = _agents_dir()
if not agents_dir.is_dir():
print(f'ERROR: agents/ directory not found at {agents_dir}', file=sys.stderr)
sys.exit(1)
# ---- Scan profiles ----
profiles = []
for child in sorted(agents_dir.iterdir()):
if not child.is_dir() or child.name.startswith('.') or child.name == 'groups':
continue
pf = child / 'profile.md'
if pf.is_file():
parsed = parse_profile(pf)
if parsed:
profiles.append(parsed)
if not profiles:
print('ERROR: No valid profile.md files found', file=sys.stderr)
sys.exit(1)
# ---- Scan groups ----
groups = []
groups_dir = agents_dir / 'groups'
if groups_dir.is_dir():
for gf in sorted(groups_dir.glob('grp-*.md')):
parsed = parse_group(gf)
if parsed:
groups.append(parsed)
# ---- Generate ----
output = generate_status_md(profiles, groups)
if args.dry_run:
print(output)
else:
status_path = agents_dir / 'STATUS.md'
status_path.write_text(output, encoding='utf-8')
print(f'Written: {status_path} ({len(profiles)} agents, {len(groups)} groups)',
file=sys.stderr)
sys.exit(0)
if __name__ == '__main__':
main()