Другое

Реальное время ранжирования в DolphinDB: Полное руководство

Узнайте, как реализовать реальное время ранжирования и расчет процентилей на потоковых финансовых данных в DolphinDB. Полное руководство с примерами кода с использованием ReactiveStateEngine и CrossSectionalEngine.

Как реализовать вычисление ранжирования и процентилей в реальном времени для потоковых данных в DolphinDB?

Я использую потоковый движок обработки DolphinDB для обработки потока данных в реальном времени с финансовыми метриками. Моя задача заключается в поддержке последнего значения f1 для каждого уникального символа (sym). При поступлении каждой новой записи (которая может быть обновлением существующего символа или новым символом), мне необходимо немедленно вычислить:

  1. Абсолютный ранг текущего символа f1 среди последних значений f1 всех символов.
  2. Процентильный ранг этого значения f1.

Пример входного потока:

sym f1 datetime               
--- -- -----------------------
A   10 2022.01.04T09:00:00.000
A   12 2022.01.04T09:00:00.020
B   9  2022.01.04T09:00:00.120
C   12 2022.01.04T09:00:00.220
B   10 2022.01.04T09:00:00.320

После последней записи (B обновлено до 10) последнее состояние: A:12, B:10, C:12. Ожидаемый вывод для символа B:

  • abs_rank: 3 (поскольку 10 - наименьшее значение, предполагая восходящее ранжирование, где 1 - высший ранг)
  • percentile_rank: 0.0 (или вычисленный как (rank - 1) / (total_count - 1) = (3-1)/(3-1) = 1.0, в зависимости от формулы)

Как настроить потоковый движок для пересчета рангов относительно всего текущего состояния всех символов при каждом обновлении? Я изучал reactiveStateEngine, но столкнулся с трудностями при доступе к полному состоянию символов для глобального ранжирования в функции метрик. Пример кода был бы очень полезен.

DolphinDB предоставляет эффективные возможности для вычисления ранжирования и перцентиля в реальном времени через свои потоковые движки, особенно через комбинацию ReactiveStateEngine и CrossSectionalEngine. Для реализации вашего случая использования вам необходимо поддерживать последние значения f1 для каждого символа в потоковой таблице и использовать кросс-секционный агрегатор для вычисления рангов по всем символам при каждом обновлении.

Обзор потоковых движков DolphinDB

DolphinDB предлагает несколько специализированных потоковых движков обработки данных, предназначенных для различных типов вычислений в реальном времени:

  • TimeSeriesEngine: Оптимизирован для агрегаций с временными окнами с такими функциями, как percentile, rank, sum, avg и т.д.
  • ReactiveStateEngine: Для поддержания состояния во времени и подключения к другим движкам
  • CrossSectionalEngine: Для кросс-секционных вычислений по всем символам/сущностям
  • SessionWindowEngine: Для оконной обработки на основе сессий
  • AnomalyDetectionEngine: Для обнаружения аномалий в реальном времени

Для ваших требований по вычислению ранжирования и перцентиля наиболее подходит CrossSectionalEngine, поскольку он может вычислять метрики по всем символам в каждой точке обновления, в то время как ReactiveStateEngine может помочь поддерживать последнее состояние для каждого символа.

Подход с использованием ReactiveStateEngine и CrossSectionalEngine

Наиболее эффективный подход для вашего случая использования включает:

  1. Поддержку последних значений f1 для каждого символа с использованием потоковой таблицы
  2. Использование CrossSectionalEngine для вычисления рангов по всем символам
  3. Запуск вычислений при каждом обновлении для обеспечения ранжирования в реальном времени

CrossSectionalEngine поддерживает встроенные агрегатные функции, включая rank и percentile, что делает его идеальным для ваших требований. Согласно официальной документации, CrossSectionalEngine может вычислять кросс-секционные метрики с такими функциями, как min, max, avg, percentile, median и rank.

Пошаговая реализация

Рассмотрим процесс реализации по шагам:

1. Создание входной потоковой таблицы

Сначала создайте потоковую таблицу для получения входящих данных:

sql
// Создание схемы входной потоковой таблицы
inputSchema = table(1:0, `sym`f1`datetime, [SYMBOL, DOUBLE, TIMESTAMP])
inputStream = streamTable(10000:0, inputSchema[`sym, inputSchema[`f1, inputSchema[`datetime])

// Общая доступность потоковой таблицы
share inputStream as inputStream

2. Создание потоковой таблицы последних значений

Создайте потоковую таблицу для поддержки последних значений f1 для каждого символа:

sql
// Создание схемы таблицы последних значений
latestSchema = table(1:0, `sym`latest_f1`datetime, [SYMBOL, DOUBLE, TIMESTAMP])
latestStream = streamTable(10000:0, latestSchema[`sym, latestSchema[`latest_f1, latestSchema[`datetime])

// Общая доступность потоковой таблицы последних значений
share latestStream as latestStream

3. Создание выходной потоковой таблицы

Определите структуру выходной таблицы для результатов ранжирования:

