Active directory dumper или пишем свой ADFind на питоне по заветам Дядюшки Боба

D2

Администратор
Регистрация
19 Фев 2025
Сообщения
4,380
Реакции
0
Автор Snow
Источник https://xss.is
ПРОДОЛЖЕНИЕ ЗДЕСЬ

Дядюшка Боб, как он сам себя называет в своих трудах, – это Роберт Мартин, автор таких мировых бестселлеров как «Чистый код», «Чистая архитектура, искусство разработки программного
обеспечения » и многих других. Настоятельно рекомендую к изучению.

По роду своей деятельности часто приходится собирать информацию о сети. Паблик продукты, даже общепризнанные, для этих целей не всегда подходят поскольку их функционал недостаточен или избыточен. Ну и про качество кода я уже неоднократно тут говорил. Видимо пришло время подтвердить слова действием. Я не буду приводить весь код проекта. Цель данной статьи –показать использование принципов построения архитектуры приложения.
1.1.1 Что мы хотим получить
Наиболее часто приходится собирать информацию о:
• пользователях, в частности мы хотим знать:
– общее количество пользователей;
– список пользовательских групп;
– список администраторов домена;
– список Ынтерпрайзных админов;
– до кучи еще хотелось бы знать к каким группам принадлежит пользователь, под которым мы авторизованы в текущий момент времени.

• компьютерах, а именно
– общее количество активных хостов в домене;
– список и количество серверов;
– список и количество рабочих станций;
– список и количество контроллеров домена;
– статистику по установленным операционным системам – очень полезная штука для поиска уязвимостей, но об этом в другой раз (о поиске уязвимостей).
• список общих сетевых ресурсов – так называемых «шар», в том числе понимать какие права для обращения к ним имеются у текущего пользователя (чтение, запись). А еще мы хотим делать это в несколько потоков и быстро, или наоборот – будем вести себя очень тихо и опрашивать по одному хосту за произвольный период времени.

А еще мы хотим чтобы результаты обработки полученной информации были разложены по полочкам. Юзеры к юзерам, админы к админам, шары к шарам, рабочие станции, сервера и контроллеры домена тоже должны быть на своих местах.

1.2 Инструменты необходимые для решения задачи.

Было бы наивно полагать, что подобная задача не возникала ранее. Соответственно и методы ее решения должны быть в открытом доступе. Несколько минут гугления и вуаля – найден инструмент решающий подобную задачу. https://github.com/SecuProject/ADenum
Смотрим видео - ну вроде похоже на правду. Смотрим в код. Плачем. Еще раз открываем. Опять рыдаем крокодильими кровавыми слезами. Для перечисления недостатков этого проекта понадобится отдельная статья, поэтому мы их примем как
данность и постараемся не повторить чужих ошибок. По мере изложения я, тем или иным способом, буду указывать на них.
Ниже будут рассмотрены основные подходы к решению задачи.
1.2.1 LDAP
Открываем код проекта и видим, что получение информации о сети основано на выполнении LDAP запросов.
Открываем вики и понимаем, что LDAP – это (англ. Lightweight Directory Access Protocol –
«легковесный протокол доступа к каталогам»),
протокол прикладного уровня для доступа к службе каталогов X.500, разработанный IETF как облегчённый вариант разработанного ITU-T протокола DAP. LDAP – относительно простой протокол, использующий TCP/IP и позволяющий производить операции аутентификации (bind), поиска (search) и сравнения (compare), а также операции добавления, изменения или удаления записей. Обычно LDAP – сервер принимает входящие соединения на порт 389 по протоколам TCP или UDP. Для LDAP-сеансов, инкапсулированных в SSL, обычно используется порт 636. Опять смотрим в код, вики, тщательно гуглим и понимаем, что для выполнения LDAP запросов нам понадобится:

1. установить соединение со службой каталогов;
2. сформировать запрос используя такие магические штуковины, как ObjectToSearch и AttributeToSearch. Почему магические? Ну а как еще неподготовленный пользователь может назвать строку вида:
Bash: Скопировать в буфер обмена
object_to_search = ’(&(objectCategory=person)(objectClass=user) (userAccountControl:1.2.840.113556.1.4.803:=1048576))’
3. выполнить запрос и получить его результаты;
4. обработать результаты выполнения запроса, отбросив не валидные записи.

1.2.2 Обработка аргументов командной строки.
В коде проекта ADEnum реализована такая обработка. Только вы сначала ее там найдите. А потом попробуйте без поллитры разобраться в ее устройстве. Удачи.
Значит нам понадобится свой собственный парсер аргументов командной строки. Естественно, что писать его с нуля никто не собирается, но его код должен быть вынесен в отдельный модуль.

1.2.3 Logger
Логи - наше всё. Без них существование более-менее серьезного продукта невозможно. Поэтому логгер у нас тоже будет. Свой или допилим чужой – не важно, но он тоже будет жить в отдельном модуле, а еще научим его в многопоточность и записи в файл.

1.2.4 Модуль обработки результатов
В настоящее время, на этапе проектирования, я понятия не имею как должен выглядеть результат, поэтому воспользуемся одним из советов дядюшки Боба и отложим решение этого вопроса на самый последний момент. Мы объявим интерфейсы, но реализацию оставим пустой. То же самое касается и остальных модулей, за исключением LDAP. Там, как раз, всё просто и понятно.

2 Проектирование архитектуры.
Грамотно спроектированная архитектура экономит сотни человекочасов и, как минимум, десятки тысяч вечнозеленых бумажек. Поэтому придется потратить некоторое время на старте, чтобы потом безболезненно добавлять функционал.

Итак, начнем, конечно же, с LDAP. Поскольку он будет одной из ключевых составляющих ядра системы.

2.1 Проектирование архитектуры модуля обработки LDAP запросов

