Я дал claude руки и глаза... Практическое введение в Model Context Protocol (MCP)

D2

Администратор
Регистрация
19 Фев 2025
Сообщения
4,380
Реакции
0

Авторство: hackeryaroslav​

Источник: xss.is​

Интро​

Всем привет, давно не писал статьи. Не было особо идей, да и ничего особого не происходило. Но на днях вышла одна технология, которая зажгла во мне искру - mcp сервера. Пошел активный форс в твиттере и просто не мог ее пропустить. Это то, что именно не хватало.

Что такое Model Context Protocol (MCP)? Теория простыми словами​

Представьте, что у вас есть ИИ – большая языковая модель (вроде ChatGPT или Claude). Она может писать тексты, отвечать на вопросы, генерировать идеи. Но что, если вам нужно, чтобы она сделала что-то конкретное с файлами на вашем компьютере? Или получила доступ к свежей информации из базы данных? А может подправить дизайн в фигме? Сама по себе модель этого сделать не может, она находится в своем цифровом пространстве текста и данных, на которых обучалась.

Вот тут-то и возникает потребность в "руках" и "глазах" для такой модели – способе взаимодействовать с внешними программами и данными. Нужен общий язык, понятный и модели, и этим внешним инструментам. Model Context Protocol как раз и предлагает такой стандартный способ общения.

И так, если упростить, MCP – это набор правил и форматов сообщений, которые позволяют языковой модели (клиенту) общаться с внешней программой или сервисом (сервером или "инструментом"). Думайте об этом как об универсальном переводчике или стандартном разъеме (вроде USB), который позволяет разным устройствам подключаться и понимать друг друга.

Проблемы интеграции ИИ до появления стандартов вроде MCP​

Раньше подключение ИИ к внешним инструментам и данным было муторной задачей из-за нескольких ключевых проблем. Во-первых, отсутствовал единый стандарт, каждая модель или платформа требует свой уникальный способ интеграции. Это вынуждало разработчиков создавать и поддерживать отдельные адаптеры для разных ИИ (например, для ChatGPT, Claude, Llama).

Во-вторых, многие решения были проприетарными и привязанными к конкретной экосистеме, как ранние версии плагинов. Инструмент, созданный для одной платформы, было трудно или невозможно использовать на другой, что ограничивало выбор для разработчиков и пользователей. Кроме того, возникала сложность как для ИИ, так и для разработчиков модели не могли легко "понять" произвольный API без специальных посредников или дообучения, а разработчикам инструментов приходилось не только создавать функционал, но и выяснять, как по-разному "объяснить" его разным моделям, обрабатывать их запросы и форматировать ответы.

Основная идея​

Вот как проходит процесс:

1. Модель спрашивает у сервера: "Здаров, Какие инструменты у тебя есть и что они умеют делать?"
2. Сервер отвечает: "У меня есть инструмент X, он делает вот это. Чтобы его использовать, мне нужны такие-то данные (например, путь к файлу или текст для анализа)". Это описание передается в стандартном формате.
3. Модель такая типо: "Збс, используй инструмент X вот с этими данными: тут данные".
4. Сервер получает запрос, выполняет нужную работу с помощью своего кода (например, запускает скрипт анализа файлов) и отправляет результат обратно модели. А модель нам в удобном формате человеку.

Выглядит очень просто и на самом деле на практике будет также. Тут у нас вытекает несколько плюсов. Первое, все говорят на одном языке. Не нужно писать отдельный код для подключения каждой новой модели к каждому новому инструменту. Второе, это конечно легко добавлять новые инструменты к существующему MCP серверу. Один и тот же инструмент может использоваться разными моделями или приложениями, поддерживающими MCP (в нашем случае это будет Claude 3.7 Sonnet).

В этой статье я представлю два мини сервера, один из которых будет выполнять довольно простые функции, а другой будет нацелен на конкретную задачу, чтобы показать реальную пользу. Первый проект будет представлять собой инструмент сбора всей информации о нашей ОС и последующим анализом ИИ. Да, в реальной жизни он не будет нести практической пользы, тем не менее, я хочу немного разнообразить статью, общей практикой и конкретной. Вторым проект будет анализ логов стиллера, ИИ будет собирать информацию о пользователях для статистики и их схожестей. Давайте начнем?

Но перед этим…​

Давайте подготовим наш ИИ - Claude.

Заходим по ссылке - https://claude.ai/download , качаем десктоп версию, так мы сможем подключать наши MCP сервера в будущем. Как скачали, повторяем шаги за скрином:

AD_4nXcOxbLG0lpcEuPcQtKVa-2-oA4ZMac1YcayrmqZFSX-VmN8NMdjCbSnMDs7pMHNOcnYRGVa08vUQcVmZRnZfp8Co33PpZZIvYmKpBWdB4kK-fflRr9aBcAWtXifTcRxrIVTngYmHA

Во вкладке Developer будет находится наш конфиг json (%APPDATA%\Claude\claude_desktop_config.json на винде и ~/Library/Application Support/Claude/claude_desktop_config.json на MacOS), там мы будем приписывать наши сервера:

AD_4nXfIaEREj1fdgR2Xf9KmpIzfmjKVipGW1IHSrjRnzCDmorh1Qf3RG6W9ej84VB2YOEAM1ZAy6vozfT2aak8WwrG0gj2uPZKy4r-WFEteU9ax72SLoLcgZ1OhEcfKlQ0Mypa_sIPH2g

