НейроАгент

Клонирование документов MongoDB между контейнерами с преобразованием ID

Узнайте, как клонировать документы MongoDB между контейнерами, преобразовывая форматы ID с помощью фреймворка агрегации. Полная реализация на C# с оптимизацией производительности.

Как я могу клонировать документы MongoDB из одного контейнера в другой, обновляя поля ID в определенном формате без извлечения их в память приложения? Я использую MongoDB с C# и мне нужно преобразовывать документы с форматом ID ‘OwnerId\ContainerId:MyDocId’ при клонировании из одного ContainerId в другой. Документы уникально идентифицируются комбинацией полей OwnerId, MyDocId и ContainerId. Я хочу использовать фреймворк агрегации MongoDB для достижения этого, подобно операциям конкатенации строк в SQL.

НейроАгент

Фреймворк агрегации MongoDB для клонирования и преобразования документов

Фреймворк агрегации MongoDB обеспечивает эффективное клонирование и преобразование документов между контейнерами без извлечения данных в память приложения с помощью этапов конвейера, таких как $match, $project и $set для переформатирования полей ID и массовых операций для повышения производительности. Вы можете достичь преобразования формата ‘OwnerId\ContainerId:MyDocId’, комбинируя операторы конкатенации строк на этапе $project и используя этапы $out или $merge для записи преобразованных документов в целевой контейнер, все в рамках одного конвейера агрегации, выполняемого из C#.

Содержание

Понимание конвейера агрегации MongoDB

Фреймворк агрегации MongoDB предоставляет мощный способ обработки документов через серию этапов, каждый из которых выполняет определенное преобразование данных [источник]. Конвейер агрегации состоит из одного или нескольких этапов, которые последовательно обрабатывают документы, где выходные данные одного этапа становятся входными данными для следующего [источник].

Каждый этап выполняет операцию над входными документами - этапы могут фильтровать документы, группировать их, вычислять значения, преобразовывать поля и многое другое [источник]. Конвейер затем выполняет последовательные преобразования данных до достижения цели, позволяя разбивать сложные операции на более простые, управляемые этапы [источник].

Для клонирования и преобразования документов ключевые этапы включают:

  • $match: Фильтрует документы на основе указанных критериев
  • $project: Изменяет форму документов, включая, исключая или переименовывая поля
  • $set: Добавляет новые поля или обновляет существующие поля в документах
  • $addFields: Аналогично $set, добавляет новые поля в документы
  • $out: Записывает результаты агрегации в коллекцию
  • $merge: Записывает результаты в коллекцию, потенциально комбинируя с существующими данными

“Входными данными конвейера может быть одна коллекция, где другие могут быть объединены позже в конвейере. Конвейер затем выполняет последовательные преобразования данных до достижения цели.” [источник]

Стратегия клонирования документов

Для клонирования документов из одного контейнера в другой с преобразованием полей ID вам нужна стратегия, которая:

  1. Фильтрует исходные документы с использованием комбинации уникальных идентификаторов (OwnerId, MyDocId, ContainerId)
  2. Преобразует формат ID из исходного в целевой формат
  3. Записывает преобразованные документы в целевую коллекцию массово

Ключевое понимание заключается в том, что конвейеры агрегации MongoDB могут выполнять все эти операции в одной операции с базой данных, устраняя необходимость извлечения документов в память приложения [источник].

javascript
// Базовая структура конвейера агрегации для клонирования документов
db.sourceCollection.aggregate([
  {
    $match: {
      "OwnerId": "owner123",
      "ContainerId": "sourceContainer",
      "MyDocId": { $exists: true }
    }
  },
  {
    $project: {
      // Преобразования полей здесь
      _id: 0,
      newId: { $concat: ["$$ROOT.OwnerId", "\\$$ROOT.targetContainer", ":", "$$ROOT.MyDocId"] },
      // Сохраняем другие поля
      "OwnerId": 1,
      "MyDocId": 1,
      "ContainerId": "$$ROOT.targetContainer",
      // Копируем все остальные поля
      "field1": 1,
      "field2": 1
    }
  },
  {
    $out: "targetCollection"
  }
])

Техники преобразования полей ID