Как уже говорилось выше, у нас будет 3 подмодуля здесь. А именно:
• модуль управления соединением;
• модуль выполнения запросов;
• модуль сбора и обработки результатов;
Поехали.

2.1.1 LDAPConnection - модуль обеспечивающий соединение по LDAP и управление им.

Давайте разбираться что нам понадобится для подключения к службе каталогов. Опять, смотрим в код проекта ADEnum, гуглим, изучаем. В итоге приходим к выводу, что нам понадобятся следующие параметры:
• domainName – имя домена, необходимо для формирования строки вида username + ’@’ + domain_name, которая будет использоваться при установки соединения в качестве параметра;
• username – имя пользователя;
• password – пароль;
• ipAddress – адрес контроллера домена, поскольку служба каталогов живет там;
• useLdapWithSsl – флаг, на случай если мы решим использовать SSL соединение для подключения;

• baseDn – Distinguished Name – уникальное представление записи имени домена, ее мы получим самостоятельно. Ниже будет пример кода.;
• LdapVersion – версия протокола LDAP;

Первая мысль - передавать все эти параметры в конструктор. Идея хорошая, но тогда мы получим много веселья связанного с тем, что на данном этапе мы не знаем ничего о точном количестве параметров и их типах. Кроме того, желательно предусмотреть их возможное изменение в будущем. Мы же хотим сделать качественный продукт, а не одноразовую поделку. Поэтому все параметры мы завернем в класс и будем передавать уже экземпляр класса. А с учетом того, что мы только в начале пути, и с подобной ситуацией столкнемся неоднократно, то сюда прямо просится создание базового класса с минимальным набором полей и методов, которые будут реализованы в дочерних. Если бы разработка велась на С-подобном языке, то можно было бы смело передавать
указатель на базовый класс и не греть голову. Но у нас питон. Он, конечно, умеет в ООП, но придется несколько извратиться. Подробности ниже.

Преимущества такого решения:
• мы не зависим от количества и типов параметров;
• можно вынести единый функционал для все конфигов в абстрактные методы базового класса, и реализовать их в наследниках. В частности нам точно понадобится метод валидации конфига и метод вывода на экран;
• возможность версионирования и поддержка обратной совместимости.

Недостатки – придется писать чуть больше кода и чуть тщательнее следить за реализацией.

Подробное описание модуля конфигурации смотри ниже.

А сейчас вернемся к нашим баранам. То бишь к LDAP. Здесь и далее предполагаем, что конфиги и логгер у нас уже реализованы. Итак. Давайте попробуем разобраться с функциональностью класса, обеспечивающего соединение. Логично предположить, что это будут примерно следующие методы:
• инициализация параметров соединения;
• установка соединения;
• разрыв соединения;
• обработка ошибок.
Начнем с описания ошибок. Могут возникнуть следующие проблемы:
• Ошибка авторизации – нам дали неверные креды. Авторизоваться не удалось. Беда. Завершаем работу.
• Недоступен сервер – лежит сервер по непонятной нам причине и мы ничего сделать не можем, разве что попросить админа .
• Истекло время ожидания ответа сервера – тут очевидно.
• Другие ошибки – мы про них ничего не знаем, но предполагаем, что они могут быть. Поэтому для описания типов ошибок будем использовать перечисление.

Python: Скопировать в буфер обмена
Код:
class LdapLoginError(enum.Enum):
    NO_ERROR = 0
    INVALID_CREDENTIALS = 1
    SERVER_DOWN = 2
    OTHER_ERROR = 3
    TIMEOUT_ERROR = 4

Установка соединения.
Python: Скопировать в буфер обмена
Код:
    def is_ldap_connection_established(self) -> bool:
        DumpLogger.print_title(f'{self.title} is_ldap_connection_established')
        if self.ldap_config.ip_address is None:
            DumpLogger.print_warning('ip address not specified, a default  value will be used')
            ip_address = NetworkUtils.ger_current_host_ip_address(self.ldap_config.domain_name)
            if ip_address is None:
                DumpLogger.print_error('Unable to resolve a domain name:', self.ldap_config.domain_name)
                return False
            self.ldap_config.ip_address = ip_address
        DumpLogger.highlight_dark_blue("current ip:\t" + self.ldap_config.ip_address)

        self.__setup_ldap_connection()
        if self.connection is None:
            DumpLogger.print_error_message('invalid LDAP connection')
            return False

        return self.__is_success_ldap_login(self.ldap_config.domain_name, self.ldap_config.password,
                                            self.ldap_config.username)

Отключение.
Python: Скопировать в буфер обмена
Код:
    def disconnect(self) -> None:
        self.connection.unbind()

Конструктор класса выглядит так:
Python: Скопировать в буфер обмена
Код:
class LdapConnection:
    def __init__(self, ldap_config: LdapConfig):
        self.ldap_config = ldap_config
        self.ldap_version = VERSION3
        self.connection = ldap.initialize('ldaps://' + self.ldap_config.ip_address)
        self.title = 'LdapConnection'
А конфиг ,который мы передаем в качестве параметра выглядит так.

Python: Скопировать в буфер обмена
Код:
from _ldap import VERSION3

from source.core.ldap.network_utils import NetworkUtils
from source.utils.app_config.configs.app_config import AppConfig
from source.utils.console.console_utils import DumpLogger
from source.utils.network.network_helper import NetworkHelper