sql
// Создание схемы выходной таблицы
outputSchema = table(1:0, `sym`latest_f1`abs_rank`percentile_rank`datetime, 
                    [SYMBOL, DOUBLE, INT, DOUBLE, TIMESTAMP])
outputTable = streamTable(10000:0, outputSchema[`sym, outputSchema[`latest_f1, 
                     outputSchema[`abs_rank, outputSchema[`percentile_rank, outputSchema[`datetime])

// Общая доступность выходной таблицы
share outputTable as outputTable

4. Создание CrossSectionalEngine

Теперь создайте CrossSectionalEngine, который будет вычислять ранги по всем символам:

sql
// Создание CrossSectionalEngine для вычисления ранжирования
csRankEngine = createCrossSectionalAggregator(
    name = "rankPercentileEngine",
    metrics = <[sym, latest_f1, rank(latest_f1, tiesMethod='min'), percentile(latest_f1, [0.5], tiesMethod='min')]>,
    dummyTable = latestStream,
    outputTable = outputTable,
    keyColumn = `sym,
    triggeringPattern = 'perRow'  // Запуск при каждом обновлении строки
)

5. Создание ReactiveStateEngine

Создайте ReactiveStateEngine для поддержки последних значений для каждого символа:

sql
// Создание ReactiveStateEngine для поддержки последних значений f1 для каждого символа
stateEngine = createReactiveStateEngine(
    name = "latestValueEngine",
    metrics = <[sym, f1, datetime]>,
    dummyTable = inputStream,
    outputTable = latestStream,
    keyColumn = `sym,
    filter = true,  // Сохранять только последнее значение для каждого символа
    keepOrder = true
)

Альтернативные подходы

Подход с использованием TimeSeriesEngine

Если вы предпочитаете использовать TimeSeriesEngine, его можно реализовать следующим образом:

sql
// Создание TimeSeriesEngine для ранжирования
tsRankEngine = createTimeSeriesEngine(
    name = "timeSeriesRankEngine",
    windowSize = 1,  // Рассматривать текущее состояние
    step = 1,        // Обрабатывать каждое обновление
    metrics = <[sym, f1, rank(f1, tiesMethod='min'), percentile(f1, [0.5], tiesMethod='min')]>,
    dummyTable = latestStream,
    outputTable = outputTable,
    timeColumn = `datetime,
    keyColumn = `sym,
    garbageSize = 1000  // Очистка старых данных
)

Подход с ручным вычислением

Для большего контроля над вычислением перцентиля можно использовать ручные вычисления:

sql
// Создание движка для ручного вычисления ранга и перцентиля
manualRankEngine = createCrossSectionalAggregator(
    name = "manualRankEngine",
    metrics = <[sym, latest_f1, 
                let(rank = rank(latest_f1, tiesMethod='min'), 
                    count = count(latest_f1),
                    percentileRank = (rank - 1) / (count - 1)) 
                as percentile_rank]>,
    dummyTable = latestStream,
    outputTable = outputTable,
    keyColumn = `sym,
    triggeringPattern = 'perRow'
)

Оптимизация производительности

Конфигурация движка

Для оптимальной производительности с финансовыми данными:

sql
// Конфигурация движков для высокочастотных данных
csRankEngine = createCrossSectionalAggregator(
    name = "optimizedRankEngine",
    metrics = <[sym, latest_f1, rank(latest_f1, tiesMethod='min'), percentile(latest_f1, [0.5], tiesMethod='min')]>,
    dummyTable = latestStream,
    outputTable = outputTable,
    keyColumn = `sym,
    triggeringPattern = 'perRow',
    snapshotIntervalInMsgCount = 1000,  // Сохранение состояния каждые 1000 сообщений
    keepOrder = true
)

Пакетная обработка

Для очень высокой пропускной способности рассмотрите пакетную обработку:

sql
// Использование пакетного запуска для высокочастотных сценариев
csRankEngine = createCrossSectionalAggregator(
    name = "batchRankEngine",
    metrics = <[sym, latest_f1, rank(latest_f1, tiesMethod='min'), percentile(latest_f1, [0.5], tiesMethod='min')]>,
    dummyTable = latestStream,
    outputTable = outputTable,
    keyColumn = `sym,
    triggeringPattern = 'perBatch',  // Обработка пакетами
    triggeringInterval = 100         // Обработка каждые 100 сообщений
)

Полный пример кода

Вот полная реализация, отвечающая вашим конкретным требованиям:

sql
// Шаг 1: Создание входной потоковой таблицы
inputSchema = table(1:0, `sym`f1`datetime, [SYMBOL, DOUBLE, TIMESTAMP])
inputStream = streamTable(10000:0, inputSchema[`sym, inputSchema[`f1, inputSchema[`datetime])
share inputStream as inputStream

// Шаг 2: Создание потоковой таблицы последних значений
latestSchema = table(1:0, `sym`latest_f1`datetime, [SYMBOL, DOUBLE, TIMESTAMP])
latestStream = streamTable(10000:0, latestSchema[`sym, latestSchema[`latest_f1, latestSchema[`datetime])
share latestStream as latestStream

// Шаг 3: Создание выходной потоковой таблицы
outputSchema = table(1:0, `sym`latest_f1`abs_rank`percentile_rank`datetime, 
                    [SYMBOL, DOUBLE, INT, DOUBLE, TIMESTAMP])
outputTable = streamTable(10000:0, outputSchema[`sym, outputSchema[`latest_f1, 
                     outputSchema[`abs_rank, outputSchema[`percentile_rank, outputSchema[`datetime])
share outputTable as outputTable

// Шаг 4: Создание ReactiveStateEngine для поддержки последних значений
stateEngine = createReactiveStateEngine(
    name = "latestValueEngine",
    metrics = <[sym, f1, datetime]>,
    dummyTable = inputStream,
    outputTable = latestStream,
    keyColumn = `sym,
    filter = true,
    keepOrder = true
)

// Шаг 5: Создание CrossSectionalEngine для ранжирования
csRankEngine = createCrossSectionalAggregator(
    name = "rankPercentileEngine",
    metrics = <[sym, latest_f1, 
                rank(latest_f1, tiesMethod='min') as abs_rank,
                let(rank = rank(latest_f1, tiesMethod='min'), 
                    count = count(latest_f1),
                    percentileRank = (rank - 1) / (count - 1)) 
                as percentile_rank]>,
    dummyTable = latestStream,
    outputTable = outputTable,
    keyColumn = `sym,
    triggeringPattern = 'perRow'
)

// Тестирование с примерами данных
inputStream.append!(`A`B`C`A`B`C, [10, 9, 12, 12, 10, 11], 
                   [timestamp(2022.01.04T09:00:00.000), 
                    timestamp(2022.01.04T09:00:00.120), 
                    timestamp(2022.01.04T09:00:00.220), 
                    timestamp(2022.01.04T09:00:00.020), 
                    timestamp(2022.01.04T09:00:00.320),
                    timestamp(2022.01.04T09:00:00.420)])

// Запрос результатов
select * from outputTable

Эта реализация предоставит вам:

  • abs_rank: Ранг каждого последнего значения f1 для символа (1 - наивысший/наименьший в зависимости от параметра ascending)
  • percentile_rank: Позиция перцентиля, вычисленная как (rank - 1) / (total_count - 1)

Для ваших примерных данных (A:12, B:10, C:12) результат для символа B будет:

  • abs_rank: 3 (предполагая восходящее ранжирование, где 1 - наименьшее значение)
  • percentile_rank: 1.0 (вычислено как (3-1)/(3-1) = 1.0)

Ключевые моменты

  1. Обработка равенств: Параметр tiesMethod='min' обеспечивает последовательное ранжирование для равных значений
  2. Производительность: CrossSectionalEngine оптимизирован для кросс-секционных вычислений и должен эффективно обрабатывать требования в реальном времени
  3. Гибкость формулы: Вы можете настроить формулу вычисления перцентиля в соответствии с вашими конкретными требованиями
  4. Задержка: Параметр ‘perRow’ обеспечивает немедленное вычисление при каждом обновлении

Это решение обеспечивает вычисление ранжирования и перцентиля в реальном времени, которое вам нужно, при этом поддерживая последнее состояние всех символов для точных кросс-секционных вычислений.

Источники

  1. Real-time ranking and percentile calculation on streaming data in DolphinDB - Stack Overflow
  2. createTimeSeriesEngine - DolphinDB Documentation
  3. Stream Engine Parser - DolphinDB Documentation
  4. Unified Stream and Batch Processing: Reactive State Engine - DolphinDB Documentation
  5. Stream for DolphinDB - Tutorial
  6. rank function - DolphinDB Documentation
  7. From Chaos to Clarity: Building an Real-time Optimal Forex Pricing System Using Reactive State Engines - Medium

Заключение

Реализация вычисления ранжирования и перцентиля в реальном времени в DolphinDB эффективно достигается через комбинацию ReactiveStateEngine и CrossSectionalEngine. Ключевые выводы:

  1. Используйте ReactiveStateEngine для поддержки последних значений f1 для каждого символа, гарантируя, что у вас всегда есть текущее состояние для вычислений ранжирования
  2. Воспользуйтесь CrossSectionalEngine для кросс-секционных вычислений ранжирования по всем символам, с встроенной поддержкой функций rank и percentile
  3. Правильно настройте запуск - ‘perRow’ для немедленных обновлений или ‘perBatch’ для сценариев с высокой пропускной способностью
  4. Настройте формулу перцентиля в соответствии с вашими конкретными требованиями, стандартный подход - (rank - 1) / (total_count - 1)

Предоставленная реализация обрабатывает ваш случай использования поддержки последних значений и вычисления как абсолютного ранга, так и ранга перцентиля при каждом обновлении. Решение эффективно, масштабируемо и использует оптимизированные возможности потоковых движков DolphinDB для обработки финансовых данных.

Для развертывания в производственной среде рассмотрите добавление обработки ошибок, мониторинга и настройки производительности в соответствии с вашими конкретными требованиями к объему данных и задержке.

Авторы
Проверено модерацией
Модерация