Я уже успел поиграться поиграться с некоторыми серверами, которые взял из свежего списка тут: https://github.com/punkpeye/awesome-mcp-servers

Отлично, давайте перейдем к коду

Проект 1, анализ информации ОС и анализ ИИ​

Не буду уделять слишком много тут, поймем лишь концепцию работы этой штуки.

Общая логика кода:

По сути, это набор инструментов для получения различной системной информации, такой как загрузка процессора, использование памяти или данные о процессах. Для сбора этих данных активно используется библиотека psuti`. Особенностью является асинхронная работа (async/await), которая предотвращает блокировку программы во время ожидания системных вызовов. Функция handle_call_tool выступает в роли внутреннего диспетчера, направляя запросы к соответствующим функциям (fetch...) для выполнения конкретных задач. Встроенная обработка ошибок (try...except) обеспечивает стабильность, позволяя возвращать информативные сообщения об ошибках вместо аварийного завершения работы при возникновении проблем (например, при доступе к несуществующему процессу). Ничего особенного.

Роль технологии MCP:

Технология MCP, реализованная через библиотеку mcp.server, используется здесь для того, чтобы сделать эти системные инструменты доступными для внешних программ (в нашем случае ИИ). Создается объект Server (system_info_service), который слушает запросы. С помощью декораторов регистрируются две ключевые функции: handle_list_tools сообщает клиенту, какие инструменты доступны и какие параметры (inputSchema) они ожидают, а handle_call_tool принимает от клиента имя конкретного инструмента и его аргументы, выполняет соответствующее действие (вызывая нужную функцию fetch...) и упаковывает результат (или ошибку) для отправки обратно клиенту. Функция main запускает этот сервер, часто используя stdio_server для обмена данными через стандартные потоки ввода/вывода, а server.run управляет всем циклом приема mcp запросов и отправки ответов.

Полный код:

Python: Скопировать в буфер обмена
Код:
import sys
import json
import platform
import psutil
import asyncio
import socket
import time
from datetime import datetime, timedelta
from typing import Any, Optional, Dict, List


from mcp.server import Server, InitializationOptions, NotificationOptions
import mcp.types as types
import mcp.server.stdio


BYTES_TO_GB = 1024**3
BYTES_TO_MB = 1024**2


async def _fetch_cpu_load() -> Dict[str, Any]:
    try:
        load = psutil.cpu_percent(interval=None)
        return {"load_percent": load}
    except Exception as e:
        print(f"Error getting CPU load: {e}", file=sys.stderr)
        return {"error": f"Failed to get CPU load: {e}"}


async def _fetch_memory_usage() -> Dict[str, Any]:
    try:
        mem = psutil.virtual_memory()
        total_gb = round(mem.total / BYTES_TO_GB, 2)
        available_gb = round(mem.available / BYTES_TO_GB, 2)
        used_gb = round(mem.used / BYTES_TO_GB, 2)
        percent_used = round(mem.percent, 1)
        return {
            "total_gb": total_gb,
            "available_gb": available_gb,
            "used_gb": used_gb,
            "percent_used": percent_used,
        }
    except Exception as e:
        print(f"Error getting memory usage: {e}", file=sys.stderr)
        return {"error": f"Failed to get memory usage: {e}"}


async def _fetch_disk_usage(path: Optional[str] = None) -> Dict[str, Any]:
    if path is None:
        path = "/"
        if platform.system() == "Windows":
            try:
                partitions = psutil.disk_partitions()
                for p in partitions:
                    path = p.mountpoint
                    break
                else:
                    path = "C:\\"
            except Exception:
                path = "C:\\"


    try:
        usage = psutil.disk_usage(path)
        total_gb = round(usage.total / BYTES_TO_GB, 2)
        used_gb = round(usage.used / BYTES_TO_GB, 2)
        free_gb = round(usage.free / BYTES_TO_GB, 2)
        percent_used = round(usage.percent, 1)
        return {
            "path": path,
            "total_gb": total_gb,
            "used_gb": used_gb,
            "free_gb": free_gb,
            "percent_used": percent_used,
        }
    except FileNotFoundError:
        print(f"Error getting disk usage: Path not found '{path}'", file=sys.stderr)
        return {"error": f"Path not found: {path}"}
    except Exception as e:
        print(f"Error getting disk usage for '{path}': {e}", file=sys.stderr)
        return {"error": f"Failed to get disk usage for '{path}': {e}"}


async def _fetch_battery_status() -> Dict[str, Any]:
    try:
        battery = psutil.sensors_battery()
        if battery is None:
            return {"present": False, "percent": None, "charging": None, "time_left_minutes": None}


        time_left_min = None
        if battery.secsleft not in (psutil.POWER_TIME_UNKNOWN, psutil.POWER_TIME_UNLIMITED) and isinstance(battery.secsleft, (int, float)):
             time_left_min = round(battery.secsleft / 60)


        return {
            "present": True,
            "percent": round(battery.percent, 1) if battery.percent is not None else None,
            "charging": battery.power_plugged,
            "time_left_minutes": time_left_min,
        }
    except Exception as e:
        print(f"Error getting battery status: {e}", file=sys.stderr)
        return {"error": f"Failed to get battery status: {e}", "present": False}


async def _fetch_network_status() -> Dict[str, Any]:
    primary_interface = None
    primary_ip = None
    connected = False
    try:
        addresses = psutil.net_if_addrs()
        stats = psutil.net_if_stats()
        up_interfaces = {name for name, stat in stats.items() if stat.isup}
        sorted_interfaces = sorted(addresses.keys(), key=lambda x: x in up_interfaces, reverse=True)


        for interface in sorted_interfaces:
            is_up = interface in up_interfaces
            if connected and not is_up: continue


            interface_has_non_loopback_ipv4 = False
            potential_ip = None
            for snic in addresses[interface]:
                if snic.family == socket.AF_INET and snic.address and not snic.address.startswith("127."):
                    interface_has_non_loopback_ipv4 = True
                    potential_ip = snic.address
                    break


            if is_up and interface_has_non_loopback_ipv4:
                primary_interface, primary_ip, connected = interface, potential_ip, True
                break
            elif interface not in stats and interface_has_non_loopback_ipv4 and not connected:
                 primary_interface, primary_ip, connected = interface, potential_ip, True


        return {"connected": connected, "primary_interface": primary_interface, "primary_ip_address": primary_ip}
    except Exception as e:
        print(f"Error getting network status: {e}", file=sys.stderr)
        return {"error": f"Failed to get network status: {e}", "connected": False}


async def _fetch_os_info() -> Dict[str, Any]:
    try:
        return {
            "system": platform.system(),
            "release": platform.release(),
            "version_detail": platform.version(),
            "architecture": platform.machine(),
            "hostname": platform.node()
        }
    except Exception as e:
        print(f"Error getting OS info: {e}", file=sys.stderr)
        return {"error": f"Failed to get OS info: {e}"}


def _format_timedelta(delta: timedelta) -> str:
    parts = []
    total_seconds = int(delta.total_seconds())
    days, remainder = divmod(total_seconds, 86400)
    hours, remainder = divmod(remainder, 3600)
    minutes, seconds = divmod(remainder, 60)


    if days > 0: parts.append(f"{days} day{'s' if days != 1 else ''}")
    if hours > 0: parts.append(f"{hours} hour{'s' if hours != 1 else ''}")
    if minutes > 0: parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
    if not parts:
         parts.append(f"{seconds} second{'s' if seconds != 1 else ''}")


    return ", ".join(parts) if parts else "0 seconds"


async def _fetch_system_uptime() -> Dict[str, Any]:
    try:
        boot_timestamp = psutil.boot_time()
        boot_dt = datetime.fromtimestamp(boot_timestamp)
        now_timestamp = time.time()
        uptime_seconds = now_timestamp - boot_timestamp
        uptime_delta = timedelta(seconds=uptime_seconds)


        return {
            "boot_time_iso": boot_dt.isoformat(),
            "uptime_seconds": round(uptime_seconds),
            "uptime_human": _format_timedelta(uptime_delta)
        }
    except Exception as e:
        print(f"Error getting system uptime: {e}", file=sys.stderr)
        return {"error": f"Failed to get system uptime: {e}"}


async def _fetch_disk_partitions() -> Dict[str, Any]:
    partitions_list = []
    try:
        partitions = psutil.disk_partitions(all=False)
        for p in partitions:
            partitions_list.append({
                "device": p.device,
                "mountpoint": p.mountpoint,
                "fstype": p.fstype,
                "opts": p.opts
            })
        return {"partitions": partitions_list}
    except Exception as e:
        print(f"Error listing disk partitions: {e}", file=sys.stderr)
        return {"error": f"Failed to list disk partitions: {e}"}


async def _fetch_swap_usage() -> Dict[str, Any]:
    try:
        swap = psutil.swap_memory()
        return {
            "total_gb": round(swap.total / BYTES_TO_GB, 2),
            "used_gb": round(swap.used / BYTES_TO_GB, 2),
            "free_gb": round(swap.free / BYTES_TO_GB, 2),
            "percent_used": round(swap.percent, 1),
            "swapped_in_gb": round(swap.sin / BYTES_TO_GB, 2),
            "swapped_out_gb": round(swap.sout / BYTES_TO_GB, 2)
        }
    except Exception as e:
        print(f"Error getting swap usage: {e}", file=sys.stderr)
        return {"error": f"Failed to get swap usage: {e}"}


async def _fetch_cpu_details() -> Dict[str, Any]:
    try:
        freq = psutil.cpu_freq()
        return {
            "model_name": None,
            "physical_cores": psutil.cpu_count(logical=False),
            "logical_processors": psutil.cpu_count(logical=True),
            "current_frequency_mhz": round(freq.current, 1) if freq else None,
            "max_frequency_mhz": round(freq.max, 1) if freq else None
        }
    except Exception as e:
        print(f"Error getting CPU details: {e}", file=sys.stderr)
        return {"error": f"Failed to get CPU details: {e}"}


async def _fetch_process_list(name_filter: Optional[str] = None, limit: int = 50) -> Dict[str, Any]:
    process_list = []
    attrs = ['pid', 'name', 'username', 'cpu_percent', 'memory_percent']
    try:
        for proc in psutil.process_iter(attrs=attrs):
            try:
                pinfo = proc.info
                if name_filter and name_filter.lower() not in pinfo['name'].lower():
                    continue


                pinfo['cpu_percent'] = round(pinfo['cpu_percent'] or 0.0, 1)
                pinfo['memory_percent'] = round(pinfo['memory_percent'] or 0.0, 1)


                process_list.append(pinfo)


                if len(process_list) >= limit:
                    break
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
            except Exception as proc_e:
                 print(f"Error processing PID {proc.pid}: {proc_e}", file=sys.stderr)
                 continue


        return {"processes": process_list, "count": len(process_list)}
    except Exception as e:
        print(f"Error listing processes: {e}", file=sys.stderr)
        return {"error": f"Failed to list processes: {e}"}


async def _fetch_process_details(pid: int) -> Dict[str, Any]:
    try:
        proc = psutil.Process(pid)
        with proc.oneshot():
            mem_info = proc.memory_info()
            open_files_count = None
            connections_count = None


            try: open_files_count = len(proc.open_files())
            except psutil.AccessDenied: open_files_count = "Access Denied"
            except Exception: pass


            try: connections_count = len(proc.connections(kind='inet'))
            except psutil.AccessDenied: connections_count = "Access Denied"
            except Exception: pass


            details = {
                "pid": proc.pid,
                "name": proc.name(),
                "username": proc.username(),
                "status": proc.status(),
                "cpu_percent": round(proc.cpu_percent() or 0.0, 1),
                "memory_percent": round(proc.memory_percent() or 0.0, 1),
                "memory_rss_mb": round(mem_info.rss / BYTES_TO_MB, 2),
                "memory_vms_mb": round(mem_info.vms / BYTES_TO_MB, 2),
                "create_time_iso": datetime.fromtimestamp(proc.create_time()).isoformat(),
                "executable_path": proc.exe(),
                "command_line": proc.cmdline(),
                "num_threads": proc.num_threads(),
                "open_files_count": open_files_count,
                "network_connections_count": connections_count
            }
            return details
    except psutil.NoSuchProcess:
        return {"error": f"Process with PID {pid} not found."}
    except psutil.AccessDenied:
         return {"error": f"Access denied when retrieving details for PID {pid}."}
    except Exception as e:
        print(f"Error getting details for PID {pid}: {e}", file=sys.stderr)
        return {"error": f"Failed to get details for PID {pid}: {e}"}


server = Server("system_info_service")


@server.list_tools()
async def handle_list_tools() -> List[types.Tool]:
    return [
        types.Tool(
            name="get_cpu_load",
            description="Retrieves the current overall CPU utilization percentage.",
            inputSchema={"type": "object", "properties": {}, "required": []},
        ),
        types.Tool(
            name="get_memory_usage",
            description="Retrieves current RAM (memory) usage statistics (total, available, used GB, percent used).",
            inputSchema={"type": "object", "properties": {}, "required": []},
        ),
        types.Tool(
            name="get_disk_usage",
            description="Retrieves disk usage (total, used, free GB, percent used) for a specified path or the root filesystem.",
            inputSchema={
                "type": "object",
                "properties": { "path": { "type": "string", "description": "Optional filesystem path (e.g., '/', 'C:\\'). Defaults to root/system drive." } },
                "required": [],
            },
        ),
        types.Tool(
            name="get_battery_status",
            description="Retrieves current battery status (percentage, charging status, time left). Returns nulls if no battery.",
            inputSchema={"type": "object", "properties": {}, "required": []},
        ),
         types.Tool(
            name="get_network_status",
            description="Retrieves basic network status (connected boolean, primary interface name and IP address).",
            inputSchema={"type": "object", "properties": {}, "required": []},
        ),
        types.Tool(
            name="get_os_info",
            description="Retrieves basic information about the host operating system (name, version, architecture, hostname).",
            inputSchema={"type": "object", "properties": {}, "required": []},
        ),
        types.Tool(
            name="get_system_uptime",
            description="Retrieves the system boot time and calculates the total uptime.",
            inputSchema={"type": "object", "properties": {}, "required": []},
        ),
        types.Tool(
            name="list_disk_partitions",
            description="Lists all mounted disk partitions/volumes (device, mount point, filesystem type, options).",
            inputSchema={"type": "object", "properties": {}, "required": []},
        ),
        types.Tool(
            name="get_swap_usage",
            description="Retrieves swap memory usage statistics (total, used, free GB, percent used, swapped in/out).",
            inputSchema={"type": "object", "properties": {}, "required": []},
        ),
        types.Tool(
            name="get_cpu_details",
            description="Provides details about the CPU (cores, logical processors, frequency). Model name often requires external library.",
            inputSchema={"type": "object", "properties": {}, "required": []},
        ),
        types.Tool(
            name="list_processes",
            description="Lists running processes (PID, name, user, CPU%, Mem%), optionally filtered by name and limited.",
            inputSchema={
                "type": "object",
                "properties": {
                    "name_filter": { "type": "string", "description": "Optional: Only list processes whose name contains this string (case-insensitive)." },
                    "limit": { "type": "integer", "description": "Optional: Maximum number of processes to return.", "default": 50 }
                 },
                 "required": [],
            },
        ),
         types.Tool(
            name="get_process_details",
            description="Retrieves detailed information about a specific process identified by its Process ID (PID).",
            inputSchema={
                "type": "object",
                "properties": { "pid": { "type": "integer", "description": "The Process ID (PID) of the process to inspect." } },
                "required": ["pid"],
            },
        ),
    ]


@server.call_tool()
async def handle_call_tool(
    name: str, arguments: Optional[Dict[str, Any]]
) -> List[types.TextContent]:
    result_data = {}
    arguments = arguments or {}


    try:
        match name:
            case "get_cpu_load":
                result_data = await _fetch_cpu_load()
            case "get_memory_usage":
                result_data = await _fetch_memory_usage()
            case "get_disk_usage":
                path_arg = arguments.get("path")
                result_data = await _fetch_disk_usage(path=path_arg)
            case "get_battery_status":
                result_data = await _fetch_battery_status()
            case "get_network_status":
                 result_data = await _fetch_network_status()
            case "get_os_info":
                result_data = await _fetch_os_info()
            case "get_system_uptime":
                result_data = await _fetch_system_uptime()
            case "list_disk_partitions":
                result_data = await _fetch_disk_partitions()
            case "get_swap_usage":
                result_data = await _fetch_swap_usage()
            case "get_cpu_details":
                result_data = await _fetch_cpu_details()
            case "list_processes":
                name_filter_arg = arguments.get("name_filter")
                limit_arg = arguments.get("limit", 50)
                result_data = await _fetch_process_list(name_filter=name_filter_arg, limit=limit_arg)
            case "get_process_details":
                pid_arg = arguments.get("pid")
                if pid_arg is None or not isinstance(pid_arg, int):
                    raise ValueError("Missing or invalid 'pid' argument for get_process_details")
                result_data = await _fetch_process_details(pid=pid_arg)
            case _:
                raise ValueError(f"Unsupported tool: {name}")


        if isinstance(result_data, dict) and result_data.get("error"):
            pass


        return [
            types.TextContent(type="text", text=json.dumps(result_data, indent=2))
        ]


    except ValueError as ve:
        print(f"Value error calling tool '{name}': {ve}", file=sys.stderr)
        error_info = {"error": f"Invalid input for tool '{name}': {ve}"}
        return [types.TextContent(type="text", text=json.dumps(error_info, indent=2))]
    except Exception as e:
        print(f"Internal server error calling tool '{name}': {e}", file=sys.stderr)
        error_info = {"error": f"Internal server error calling tool '{name}': {e}"}
        return [types.TextContent(type="text", text=json.dumps(error_info, indent=2))]


async def main():
    print("Starting ", file=sys.stderr)


    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        print("✅", file=sys.stderr)


        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="system_info_service",
                server_version="0.2.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )


if __name__ == "__main__":
    asyncio.run(main())


Для того, чтобы подключить его к ИИ, нужно изменить claude_desktop_config.json:

JSON: Скопировать в буфер обмена
Код:
{
  "mcpServers": {
  "system-info-mcp": {
    "command": "python",
    "args": [
      "C:/Users/тут юз/…/test/main.py"
    ],
    "env": {}
  }
}
}

Перезайдите в Claude и в панели промпта должно появится что-то типо этого:

AD_4nXdKFFG4fi6C5b_AuZLlVzQe2KqXbRsYkKAyzMHLpZQMz_muRXkht1B5Ne8EXSxdcmzavqX9Kw_P6aHOzrmnDQoNwyjgAKleXLa7kMdO3mQMAESJiTdjgTvl-e2jNc5kxbT8kQhA2w

На этом все, попробуем потестить:

AD_4nXerkP9_C4-i5_gjmPIbsQFqWi4DaQ_u6pP-AxWGCSTR_es19DMiPQ3um3owxYRcy1njbXMF3nrIG0h_pAm2Q2nrRlnpBzBg5wGSj3_7vFloGi13q7XmodEvq1eeDcDK7mCWbpcE

И дальше идет анализ ИИ, все просто!

„Простота — непростая штука.“ Чарльз Спенсер Чаплин
Нажмите, чтобы раскрыть...

Давайте перейдем ко второму проекту.

Проект 2, анализ логов и вывод общей статистики ИИ​

Общая логика кода:

Основная задача этого кода — анализировать содержимое специфических текстовых файлов стилера(я взял типичные по типу UserInformation.txt, DomainDetects.txt, InstalledBrowsers.txt и т.д.), которые ожидаются в указанных директориях (мы дадим их ИИ). Для каждого типа файла существует своя функция-парсер (parse_user_information, parse_domain_detects и др.), использующая регулярные выражения и строковые операции для извлечения структурированной информации (извлекаем оттуда данные пользователя, ОС, IP-адрес, обнаруженные домены, установленные браузеры и пр.). Некоторые парсеры упрощены и лишь подсчитывают количество записей (например, для установленного ПО и процессов, чтобы не забить ИИ кол-вом данных). Функция analyze_directory_contents обрабатывает одну директорию, находя целевые файлы, вызывая соответствующие парсеры и обрабатывая ошибки чтения/парсинга. Затем функция aggregate_statistics собирает данные со всех обработанных директорий, агрегирует их для получения общей статистики (распределение ОС, стран, браузеров, антивирусов; средние значения; уникальные идентификаторы; активность доменов) и формирует итоговый отчет, включая информацию об ошибках.

Роль технологии MCP:

В данном коде технология сервера(снова через библиотеку mcp.server) служит интерфейсом для запуска этого анализатора логов извне. Создается сервер user_stats_service. Функция handle_list_tools, зарегистрированная через декоратор @server.list_tools(), объявляет единственный доступный инструмент — analyze_user_log_directories. Важно, что она также определяет inputSchema, точно указывая, что этот инструмент ожидает на вход список строк с путями к директориям (directories). Функция handle_call_tool (@server.call_tool()) является точкой входа для выполнения анализа: она получает от клиента имя инструмента и список директорий, проверяет их корректность, затем инициирует основной процесс анализа, последовательно вызывая analyze_directory_contentsдля каждой директории и aggregate_statistics для финального обобщения. Результат агрегации упаковывается в формат types.TextContent (в виде json строки) и отправляется обратно клиенту через MCP. Функция main с stdio_server и server.run обеспечивает запуск сервера и управление циклом обработки входящих MCP запросов и отправки ответов.

Полный код:

Python: Скопировать в буфер обмена
Код:
import os
import sys
import json
import re
from pathlib import Path
from collections import Counter, defaultdict
from typing import Any, List, Dict, Tuple

from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.types as types
from pydantic import AnyUrl
import mcp.server.stdio
import asyncio

TARGET_FILENAMES = [
    "UserInformation.txt",
    "DomainDetects.txt",
    "InstalledBrowsers.txt",
    "InstalledSoftware.txt",
    "ProcessList.txt",
]

def parse_user_information(content: str) -> Dict[str, Any]:
    data = {}
    lines = content.splitlines()
    hardware_section = False
    antivirus_section = False
    gpus = []
    avs = []

    for line in lines:
        line = line.strip()
        if not line:
            hardware_section = False
            antivirus_section = False
            continue

        parts = line.split(':', 1)
        if len(parts) == 2:
            key, value = parts[0].strip(), parts[1].strip()
            if key == "IP": data["ip_address"] = value
            elif key == "UserName": data["username"] = value
            elif key == "MachineName": data["machine_name"] = value
            elif key == "Country": data["country"] = value
            elif key == "Zip Code": data["zip_code"] = value
            elif key == "Location": data["location"] = value
            elif key == "HWID": data["hwid"] = value
            elif key == "Current Language": data["language"] = value
            elif key == "ScreenSize": data["screen_size"] = value
            elif key == "TimeZone": data["timezone"] = value
            elif key == "Operation System": data["os"] = value
            elif key == "Log date": data["log_date"] = value
            elif key == "Available KeyboardLayouts": pass
            elif key == "Hardwares": hardware_section = True
            elif key == "Anti-Viruses": antivirus_section = True
            elif hardware_section and key == "Name":
                if "Total of RAM" in value:
                    match = re.search(r"([\d.]+)\s*Mb", value)
                    if match: data["ram_mb"] = float(match.group(1))
                elif "CPU" in value:
                    data["cpu"] = value
                elif "NVIDIA" in value or "AMD" in value or "Intel(R) UHD Graphics" in value or "Radeon" in value :
                     gpus.append(value)

        elif antivirus_section and line:
            if not line.startswith("ID:") and not re.match(r"^\d+\)", line):
                avs.append(line.split(',')[0].strip())

    if gpus: data["gpus"] = list(dict.fromkeys(gpus))
    if avs: data["anti_viruses"] = list(dict.fromkeys(avs))

    return data

def parse_domain_detects(content: str) -> Dict[str, Dict[str, int]]:
    detected_domains = defaultdict(lambda: defaultdict(int))
    pattern = re.compile(r"\[([^\]]+)\]\s*([\w.-]+)\s*\((\d+)\)")

    in_pdd = False
    in_cdd = False
    current_category_prefix = ""

    for line in content.splitlines():
        line = line.strip()
        if line.startswith("PDD:"):
            in_pdd = True
            in_cdd = False
            current_category_prefix = "PDD_"
            continue
        elif line.startswith("CDD:"):
            in_pdd = False
            in_cdd = True
            current_category_prefix = "CDD_"
            continue

        if in_pdd or in_cdd:
            matches = pattern.findall(line)
            for category, domain, count_str in matches:
                try:
                    count = int(count_str)
                    domain_lower = domain.lower()
                    category_clean = category.upper().replace("0", "O").replace("1", "I")
                    detected_domains[domain_lower][category_clean] += count
                except ValueError:
                    print(f"Warning: Could not parse count in DomainDetects: {line}", file=sys.stderr)

    return {domain: dict(categories) for domain, categories in detected_domains.items()}

def parse_installed_browsers(content: str) -> List[Dict[str, str]]:
    browsers = []
    pattern = re.compile(r"^\d+\)\s*Name:\s*(?P<name>[^,]+?)(?:,\s*Path:\s*(?P<path>[^,]+?))?,\s*Version:\s*(?P<version>.+)$")
    for line in content.splitlines():
        match = pattern.match(line.strip())
        if match:
            browser_info = match.groupdict()
            browsers.append({
                "name": browser_info["name"].strip(),
                "version": browser_info["version"].strip(),
            })
    return browsers

def parse_installed_software(content: str) -> int:
    count = 0
    pattern = re.compile(r"^\d+\)")
    for line in content.splitlines():
        if pattern.match(line.strip()):
            count += 1
    return count

def parse_process_list(content: str) -> int:
    count = 0
    for line in content.splitlines():
        if line.strip().startswith("ID:"):
            count += 1
    return count

def analyze_directory_contents(directory_path: Path) -> Dict[str, Any]:
    results = {"directory": str(directory_path), "found_files": {}}
    errors = []

    for filename in TARGET_FILENAMES:
        file_path = directory_path / filename
        if file_path.is_file():
            try:
                print(f"  Processing: {file_path}", file=sys.stderr)
                content = file_path.read_text(encoding='utf-8', errors='ignore')
                results["found_files"][filename] = True

                if filename == "UserInformation.txt":
                    results.update(parse_user_information(content))
                elif filename == "DomainDetects.txt":
                    results["detected_domains"] = parse_domain_detects(content)
                elif filename == "InstalledBrowsers.txt":
                    results["installed_browsers"] = parse_installed_browsers(content)
                elif filename == "InstalledSoftware.txt":
                    results["installed_software_count"] = parse_installed_software(content)
                elif filename == "ProcessList.txt":
                    results["running_process_count"] = parse_process_list(content)

            except Exception as e:
                error_msg = f"Error parsing {file_path}: {e}"
                print(f"Warning: {error_msg}", file=sys.stderr)
                errors.append(error_msg)
                results["found_files"][filename] = False
        else:
             results["found_files"][filename] = False

    password_file_path = directory_path / "Passwords.txt"
    if password_file_path.exists():
         warning_msg = f"Security Warning: Found 'Passwords.txt' in {directory_path}. This file was NOT processed."
         print(warning_msg, file=sys.stderr)
         errors.append(warning_msg)

    results["parsing_errors"] = errors
    return results

def aggregate_statistics(all_directory_results: List[Dict[str, Any]]) -> Dict[str, Any]:
    if not all_directory_results:
        return {"message": "No directories processed or no relevant files found."}

    summary = {
        "total_directories_scanned": len(all_directory_results),
        "directories_with_data": 0,
        "files_processed_counts": Counter(),
        "parsing_errors_count": 0,
        "security_warnings_count": 0,
    }
    aggregated_data = {
        "os_distribution": Counter(),
        "language_distribution": Counter(),
        "country_distribution": Counter(),
        "browser_distribution": Counter(),
        "browser_versions": defaultdict(Counter),
        "antivirus_distribution": Counter(),
        "domain_activity": defaultdict(lambda: defaultdict(int)),
        "domain_category_totals": Counter(),
        "avg_ram_mb": [],
        "avg_installed_software": [],
        "avg_running_processes": [],
        "unique_hwids": set(),
        "unique_machine_names": set(),
    }
    all_errors = []

    for dir_result in all_directory_results:
        has_data = any(v for k, v in dir_result.items() if k not in ["directory", "found_files", "parsing_errors"])
        if has_data:
            summary["directories_with_data"] += 1

        for filename, found in dir_result.get("found_files", {}).items():
            if found:
                 summary["files_processed_counts"][filename] += 1

        if dir_result.get("os"): aggregated_data["os_distribution"][dir_result["os"]] += 1
        if dir_result.get("language"): aggregated_data["language_distribution"][dir_result["language"]] += 1
        if dir_result.get("country"): aggregated_data["country_distribution"][dir_result["country"]] += 1
        if dir_result.get("hwid"): aggregated_data["unique_hwids"].add(dir_result["hwid"])
        if dir_result.get("machine_name"): aggregated_data["unique_machine_names"].add(dir_result["machine_name"])

        if dir_result.get("ram_mb"): aggregated_data["avg_ram_mb"].append(dir_result["ram_mb"])
        if dir_result.get("installed_software_count") is not None:
            aggregated_data["avg_installed_software"].append(dir_result["installed_software_count"])
        if dir_result.get("running_process_count") is not None:
             aggregated_data["avg_running_processes"].append(dir_result["running_process_count"])

        for av in dir_result.get("anti_viruses", []):
            aggregated_data["antivirus_distribution"][av] += 1

        for browser in dir_result.get("installed_browsers", []):
            name = browser.get("name")
            version = browser.get("version")
            if name:
                aggregated_data["browser_distribution"][name] += 1
                if version:
                    aggregated_data["browser_versions"][name][version] += 1

        for domain, categories in dir_result.get("detected_domains", {}).items():
             for category, count in categories.items():
                 aggregated_data["domain_activity"][domain][category] += count
                 aggregated_data["domain_category_totals"][category] += count

        dir_errors = dir_result.get("parsing_errors", [])
        summary["parsing_errors_count"] += len([e for e in dir_errors if "Security Warning" not in e])
        summary["security_warnings_count"] += len([e for e in dir_errors if "Security Warning" in e])
        if dir_errors:
            all_errors.extend([f"{dir_result['directory']}: {e}" for e in dir_errors])

    final_output = {"summary": summary}
    final_output["os_distribution"] = dict(aggregated_data["os_distribution"].most_common())
    final_output["language_distribution"] = dict(aggregated_data["language_distribution"].most_common())
    final_output["country_distribution"] = dict(aggregated_data["country_distribution"].most_common())
    final_output["antivirus_distribution"] = dict(aggregated_data["antivirus_distribution"].most_common())
    final_output["browser_distribution"] = dict(aggregated_data["browser_distribution"].most_common())
    final_output["browser_versions"] = {name: dict(versions.most_common(5)) for name, versions in aggregated_data["browser_versions"].items()}

    if aggregated_data["avg_ram_mb"]: final_output["average_ram_mb"] = round(sum(aggregated_data["avg_ram_mb"]) / len(aggregated_data["avg_ram_mb"]), 2)
    if aggregated_data["avg_installed_software"]: final_output["average_installed_software"] = round(sum(aggregated_data["avg_installed_software"]) / len(aggregated_data["avg_installed_software"]), 1)
    if aggregated_data["avg_running_processes"]: final_output["average_running_processes"] = round(sum(aggregated_data["avg_running_processes"]) / len(aggregated_data["avg_running_processes"]), 1)

    final_output["total_unique_hwids"] = len(aggregated_data["unique_hwids"])
    final_output["total_unique_machine_names"] = len(aggregated_data["unique_machine_names"])

    top_domains_overall = Counter({domain: sum(cats.values()) for domain, cats in aggregated_data["domain_activity"].items()}).most_common(50)
    final_output["top_domains_overall"] = dict(top_domains_overall)
    final_output["domain_category_totals"] = dict(aggregated_data["domain_category_totals"].most_common())

    final_output["parsing_errors_log"] = all_errors[:100]

    final_output["domain_activity"] = {
        domain: dict(categories) for domain, categories in aggregated_data["domain_activity"].items()
    }

    return final_output

server = Server("user_stats_service")

@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="analyze_user_log_directories",
            description="Analyzes specific user log files (UserInformation.txt, DomainDetects.txt, etc.) within specified directories to extract and aggregate user statistics.",
            inputSchema={
                "type": "object",
                "properties": {
                    "directories": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "A list of absolute or relative directory paths containing the user log files.",
                    }
                },
                "required": ["directories"],
            },
        )
    ]

@server.call_tool()
async def handle_call_tool(
    name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:

    if name == "analyze_user_log_directories":
        if not arguments or "directories" not in arguments:
            raise ValueError("Missing 'directories' argument for analyze_user_log_directories tool.")

        directories = arguments["directories"]
        if not isinstance(directories, list) or not all(isinstance(d, str) for d in directories):
             raise ValueError("'directories' argument must be a list of strings.")

        all_results = []
        print(f"Received request to analyze directories: {directories}", file=sys.stderr)

        for dir_str in directories:
            dir_path = Path(dir_str)
            if not dir_path.is_dir():
                 print(f"Warning: Directory not found or is not a directory: {dir_str}", file=sys.stderr)
                 continue

            print(f"Scanning directory: {dir_path}", file=sys.stderr)
            try:
                dir_contents_results = analyze_directory_contents(dir_path)
                all_results.append(dir_contents_results)
            except PermissionError:
                print(f"Warning: Permission denied while accessing {dir_path}", file=sys.stderr)
                all_results.append({"directory": str(dir_path), "parsing_errors": [f"Permission denied accessing directory."]})
            except Exception as e:
                 print(f"Error processing directory {dir_path}: {e}", file=sys.stderr)
                 all_results.append({"directory": str(dir_path), "parsing_errors": [f"General error processing directory: {e}"]})
                 import traceback
                 traceback.print_exc(file=sys.stderr)

        print(f"Aggregation starting for {len(all_results)} directories.", file=sys.stderr)
        aggregated_stats = aggregate_statistics(all_results)
        print(f"Analysis complete. Returning aggregated statistics.", file=sys.stderr)

        return [
            types.TextContent(
                type="text",
                text=json.dumps(aggregated_stats, indent=2),
            )
        ]
    else:
        raise ValueError(f"Unsupported tool: {name}")

async def main():
    print("Starting", file=sys.stderr)

    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        print("✅", file=sys.stderr)

        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="user_stats_service",
                server_version="0.2.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )

if __name__ == "__main__":
    asyncio.run(main())

Вернемся к json нашего клауди:

JSON: Скопировать в буфер обмена
Код:
{
  "mcpServers": {
  "user-stats-mcp": {
  "command": "python",
  "args": [
    "C:/путь/до/прекрасного/main.py"
  ],
  "env": {}
}
}
}


Пример промпта и результата:

AD_4nXcUhBVDkBJCmYVlu77_PdapcisLnrDjHz4yuF09ldMvavUpu9vS1K_C2mxOzy5XidQ1uZD6cUGr34d4U5Jhd9K2P6OPHxVGg6cxND389Xh_PJujD_OQWJ1NNwA04S7eBaaDHvQVBg


AD_4nXdwkPdiwYMvr0UcDqvZ6g8fBG2z6eQxS9FNLx30fyxUlyeL7OR043Gl27AQ77l35wUj9laJPbCJD4BK6Q_et8jfI0zDiT4XWnxDjaEbxOFb7oT5oZRAhVGnF3Ch3Hy5Toi5C3_jgg

Согласен, такое читать не очень удобно, попросим в виде таблицы:

AD_4nXcuftqL182JzaweoxjNcgoYP981YszthFux2ghZiqQCFz0NKUm4caERxTfJvrM0f0RKAPOf0Y21R-dy0U3XuARR72YylQYuymyer9S4Xnf_tX3hD19WTdkPxUPy-WIq-OChU9GyoQ

Круто, не так ли? Чуваки с гитхаба уже фигачат целые сцены в блендере (
https://twitter.com/x/status/1899460492999184534
), не трогая клавиатуру совсем, реверсят код, управляют дбшками, полностью кодят проекты и так далее. Главное придумать хорошее применение.

Остановись и прочитай! Безопасность при использовании MCP​

Но перед тем как использовать готовые, нужно обратить внимание и на их код тоже. Чуваки из твиттера уже подсуетились и выявили несколько типов атак. Расскажу тут кратко. Из-за атак вроде "Отравления Инструмента", когда в описания безобидных инструментов внедряются скрытые команды для кражи данных (ключей SSH, учетных данных) незаметно для пользователя, видящего лишь упрощенную версию. Атака Rug Pull позволяет злоумышленнику изменить описание инструмента уже после его одобрения пользователем, добавив вредоносные инструкции. Помимо этого, в среде с несколькими серверами атака Shadowing позволяет вредоносному серверу через свои описания изменять поведение ИИ при работе с доверенными инструментами, например, перенаправляя электронные письма атакующему. Комбинация этих методов делает возможными скрытые атаки и манипуляции даже через доверенные инструменты.

Заключение

Итак, в этой статье мы разобрали относительно новую технологию - mcp сервера. Мы рассмотрели Model Context Protocol как способ соединения языковых моделей с внешними программами. На двух примерах - анализаторе логов и сервисе системной информации - мы увидели, как это работает на практике. Я надеюсь что теперь вы более осведомлены о новых возможностях и найдете им полезное применение. Спасибо за ваше внимание)
 
Сверху Снизу