class LdapConfig(AppConfig):
    def __init__(self, domain_name: str, username: str, password: str, ip_address: str, use_ldap_with_ssl: bool,
                 base_dn: str):
        super().__init__()
        self.domain_name = domain_name
        self.username = username
        self.password = password
        self.ip_address = ip_address
        self.use_ldap_with_ssl = use_ldap_with_ssl
        self.base_dn = base_dn
        self.smb_client_dialect = None
        self.ldap_version = VERSION3

    def print(self):
        DumpLogger.print_title('LDAP configuration')
        DumpLogger.print_param('domain name', self.domain_name)
        DumpLogger.print_param('username', self.username)
        DumpLogger.print_param('password', self.password)
        DumpLogger.print_param('ip address', self.ip_address)
        DumpLogger.print_param('base_dn', self.base_dn)

    def is_valid(self) -> bool:
        if not NetworkHelper.is_valid_ip_address(self.ip_address):
            DumpLogger.print_error('LDAP config. Invalid ip address', self.ip_address)
            return False
        if NetworkUtils.get_base_dn(self.domain_name) is None:
            DumpLogger.print_error('LDAP config. Invalid domain name', self.domain_name)
            return False
        return True

    def help(self):
        pass

На этом можно считать предварительную реализацию модуля
управления соединением завершенной.

2.1.2 LDAP query executor – модуль выполнения запросов.

Итак, мы научились устанавливать соединение и это, само по себе, уже замечательно. Теперь нам нужно научиться выполнять запросы. Для этого нам понадобится активное соединение и уникальное представление имени домена внутри LDAP, дальше я его буду звать
Base DN. В итоге конфиг будет у нас следующий:

Python: Скопировать в буфер обмена
Код:
class LdapQueryExecutorConfig(AppConfig):
    def __init__(self, ldap_connection: LdapConnection, base_dn: str):
        super().__init__()
        self.ldap_connection = ldap_connection
        self.base_dn = base_dn

    def print(self):
        print("LdapQueryExecutor configuration:")
        self.ldap_connection.ldap_config.print()
        pass

    def is_valid(self):
        return self.ldap_connection.ldap_config.is_valid()

    def help(self):
        pass


Все запросы выполняются через LdapConnection и задача LdapQueryExecutor’a передать ему параметры и отправить полученные результаты дальше. Не мудрствуя лукаво просто подсмотрим реализацию в ADEnum и у нас получится примерно следующее.

Python: Скопировать в буфер обмена
Код:
class LdapQueryExecutor:
    def __init__(self, config: LdapQueryExecutorConfig):
        self.ldap_connector = config.ldap_connection
        self.base_dn = config.base_dn

    def search_server_ldap(self, object_to_search: str, attributes_to_search: list) -> list:
        result_search = []
        try:
            result = self.ldap_connector.connection.search_s(self.base_dn, ldap.SCOPE_SUBTREE,
                                                             object_to_search,
                                                             attributes_to_search)
            for info in result:
                if info[0] is not None:
                    result_search.append([info[0], info[1]])
            if len(result_search) == 0:
                DumpLogger.highlight_warning("No entry found !")
        except ldap.OPERATIONS_ERROR as error:
            DumpLogger.print_error("OPERATIONS_ERROR: ", str(error))
            raise error
        except ldap.LDAPError as error:
            DumpLogger.print_error("LDAPError: ", str(error))
            raise error

        return result_search

    def search_server_ldap_pages(self, object_to_search: str, attributes_to_search: list) -> list | None:
        page_control = SimplePagedResultsControl(True, size=1000, cookie='')

        try:
            response = self.ldap_connector.connection.search_ext(self.base_dn,
                                                                 ldap.SCOPE_SUBTREE,
                                                                 object_to_search,
                                                                 attributes_to_search,
                                                                 serverctrls=[page_control])
            result = []
            pages = 0
            while True:
                pages += 1
                rtype, rdata, rmsgid, serverctrls = self.ldap_connector.connection.result3(response)
                result.extend(rdata)
                controls = [control for control in serverctrls
                            if control.controlType == SimplePagedResultsControl.controlType]
                if not controls:
                    print('The server ignores RFC 2696 control')
                    break
                if not controls[0].cookie:
                    break
                page_control.cookie = controls[0].cookie
                response = self.ldap_connector.connection.search_ext(self.base_dn,
                                                                     ldap.SCOPE_SUBTREE,
                                                                     object_to_search,
                                                                     attributes_to_search,
                                                                     serverctrls=[page_control])
                result.append(response)
            return result
        except Exception as err:
            DumpLogger.print_error('search_server_ldap_pages', str(err))
            raise err

2.1.3 LDAPDataCollector
И, наконец, самое вкусное в данном разделе – модуль сбора и обработки результатов.
Как обычно начнем с конфига. Здесь он предельно простой.

Python: Скопировать в буфер обмена
Код:
class LdapDataCollectorConfig(AppConfig):
    def __init__(self, ldap_query_executor: LdapQueryExecutor):
        super().__init__()
        self.ldap_query_executor = ldap_query_executor

    def print(self):
        DumpLogger.print_title('LdapDataCollector configuration ')
        self.ldap_query_executor.ldap_connector.ldap_config.print()

    def is_valid(self):
        return self.ldap_query_executor.ldap_connector.ldap_config.is_valid()

    def help(self):
        pass

Наблюдательный читатель сразу заметит, что у нас здесь получилась матрешка из конфигов и задаст вопрос: "а нахрена?". Отвечу просто. Порядок бьет класс. Все подробности конфигов, билдеров и иже с ниме в соответствующем разделе.
Пока же рассмотрим основной функционал данного класса. Его конструктор выглядит следующим образом:

Python: Скопировать в буфер обмена
Код:
class LdapDataCollector:
    def __init__(self, config: LdapDataCollectorConfig):
        self.query_executor = config.ldap_query_executor

        self.domain_users = list()
        self.domain_admins = list()
        self.enterprise_admins = list()
        self.domain_controllers = list()
        self.domain_trusts = list()
        self.servers = list()
        self.user_pc = list()
        self.os_versions = set()
        self.server_os_count = 0
        self.user_os_count = 0
        self.os_counter = defaultdict(list)
        self.user_groups = defaultdict(list)
        self.computers = dict()
        self.ad_organizational_units = list()
        self.ad_subnets = list()
        self.ad_groups = list()



