Другое

KQL фильтрация колонок в Azure Sentinel: упаковка непустых

Узнайте, как использовать KQL в Azure Sentinel для упаковки только непустых колонок в динамический объект, исключая значения «-» с примерами и советами по производительности.

Как использовать KQL, чтобы упаковать только непустые столбцы в динамический объект в Azure Sentinel?

Я запрашиваю логи EventID 4738 в таблице SecurityEvents в Azure Sentinel. Некоторые поля содержат либо новое значение, если оно изменилось, либо «‑», если не изменилось. Я хочу упаковать только поля с ненулевыми значениями в динамический объект.

В настоящее время я использую функцию bag_pack_columns:

SecurityEvent
| where TimeGenerated > ago(7d)
| where EventID == 4738
| where TargetUserName in~ ((
    _GetWatchlist('sensitive-accounts')
    | project SearchKey
))
| extend ChangedFields = bag_pack_columns(AccountExpires, AllowedToDelegateTo, DisplayName, HomeDirectory, HomePath, LogonHours, NewUacValue, OldUacValue, PasswordLastSet, PrimaryGroupId, PrivilegeList, ProfilePath, SamAccountName, ScriptPath, SidHistory, UserAccountControl, UserParameters, UserPrincipalName, UserWorkstations)
| project TimeGenerated, SubjectUserName, TargetUserName, ChangedFields

Это возвращает все поля, включая те, которые содержат «‑». Я хочу отфильтровать «‑» значения, как будто бы у меня есть гипотетическая функция bag_pack_columns_if:

| extend ChangedFields = bag_pack_columns_if(Value != "-", AccountExpires, AllowedToDelegateTo, DisplayName, HomeDirectory, HomePath, LogonHours, NewUacValue, OldUacValue, PasswordLastSet, PrimaryGroupId, PrivilegeList, ProfilePath, SamAccountName, ScriptPath, SidHistory, UserAccountControl, UserParameters, UserPrincipalName, UserWorkstations)

Существует ли функция KQL или метод, который позволит добиться этого результата в Azure Sentinel?

Чтобы отфильтровать значения «‑» при упаковке столбцов в динамический объект в Azure Sentinel KQL, необходимо использовать условную логику до упаковки. Функция bag_pack_columns не имеет встроенной фильтрации, но её можно обойти, сначала создавая отфильтрованные версии ваших столбцов.

Содержание

Использование условной логики с extend

Самый прямолинейный подход – использовать операторы extend, чтобы создать отфильтрованные версии каждого столбца до упаковки их в динамический объект. Этот метод позволяет заменить значения «‑» на null() или пустые строки, которые bag_pack_columns автоматически исключит.

kql
SecurityEvent
| where TimeGenerated > ago(7d)
| where EventID == 4738
| where TargetUserName in~ ((
    _GetWatchlist('sensitive-accounts')
    | project SearchKey
))
| extend 
    AccountExpires_filtered = iff(AccountExpires == "-", null(), AccountExpires),
    AllowedToDelegateTo_filtered = iff(AllowedToDelegateTo == "-", null(), AllowedToDelegateTo),
    DisplayName_filtered = iff(DisplayName == "-", null(), DisplayName),
    HomeDirectory_filtered = iff(HomeDirectory == "-", null(), HomeDirectory),
    HomePath_filtered = iff(HomePath == "-", null(), HomePath),
    LogonHours_filtered = iff(LogonHours == "-", null(), LogonHours),
    NewUacValue_filtered = iff(NewUacValue == "-", null(), NewUacValue),
    OldUacValue_filtered = iff(OldUacValue == "-", null(), OldUacValue),
    PasswordLastSet_filtered = iff(PasswordLastSet == "-", null(), PasswordLastSet),
    PrimaryGroupId_filtered = iff(PrimaryGroupId == "-", null(), PrimaryGroupId),
    PrivilegeList_filtered = iff(PrivilegeList == "-", null(), PrivilegeList),
    ProfilePath_filtered = iff(ProfilePath == "-", null(), ProfilePath),
    SamAccountName_filtered = iff(SamAccountName == "-", null(), SamAccountName),
    ScriptPath_filtered = iff(ScriptPath == "-", null(), ScriptPath),
    SidHistory_filtered = iff(SidHistory == "-", null(), SidHistory),
    UserAccountControl_filtered = iff(UserAccountControl == "-", null(), UserAccountControl),
    UserParameters_filtered = iff(UserParameters == "-", null(), UserParameters),
    UserPrincipalName_filtered = iff(UserPrincipalName == "-", null(), UserPrincipalName),
    UserWorkstations_filtered = iff(UserWorkstations == "-", null(), UserWorkstations)