Основная задача - преобразование формата ID из ‘OwnerId\SourceContainer:MyDocId’ в ‘OwnerId\TargetContainer:MyDocId’. MongoDB предоставляет несколько строковых операторов агрегации для достижения этой цели:

Использование $concat для конкатенации строк

Оператор $concat объединяет несколько строк вместе, аналогично конкатенации строк в SQL [источник]:

javascript
{
  $project: {
    transformedId: {
      $concat: [
        "$OwnerId",
        "\\",
        "targetContainer", // Это может быть переменной
        ":",
        "$MyDocId"
      ]
    }
  }
}

Использование переменных для преобразования контейнера

Для динамических имен контейнеров можно использовать переменные агрегации и оператор $let:

javascript
{
  $let: {
    vars: {
      targetContainer: "newContainerName"
    },
    in: {
      $concat: [
        "$$ROOT.OwnerId",
        "\\",
        "$$targetContainer",
        ":",
        "$$ROOT.MyDocId"
      ]
    }
  }
}

Сохранение структуры документа

При клонировании документов необходимо сохранять исходную структуру, обновляя определенные поля:

javascript
{
  $project: {
    // Преобразуем формат ID
    _id: { $concat: ["$$ROOT.OwnerId", "\\$$ROOT.targetContainer", ":", "$$ROOT.MyDocId"] },
    // Обновляем ContainerId до целевого значения
    ContainerId: "$$ROOT.targetContainer",
    // Копируем все остальные поля из исходного документа
    "OwnerId": 1,
    "MyDocId": 1,
    // Сохраняем вложенные объекты и массивы
    "nested.field": 1,
    "arrayField": 1
  }
}

Реализация на C# с драйвером MongoDB

Драйвер MongoDB для C# предоставляет всестороннюю поддержку операций агрегации. Вот как реализовать клонирование и преобразование документов:

Настройка конвейера агрегации

csharp
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System;
using System.Collections.Generic;
using System.Linq;

public class DocumentCloner
{
    private readonly IMongoCollection<BsonDocument> _sourceCollection;
    private readonly IMongoCollection<BsonDocument> _targetCollection;
    
    public DocumentCloner(IMongoDatabase database, string sourceCollectionName, string targetCollectionName)
    {
        _sourceCollection = database.GetCollection<BsonDocument>(sourceCollectionName);
        _targetCollection = database.GetCollection<BsonDocument>(targetCollectionName);
    }
    
    public void CloneDocumentsWithIdTransformation(
        string ownerId, 
        string sourceContainer, 
        string targetContainer)
    {
        // Определяем конвейер агрегации
        var pipeline = new List<BsonDocument>
        {
            // Находим документы для клонирования
            new BsonDocument("$match", new BsonDocument
            {
                { "OwnerId", ownerId },
                { "ContainerId", sourceContainer },
                { "MyDocId", new BsonDocument("$exists", true) }
            }),
            
            // Преобразуем структуру документа и формат ID
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", new BsonDocument("$concat", new BsonArray 
                    {
                        "$OwnerId",
                        "\\",
                        targetContainer,
                        ":",
                        "$MyDocId"
                    }) 
                },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", targetContainer },
                // Копируем все остальные поля, кроме _id
                { "otherField", 1 }
            }),
            
            // Записываем в целевую коллекцию
            new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
        };
        
        // Выполняем конвейер агрегации
        _sourceCollection.Aggregate<BsonDocument>(pipeline);
    }
}

Использование строго типизированных классов

Для лучшей типобезопасности и поддерживаемости:

csharp
[BsonIgnoreExtraElements]
public class SourceDocument
{
    [BsonId]
    [BsonRepresentation(BsonType.String)]
    public string Id { get; set; }
    
    public string OwnerId { get; set; }
    public string ContainerId { get; set; }
    public string MyDocId { get; set; }
    public string OtherField { get; set; }
    // Другие поля...
}

[BsonIgnoreExtraElements]
public class TargetDocument
{
    [BsonId]
    [BsonRepresentation(BsonType.String)]
    public string Id { get; set; }
    
    public string OwnerId { get; set; }
    public string ContainerId { get; set; }
    public string MyDocId { get; set; }
    public string OtherField { get; set; }
    // Другие поля...
}