Фактически – это некий контейнер, аккумулирующий в себе результаты всех запросов. ниже приведен список открытых методов и реализация некоторых из них. Реализацию остальных оставлю на откуп любопытствующим. Там ничего сложного нет.

Python: Скопировать в буфер обмена
Код:
    def get_domain_admins(self) -> list:
        DumpLogger.print_title('get_domain_admins')
        object_to_search = '(&(objectCategory=user)(adminCount=1))'
        result = self.query_executor.search_server_ldap_pages(object_to_search, attributes_to_search=["*"])
        for info in result:
            if not self.__is_valid_data(info):
                continue
            res, name, sAMAccountName = self.__get_full_user_information(info)
            if not self.__is_valid_query_result(name, res):
                continue
            self.domain_admins.append(res)
        DumpLogger.print_success('Done...')
        return self.domain_admins

    def get_enterprise_admins(self) -> list:
        # your code here
        return self.enterprise_admins

    def get_user_groups(self, username: str = '') -> list:
        DumpLogger.print_title('get_user_groups')
        # your code here
        DumpLogger.print_success('Done...')
        return self.user_groups[username]

 
    def get_domain_controllers(self) -> list:
        DumpLogger.print_title('get_domain_controllers')
        object_to_search = '(&(objectCategory=computer)(userAccountControl:1.2.840.113556.1.4.803:=8192))'
        attributes_to_search = ["dNSHostName", "operatingSystem", "operatingSystemVersion"]
        result = self.query_executor.search_server_ldap_pages(object_to_search, attributes_to_search)
        self.domain_controllers = self.__get_computers_info(result, is_os_version_needed=True)
        DumpLogger.print_success('Done...')
        return self.domain_controllers

    def get_domain_trusts(self) -> list:
        DumpLogger.print_title('get_domain_trusts')
        # your ode here
        DumpLogger.print_success('Done...')
        return self.domain_trusts

    def get_domain_computers_full_info(self) -> None:
        DumpLogger.print_title('get domain computers full info')
        object_to_search = '(&(objectCategory=computer))'
        attributes_to_search = ["dNSHostName", "operatingSystem", "operatingSystemVersion"]
        result = self.query_executor.search_server_ldap_pages(object_to_search, attributes_to_search)
        try:
            for info in result:
                if not self.__is_valid_data(info):
                    continue
                try:
                    self.__get_computer_full_info(info)
                except Exception as err:
                    DumpLogger.print_error_message(str(err))
        except Exception as err:
            DumpLogger.print_error_message(str(err))
        DumpLogger.print_success('Done...')

    def get_domain_users(self) -> list:
        DumpLogger.print_title('get domain users')
        # your code here
        DumpLogger.print_success('Done...')
        return self.domain_users

    def get_ad_organizational_unit(self) -> list:
        DumpLogger.print_title('get AD organizational units')
        object_to_search = '(&(objectcategory=organizationalUnit))'
        attributes_to_search = ['*']
        result = self.query_executor.search_server_ldap_pages(object_to_search, attributes_to_search)
        for info in result:
            if not self.__is_valid_data(info):
                continue
            self.ad_organizational_units.append(info[0])
        DumpLogger.print_success('Done...')
        return self.ad_organizational_units

    def get_ad_subnets(self) -> list:
        DumpLogger.print_title('get domain subnets')
        # your code here
        DumpLogger.print_success('Done...')
        return self.ad_subnets

    def get_ad_groups(self) -> list:
        DumpLogger.print_title('get AD groups')
       # your code here
        return self.ad_groups



2.2 Выводы.
Мы разделили взаимодействие с LDAP на несколько модулей. Каждый из которых представляет самостоятельную единицу. Кроме того удалось инкапсулировать всю «магию» запросов. Для сравнения в том же ADFind для выполнения запроса необходимо помнить или
постоянно держать под рукой магические строки, например:

Bash: Скопировать в буфер обмена
Код:
 adfind.exe  -f "(objectcategory=person)" > ad_users.txt
 adfind.exe  -f "objectcategory=computer" > ad_computers.txt
 adfind.exe -f "(objectcategory=organizationalUnit)" >  ad_ous.txt
 adfind.exe -sc trustdmp > ad_trusts.txt
 adfind.exe -subnets -f (objectCategory=subnet)>ad_subnets.txt
 adfind.exe  -f "(objectcategory=group)" > ad_groups.txt
 adfind.exe -gcb -sc trustdmp > trustdmp.txt

В нашей реализации же все выглядит несколько проще.

Python: Скопировать в буфер обмена
Код:
        query_executor = LdapQueryExecutor(query_executor_config)
        data_collector_config = ConfigFactory.create_data_collector_config(query_executor)
        data_collector = LdapDataCollector(data_collector_config)
        domain_admins = data_collector.get_domain_admins()
        subnets = data_collector.get_ad_subnets()
        trusts = data_collector.get_domain_trusts()


3 Режимы работы приложения
Теперь поговорим о том, как грамотно организовать работу приложения. То есть сделать его максимально простым в использовании, удобным для расширения функционала.
Поехали.
По сути мы хотим снимать дампы сети, включающие в себя различные наборы параметров. Иногда требуется максимально полная информация о сети, включающая в себя списки юзеров, хостов, подсетей, групп и еще много чего. Также может потребоваться мини-
мальный набор данных, включающий лишь количество хостов, пользователей и список групп, участником (мембером) которых является текущий пользователь, от имени которого выполняются запросы.

Я рассмотрю два типа дампов, в проекте их несколько больше, но мы же изучаем архитектуру, а не проект.

