Как я могу клонировать документы MongoDB из одного контейнера в другой, обновляя поля ID в определенном формате без извлечения их в память приложения? Я использую MongoDB с C# и мне нужно преобразовывать документы с форматом ID ‘OwnerId\ContainerId:MyDocId’ при клонировании из одного ContainerId в другой. Документы уникально идентифицируются комбинацией полей OwnerId, MyDocId и ContainerId. Я хочу использовать фреймворк агрегации MongoDB для достижения этого, подобно операциям конкатенации строк в SQL.
Фреймворк агрегации MongoDB для клонирования и преобразования документов
Фреймворк агрегации MongoDB обеспечивает эффективное клонирование и преобразование документов между контейнерами без извлечения данных в память приложения с помощью этапов конвейера, таких как $match, $project и $set для переформатирования полей ID и массовых операций для повышения производительности. Вы можете достичь преобразования формата ‘OwnerId\ContainerId:MyDocId’, комбинируя операторы конкатенации строк на этапе $project и используя этапы $out или $merge для записи преобразованных документов в целевой контейнер, все в рамках одного конвейера агрегации, выполняемого из C#.
Содержание
- Понимание конвейера агрегации MongoDB
- Стратегия клонирования документов
- Техники преобразования полей ID
- Реализация на C# с драйвером MongoDB
- Массовые операции для повышения производительности
- Обработка ошибок и валидация
- Полный пример решения
Понимание конвейера агрегации MongoDB
Фреймворк агрегации MongoDB предоставляет мощный способ обработки документов через серию этапов, каждый из которых выполняет определенное преобразование данных [источник]. Конвейер агрегации состоит из одного или нескольких этапов, которые последовательно обрабатывают документы, где выходные данные одного этапа становятся входными данными для следующего [источник].
Каждый этап выполняет операцию над входными документами - этапы могут фильтровать документы, группировать их, вычислять значения, преобразовывать поля и многое другое [источник]. Конвейер затем выполняет последовательные преобразования данных до достижения цели, позволяя разбивать сложные операции на более простые, управляемые этапы [источник].
Для клонирования и преобразования документов ключевые этапы включают:
- $match: Фильтрует документы на основе указанных критериев
- $project: Изменяет форму документов, включая, исключая или переименовывая поля
- $set: Добавляет новые поля или обновляет существующие поля в документах
- $addFields: Аналогично $set, добавляет новые поля в документы
- $out: Записывает результаты агрегации в коллекцию
- $merge: Записывает результаты в коллекцию, потенциально комбинируя с существующими данными
“Входными данными конвейера может быть одна коллекция, где другие могут быть объединены позже в конвейере. Конвейер затем выполняет последовательные преобразования данных до достижения цели.” [источник]
Стратегия клонирования документов
Для клонирования документов из одного контейнера в другой с преобразованием полей ID вам нужна стратегия, которая:
- Фильтрует исходные документы с использованием комбинации уникальных идентификаторов (OwnerId, MyDocId, ContainerId)
- Преобразует формат ID из исходного в целевой формат
- Записывает преобразованные документы в целевую коллекцию массово
Ключевое понимание заключается в том, что конвейеры агрегации MongoDB могут выполнять все эти операции в одной операции с базой данных, устраняя необходимость извлечения документов в память приложения [источник].
// Базовая структура конвейера агрегации для клонирования документов
db.sourceCollection.aggregate([
{
$match: {
"OwnerId": "owner123",
"ContainerId": "sourceContainer",
"MyDocId": { $exists: true }
}
},
{
$project: {
// Преобразования полей здесь
_id: 0,
newId: { $concat: ["$$ROOT.OwnerId", "\\$$ROOT.targetContainer", ":", "$$ROOT.MyDocId"] },
// Сохраняем другие поля
"OwnerId": 1,
"MyDocId": 1,
"ContainerId": "$$ROOT.targetContainer",
// Копируем все остальные поля
"field1": 1,
"field2": 1
}
},
{
$out: "targetCollection"
}
])
Техники преобразования полей ID
Основная задача - преобразование формата ID из ‘OwnerId\SourceContainer:MyDocId’ в ‘OwnerId\TargetContainer:MyDocId’. MongoDB предоставляет несколько строковых операторов агрегации для достижения этой цели:
Использование $concat для конкатенации строк
Оператор $concat объединяет несколько строк вместе, аналогично конкатенации строк в SQL [источник]:
{
$project: {
transformedId: {
$concat: [
"$OwnerId",
"\\",
"targetContainer", // Это может быть переменной
":",
"$MyDocId"
]
}
}
}
Использование переменных для преобразования контейнера
Для динамических имен контейнеров можно использовать переменные агрегации и оператор $let:
{
$let: {
vars: {
targetContainer: "newContainerName"
},
in: {
$concat: [
"$$ROOT.OwnerId",
"\\",
"$$targetContainer",
":",
"$$ROOT.MyDocId"
]
}
}
}
Сохранение структуры документа
При клонировании документов необходимо сохранять исходную структуру, обновляя определенные поля:
{
$project: {
// Преобразуем формат ID
_id: { $concat: ["$$ROOT.OwnerId", "\\$$ROOT.targetContainer", ":", "$$ROOT.MyDocId"] },
// Обновляем ContainerId до целевого значения
ContainerId: "$$ROOT.targetContainer",
// Копируем все остальные поля из исходного документа
"OwnerId": 1,
"MyDocId": 1,
// Сохраняем вложенные объекты и массивы
"nested.field": 1,
"arrayField": 1
}
}
Реализация на C# с драйвером MongoDB
Драйвер MongoDB для C# предоставляет всестороннюю поддержку операций агрегации. Вот как реализовать клонирование и преобразование документов:
Настройка конвейера агрегации
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
public class DocumentCloner
{
private readonly IMongoCollection<BsonDocument> _sourceCollection;
private readonly IMongoCollection<BsonDocument> _targetCollection;
public DocumentCloner(IMongoDatabase database, string sourceCollectionName, string targetCollectionName)
{
_sourceCollection = database.GetCollection<BsonDocument>(sourceCollectionName);
_targetCollection = database.GetCollection<BsonDocument>(targetCollectionName);
}
public void CloneDocumentsWithIdTransformation(
string ownerId,
string sourceContainer,
string targetContainer)
{
// Определяем конвейер агрегации
var pipeline = new List<BsonDocument>
{
// Находим документы для клонирования
new BsonDocument("$match", new BsonDocument
{
{ "OwnerId", ownerId },
{ "ContainerId", sourceContainer },
{ "MyDocId", new BsonDocument("$exists", true) }
}),
// Преобразуем структуру документа и формат ID
new BsonDocument("$project", new BsonDocument
{
{ "_id", new BsonDocument("$concat", new BsonArray
{
"$OwnerId",
"\\",
targetContainer,
":",
"$MyDocId"
})
},
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", targetContainer },
// Копируем все остальные поля, кроме _id
{ "otherField", 1 }
}),
// Записываем в целевую коллекцию
new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
};
// Выполняем конвейер агрегации
_sourceCollection.Aggregate<BsonDocument>(pipeline);
}
}
Использование строго типизированных классов
Для лучшей типобезопасности и поддерживаемости:
[BsonIgnoreExtraElements]
public class SourceDocument
{
[BsonId]
[BsonRepresentation(BsonType.String)]
public string Id { get; set; }
public string OwnerId { get; set; }
public string ContainerId { get; set; }
public string MyDocId { get; set; }
public string OtherField { get; set; }
// Другие поля...
}
[BsonIgnoreExtraElements]
public class TargetDocument
{
[BsonId]
[BsonRepresentation(BsonType.String)]
public string Id { get; set; }
public string OwnerId { get; set; }
public string ContainerId { get; set; }
public string MyDocId { get; set; }
public string OtherField { get; set; }
// Другие поля...
}
public class TypedDocumentCloner
{
private readonly IMongoCollection<SourceDocument> _sourceCollection;
private readonly IMongoCollection<TargetDocument> _targetCollection;
public TypedDocumentCloner(IMongoDatabase database, string sourceCollectionName, string targetCollectionName)
{
_sourceCollection = database.GetCollection<SourceDocument>(sourceCollectionName);
_targetCollection = database.GetCollection<TargetDocument>(targetCollectionName);
}
public async Task CloneDocumentsWithIdTransformationAsync(
string ownerId,
string sourceContainer,
string targetContainer)
{
var pipeline = new EmptyPipelineDefinition<SourceDocument>()
.Match(doc => doc.OwnerId == ownerId && doc.ContainerId == sourceContainer && doc.MyDocId != null)
.Project(doc => new TargetDocument
{
Id = $"{doc.OwnerId}\\{targetContainer}:{doc.MyDocId}",
OwnerId = doc.OwnerId,
ContainerId = targetContainer,
MyDocId = doc.MyDocId,
OtherField = doc.OtherField
// Сопоставляем другие поля при необходимости
});
await _targetCollection.InsertManyAsync(
await _sourceCollection.Aggregate(pipeline).ToListAsync());
}
}
Массовые операции для повышения производительности
При работе с большими объемами документов оптимизация производительности становится критически важной. MongoDB предоставляет несколько стратегий для эффективных массовых операций:
Использование этапа $out
Этап $out записывает результаты агрегации в коллекцию. Это очень эффективно, так как операция выполняется на стороне сервера [источник]:
var pipeline = new List<BsonDocument>
{
// Этапы сопоставления и преобразования...
new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
};
_sourceCollection.Aggregate<BsonDocument>(pipeline);
Использование этапа $merge для частичных обновлений
Если нужно объединить с существующими данными вместо замены:
var pipeline = new List<BsonDocument>
{
// Этапы сопоставления и преобразования...
new BsonDocument("$merge", new BsonDocument
{
{ "into", _targetCollection.CollectionNamespace.CollectionName },
{ "on", "_id" },
{ "whenMatched", "replace" },
{ "whenNotMatched", "insert" }
})
};
_sourceCollection.Aggregate<BsonDocument>(pipeline);
Пакетная обработка для больших наборов данных
Для очень больших наборов данных рассмотрите возможность обработки пакетами:
public async Task CloneDocumentsInBatchesAsync(
string ownerId,
string sourceContainer,
string targetContainer,
int batchSize = 1000)
{
var filter = Builders<BsonDocument>.Filter.And(
Builders<BsonDocument>.Filter.Eq("OwnerId", ownerId),
Builders<BsonDocument>.Filter.Eq("ContainerId", sourceContainer),
Builders<BsonDocument>.Filter.Exists("MyDocId")
);
var totalDocuments = await _sourceCollection.CountDocumentsAsync(filter);
var batches = (int)Math.Ceiling(totalDocuments / (double)batchSize);
for (int i = 0; i < batches; i++)
{
var skip = i * batchSize;
var pipeline = new List<BsonDocument>
{
new BsonDocument("$match", filter),
new BsonDocument("$skip", skip),
new BsonDocument("$limit", batchSize),
new BsonDocument("$project", GetProjection(targetContainer)),
new BsonDocument("$out", $"{_targetCollection.CollectionNamespace.CollectionName}_batch_{i}")
};
await _sourceCollection.Aggregate<BsonDocument>(pipeline).ToListAsync();
// При необходимости объединить результаты пакетов в конечную коллекцию
var mergePipeline = new List<BsonDocument>
{
new BsonDocument("$merge", new BsonDocument
{
{ "into", _targetCollection.CollectionNamespace.CollectionName },
{ "on", "_id" },
{ "whenMatched", "replace" },
{ "whenNotMatched", "insert" }
})
};
await _targetCollection.Aggregate<BsonDocument>(mergePipeline).ToListAsync();
}
}
Обработка ошибок и валидация
Реализация правильной обработки ошибок критически важна для производственных сред:
public async Task<bool> CloneDocumentsWithValidationAsync(
string ownerId,
string sourceContainer,
string targetContainer)
{
try
{
// Валидация входных параметров
if (string.IsNullOrWhiteSpace(ownerId) ||
string.IsNullOrWhiteSpace(sourceContainer) ||
string.IsNullOrWhiteSpace(targetContainer))
{
throw new ArgumentException("Идентификатор владельца, исходный и целевой контейнеры должны быть указаны.");
}
// Проверка существования целевой коллекции
var collectionExists = await _targetCollection.Database
.ListCollectionNames()
.AnyAsync(name => name == _targetCollection.CollectionNamespace.CollectionName);
if (!collectionExists)
{
await _targetCollection.Database.CreateCollectionAsync(_targetCollection.CollectionNamespace.CollectionName);
}
var pipeline = new List<BsonDocument>
{
new BsonDocument("$match", new BsonDocument
{
{ "OwnerId", ownerId },
{ "ContainerId", sourceContainer },
{ "MyDocId", new BsonDocument("$exists", true) }
}),
new BsonDocument("$project", new BsonDocument
{
{ "_id", new BsonDocument("$concat", new BsonArray
{
"$OwnerId",
"\\",
targetContainer,
":",
"$MyDocId"
})
},
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", targetContainer }
}),
new BsonDocument("$out", _targetCollection.CollectionNamespace.CollectionName)
};
var result = await _sourceCollection.Aggregate<BsonDocument>(pipeline).FirstOrDefaultAsync();
return result != null;
}
catch (MongoException ex)
{
// Логирование ошибок, специфичных для MongoDB
Console.WriteLine($"Ошибка MongoDB: {ex.Message}");
throw;
}
catch (Exception ex)
{
// Логирование общих ошибок
Console.WriteLine($"Общая ошибка: {ex.Message}");
throw;
}
}
Полный пример решения
Вот полный, готовый к производству пример, который демонстрирует все концепции:
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
public class DocumentMigrator
{
private readonly IMongoDatabase _database;
private readonly string _sourceCollectionName;
private readonly string _targetCollectionName;
public DocumentMigrator(IMongoClient client, string databaseName,
string sourceCollectionName, string targetCollectionName)
{
_database = client.GetDatabase(databaseName);
_sourceCollectionName = sourceCollectionName;
_targetCollectionName = targetCollectionName;
}
/// <summary>
/// Клонирует документы из исходного в целевой контейнер с преобразованием ID
/// </summary>
public async Task CloneDocumentsAsync(
string ownerId,
string sourceContainer,
string targetContainer,
bool replaceExisting = false)
{
var sourceCollection = _database.GetCollection<BsonDocument>(_sourceCollectionName);
var targetCollection = _database.GetCollection<BsonDocument>(_targetCollectionName);
// Построение конвейера агрегации
var pipeline = new List<BsonDocument>
{
// Фильтрация документов для клонирования
new BsonDocument("$match", new BsonDocument
{
{ "OwnerId", ownerId },
{ "ContainerId", sourceContainer },
{ "MyDocId", new BsonDocument("$exists", true) }
}),
// Преобразование структуры документа
new BsonDocument("$project", new BsonDocument
{
{ "_id", new BsonDocument("$concat", new BsonArray
{
"$OwnerId",
"\\",
targetContainer,
":",
"$MyDocId"
})
},
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", targetContainer },
// Динамическое копирование всех остальных полей
{ "additionalFields", new BsonDocument("$objectToArray", "$$ROOT") }
}),
// Уплощение дополнительных полей
new BsonDocument("$project", new BsonDocument
{
{ "_id", 1 },
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", 1 },
{ "additionalFields", new BsonDocument("$filter", new BsonDocument
{
{ "input", "$additionalFields" },
{ "as", "field" },
{ "cond", new BsonDocument("$not", new BsonArray
{
"$in", new BsonArray { "$$field.k", "_id", "OwnerId", "MyDocId", "ContainerId" }
})
}
})
}
}),
// Разворачивание дополнительных полей
new BsonDocument("$unwind", "$additionalFields"),
// Преобразование полей обратно в документ
new BsonDocument("$replaceRoot", new BsonDocument
{
{ "newRoot", new BsonDocument
{
{ "_id", "$_id" },
{ "OwnerId", "$OwnerId" },
{ "MyDocId", "$MyDocId" },
{ "ContainerId", "$ContainerId" },
{ "$$ROOT.additionalFields.k", "$$ROOT.additionalFields.v" }
}
}
}),
// Удаление временного поля
new BsonDocument("$project", new BsonDocument
{
{ "_id", 1 },
{ "OwnerId", 1 },
{ "MyDocId", 1 },
{ "ContainerId", 1 },
{ "additionalFields", 0 }
})
};
// Выбор этапа вывода на основе необходимости замены существующих данных
if (replaceExisting)
{
pipeline.Add(new BsonDocument("$out", _targetCollectionName));
}
else
{
pipeline.Add(new BsonDocument("$merge", new BsonDocument
{
{ "into", _targetCollectionName },
{ "on", "_id" },
{ "whenMatched", "replace" },
{ "whenNotMatched", "insert" }
}));
}
// Выполнение конвейера
await sourceCollection.Aggregate<BsonDocument>(pipeline).ToListAsync();
}
/// <summary>
/// Проверка успешности преобразования
/// </summary>
public async Task<bool> ValidateTransformationAsync(
string ownerId,
string sourceContainer,
string targetContainer,
int sampleSize = 10)
{
var sourceCollection = _database.GetCollection<BsonDocument>(_sourceCollectionName);
var targetCollection = _database.GetCollection<BsonDocument>(_targetCollectionName);
// Получение выборки документов из источника
var sourceDocs = await sourceCollection
.Find(new BsonDocument
{
{ "OwnerId", ownerId },
{ "ContainerId", sourceContainer }
})
.Limit(sampleSize)
.ToListAsync();
foreach (var sourceDoc in sourceDocs)
{
var expectedId = $"{sourceDoc["OwnerId"]}\\{targetContainer}:{sourceDoc["MyDocId"]}";
var targetDoc = await targetCollection
.Find(new BsonDocument { { "_id", expectedId } })
.FirstOrDefaultAsync();
if (targetDoc == null)
{
return false;
}
// Проверка соответствия других полей
foreach (var element in sourceDoc.Elements)
{
if (element.Name != "_id" && element.Name != "ContainerId")
{
if (!targetDoc.Contains(element.Name) ||
!BsonDocumentComparer.Equal(element.Value, targetDoc[element.Name]))
{
return false;
}
}
}
}
return true;
}
}
Источники
- Конвейер агрегации MongoDB - Руководство по базе данных
- Агрегация MongoDB: учебник с примерами и упражнениями | Studio 3T
- Обновления с использованием конвейера агрегации - Руководство по базе данных
- Конвейер агрегации MongoDB | MongoDB
- Операции агрегации - Руководство по базе данных
- Как использовать агрегации в MongoDB | DigitalOcean
- Фреймворк агрегации MongoDB: как упростить сложную логику в этапы
Заключение
- Фреймворк агрегации MongoDB обеспечивает эффективное клонирование документов без извлечения данных в память приложения, обрабатывая преобразования на стороне сервера через этапы конвейера
- Преобразование полей ID достигается с использованием операторов конкатенации строк, таких как
$concat, на этапе$project, что позволяет динамически переформировать шаблоны ‘OwnerId\ContainerId:MyDocId’ - Реализация на C# использует поддержку агрегации драйвера MongoDB для программного построения конвейеров, обеспечивая типобезопасность и производительность для массовых операций
- Массовые операции оптимизированы с использованием этапов
$outили$mergeдля прямой записи преобразованных документов в целевые коллекции, устраняя необходимость в отдельных операциях вставки - Готовые к производству решения должны включать правильную обработку ошибок, валидацию и пакетную обработку для больших наборов данных для обеспечения надежной миграции документов между контейнерами