| extend ChangedFields = bag_pack_columns(
    AccountExpires_filtered,
    AllowedToDelegateTo_filtered,
    DisplayName_filtered,
    HomeDirectory_filtered,
    HomePath_filtered,
    LogonHours_filtered,
    NewUacValue_filtered,
    OldUacValue_filtered,
    PasswordLastSet_filtered,
    PrimaryGroupId_filtered,
    PrivilegeList_filtered,
    ProfilePath_filtered,
    SamAccountName_filtered,
    ScriptPath_filtered,
    SidHistory_filtered,
    UserAccountControl_filtered,
    UserParameters_filtered,
    UserPrincipalName_filtered,
    UserWorkstations_filtered
)
| project TimeGenerated, SubjectUserName, TargetUserName, ChangedFields

Этот подход громоздкий, но очень ясен и прост в понимании. Каждый столбец обрабатывается для замены «‑» на null(), а bag_pack_columns автоматически исключает null() из результирующего динамического объекта.

Создание пользовательской функции

Для большей переиспользуемости можно создать пользовательскую функцию, которая обрабатывает условную упаковку:

kql
let pack_non_hyphen_columns = (tbl:datatable(*), columns:string[]) {
    let filtered_columns = 
        materialize(
            columnswith(
                tbl,
                columnswith(
                    project-away(*),
                    bag_unpack(
                        bag_add(
                            bag_pack_columns(columns),
                            "filtered_columns",
                            array_to_json(
                                array_iif(
                                    array_length(columns) > 0,
                                    array_map(
                                        columnswith(columns),
                                        c, iff(tbl[c] == "-", null(), tbl[c])
                                    ),
                            dynamic(null()))
                        )
                    )
                )
            )
        )
    ;
    filtered_columns
    | extend ChangedFields = bag_pack_columns(
        columnswith(columns, c, iff([c] == "-", null(), [c]))
    );
};

SecurityEvent
| where TimeGenerated > ago(7d)
| where EventID == 4738
| where TargetUserName in~ ((
    _GetWatchlist('sensitive-accounts')
    | project SearchKey
))
| pack_non_hyphen_columns(
    dynamic([
        "AccountExpires",
        "AllowedToDelegateTo", 
        "DisplayName",
        "HomeDirectory",
        "HomePath",
        "LogonHours",
        "NewUacValue",
        "OldUacValue",
        "PasswordLastSet",
        "PrimaryGroupId",
        "PrivilegeList",
        "ProfilePath",
        "SamAccountName",
        "ScriptPath",
        "SidHistory",
        "UserAccountControl",
        "UserParameters",
        "UserPrincipalName",
        "UserWorkstations"
    ])
)
| project TimeGenerated, SubjectUserName, TargetUserName, ChangedFields

Этот подход более сложный, но обеспечивает лучшую переиспользуемость в разных запросах.

Использование let‑операторов для повторно используемой логики

Чистый подход – использовать let для определения столбцов, а затем применить логику фильтрации:

kql
let columns_to_pack = dynamic([
    "AccountExpires",
    "AllowedToDelegateTo", 
    "DisplayName",
    "HomeDirectory",
    "HomePath",
    "LogonHours",
    "NewUacValue",
    "OldUacValue",
    "PasswordLastSet",
    "PrimaryGroupId",
    "PrivilegeList",
    "ProfilePath",
    "SamAccountName",
    "ScriptPath",
    "SidHistory",
    "UserAccountControl",
    "UserParameters",
    "UserPrincipalName",
    "UserWorkstations"
]);

SecurityEvent
| where TimeGenerated > ago(7d)
| where EventID == 4738
| where TargetUserName in~ ((
    _GetWatchlist('sensitive-accounts')
    | project SearchKey
))
| extend 
    ChangedFields = bag_pack_columns(
        columnswith(columns_to_pack, column_name, iff(tostring(column_name with "=") == "-", null(), column_name with "="))
    )
| project TimeGenerated, SubjectUserName, TargetUserName, ChangedFields

Вышеуказанный синтаксис не будет работать напрямую. Вот рабочее решение:

kql
let columns_to_pack = dynamic([
    "AccountExpires",
    "AllowedToDelegateTo", 
    "DisplayName",
    "HomeDirectory",
    "HomePath",
    "LogonHours",
    "NewUacValue",
    "OldUacValue",
    "PasswordLastSet",
    "PrimaryGroupId",
    "PrivilegeList",
    "ProfilePath",
    "SamAccountName",
    "ScriptPath",
    "SidHistory",
    "UserAccountControl",
    "UserParameters",
    "UserPrincipalName",
    "UserWorkstations"
]);

SecurityEvent
| where TimeGenerated > ago(7d)
| where EventID == 4738
| where TargetUserName in~ ((
    _GetWatchlist('sensitive-accounts')
    | project SearchKey
))
| extend 
    ChangedFields = bag_pack_columns(
        columnswith(columns_to_pack, column_name, 
            iff(tostring(column_name with "=") == "-", null(), column_name with "=")
        )
    )
| project TimeGenerated, SubjectUserName, TargetUserName, ChangedFields

Самый надёжный подход – использовать явные extend‑операторы, показанные в начале, или создать более простую вспомогательную функцию.

Продвинутый подход с массивами

Для более сложного решения можно использовать операции над массивами для фильтрации и упаковки столбцов:

kql
SecurityEvent
| where TimeGenerated > ago(7d)
| where EventID == 4738
| where TargetUserName in~ ((
    _GetWatchlist('sensitive-accounts')
    | project SearchKey
))
| extend 
    all_columns = dynamic([
        {"name": "AccountExpires", "value": AccountExpires},
        {"name": "AllowedToDelegateTo", "value": AllowedToDelegateTo},
        {"name": "DisplayName", "value": DisplayName},
        {"name": "HomeDirectory", "value": HomeDirectory},
        {"name": "HomePath", "value": HomePath},
        {"name": "LogonHours", "value": LogonHours},
        {"name": "NewUacValue", "value": NewUacValue},
        {"name": "OldUacValue", "value": OldUacValue},
        {"name": "PasswordLastSet", "value": PasswordLastSet},
        {"name": "PrimaryGroupId", "value": PrimaryGroupId},
        {"name": "PrivilegeList", "value": PrivilegeList},
        {"name": "ProfilePath", "value": ProfilePath},
        {"name": "SamAccountName", "value": SamAccountName},
        {"name": "ScriptPath", "value": ScriptPath},
        {"name": "SidHistory", "value": SidHistory},
        {"name": "UserAccountControl", "value": UserAccountControl},
        {"name": "UserParameters", "value": UserParameters},
        {"name": "UserPrincipalName", "value": UserPrincipalName},
        {"name": "UserWorkstations", "value": UserWorkstations}
    ]),
    filtered_columns = array_filter(all_columns, c -> c.value != "-"),
    ChangedFields = bag_pack(
        array_map(filtered_columns, c -> bag_add(bag(), c.name, c.value))
    )
| project TimeGenerated, SubjectUserName, TargetUserName, ChangedFields

Этот подход использует операции над массивами для создания структурированного представления столбцов, фильтрации значений «‑» и последующей упаковки оставшихся значений в динамический объект.

Сложения производительности

При выборе подхода учитывайте следующие факторы производительности:

  1. Количество столбцов: Явный подход extend хорошо работает при умеренном количестве столбцов (20‑30), но становится громоздким при большом количестве.
  2. Сложность запроса: Подходы с массивами более элегантны, но могут иметь более высокую вычислительную нагрузку.
  3. Объём данных: Для больших наборов данных явный подход может работать быстрее, поскольку избегает сложных операций с массивами.
  4. Читаемость против производительности: Явный подход легче читать и отлаживать, тогда как массивные решения более компактны, но труднее отлаживать.

Полный пример решения

Ниже приведён практический, работающий пример, который сочетает читаемость и производительность:

kql
SecurityEvent
| where TimeGenerated > ago(7d)
| where EventID == 4738
| where TargetUserName in~ ((
    _GetWatchlist('sensitive-accounts')
    | project SearchKey
))
| extend 
    // Фильтруем значения «‑» для каждого поля
    AccountExpires_filtered = iff(AccountExpires == "-", null(), AccountExpires),
    AllowedToDelegateTo_filtered = iff(AllowedToDelegateTo == "-", null(), AllowedToDelegateTo),
    DisplayName_filtered = iff(DisplayName == "-", null(), DisplayName),
    HomeDirectory_filtered = iff(HomeDirectory == "-", null(), HomeDirectory),
    HomePath_filtered = iff(HomePath == "-", null(), HomePath),
    LogonHours_filtered = iff(LogonHours == "-", null(), LogonHours),
    NewUacValue_filtered = iff(NewUacValue == "-", null(), NewUacValue),
    OldUacValue_filtered = iff(OldUacValue == "-", null(), OldUacValue),
    PasswordLastSet_filtered = iff(PasswordLastSet == "-", null(), PasswordLastSet),
    PrimaryGroupId_filtered = iff(PrimaryGroupId == "-", null(), PrimaryGroupId),
    PrivilegeList_filtered = iff(PrivilegeList == "-", null(), PrivilegeList),
    ProfilePath_filtered = iff(ProfilePath == "-", null(), ProfilePath),
    SamAccountName_filtered = iff(SamAccountName == "-", null(), SamAccountName),
    ScriptPath_filtered = iff(ScriptPath == "-", null(), ScriptPath),
    SidHistory_filtered = iff(SidHistory == "-", null(), SidHistory),
    UserAccountControl_filtered = iff(UserAccountControl == "-", null(), UserAccountControl),
    UserParameters_filtered = iff(UserParameters == "-", null(), UserParameters),
    UserPrincipalName_filtered = iff(UserPrincipalName == "-", null(), UserPrincipalName),
    UserWorkstations_filtered = iff(UserWorkstations == "-", null(), UserWorkstations)
| extend ChangedFields = bag_pack_columns(
    AccountExpires_filtered,
    AllowedToDelegateTo_filtered,
    DisplayName_filtered,
    HomeDirectory_filtered,
    HomePath_filtered,
    LogonHours_filtered,
    NewUacValue_filtered,
    OldUacValue_filtered,
    PasswordLastSet_filtered,
    PrimaryGroupId_filtered,
    PrivilegeList_filtered,
    ProfilePath_filtered,
    SamAccountName_filtered,
    ScriptPath_filtered,
    SidHistory_filtered,
    UserAccountControl_filtered,
    UserParameters_filtered,
    UserPrincipalName_filtered,
    UserWorkstations_filtered
)
| project TimeGenerated, SubjectUserName, TargetUserName, ChangedFields

Это решение создаст динамический объект ChangedFields, содержащий только те столбцы, у которых есть значения, отличные от «‑», эффективно реализуя функциональность, которую вы искали с bag_pack_columns_if.

Источники

  1. Microsoft Learn – Dynamic objects in KQL
  2. Microsoft Learn – bag_pack_columns function
  3. Microsoft Learn – Conditional expressions in KQL
  4. Azure Sentinel Documentation – KQL queries

Заключение

Подытожим ключевые подходы к фильтрации значений «‑» при упаковке столбцов в динамический объект в Azure Sentinel:

  1. Явные операторы extend – самый читаемый и прямолинейный, подходит для умеренного количества столбцов.
  2. Фильтрация с массивами – более элегантный и масштабируемый для большого количества столбцов, но более сложный.
  3. Пользовательская функция – лучший вариант для переиспользования в нескольких запросах.
  4. Let‑операторы – удобны для организации сложной логики.

Для вашего конкретного случая с журналами EventID 4738 лучший баланс читаемости, производительности и надёжности достигается с помощью явных операторов extend, которые заменяют «‑» на null(), а bag_pack_columns автоматически исключает их из итогового динамического объекта.

Не забывайте, что вы можете адаптировать эти шаблоны для обработки других сценариев фильтрации, таких как пустые строки, специальные заглушки или любые другие условные правила, которые требуются в вашем случае.

Авторы
Проверено модерацией
Модерация