3.1 Dump
В этом режиме мы собираем необходимый минимум информации о сети как-то:
• список пользователей;
• список доменных админов;
• список ЫнтЫрпрайзных админов;
• список контроллеров домена;
• список трастов;
• список серверов;
• список рабочих станций;
• статистику по операционным системам;

3.2 FullDump
в этом режиме мы хотим собрать всю доступную информацию о сети, дополнив изначальный дамп:
• списком групп;
• подразделений (OU, organization units);
• подсетей;
• групп, участником которых является текущий пользователь, под которым мы выполняем запросы;
• общим количеством хостов;
• общим количеством пользователей.

3.3 Проектирование архитектуры.

Хорошо, сейчас у нас два режима работы. Мы можем их относительно безболезненно захардкодить и забыть. Но мы же ЫнтЫрпрайз пишем, поэтому нам просто необходимо заложить возможность расширения.

Давайте рассуждать вслух. Что мы имеем. У нас есть модуль, который соберет всю информацию и отдаст ее в виде списка коллекций. Только сам по себе он не является конечным продуктом. Конечный продукт должен представлять собой некий завершенный результат, с которым можно проводить некие манипуляции. Например, вывести на экран, сохранить в файл, записать в базу. Воот. У нас уже прорисовывается каркас класса AbstractProduct и его абстрактные
же методы. Ниже приведен его код.

Python: Скопировать в буфер обмена
Код:
class AbstractProduct:
    def __init__(self):
        self.is_valid = True
        pass

    @abstractmethod
    def print_results(self):
        pass

    @abstractmethod
    def save(self, app_config: AppConfig):
        pass

Обратите внимание, у нас снова возник конфиг. Причем не какой-то конкретный, а базовый класс. В православных плюсах я бы сказал, что мы передаем указатель на базовый класс. Да здравствует полиморфизм. Сейчас я не знаю что у меня будет за продукт, какие параметры ему передадут в метод save, но мне это и не особо важно. Конфиг это знает, он и скажет нам что делать с результатом.

Идем дальше. Если заглянуть в бессмертное произведение «Банды четырех», то можно понять, что у нас явно уже проглядывается паттерн «Строитель». Давайте вспомним что это, обратившись к первоисточнику.

Строитель — это порождающий паттерн проектирования, который позволяет создавать сложные объекты пошагово. Строитель даёт возможность использовать один и тот же код строительства для получения разных представлений объектов. (Определение взято из книги замечательного автора Александра Швеца. К сожалению сейчас его сайт refactoring.guru недоступен из РФ по понятным причинам и это еще один повод ебануть лишний раз пендосов. Но об этом не здесь и не сейчас. )

Так вот. Паттерн строитель позволяет компоновать продукт нужным образом. У нас есть класс LdapDataCollector, который умеет собирать всю необходимую информацию. Но для каждого продукта нам необходим различный набор этих данных. Наиболее близ-
кое сравнение – комплектации автомобилей. Мы можем приобрести бомж-комплектацию на палке с веслами. А можем доплатить и кайфовать от максимальной комплектации. В основе лежит один и тот же автомобиль, но начинка у него разная. Не буду тут дублировать текст книги. Она легко гуглится, но если прямо очень нужно, то могу поделиться. Диаграммы классов мне тоже рисовать лень, у того же Швеца все максимально подробно описано.

Вернемся к нашим баранам (дампам).

3.3.1 Products
Продуктами, которые производит строитель у нас будут обычный дамп и полный. Ниже приведен код их конструкторов и реализация абстрактных методов базового класса AbstractProduct.

Python: Скопировать в буфер обмена
Код:
class Dump(AbstractProduct):
    def __init__(self):
        super().__init__()
        self.domain_users = list()
        self.domain_admins = list()
        self.enterprise_admins = list()
        self.domain_controllers = list()
        self.domain_trusts = list()
        self.servers = list()
        self.user_pc = list()
        self.os_versions = set()
        self.server_os_count = 0
        self.user_os_count = 0
        self.os_counter = dict()
        self.computers = dict()

    def print_results(self):
        DumpLogger.print_title('Dump. print_results')

        self.print_domain_admins()
        self.print_enterprise_admins()
        self.print_domain_controllers()
        self.print_domain_computers()

    def save(self, app_config: DumpConfig):
        DumpLogger.print_title(f'Dump saving results...')
        self.save_domain_users(app_config)
        self.save_domain_admins(app_config)
        self.save_enterprise_admins(app_config)
        self.save_servers(app_config)
        self.save_users_pc(app_config)
        self.save_os_statistic(app_config)
        DumpLogger.print_success('Done...')

FullDump
Поскольку FullDump является расширенной версией обычного дампа, то мы его просто унаследуем от него. В итоге получится примерно следующее.

Python: Скопировать в буфер обмена
Код:
class Fulldump(Dump):
    def __init__(self):
        super().__init__()
        self.ad_organizational_units = list()
        self.ad_subnets = list()
        self.ad_groups = list()
        self.user_groups = list()
        self.users_count = 0
        self.computers_count = 0

    def print_results(self):
        super().print_results()
        self.print_domain_groups()
        self.print_domain_subnets()
        self.print_organizational_units()

        # DumpLogger.print_param('found', self.)

    def save(self, app_config: FulldumpConfig):
        super().save(app_config)
        self.save_organizational_unit(app_config)
        self.save_ad_groups(app_config)
        self.save_subnets(app_config)
        title = "Fastdump"
        filename = app_config.fast_dump_filename
        FileHelper.append_title_to_file(filename, title)

        FileHelper.append_to_file(filename, f'found {self.users_count} users; ')
        FileHelper.append_to_file(filename, f'found {self.computers_count} computers; ')
        FileHelper.append_to_file(filename, f'current user is member of ')
        FileHelper.save_list_to_file(self.user_groups, filename, 'user groups')

3.3.2 Builders