public class TypedDocumentCloner
{
    private readonly IMongoCollection<SourceDocument> _sourceCollection;
    private readonly IMongoCollection<TargetDocument> _targetCollection;
    
    public TypedDocumentCloner(IMongoDatabase database, string sourceCollectionName, string targetCollectionName)
    {
        _sourceCollection = database.GetCollection<SourceDocument>(sourceCollectionName);
        _targetCollection = database.GetCollection<TargetDocument>(targetCollectionName);
    }
    
    public async Task CloneDocumentsWithIdTransformationAsync(
        string ownerId, 
        string sourceContainer, 
        string targetContainer)
    {
        var pipeline = new EmptyPipelineDefinition<SourceDocument>()
            .Match(doc => doc.OwnerId == ownerId && doc.ContainerId == sourceContainer && doc.MyDocId != null)
            .Project(doc => new TargetDocument
            {
                Id = $"{doc.OwnerId}\\{targetContainer}:{doc.MyDocId}",
                OwnerId = doc.OwnerId,
                ContainerId = targetContainer,
                MyDocId = doc.MyDocId,
                OtherField = doc.OtherField
                // Сопоставляем другие поля при необходимости
            });
        
        await _targetCollection.InsertManyAsync(
            await _sourceCollection.Aggregate(pipeline).ToListAsync());
    }
}

Массовые операции для повышения производительности

При работе с большими объемами документов оптимизация производительности становится критически важной. MongoDB предоставляет несколько стратегий для эффективных массовых операций:

Использование этапа $out

Этап $out записывает результаты агрегации в коллекцию. Это очень эффективно, так как операция выполняется на стороне сервера [источник]:

csharp
var pipeline = new List<BsonDocument>
{
    // Этапы сопоставления и преобразования...
    new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
};

_sourceCollection.Aggregate<BsonDocument>(pipeline);

Использование этапа $merge для частичных обновлений

Если нужно объединить с существующими данными вместо замены:

csharp
var pipeline = new List<BsonDocument>
{
    // Этапы сопоставления и преобразования...
    new BsonDocument("$merge", new BsonDocument
    {
        { "into", _targetCollection.CollectionNamespace.CollectionName },
        { "on", "_id" },
        { "whenMatched", "replace" },
        { "whenNotMatched", "insert" }
    })
};

_sourceCollection.Aggregate<BsonDocument>(pipeline);

Пакетная обработка для больших наборов данных

Для очень больших наборов данных рассмотрите возможность обработки пакетами:

csharp
public async Task CloneDocumentsInBatchesAsync(
    string ownerId, 
    string sourceContainer, 
    string targetContainer,
    int batchSize = 1000)
{
    var filter = Builders<BsonDocument>.Filter.And(
        Builders<BsonDocument>.Filter.Eq("OwnerId", ownerId),
        Builders<BsonDocument>.Filter.Eq("ContainerId", sourceContainer),
        Builders<BsonDocument>.Filter.Exists("MyDocId")
    );
    
    var totalDocuments = await _sourceCollection.CountDocumentsAsync(filter);
    var batches = (int)Math.Ceiling(totalDocuments / (double)batchSize);
    
    for (int i = 0; i < batches; i++)
    {
        var skip = i * batchSize;
        var pipeline = new List<BsonDocument>
        {
            new BsonDocument("$match", filter),
            new BsonDocument("$skip", skip),
            new BsonDocument("$limit", batchSize),
            new BsonDocument("$project", GetProjection(targetContainer)),
            new BsonDocument("$out", $"{_targetCollection.CollectionNamespace.CollectionName}_batch_{i}")
        };
        
        await _sourceCollection.Aggregate<BsonDocument>(pipeline).ToListAsync();
        
        // При необходимости объединить результаты пакетов в конечную коллекцию
        var mergePipeline = new List<BsonDocument>
        {
            new BsonDocument("$merge", new BsonDocument
            {
                { "into", _targetCollection.CollectionNamespace.CollectionName },
                { "on", "_id" },
                { "whenMatched", "replace" },
                { "whenNotMatched", "insert" }
            })
        };
        
        await _targetCollection.Aggregate<BsonDocument>(mergePipeline).ToListAsync();
    }
}

Обработка ошибок и валидация

Реализация правильной обработки ошибок критически важна для производственных сред:

csharp
public async Task<bool> CloneDocumentsWithValidationAsync(
    string ownerId, 
    string sourceContainer, 
    string targetContainer)
{
    try
    {
        // Валидация входных параметров
        if (string.IsNullOrWhiteSpace(ownerId) || 
            string.IsNullOrWhiteSpace(sourceContainer) || 
            string.IsNullOrWhiteSpace(targetContainer))
        {
            throw new ArgumentException("Идентификатор владельца, исходный и целевой контейнеры должны быть указаны.");
        }
        
        // Проверка существования целевой коллекции
        var collectionExists = await _targetCollection.Database
            .ListCollectionNames()
            .AnyAsync(name => name == _targetCollection.CollectionNamespace.CollectionName);
        
        if (!collectionExists)
        {
            await _targetCollection.Database.CreateCollectionAsync(_targetCollection.CollectionNamespace.CollectionName);
        }
        
        var pipeline = new List<BsonDocument>
        {
            new BsonDocument("$match", new BsonDocument
            {
                { "OwnerId", ownerId },
                { "ContainerId", sourceContainer },
                { "MyDocId", new BsonDocument("$exists", true) }
            }),
            
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", new BsonDocument("$concat", new BsonArray 
                    {
                        "$OwnerId",
                        "\\",
                        targetContainer,
                        ":",
                        "$MyDocId"
                    }) 
                },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", targetContainer }
            }),
            
            new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
        };
        
        var result = await _sourceCollection.Aggregate<BsonDocument>(pipeline).FirstOrDefaultAsync();
        
        return result != null;
    }
    catch (MongoException ex)
    {
        // Логирование ошибок, специфичных для MongoDB
        Console.WriteLine($"Ошибка MongoDB: {ex.Message}");
        throw;
    }
    catch (Exception ex)
    {
        // Логирование общих ошибок
        Console.WriteLine($"Общая ошибка: {ex.Message}");
        throw;
    }
}

Полный пример решения

Вот полный, готовый к производству пример, который демонстрирует все концепции:

