#!/usr/bin/env python3 """ Refresh agents/STATUS.md by scanning all profile.md and group files. Usage: python scripts/refresh_status.py # Write agents/STATUS.md python scripts/refresh_status.py --dry-run # Print to stdout only Requirements: Python 3.8+, standard library only. Parses YAML front matter from: - agents//profile.md (agent_id, name, role, current_groups, performance_log) - agents/groups/grp-*.md (group_id, name, lead, members, mission, active_tasks, status) """ import sys import re import argparse from datetime import date from pathlib import Path # Enforce UTF-8 I/O on Windows (stdout/stderr may default to cp936/gbk) for _stream in (sys.stdout, sys.stderr): try: _stream.reconfigure(encoding='utf-8') except Exception: pass # Metadata integrity checks (W16.4: import from check_agents_metadata as pre-gate) from check_agents_metadata import ( check_yaml_parse, check_rating_range, check_group_refs, check_member_refs, check_duplicate_ids, ) # ============================================================================= # Path resolution # ============================================================================= def _repo_root(): """Project root (parent of this script's directory).""" return Path(__file__).resolve().parent.parent def _agents_dir(): return _repo_root() / 'agents' # ============================================================================= # YAML front matter helpers # ============================================================================= def _read_fm(filepath): """Return front matter text between first pair of '---' lines, or None.""" try: text = filepath.read_text(encoding='utf-8') except (OSError, UnicodeDecodeError) as e: print(f"ERROR: Cannot read {filepath}: {e}", file=sys.stderr) return None m = re.match(r'^---\s*\n(.*?)\n---', text, re.DOTALL) if not m: print(f"WARNING: No YAML front matter in {filepath}", file=sys.stderr) return None return m.group(1) def _fm_scalar(fm, key): """Return value of a top-level 'key: value' line.""" m = re.search(rf'^{key}:\s*(.+)$', fm, re.MULTILINE) return m.group(1).strip() if m else None def _fm_list(fm, key): """Return items of a top-level YAML list (key:\\n - item1\\n - item2).""" section = re.search(rf'^{key}:\s*\n((?: - .+\n?)*)', fm, re.MULTILINE) if not section: return [] items = [] for line in section.group(1).split('\n'): m = re.match(r' - (.+)', line) if m: items.append(m.group(1).strip()) return items def _fm_performance_log(fm): """Parse the performance_log YAML list into [{date,event,rating}, ...].""" entries = [] log_match = re.search(r'^performance_log:', fm, re.MULTILINE) if not log_match: return entries log_section = fm[log_match.start():] # Each entry starts with " - date:" (indent 2, dash) blocks = re.split(r'\n - ', log_section) # blocks[0] = "performance_log:" header; blocks[1:] = "date:...", "event:...", ... for block in blocks[1:]: date_m = re.search(r'^\s*date:\s*(.+)$', block, re.MULTILINE) event_m = re.search(r'^\s*event:\s*["\']?([^"\'\n]+)', block, re.MULTILINE) rating_m = re.search(r'^\s*rating:\s*(\S+)', block, re.MULTILINE) if date_m and event_m and rating_m: entries.append({ 'date': date_m.group(1).strip(), 'event': event_m.group(1).strip(), 'rating': rating_m.group(1).strip(), }) return entries # ============================================================================= # File parsers # ============================================================================= def parse_profile(filepath): """Parse a single profile.md. Returns dict or None.""" fm = _read_fm(filepath) if fm is None: return None agent_id = _fm_scalar(fm, 'agent_id') name = _fm_scalar(fm, 'name') role = _fm_scalar(fm, 'role') if not all([agent_id, name, role]): print(f"WARNING: Missing agent_id/name/role in {filepath}", file=sys.stderr) return None groups = _fm_list(fm, 'current_groups') perf_log = _fm_performance_log(fm) return { 'agent_id': agent_id, 'name': name, 'role': role, 'groups_raw': groups, # raw strings from profile 'perf_log': perf_log, } def parse_group(filepath): """Parse a single grp-*.md. Returns dict or None.""" fm = _read_fm(filepath) if fm is None: return None gid = _fm_scalar(fm, 'group_id') name = _fm_scalar(fm, 'name') lead = _fm_scalar(fm, 'lead') mission = _fm_scalar(fm, 'mission') members = _fm_list(fm, 'members') active_tasks = _fm_list(fm, 'active_tasks') explicit_status = _fm_scalar(fm, 'status') standby = _fm_scalar(fm, 'standby') if not all([gid, name, lead, mission]): print(f"WARNING: Missing required group fields in {filepath}", file=sys.stderr) return None # Determine display status if explicit_status: display_status = explicit_status elif standby and standby.lower() == 'true': display_status = '待命' elif active_tasks: display_status = '执行中' else: display_status = '待命' return { 'group_id': gid, 'name': name, 'lead': lead, 'members': members, 'mission': mission, 'active_tasks': active_tasks, 'status': display_status, } # ============================================================================= # Agent status classification # ============================================================================= def _classify(perf_log): """ Determine agent status and contribution from perf_log. Returns (status, contribution_text, w_number): status -- 'working' | 'idle' contribution -- shortened event description w_number -- extracted W number (e.g. 'W10.2') or '' """ if not perf_log: return 'idle', '', '' last = perf_log[-1] status = 'working' if last['rating'].lower() == 'ongoing' else 'idle' w_match = re.search(r'[Ww](\d+\.\d+|\d+)', last['event']) w_num = f'W{w_match.group(1)}' if w_match else '' desc = _shorten_event(last['event']) return status, desc, w_num def _shorten_event(text, max_len=72): """Compress an event string into a one-line description.""" text = text.strip().strip('"').strip("'") # Preserve W prefix w_prefix = '' w_match = re.match(r'([Ww]\d+\.?\d*)', text) if w_match: w_prefix = w_match.group(1) text = text[w_match.end():] text = re.sub(r'^[::\-–\s]+', '', text) # Strip "完成:" text = re.sub(r'^完成[::]\s*', '', text) # Truncate at sentence-ending period if '。' in text: text = text.split('。')[0] # If too long, break at a natural separator if len(text) > max_len: for sep in [',', ',', ';', ';', '、']: idx = text[:max_len].rfind(sep) if idx > max_len // 2: text = text[:idx] break else: text = text[:max_len - 3] + '...' text = text.strip() if w_prefix: return f'{w_prefix} {text}' return text # ============================================================================= # Group membership supplement # ============================================================================= def _supplement_groups(profiles, groups): """ For each agent, compute the union of profile current_groups and group memberships (so the '当前小组' column is complete even when profiles haven't been synced). Returns a dict: agent_id -> comma-separated group_id string. """ # profile-level groups (strip annotations in parens) profile_groups = {} for p in profiles: cleaned = [] for g in p['groups_raw']: gid = re.sub(r'\s*\(.*\)', '', g).strip() if gid: cleaned.append(gid) profile_groups[p['agent_id']] = set(cleaned) # group-level reverse lookup group_membership = {p['agent_id']: set() for p in profiles} for g in groups: for m in g['members']: if m in group_membership: group_membership[m].add(g['group_id']) # union result = {} for p in profiles: aid = p['agent_id'] union = profile_groups.get(aid, set()) | group_membership.get(aid, set()) result[aid] = ', '.join(sorted(union)) if union else '--' return result # ============================================================================= # Wave aggregation # ============================================================================= def _collect_waves(profiles): """Collect unique W numbers from all profiles. Returns (sorted_list, max).""" seen = set() for p in profiles: for entry in p['perf_log']: for m in re.finditer(r'[Ww](\d+\.\d+|\d+)', entry['event']): seen.add(m.group(0)) def _key(w): parts = re.match(r'[Ww](\d+)\.?(\d*)', w) major = int(parts.group(1)) if parts else 0 minor = int(parts.group(2)) if parts and parts.group(2) else 0 return (major, minor) ordered = sorted(seen, key=_key) return ordered, ordered[-1] if ordered else 'N/A' # ============================================================================= # STATUS.md generator # ============================================================================= def generate_status_md(profiles, groups): """Build the complete STATUS.md content string.""" today = date.today().isoformat() n_agents = len(profiles) n_groups = len(groups) # Supplement group memberships group_col = _supplement_groups(profiles, groups) # Name lookup name_map = {p['agent_id']: p['name'] for p in profiles} lines = [] lines.append('# dstalk 实时编制状态') lines.append('') lines.append(f'> **最后更新**: {today}') lines.append(f'> **数据来源**: 由 `scripts/refresh_status.py` 自动扫描全部 {n_agents} 个 `agents/*/profile.md` + {n_groups} 个 `agents/groups/*.md` 生成。') lines.append('') # ---- Table 1 ---- lines.append(f'## 表 1:员工状态({n_agents} 人)') lines.append('') lines.append('| Agent ID | 姓名 | 角色 | 最近一次贡献 | perf_log | 当前小组 | 状态 |') lines.append('|---|---|---|---|---|---|---|') for p in profiles: status, desc, _w = _classify(p['perf_log']) contrib = desc if desc else '--' cnt = str(len(p['perf_log'])) groups_str = group_col.get(p['agent_id'], '--') status_str = 'working' if status == 'working' else 'idle' lines.append( f'| {p["agent_id"]} | {p["name"]} | {p["role"]} | ' f'{contrib} | {cnt} | {groups_str} | {status_str} |' ) lines.append('') lines.append('> **状态判定规则**: 基于 `performance_log` 最后一条的 `rating`——`ongoing` 视为 `working`,其余 (`A/A+/B/completed/done/success/good`) 视为 `idle`。') lines.append('') # ---- Table 2 ---- lines.append(f'## 表 2:工作组状态({n_groups} 组)') lines.append('') lines.append('| group_id | 名称 | lead | members | mission | active_tasks | 状态 |') lines.append('|---|---|---|---|---|---|---|') for g in groups: lead_name = name_map.get(g['lead'], g['lead']) member_names = ', '.join(name_map.get(m, m) for m in g['members']) tasks = ', '.join(g['active_tasks']) if g['active_tasks'] else '--' lines.append( f'| {g["group_id"]} | {g["name"]} | {lead_name} | {member_names} | ' f'{g["mission"]} | {tasks} | {g["status"]} |' ) lines.append('') lines.append('> **成员列来源**: 以 `agents/groups/*.md` 为准(部分成员 profile 未同步更新 `current_groups`)。') lines.append('') # ---- Wave Progress ---- lines.append('## Wave 进度') lines.append('') all_waves, max_w = _collect_waves(profiles) lines.append(f'**已完成高水位**: {max_w}(基于 {n_agents} 份 profile.md 的 performance_log 聚合)') lines.append('') if all_waves: lines.append(f'**已发现 Wave 编号**: {", ".join(all_waves)}') lines.append('') return '\n'.join(lines) + '\n' # ============================================================================= # Main # ============================================================================= def main(): parser = argparse.ArgumentParser( description='Refresh agents/STATUS.md from profile.md and group files.' ) parser.add_argument( '--dry-run', action='store_true', help='Print output to stdout without writing STATUS.md' ) args = parser.parse_args() agents_dir = _agents_dir() if not agents_dir.is_dir(): print(f'ERROR: agents/ directory not found at {agents_dir}', file=sys.stderr) sys.exit(1) # ---- Metadata integrity pre-check (W16.4) ---- check_suites = [ ('C1', 'YAML parse', check_yaml_parse), ('C2', 'rating range', check_rating_range), ('C3', 'group refs', check_group_refs), ('C4', 'member refs', check_member_refs), ('C5', 'duplicate IDs', check_duplicate_ids), ] all_findings = [] for code, label, fn in check_suites: findings = fn(agents_dir) all_findings.extend((code, label, f) for f in findings) errors = [f for f in all_findings if f[2][0] == 'error'] warnings = [f for f in all_findings if f[2][0] == 'warn'] if errors: for code, label, (sev, filepath, msg) in errors: print(f'[{code}] ERROR: {filepath}: {msg}', file=sys.stderr) for code, label, (sev, filepath, msg) in warnings: print(f'[{code}] WARN: {filepath}: {msg}', file=sys.stderr) print(f'\nMetadata check FAILED: {len(errors)} errors, {len(warnings)} warnings. ' f'Fix errors before generating STATUS.md.', file=sys.stderr) sys.exit(1) elif warnings: for code, label, (sev, filepath, msg) in warnings: print(f'[{code}] WARN: {filepath}: {msg}', file=sys.stderr) print(f'Metadata check: {len(warnings)} warning(s) found. ' f'Proceeding with STATUS.md generation.', file=sys.stderr) else: print('OK: All 5 metadata checks passed.', file=sys.stderr) # ---- Scan profiles ---- profiles = [] for child in sorted(agents_dir.iterdir()): if not child.is_dir() or child.name.startswith('.') or child.name in ('groups', 'audits', 'mailroom'): continue pf = child / 'profile.md' if pf.is_file(): parsed = parse_profile(pf) if parsed: profiles.append(parsed) if not profiles: print('ERROR: No valid profile.md files found', file=sys.stderr) sys.exit(1) # ---- Scan groups ---- groups = [] groups_dir = agents_dir / 'groups' if groups_dir.is_dir(): for gf in sorted(groups_dir.glob('grp-*.md')): parsed = parse_group(gf) if parsed: groups.append(parsed) # ---- Generate ---- output = generate_status_md(profiles, groups) if args.dry_run: print(output) else: status_path = agents_dir / 'STATUS.md' status_path.write_text(output, encoding='utf-8') print(f'Written: {status_path} ({len(profiles)} agents, {len(groups)} groups)', file=sys.stderr) sys.exit(0) if __name__ == '__main__': main()