Теперь нам нужны строители, которые будут выпускать наши продукты. Еще нам понадобится фабрика, которая на основании переданных параметров будет собирать самих строителей. Начнем, пожалуй, с нее. В основе каждого строителя у нас будет лежать наш неизменный LdapDataCollector. Здесь же нам понадобится набор параметров для каждого продукта. Их передаем в виде конфига. Получится примерно следующее:

Python: Скопировать в буфер обмена
Код:
class BuilderFactory:
    def __init__(self):
        pass

    @staticmethod
    def create_fulldump_builder(app_config: FulldumpConfig, data_collector: LdapDataCollector) -> AbstractBuilder:
        return FulldumpBuilder(app_config, data_collector)

    @staticmethod
    def create_minidump_builder(app_config: MinidumpConfig, data_collector: LdapDataCollector) -> AbstractBuilder:
        return MinidumpBuilder(app_config, data_collector)

У фабрики всего два статических метода, но по мере расширения продукта она будет ими обрастать.
Теперь рассмотрим самих строителей. Для удобства введем еще один класс строителя и назовем его MiniDump. Он и будет выполнять роль сборщика минимального дампа.
Итого у нас имеется 4 класса строителей:
AbstractBuilder – родительский абстрактный класс, без реализации каких либо методов. Ему в конструктор мы отдадим LdapDataCollector, который будет доступен всем его потомкам. У него всего два абстрактных метода. build_product и setup_incomplete_product.
пытается собрать изделие. Второй устраняет косяки первого и стучит наверх об ошибках.

Python: Скопировать в буфер обмена
Код:
class AbstractBuilder:
    def __init__(self, data_collector: LdapDataCollector):
        self.data_collector = data_collector
        self.is_build_completed = True
        self.error_message = ''

    @abstractmethod
    def build_product(self) -> AbstractProduct:
        pass

    @abstractmethod
    def setup_incomplete_product(self, err, error_message):
        pass

• DumpBuilder – это уже базовый класс для строителей дампов. Я пока даже не знаю какие еще продукты буду выпускать, но очевидно, что дампы необходимо вынести в отдельную категорию. Его реализация примерно такая вышла. То есть мы передаем на вход параметры продукта. Собираем его внутри закрытых методов и возвращаем уже готовый к дальнейшему использованию объект.

Python: Скопировать в буфер обмена
Код:
class DumpBuilder(AbstractBuilder):
    def __init__(self, data_collector: LdapDataCollector, app_config: DumpConfig, mode: ProgramMode):
        super().__init__(data_collector)
        self.app_config = app_config
        self.program_mode = mode
        self._result = AbstractProduct()

    @abstractmethod
    def build_product(self) -> Dump():
        pass

    def setup_incomplete_product(self, err, error_message):
        DumpLogger.print_error_message(self.error_message)
        self.is_build_completed = False

    def _is_data_collected(self) -> bool:
        self.__find_domain_users()
        self.__find_domain_admins()
        self.__find_enterprise_admins()
        self.__find_domain_controllers()
        self.__find_domain_trusts()
        self.__find_domain_computers()
        if not self.is_build_completed:
            DumpLogger.print_error('The data collecting for a dump mode failed with error', self.error_message)
            return False

        self._result.servers = self.data_collector.servers
        self._result.user_pc = self.data_collector.user_pc
        self._result.server_os_count = self.data_collector.server_os_count
        self._result.user_os_count = self.data_collector.user_os_count
        self._result.os_counter = self.data_collector.os_counter
        self.is_build_completed = True
        return True

• MiniDumpBuilder – смотри описание выше.

Python: Скопировать в буфер обмена
Код:
class MinidumpBuilder(DumpBuilder):
    def __init__(self, app_config: MinidumpConfig, data_collector: LdapDataCollector):
        super().__init__(data_collector, app_config, ProgramMode.MINI_DUMP)
        self.app_config = app_config
        self._result = Minidump()

    def build_product(self) -> AbstractProduct | None:
        if not self._is_data_collected():
            DumpLogger.print_error('MinidumpBuilder error',
                                     'failed to collect basic information about the network')
            self.is_build_completed = False
            return None
        self.is_build_completed = True
        DumpLogger.highlight_green('Done...')
        return self._result

Как легко заметить, реализация заняла всего несколько строк. Посмотрим что получится со строителем фуллдампа.

Python: Скопировать в буфер обмена
Код:
class FulldumpBuilder(DumpBuilder):
    def __init__(self, app_config: FulldumpConfig, data_collector: LdapDataCollector):
        super().__init__(data_collector, app_config, ProgramMode.FULL_DUMP)
        self.app_config = app_config
        self._result = Fulldump()

    def build_product(self) -> AbstractProduct | None:
        DumpLogger.print_title('FULLDUMP BUILDER build product')
        try:
            if not self._is_data_collected():
                DumpLogger.print_error('FulldumpBuilder error',
                                         'failed to collect basic information about the network')
                return None
            if not self.__collect_fulldump_data():
                self.error_message = 'FulldumpBuilder error. Failed to collect basic information about the network'
                DumpLogger.print_error_message(self.error_message)
                return None
            self.is_build_completed = True
            DumpLogger.highlight_green('Done...')
            return self._result
        except Exception as err:
            DumpLogger.print_error('Error in building a full network dump', str(err))
            raise err

    def __collect_fulldump_data(self) -> bool:
        self.__find_ad_ou()
        self.__find_ad_subnets()
        self.__find_ad_groups()
        self._result.computers_count = len(self._result.servers) + len(self._result.user_pc)
        self._result.users_count = len(self.data_collector.get_domain_users())

        self._result.user_groups = self.data_collector.get_user_groups()

        if not self.is_build_completed:
            return False

        self.is_build_completed = True
        DumpLogger.highlight_green('Done...')
        return True

    def __find_ad_groups(self):
        try:
            self._result.ad_groups = self.data_collector.get_ad_groups()
        except Exception as err:
            self.error_message = f'get auth mechanism failed with error: {str(err)} '
            self.setup_incomplete_product(err, self.error_message)

    def __find_ad_subnets(self):
        try:
            self._result.ad_subnets = self.data_collector.get_ad_subnets()
        except Exception as err:
            self.error_message = f'get ad subnets failed with error: {str(err)} '
            self.setup_incomplete_product(err, self.error_message)

    def __find_ad_ou(self):
        DumpLogger.print_title('FULLDUMP BUILDER __collect_fulldump_data')
        try:
            self._result.ad_organizational_units = self.data_collector.get_ad_organizational_unit()
        except Exception as err:
            self.error_message = f'get ad ou failed with error: {str(err)} '
            self.setup_incomplete_product(err, self.error_message)