csharp
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class DocumentMigrator
{
    private readonly IMongoDatabase _database;
    private readonly string _sourceCollectionName;
    private readonly string _targetCollectionName;
    
    public DocumentMigrator(IMongoClient client, string databaseName, 
                          string sourceCollectionName, string targetCollectionName)
    {
        _database = client.GetDatabase(databaseName);
        _sourceCollectionName = sourceCollectionName;
        _targetCollectionName = targetCollectionName;
    }
    
    /// <summary>
    /// Клонирует документы из исходного в целевой контейнер с преобразованием ID
    /// </summary>
    public async Task CloneDocumentsAsync(
        string ownerId, 
        string sourceContainer, 
        string targetContainer,
        bool replaceExisting = false)
    {
        var sourceCollection = _database.GetCollection<BsonDocument>(_sourceCollectionName);
        var targetCollection = _database.GetCollection<BsonDocument>(_targetCollectionName);
        
        // Построение конвейера агрегации
        var pipeline = new List<BsonDocument>
        {
            // Фильтрация документов для клонирования
            new BsonDocument("$match", new BsonDocument
            {
                { "OwnerId", ownerId },
                { "ContainerId", sourceContainer },
                { "MyDocId", new BsonDocument("$exists", true) }
            }),
            
            // Преобразование структуры документа
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", new BsonDocument("$concat", new BsonArray 
                    {
                        "$OwnerId",
                        "\\",
                        targetContainer,
                        ":",
                        "$MyDocId"
                    }) 
                },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", targetContainer },
                // Динамическое копирование всех остальных полей
                { "additionalFields", new BsonDocument("$objectToArray", "$$ROOT") }
            }),
            
            // Уплощение дополнительных полей
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", 1 },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", 1 },
                { "additionalFields", new BsonDocument("$filter", new BsonDocument
                    {
                        { "input", "$additionalFields" },
                        { "as", "field" },
                        { "cond", new BsonDocument("$not", new BsonArray 
                            {
                                "$in", new BsonArray { "$$field.k", "_id", "OwnerId", "MyDocId", "ContainerId" }
                            })
                        }
                    })
                }
            }),
            
            // Разворачивание дополнительных полей
            new BsonDocument("$unwind", "$additionalFields"),
            
            // Преобразование полей обратно в документ
            new BsonDocument("$replaceRoot", new BsonDocument
            {
                { "newRoot", new BsonDocument
                    {
                        { "_id", "$_id" },
                        { "OwnerId", "$OwnerId" },
                        { "MyDocId", "$MyDocId" },
                        { "ContainerId", "$ContainerId" },
                        { "$$ROOT.additionalFields.k", "$$ROOT.additionalFields.v" }
                    }
                }
            }),
            
            // Удаление временного поля
            new BsonDocument("$project", new BsonDocument
            {
                { "_id", 1 },
                { "OwnerId", 1 },
                { "MyDocId", 1 },
                { "ContainerId", 1 },
                { "additionalFields", 0 }
            })
        };
        
        // Выбор этапа вывода на основе необходимости замены существующих данных
        if (replaceExisting)
        {
            pipeline.Add(new BsonDocument("$out", _targetCollectionName));
        }
        else
        {
            pipeline.Add(new BsonDocument("$merge", new BsonDocument
            {
                { "into", _targetCollectionName },
                { "on", "_id" },
                { "whenMatched", "replace" },
                { "whenNotMatched", "insert" }
            }));
        }
        
        // Выполнение конвейера
        await sourceCollection.Aggregate<BsonDocument>(pipeline).ToListAsync();
    }
    
    /// <summary>
    /// Проверка успешности преобразования
    /// </summary>
    public async Task<bool> ValidateTransformationAsync(
        string ownerId, 
        string sourceContainer, 
        string targetContainer,
        int sampleSize = 10)
    {
        var sourceCollection = _database.GetCollection<BsonDocument>(_sourceCollectionName);
        var targetCollection = _database.GetCollection<BsonDocument>(_targetCollectionName);
        
        // Получение выборки документов из источника
        var sourceDocs = await sourceCollection
            .Find(new BsonDocument
            {
                { "OwnerId", ownerId },
                { "ContainerId", sourceContainer }
            })
            .Limit(sampleSize)
            .ToListAsync();
        
        foreach (var sourceDoc in sourceDocs)
        {
            var expectedId = $"{sourceDoc["OwnerId"]}\\{targetContainer}:{sourceDoc["MyDocId"]}";
            var targetDoc = await targetCollection
                .Find(new BsonDocument { { "_id", expectedId } })
                .FirstOrDefaultAsync();
            
            if (targetDoc == null)
            {
                return false;
            }
            
            // Проверка соответствия других полей
            foreach (var element in sourceDoc.Elements)
            {
                if (element.Name != "_id" && element.Name != "ContainerId")
                {
                    if (!targetDoc.Contains(element.Name) || 
                        !BsonDocumentComparer.Equal(element.Value, targetDoc[element.Name]))
                    {
                        return false;
                    }
                }
            }
        }
        
        return true;
    }
}

Источники

  1. Конвейер агрегации MongoDB - Руководство по базе данных
  2. Агрегация MongoDB: учебник с примерами и упражнениями | Studio 3T
  3. Обновления с использованием конвейера агрегации - Руководство по базе данных
  4. Конвейер агрегации MongoDB | MongoDB
  5. Операции агрегации - Руководство по базе данных
  6. Как использовать агрегации в MongoDB | DigitalOcean
  7. Фреймворк агрегации MongoDB: как упростить сложную логику в этапы

Заключение

  • Фреймворк агрегации MongoDB обеспечивает эффективное клонирование документов без извлечения данных в память приложения, обрабатывая преобразования на стороне сервера через этапы конвейера
  • Преобразование полей ID достигается с использованием операторов конкатенации строк, таких как $concat, на этапе $project, что позволяет динамически переформировать шаблоны ‘OwnerId\ContainerId:MyDocId’
  • Реализация на C# использует поддержку агрегации драйвера MongoDB для программного построения конвейеров, обеспечивая типобезопасность и производительность для массовых операций
  • Массовые операции оптимизированы с использованием этапов $out или $merge для прямой записи преобразованных документов в целевые коллекции, устраняя необходимость в отдельных операциях вставки
  • Готовые к производству решения должны включать правильную обработку ошибок, валидацию и пакетную обработку для больших наборов данных для обеспечения надежной миграции документов между контейнерами