Код получился компактным, читаемым. Все по фэн-шую.


4 Конфиги, конфигураторы
Выше мы неоднократно использовали класс конфига в качестве параметров. Пришло время уделить ему должное внимание.

Параметры приложения можно передавать либо через командную строку, если их не очень много. Если же у нас получилась мультиварка, то параметров может быть очень много и тогда их следует вынести в конфигурационный файл, который мы будем читать, парсить и работать уже с его данными. Но об этом тоже не здесь и не сейчас. Будем передавать как простые смертные через командную строку. Я сначала хотел описать еще и парсер аргументов, но статья итак вышла уже достаточно объемной. Ограничимся самими конфигами и конфигураторами.

Тут вот какая штука получается. Из одних и тех же параметров мы можем собрать конфиги для различных режимов работы приложения. Если у нас один-два режима, то можно просто захардкодить параметры и не греть голову. Это не наш случай, поэтому закатываем рукава и поехали.

4.1 Конфиги дампов
Ну поскольку у нас получилось 4 строителя, то и конфигов для них будет столько же. Для каждого свой.

4.1.1 AppConfig
Абстрактный класс с «чисто виртуальными методами», да простят меня адепты питона.

Python: Скопировать в буфер обмена
Код:
class AppConfig:
    def __init__(self):
        pass

    @abstractmethod
    def print(self):
        pass

    @abstractmethod
    def is_valid(self) -> bool:
        pass

    @abstractmethod
    def help(self):
        pass

4.1.2 DumpConfig
Здесь у нас уже будут храниться вполне конкретные параметры, такие как:
• каталог сохранения данных для текущего режима работы;
• каталог сохранения отсортированных юзеров;
• каталог сохранения отсортированных рабочих станций;
• каталог сохранения отсортированных серверов;
• имена файлов для сохранения списков админов и прочей нечисти. По коду поймете в общем.

Python: Скопировать в буфер обмена
Код:
from source.utils.app_config.configs.app_config import AppConfig
from source.utils.console.console_utils import DumpLogger


class DumpConfig(AppConfig):
    def __init__(self):
        super().__init__()
        self.current_mode_out_dir = ''
        self.sorted_users_dir = ''
        self.user_pc_filename = ''
        self.server_os_filename = ''
        self.domain_users_filename = ''
        self.enterprise_admins_filename = ''
        self.domain_admins_filename = ''
        self.sorted_computers_dir = ''

    def print(self):
        DumpLogger.print_param('current mode out dir', self.current_mode_out_dir)
        DumpLogger.print_param('sorted users dir', self.sorted_users_dir)
        DumpLogger.print_param('users PC file', self.user_pc_filename)
        DumpLogger.print_param('server OS file', self.server_os_filename)
        DumpLogger.print_param('domain users file', self.domain_users_filename)
        DumpLogger.print_param('enterprise admins file', self.enterprise_admins_filename)
        DumpLogger.print_param('domain admins file', self.domain_admins_filename)
        DumpLogger.print_param('sorted computers dir', self.sorted_computers_dir)

    def is_valid(self) -> bool:
        return super().is_valid()

    def help(self):
        super().help()

4.1.3 MiniDumpConfig
Python: Скопировать в буфер обмена
Код:
class MinidumpConfig(DumpConfig):
    def __init__(self):
        super().__init__()
        self.domain_users_filename = 'domain_users.txt'

    def print(self):
        DumpLogger.print_title('Minidump configuration')
        super().print()
        DumpLogger.print_param('domain users file', self.domain_users_filename)


    def is_valid(self) -> bool:
        return True#todo: implement this

    def help(self):
        super().help()
        DumpLogger.print_title('Minidump configuration. See README.md for more information')

4.1.4 FullDumpConfig
Python: Скопировать в буфер обмена
Код:
class FulldumpConfig(DumpConfig):
    def __init__(self):
        super().__init__()
        self.subnets_filename = ''
        self.groups_filename = ''
        self.organizational_unit_filename = ''
        self.fast_dump_filename = ''

    def print(self):
        DumpLogger.print_title('Fulldump configuration')
        super().print()
        DumpLogger.print_param('subnets file', self.subnets_filename)
        DumpLogger.print_param('domain groups file', self.groups_filename)
        DumpLogger.print_param('domain groups file', self.organizational_unit_filename)

    def is_valid(self) -> bool:
        return super().is_valid()

    def help(self):
        super().help()

4.2 Конфигураторы
Как вы могли заметить, в самих конфигах значения не заданы. Можно, конечно, все передать через параметры, но это прошлый век. Поэтому мы будем сочинять козу на лисапеде – то бишь конфигураторы.
Поехали.
Основная задача конфигуратора передать в конфиг корректные параметры. Чем мы и займемся.
Действуем по тому же принципу. Даешь каждому конфигу по конфигуратору.


4.2.1 AppConfigurator


Абстрактный базовый класс, принимающий на вход некие общие параметры и пустой конфиг, который он будет этими параметрами заполнять.

Python: Скопировать в буфер обмена
Код:
class AppConfigurator:
    def __init__(self, domain_name: str, current_mode_name, out_dir: str = 'evil-corp'):
        self.out_dir = out_dir
        self._root_out_dir = ''
        self._current_mode_out_dir = ''
        self._domain_name = domain_name
        self._current_mode_name = current_mode_name

    @abstractmethod
    def setup(self):
        pass

    @abstractmethod
    def create_out_dirs(self):
        self.create_root_dir()
        self.create_current_mode_out_dir()

    def create_root_dir(self):
        if not os.path.exists(self.out_dir):
            os.mkdir(self.out_dir)

    def create_current_mode_out_dir(self):
        corp_dir = self._domain_name.replace('.', '_')
        tmp = os.path.join(self.out_dir, corp_dir)
        if not os.path.exists(tmp):
            os.mkdir(tmp)
        self._current_mode_out_dir = os.path.join(tmp, self._current_mode_name)
        if not os.path.exists(self._current_mode_out_dir):
            os.mkdir(self._current_mode_out_dir)

4.2.2 DumpConfigurator

Python: Скопировать в буфер обмена
Код:
class DumpConfigurator(AppConfigurator):
    def __init__(self, dump_config: DumpConfig, domain_name: str, current_mode: str, out_dir: str = 'evil-corp'):
        super().__init__(domain_name, current_mode, out_dir)
        self.config = dump_config

    def setup(self):
        super().setup()
        self.create_out_dirs()
        self.config.domain_admins_filename = os.path.join(self.config.sorted_users_dir, 'domain_admins.txt')
        self.config.enterprise_admins_filename = os.path.join(self.config.sorted_users_dir, 'enterprise_admins.txt')
        self.config.domain_users_filename = os.path.join(self.config.sorted_users_dir, 'domain_users.txt')
        self.config.server_os_filename = os.path.join(self.config.sorted_computers_dir, 'servers.txt')
        self.config.user_pc_filename = os.path.join(self.config.sorted_computers_dir, 'user_pc.txt')

    def create_out_dirs(self):
        super().create_out_dirs()
        self.config.current_mode_out_dir = self._current_mode_out_dir
        if not os.path.exists(self._current_mode_out_dir):
            os.mkdir(self._current_mode_out_dir)

        self.config.sorted_users_dir = os.path.join(self._current_mode_out_dir, 'sorted_users')
        if not os.path.exists(self.config.sorted_users_dir):
            os.mkdir(self.config.sorted_users_dir)

        self.config.sorted_computers_dir = os.path.join(self._current_mode_out_dir, 'sorted_computers')
        if not os.path.exists(self.config.sorted_computers_dir):
            os.mkdir(self.config.sorted_computers_dir)

4.2.3 MinidumpConfigurator


Python: Скопировать в буфер обмена
Код:
class MinidumpConfigurator(DumpConfigurator):
    def __init__(self, minidump_config: MinidumpConfig, domain_name: str, out_dir: str = 'evil-corp'):
        super().__init__(minidump_config, domain_name, 'minidump', out_dir)

    def setup(self):
        super().setup()

    def create_out_dirs(self):
        super().create_out_dirs()


4.2.4 FulldumpConfigurator

Python: Скопировать в буфер обмена
Код:
import os.path

from source.utils.app_config.configs.app_mode_configs.fulldump_config import FulldumpConfig
from source.utils.app_config.configurators.dump_configurator import DumpConfigurator
from source.utils.console.console_utils import DumpLogger


class FulldumpConfigurator(DumpConfigurator):
    def __init__(self, fulldump_config: FulldumpConfig, domain_name: str, out_dir: str = 'evil-corp'):
        super().__init__(fulldump_config, domain_name, 'fulldump', out_dir)

    def setup(self):
        super().setup()
        self.config.organizational_unit_filename = \
            os.path.join(self.config.current_mode_out_dir, 'organizational_unit.txt')
        self.config.subnets_filename = os.path.join(self.config.current_mode_out_dir, 'subnets.txt')
        self.config.groups_filename = os.path.join(self.config.current_mode_out_dir, 'groups.txt')
        self.config.fast_dump_filename = os.path.join(self.config.current_mode_out_dir, 'fastdump.txt')

    def create_out_dirs(self):
        super().create_out_dirs()
        DumpLogger.print_title('FulldumpConfigurator create_out_dirs')
        pass

4.3 Выводы
Вся работа у нас теперь сводится к созданию конфига, передаче его строителю, сборке продукта и печати результатов. и итоговый код, для запуска одного из режимов у нас будет при-
мерно такой.

Python: Скопировать в буфер обмена
Код:
    def run_fulldump_mode(self, fulldump_config, data_collector) -> bool:
        try:
            DumpLogger.print_title('run_fulldump_mode started')
            fulldump_configurator = \
                ConfiguratorsFactory.create_fulldump_configurator(fulldump_config, self.domain_name, self.out_dir)
            fulldump_configurator.setup()
            fulldump_builder = BuilderFactory.create_fulldump_builder(fulldump_config, data_collector)
            fulldump = fulldump_builder.build_product()
            self.result = fulldump
            self.result.print_results()
            self.result.save(fulldump_config)
            return True
        except Exception as error:
            DumpLogger.print_error('Oh, sorry, something broke, but we\'re already working on it', str(error))
            return False

Эта статья предназначена, в первую очередь, для начинающих разработчиков. Но возможно более опытные коллеги смогут что-то для себя почерпнуть или указать на недостаки.

Я намерено не привожу здесь код проекта целиком. Но для демонстрации результата прикреплю пару скринов с результатами работы.

1718989369551.png



1718989413492.png

1718989462087.png



1718989514050.png



1718989561140.png



1718989599332.png




1718989634366.png
 
Сверху Снизу