НейроАгент

Как реализовать стандартизацию DataStage в PySpark

Узнайте, как воспроизвести функциональность этапа стандартизации DataStage в Databricks PySpark для адресов США, областей и имен. Полное руководство по реализации с примерами кода и стратегиями оптимизации.

Как реализовать стандартизацию адресов в Databricks PySpark для воспроизведения функциональности этапа Standardize в DataStage для адресов, районов и имен США?

НейроАгент

Функциональность этапа стандартизации IBM DataStage для адресов, районов и имен США может быть воспроизведена в Databricks PySpark путем реализации комплексного конвейера стандартизации адресов, который использует как встроенные преобразования PySpark, так и специализированные библиотеки проверки адресов. Этот подход объединяет возможности преобразования, характерные для DataStage, с современной облачной обработкой для достижения аналогичных результатов качества и стандартизации данных.

Понимание функциональности этапа стандартизации DataStage

Этап стандартизации IBM DataStage предназначен для преобразования нестандартизированных адресных данных в единообразный формат, соответствующий почтовым правилам и организационным стандартам. Исследования показывают, что DataStage широко используется для “сопоставления формата HIPAA с форматом конкретного продукта” и “преобразования клиентских данных из нескольких источников в целевую базу данных ORACLE, ПЛОСКИЕ ФАЙЛЫ”, что указывает на его роль в преобразовании и стандартизации данных.

Этап стандартизации обычно выполняет несколько ключевых функций:

  • Разбор адресов (Address Parsing): Разделение необработанных строк адресов на компоненты (улица, город, штат, ZIP)
  • Проверка адресов (Address Validation): Проверка адресов по почтовым базам данных
  • Стандартизация адресов (Address Standardization): Преобразование адресов в единообразный формат
  • Дополнение адресов (Address Enhancement): Добавление недостающей информации, таких как коды ZIP+4

Согласно исследованиям, аналогичную функциональность предоставляют специализированные службы проверки адресов, такие как Precisely, которые предлагают “встроенные, готовые к использованию шаблоны для разбора и стандартизации глобальных адресов”. Эти службы могут быть интегрированы в рабочие процессы PySpark для воспроизведения возможностей стандартизации DataStage.

Подход к реализации в PySpark

Для воспроизведения функциональности этапа стандартизации DataStage в PySpark необходимо создать преобразующий конвейер, который решает те же требования к качеству и стандартизации данных. Исследования упоминают Lakebridge как решение для “модернизации ETL DataStage & Informatica в Databricks” путем “перевода устаревших сопоставлений и преобразований в конвейеры PySpark или Spark SQL Databricks”.

Вот комплексный подход к реализации:

1. Настройка среды и зависимостей

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, regexp_replace, split, when, lit
from pyspark.sql.types import StringType, StructType, StructField
import re
from typing import Optional, Dict, List

# Инициализация Spark сессии
spark = SparkSession.builder \
    .appName("AddressStandardization") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

2. Функция стандартизации адресов

python
def standardize_us_address(raw_address: str) -> Optional[Dict[str, str]]:
    """
    Стандартизация адреса США путем разбора и нормализации компонентов
    Воспроизводит функциональность этапа стандартизации DataStage
    """
    if not raw_address or pd.isna(raw_address):
        return None
    
    # Преобразование в верхний регистр и удаление лишних пробелов
    address = str(raw_address).upper().strip()
    
    # Инициализация словаря результатов
    result = {
        'original_address': raw_address,
        'standardized_address': '',
        'street_number': '',
        'street_name': '',
        'street_type': '',
        'apartment_number': '',
        'city': '',
        'state': '',
        'zip_code': '',
        'zip4': '',
        'address_quality': '',
        'is_valid': False
    }
    
    try:
        # Разбор компонентов адреса
        parsed = _parse_address_components(address)
        result.update(parsed)
        
        # Проверка и дополнение адреса
        validated = _validate_and_enhance_address(result)
        result.update(validated)
        
        # Генерация стандартизированной строки адреса
        result['standardized_address'] = _generate_standardized_address(result)
        result['is_valid'] = True
        
    except Exception as e:
        result['address_quality'] = f'ОШИБКА: {str(e)}'
    
    return result

def _parse_address_components(address: str) -> Dict[str, str]:
    """Разбор адреса на компоненты, аналогично логике разбора DataStage"""
    # Удаление специальных символов и нормализация
    clean_address = re.sub(r'[^\w\s]', ' ', address)
    
    # Разделение на части
    parts = clean_address.split()
    
    # Инициализация компонентов
    components = {
        'street_number': '',
        'street_name': '',
        'street_type': '',
        'apartment_number': '',
        'city': '',
        'state': '',
        'zip_code': '',
        'zip4': ''
    }
    
    # Типы улиц США для стандартизации
    street_types = {'AVE', 'AVENUE', 'BLVD', 'BOULEVARD', 'CIR', 'CIRCLE', 
                   'CT', 'COURT', 'DR', 'DRIVE', 'LN', 'LANE', 'PKWY', 'PARKWAY',
                   'PL', 'PLACE', 'RD', 'ROAD', 'ST', 'STREET', 'WAY', 'TRL', 'TRAIL'}
    
    # Коды штатов США
    us_states = {'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
                'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
                'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
                'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
                'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'}
    
    # Разбор компонентов на основе общих шаблонов
    if len(parts) >= 2:
        # Первая часть обычно является номером улицы
        if parts[0].isdigit():
            components['street_number'] = parts[0]
        
        # Поиск типа улицы
        for i, part in enumerate(parts[1:], 1):
            if part in street_types:
                components['street_type'] = part
                # Название улицы - все до типа улицы
                if i > 1:
                    components['street_name'] = ' '.join(parts[1:i])
                break
        
        # Поиск штата (обычно 2-буквенный код)
        for part in parts:
            if len(part) == 2 and part in us_states:
                components['state'] = part
                break
        
        # Поиск почтового индекса (5 цифр или формат 5+4)
        for part in parts:
            if re.match(r'^\d{5}$', part):
                components['zip_code'] = part
            elif re.match(r'^\d{5}-\d{4}$', part):
                zip_parts = part.split('-')
                components['zip_code'] = zip_parts[0]
                components['zip4'] = zip_parts[1]
        
        # Город обычно находится перед штатом или после компонентов улицы
        # Это упрощенная реализация - в реальной реализации она была бы более сложной
        city_parts = []
        in_city_section = False
        
        for part in parts:
            if part in us_states:
                in_city_section = False
            elif in_city_section or (not components['street_type'] and not components['state']):
                if part not in street_types and not part.isdigit() and len(part) > 2:
                    city_parts.append(part)
        
        if city_parts:
            components['city'] = ' '.join(city_parts)
    
    return components

def _validate_and_enhance_address(address_data: Dict[str, str]) -> Dict[str, str]:
    """Проверка и дополнение компонентов адреса"""
    validated = address_data.copy()
    
    # Оценка качества адреса
    quality_score = 0
    quality_factors = []
    
    # Проверка обязательных компонентов
    if validated['street_number']:
        quality_score += 20
    else:
        quality_factors.append('Отсутствует номер улицы')
    
    if validated['street_name']:
        quality_score += 20
    else:
        quality_factors.append('Отсутствует название улицы')
    
    if validated['city']:
        quality_score += 15
    else:
        quality_factors.append('Отсутствует город')
    
    if validated['state']:
        quality_score += 15
        # Проверка кода штата
        if validated['state'] not in {'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
                                     'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
                                     'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
                                     'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
                                     'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'}:
            quality_score -= 10
            quality_factors.append('Недействительный код штата')
    else:
        quality_factors.append('Отсутствует штат')
    
    if validated['zip_code']:
        quality_score += 20
        # Проверка формата ZIP
        if not re.match(r'^\d{5}$', validated['zip_code']):
            quality_score -= 5
            quality_factors.append('Недействительный формат ZIP')
    else:
        quality_factors.append('Отсутствует почтовый индекс')
    
    if validated['street_type']:
        quality_score += 10
    
    # Ограничение оценки качества 100 баллами
    validated['address_quality'] = f"{min(quality_score, 100)}% - {', '.join(quality_factors) if quality_factors else 'Хорошо'}"
    
    return validated

def _generate_standardized_address(address_data: Dict[str, str]) -> str:
    """Генерация стандартизированной строки адреса из компонентов"""
    parts = []
    
    if address_data['street_number']:
        parts.append(address_data['street_number'])
    
    if address_data['street_name']:
        parts.append(address_data['street_name'])
    
    if address_data['street_type']:
        parts.append(address_data['street_type'])
    
    if address_data['apartment_number']:
        parts.append(f"КВ {address_data['apartment_number']}")
    
    address_line = ' '.join(parts) if parts else ''
    
    # Добавление города, штата, ZIP
    location_parts = []
    if address_data['city']:
        location_parts.append(address_data['city'])
    
    if address_data['state']:
        location_parts.append(address_data['state'])
    
    if address_data['zip_code']:
        if address_data['zip4']:
            location_parts.append(f"{address_data['zip_code']}-{address_data['zip4']}")
        else:
            location_parts.append(address_data['zip_code'])
    
    location_line = ', '.join(location_parts) if location_parts else ''
    
    # Объединение строк
    standardized = []
    if address_line:
        standardized.append(address_line)
    if location_line:
        standardized.append(location_line)
    
    return '\n'.join(standardized)

Компоненты стандартизации адресов США

Процесс стандартизации адресов США включает несколько ключевых компонентов, которые работают вместе для преобразования необработанных адресных данных в единообразный формат. На основе результатов исследований, особенно от служб проверки адресов, таких как Precisely, мы можем определить основные компоненты:

1. Разбор адресов

Разбор адресов разбивает необработанные строки адресов на их составные части. Это похоже на то, как PostGrid описывает стандартизацию как “выполняемую для того, чтобы ваша почта соответствовала почтовым требованиям”.

python
# Регистрация UDF для стандартизации адресов
standardize_address_udf = udf(standardize_us_address, StringType())

# Пример использования в PySpark
address_df = spark.createDataFrame([
    ("123 Main St Apt 4B New York NY 10001",),
    ("456 OAK AVENUE SAN FRANCISCO CA 94102",),
    ("789 BROADWAY APT 5 LOS ANGELES CA 90028",)
], ["raw_address"])

# Применение стандартизации
standardized_df = address_df.withColumn(
    "standardized_result", 
    standardize_address_udf(col("raw_address"))
)

# Извлечение компонентов в отдельные столбцы
final_df = standardized_df.select(
    col("raw_address"),
    col("standardized_result.*")
).drop("original_address")

final_df.show(truncate=False)

2. Проверка адресов

Проверка адресов гарантирует, что каждый компонент адреса действителен и существует в почтовой базе данных. Согласно исследованиям, это включает проверку на “местные почтовые стандарты”, как упоминается в документации API PostGrid.

python
def validate_address_components(address_data: Dict[str, str]) -> Dict[str, bool]:
    """Проверка отдельных компонентов адреса"""
    validation_results = {
        'street_valid': False,
        'city_valid': False,
        'state_valid': False,
        'zip_valid': False,
        'address_complete': False
    }
    
    # Проверка улицы (упрощенная - в реальной реализации использовалась бы почтовая база данных)
    if (address_data['street_number'] and 
        address_data['street_name'] and 
        address_data['street_type']):
        validation_results['street_valid'] = True
    
    # Проверка города (упрощенная)
    if address_data['city'] and len(address_data['city']) > 2:
        validation_results['city_valid'] = True
    
    # Проверка штата
    us_states = {'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
                'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
                'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
                'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
                'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'}
    
    if address_data['state'] in us_states:
        validation_results['state_valid'] = True
    
    # Проверка почтового индекса
    if re.match(r'^\d{5}$', address_data['zip_code']):
        validation_results['zip_valid'] = True
    
    # Проверка, является ли адрес полным
    validation_results['address_complete'] = all([
        validation_results['street_valid'],
        validation_results['city_valid'],
        validation_results['state_valid'],
        validation_results['zip_valid']
    ])
    
    return validation_results

3. Дополнение адресов

Дополнение адресов добавляет недостающую информацию, такую как коды ZIP+4, информацию о округе и другие почтовые данные. Это соответствует тому, как Spectrum Global Addressing присваивает уникальные идентификаторы и предоставляет богатые метаданные.

python
def enhance_address_data(address_data: Dict[str, str]) -> Dict[str, str]:
    """Дополнение адреса дополнительными метаданными"""
    enhanced = address_data.copy()
    
    # Добавление информации ZIP+4 (упрощенная - в реальной реализации использовался бы почтовый API)
    if enhanced['zip_code'] and not enhanced['zip4']:
        # Мок-генерация ZIP+4 для демонстрации
        zip4 = str(int(enhanced['zip_code']) + 1000).zfill(4)
        enhanced['zip4'] = zip4
    
    # Добавление информации об округе (упрощенная)
    zip_to_county = {
        '10001': 'Округ Нью-Йорк',
        '94102': 'Округ Сан-Франциско',
        '90028': 'Округ Лос-Анджелес'
    }
    
    if enhanced['zip_code'] in zip_to_county:
        enhanced['county'] = zip_to_county[enhanced['zip_code']]
    else:
        enhanced['county'] = 'Неизвестно'
    
    # Добавление классификации типа адреса
    if enhanced['street_type'] in {'AVE', 'AVENUE', 'BLVD', 'BOULEVARD'}:
        enhanced['address_type'] = 'Основная дорога'
    elif enhanced['street_type'] in {'CT', 'COURT', 'CIR', 'CIRCLE'}:
        enhanced['address_type'] = 'Жилой'
    else:
        enhanced['address_type'] = 'Стандартный'
    
    return enhanced

Техники стандартизации районов и имен

Помимо стандартизации адресов, этап стандартизации DataStage также обрабатывает стандартизацию районов и имен. Эти компоненты важны для поддержания согласованности данных в крупных наборах данных.

1. Стандартизация районов

Стандартизация районов включает нормализацию географических районов, регионов и административных делений. Это особенно важно для наборов данных, которые охватывают несколько штатов или регионов.

python
def standardize_area(area_name: str, area_type: str) -> Dict[str, str]:
    """
    Стандартизация названий географических районов
    - Названия штатов в аббревиатуры
    - Названия округов в стандартный формат
    - Метрополитенские области в стандартное обозначение
    """
    if not area_name:
        return {'standardized_area': '', 'area_code': ''}
    
    area_name = str(area_name).upper().strip()
    
    # Сопоставление названий штатов с аббревиатурами
    state_names = {
        'ALABAMA': 'AL', 'ALASKA': 'AK', 'ARIZONA': 'AZ', 'ARKANSAS': 'AR',
        'CALIFORNIA': 'CA', 'COLORADO': 'CO', 'CONNECTICUT': 'CT', 'DELAWARE': 'DE',
        'FLORIDA': 'FL', 'GEORGIA': 'GA', 'HAWAII': 'HI', 'IDAHO': 'ID',
        'ILLINOIS': 'IL', 'INDIANA': 'IN', 'IOWA': 'IA', 'KANSAS': 'KS',
        'KENTUCKY': 'KY', 'LOUISIANA': 'LA', 'MAINE': 'ME', 'MARYLAND': 'MD',
        'MASSACHUSETTS': 'MA', 'MICHIGAN': 'MI', 'MINNESOTA': 'MN', 'MISSISSIPPI': 'MS',
        'MISSOURI': 'MO', 'MONTANA': 'MT', 'NEBRASKA': 'NE', 'NEVADA': 'NV',
        'NEW HAMPSHIRE': 'NH', 'NEW JERSEY': 'NJ', 'NEW MEXICO': 'NM', 'NEW YORK': 'NY',
        'NORTH CAROLINA': 'NC', 'NORTH DAKOTA': 'ND', 'OHIO': 'OH', 'OKLAHOMA': 'OK',
        'OREGON': 'OR', 'PENNSYLVANIA': 'PA', 'RHODE ISLAND': 'RI', 'SOUTH CAROLINA': 'SC',
        'SOUTH DAKOTA': 'SD', 'TENNESSEE': 'TN', 'TEXAS': 'TX', 'UTAH': 'UT',
        'VERMONT': 'VT', 'VIRGINIA': 'VA', 'WASHINGTON': 'WA', 'WEST VIRGINIA': 'WV',
        'WISCONSIN': 'WI', 'WYOMING': 'WY'
    }
    
    # Распространенные варианты названий округов
    county_variations = {
        'SAINT': 'ST',
        'SAINTS': 'STS',
        'SAINT LOUIS': 'ST LOUIS',
        'DE KALB': 'DEKALB',
        'DU PAGE': 'DUPAGE',
        'LA SALLE': 'LASALLE'
    }
    
    # Коды метрополитенских областей
    metro_areas = {
        'NEW YORK': 'NYC',
        'LOS ANGELES': 'LA',
        'CHICAGO': 'CHI',
        'HOUSTON': 'HOU',
        'PHOENIX': 'PHX',
        'PHILADELPHIA': 'PHL',
        'SAN ANTONIO': 'SAT',
        'SAN DIEGO': 'SD',
        'DALLAS': 'DAL',
        'SAN JOSE': 'SJ'
    }
    
    result = {'standardized_area': '', 'area_code': ''}
    
    # Проверка, является ли это названием штата
    if area_name in state_names:
        result['standardized_area'] = state_names[area_name]
        result['area_code'] = f'SH_{result["standardized_area"]}'
        return result
    
    # Проверка вариантов названий округов
    for variation, standard in county_variations.items():
        if variation in area_name:
            area_name = area_name.replace(variation, standard)
    
    # Проверка метрополитенских областей
    for metro_name, metro_code in metro_areas.items():
        if metro_name in area_name:
            result['standardized_area'] = area_name
            result['area_code'] = f'METRO_{metro_code}'
            return result
    
    # По умолчанию: возврат исходного (очищенного) названия района
    result['standardized_area'] = area_name
    result['area_code'] = f'AREA_{area_name.replace(" ", "_")}'
    
    return result

2. Стандартизация имен

Стандартизация имен включает нормализацию личных имен и названий организаций в единообразный формат. Это особенно важно для клиентских данных и управления записями.

python
def standardize_name(name: str, name_type: str = 'PERSONAL') -> Dict[str, str]:
    """
    Стандартизация имен в соответствии с указанным типом
    - Личные имена: формат Имя, Отчество, Фамилия
    - Названия организаций: юридический формат
    """
    if not name:
        return {
            'standardized_name': '',
            'first_name': '',
            'middle_name': '',
            'last_name': '',
            'suffix': '',
            'name_quality': 'Пусто'
        }
    
    name = str(name).strip()
    
    result = {
        'standardized_name': '',
        'first_name': '',
        'middle_name': '',
        'last_name': '',
        'suffix': '',
        'name_quality': 'Хорошо'
    }
    
    if name_type == 'PERSONAL':
        # Разбор компонентов личного имени
        name_parts = [part.strip() for part in name.split() if part.strip()]
        
        if len(name_parts) == 0:
            result['name_quality'] = 'Пусто'
            return result
        
        # Распространенные суффиксы
        suffixes = {'JR', 'SR', 'II', 'III', 'IV', 'V', 'ESQ', 'PHD', 'MD', 'DDS'}
        
        # Проверка суффикса в конце
        if name_parts[-1] in suffixes:
            result['suffix'] = name_parts[-1]
            name_parts = name_parts[:-1]
        
        if len(name_parts) == 1:
            result['first_name'] = name_parts[0]
            result['last_name'] = name_parts[0]
        elif len(name_parts) == 2:
            result['first_name'] = name_parts[0]
            result['last_name'] = name_parts[1]
        elif len(name_parts) == 3:
            result['first_name'] = name_parts[0]
            result['middle_name'] = name_parts[1]
            result['last_name'] = name_parts[2]
        else:
            # Более сложные имена - предполагаем, что первая часть - имя,
            # последняя часть - фамилия, а средняя - все остальное
            result['first_name'] = name_parts[0]
            result['middle_name'] = ' '.join(name_parts[1:-1])
            result['last_name'] = name_parts[-1]
        
        # Генерация стандартизированного имени
        name_components = []
        if result['first_name']:
            name_components.append(result['first_name'])
        if result['middle_name']:
            name_components.append(result['middle_name'])
        if result['last_name']:
            name_components.append(result['last_name'])
        if result['suffix']:
            name_components.append(result['suffix'])
        
        result['standardized_name'] = ' '.join(name_components)
        
        # Оценка качества
        if not result['first_name'] or not result['last_name']:
            result['name_quality'] = 'Неполный'
    
    elif name_type == 'ORGANIZATION':
        # Стандартизация названий организаций
        # Удаление распространенных префиксов/суффиксов
        org_prefixes = {'THE', 'A', 'AN', 'INC', 'INCORPORATED', 'CORP', 'CORPORATION',
                       'LLC', 'LIMITED LIABILITY COMPANY', 'LP', 'LTD', 'LIMITED'}
        
        name_parts = [part.strip().upper() for part in name.split() if part.strip()]
        
        # Удаление распространенных префиксов
        while name_parts and name_parts[0] in org_prefixes:
            name_parts = name_parts[1:]
        
        # Удаление распространенных суффиксов
        while name_parts and name_parts[-1] in org_prefixes:
            name_parts = name_parts[:-1]
        
        result['standardized_name'] = ' '.join(name_parts)
        result['first_name'] = result['standardized_name']  # Для организации используем как единое поле
    
    return result

Стратегии оптимизации производительности

При реализации стандартизации адресов в PySpark в крупном масштабе оптимизация производительности является критически важной. На основе результатов исследований о PySpark и лучших практиках инженерии данных, вот ключевые стратегии оптимизации:

1. Оптимизация UDF и кэширование

python
from functools import lru_cache

# Кэширование функций стандартизации для избежания повторных вычислений
@lru_cache(maxsize=1000)
def cached_standardize_address(raw_address: str) -> Optional[Dict[str, str]]:
    """Кэшированная версия стандартизации адреса"""
    return standardize_us_address(raw_address)

# Регистрация оптимизированного UDF
optimized_standardize_udf = udf(cached_standardize_address, StringType())

# Пример использования с кэшированием
optimized_df = address_df.withColumn(
    "standardized_result", 
    optimized_standardize_udf(col("raw_address"))
)

2. Партиционирование и параллельная обработка

python
# Перепартиционирование данных для оптимальной обработки
partitioned_df = address_df.repartition(100, col("state")) if 'state' in address_df.columns else address_df.repartition(100)

# Применение стандартизации параллельно
parallel_standardized_df = partitioned_df.withColumn(
    "standardized_result", 
    optimized_standardize_udf(col("raw_address"))
)

3. Трансляция (Broadcast) для справочных данных

python
# Создание справочных данных для штатов, городов и т.д.
reference_data = {
    'states': [('AL', 'Alabama'), ('AK', 'Alaska'), ('AZ', 'Arizona'), ('AR', 'Arkansas')],
    'cities': [('NEW YORK', 'NY'), ('LOS ANGELES', 'CA'), ('CHICAGO', 'IL')]
}

# Создание DataFrame из справочных данных
states_df = spark.createDataFrame(reference_data['states'], ['state_code', 'state_name'])
cities_df = spark.createDataFrame(reference_data['cities'], ['city_name', 'state_code'])

# Кэширование справочных данных для частого доступа
states_df.cache()
cities_df.cache()

# Использование трансляции для небольших справочных наборов данных
from pyspark.sql.functions import broadcast

# Пример использования справочных данных в стандартизации
def enhanced_address_standardization_with_references(raw_address: str, states_df, cities_df):
    """Улучшенная стандартизация с использованием трансляции справочных данных"""
    # Реализация использовала бы транслированные данные для лучшей проверки
    pass

4. Адаптивное выполнение запросов

python
# Включение адаптивного выполнения запросов для лучшей производительности
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Настройка параметров памяти для больших наборов данных
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.sql.shuffle.partitions", "200")

Тестовая среда и валидация

Чтобы обеспечить точное воспроизведение функциональности этапа стандартизации DataStage в PySpark, необходима комплексная тестовая среда и валидация.

1. Юнит-тестирование для функций стандартизации

python
import unittest
from pyspark.sql import SparkSession
import sys
import os

# Добавление текущей директории в Python путь для импортов
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

class TestAddressStandardization(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder \
            .appName("AddressStandardizationTest") \
            .master("local[4]") \
            .getOrCreate()
    
    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()
    
    def test_address_parsing(self):
        """Тестирование функциональности разбора адреса"""
        test_cases = [
            ("123 Main St", {
                'street_number': '123',
                'street_name': 'MAIN',
                'street_type': 'ST',
                'city': '',
                'state': '',
                'zip_code': ''
            }),
            ("456 OAK AVENUE SAN FRANCISCO CA 94102", {
                'street_number': '456',
                'street_name': 'OAK',
                'street_type': 'AVENUE',
                'city': 'SAN FRANCISCO',
                'state': 'CA',
                'zip_code': '94102'
            })
        ]
        
        for raw_address, expected in test_cases:
            result = standardize_us_address(raw_address)
            self.assertIsNotNone(result)
            self.assertEqual(result['street_number'], expected['street_number'])
            self.assertEqual(result['street_name'], expected['street_name'])
            self.assertEqual(result['street_type'], expected['street_type'])
    
    def test_name_standardization(self):
        """Тестирование функциональности стандартизации имен"""
        test_cases = [
            ("John Doe", {
                'first_name': 'JOHN',
                'last_name': 'DOE',
                'standardized_name': 'JOHN DOE'
            }),
            ("Mary Jane Smith PhD", {
                'first_name': 'MARY',
                'middle_name': 'JANE',
                'last_name': 'SMITH',
                'suffix': 'PHD',
                'standardized_name': 'MARY JANE SMITH PHD'
            })
        ]
        
        for raw_name, expected in test_cases:
            result = standardize_name(raw_name)
            self.assertEqual(result['first_name'], expected['first_name'])
            self.assertEqual(result['last_name'], expected['last_name'])
            if 'suffix' in expected:
                self.assertEqual(result['suffix'], expected['suffix'])
    
    def test_area_standardization(self):
        """Тестирование функциональности стандартизации районов"""
        test_cases = [
            ("California", {
                'standardized_area': 'CA',
                'area_code': 'SH_CA'
            }),
            ("New York", {
                'standardized_area': 'NY',
                'area_code': 'SH_NY'
            })
        ]
        
        for raw_area, expected in test_cases:
            result = standardize_area(raw_area, 'STATE')
            self.assertEqual(result['standardized_area'], expected['standardized_area'])
            self.assertEqual(result['area_code'], expected['area_code'])

if __name__ == '__main__':
    unittest.main()

2. Интеграционное тестирование с PySpark

python
class TestPySparkIntegration(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder \
            .appName("PySparkIntegrationTest") \
            .master("local[4]") \
            .getOrCreate()
    
    def test_end_to_end_pipeline(self):
        """Тестирование полного конвейера стандартизации"""
        # Создание тестовых данных
        test_data = [
            ("123 Main St Apt 4B New York NY 10001", "John Smith", "New York"),
            ("456 OAK AVENUE SAN FRANCISCO CA 94102", "Jane Doe", "California"),
            ("789 BROADWAY APT 5 LOS ANGELES CA 90028", "Alice Johnson", "Los Angeles")
        ]
        
        # Создание DataFrame
        test_df = self.spark.createDataFrame(test_data, ["address", "name", "area"])
        
        # Применение функций стандартизации
        standardized_df = test_df.withColumn(
            "address_result", 
            optimized_standardize_udf(col("address"))
        ).withColumn(
            "name_result", 
            udf(lambda x: standardize_name(x), StringType())(col("name"))
        ).withColumn(
            "area_result", 
            udf(lambda x: standardize_area(x, "STATE"), StringType())(col("area"))
        )
        
        # Извлечение результатов
        result_df = standardized_df.select(
            col("address"),
            col("address_result.*"),
            col("name_result.*"),
            col("area_result.*")
        ).drop("original_address")
        
        # Проверка результатов
        results = result_df.collect()
        
        # Проверка первой записи
        first_record = results[0]
        self.assertEqual(first_record['street_number'], '123')
        self.assertEqual(first_record['first_name'], 'JOHN')
        self.assertEqual(first_record['standardized_area'], 'NY')
        
        # Подсчет общего количества записей
        self.assertEqual(len(results), 3)
    
    def test_performance_benchmark(self):
        """Тестирование производительности с большим набором данных"""
        import time
        
        # Создание большего тестового набора данных
        large_data = [(f"{i} Main St", f"User {i}", "CA") for i in range(10000)]
        large_df = self.spark.createDataFrame(large_data, ["address", "name", "area"])
        
        # Бенчмарк стандартизации
        start_time = time.time()
        
        large_standardized_df = large_df.withColumn(
            "address_result", 
            optimized_standardize_udf(col("address"))
        )
        
        large_standardized_df.count()  # Запуск выполнения
        end_time = time.time()
        
        processing_time = end_time - start_time
        records_per_second = 10000 / processing_time
        
        print(f"Обработано 10,000 записей за {processing_time:.2f} секунд")
        print(f"Записей в секунду: {records_per_second:.0f}")
        
        # Порог производительности (настраивается в зависимости от вашей среды)
        self.assertGreater(records_per_second, 1000)

3. Валидация качества данных

python
def validate_standardization_results(original_df, standardized_df):
    """Валидация качества результатов стандартизации"""
    # Объединение исходных и стандартизированных данных
    validation_df = original_df.join(
        standardized_df, 
        original_df["raw_address"] == standardized_df["original_address"],
        "inner"
    )
    
    # Расчет метрик качества
    total_records = validation_df.count()
    valid_addresses = validation_df.filter(col("is_valid") == True).count()
    invalid_addresses = total_records - valid_addresses
    
    # Распределение качества адресов
    quality_distribution = validation_df.groupBy("address_quality").count().collect()
    
    # Качество стандартизации имен
    name_quality_issues = validation_df.filter(
        (col("first_name") == "") | (col("last_name") == "")
    ).count()
    
    # Вывод отчета валидации
    print(f"Отчет валидации стандартизации")
    print(f"Всего записей: {total_records}")
    print(f"Действительных адресов: {valid_records} ({valid_records/total_records*100:.1f}%)")
    print(f"Недействительных адресов: {invalid_records} ({invalid_records/total_records*100:.1f}%)")
    print(f"Проблем с качеством имен: {name_quality_issues}")
    print(f"Распределение качества:")
    for row in quality_distribution:
        print(f"  {row['address_quality']}: {row['count']}")
    
    return {
        'total_records': total_records,
        'valid_records': valid_records,
        'invalid_records': invalid_records,
        'name_quality_issues': name_quality_issues,
        'quality_distribution': quality_distribution
    }

Рассмотрения миграции и интеграции

При миграции с DataStage на PySpark для стандартизации адресов необходимо учесть несколько аспектов интеграции и миграции. Исследования упоминают Lakebridge как решение для “модернизации ETL DataStage & Informatica в Databricks”, что дает представление об этом процессе миграции.

1. Сопоставление этапа стандартизации DataStage с PySpark

python
class DataStageToPySparkMigration:
    """
    Утилита миграции для сопоставления функциональности этапа стандартизации DataStage
    с эквивалентными операциями PySpark
    """
    
    def __init__(self, spark_session):
        self.spark = spark_session
        self.transformation_mapping = {
            'PARSE': self._parse_transformation,
            'VALIDATE': self._validate_transformation,
            'STANDARDIZE': self._standardize_transformation,
            'ENHANCE': self._enhance_transformation
        }
    
    def migrate_standardize_stage(self, datastage_config):
        """
        Миграция конфигурации этапа стандартизации DataStage в код PySpark
        """
        # Извлечение конфигурации из DataStage
        input_columns = datastage_config.get('input_columns', [])
        output_columns = datastage_config.get('output_columns', [])
        transformations = datastage_config.get('transformations', [])
        
        # Генерация кода PySpark
        pyspark_code = self._generate_pyspark_code(
            input_columns, output_columns, transformations
        )
        
        return pyspark_code
    
    def _generate_pyspark_code(self, input_columns, output_columns, transformations):
        """Генерация эквивалентного кода преобразования PySpark"""
        code_lines = [
            "from pyspark.sql.functions import col, udf",
            "from pyspark.sql.types import StringType, StructType",
            "",
            "# Определение функций стандартизации",
            "def standardize_address(raw_address):",
            "    # Реализация здесь",
            "    pass",
            "",
            "# Регистрация UDF",
            "standardize_udf = udf(standardize_address, StringType())",
            ""
        ]
        
        # Добавление создания входного DataFrame
        code_lines.append("# Входной DataFrame (предполагается, что 'input_df' доступен)")
        code_lines.append("input_df = ...  # Ваш входной DataFrame")
        code_lines.append("")
        
        # Добавление преобразующего конвейера
        code_lines.append("# Применение преобразований стандартизации")
        code_lines.append("transformed_df = input_df")
        
        for transformation in transformations:
            trans_type = transformation.get('type')
            if trans_type in self.transformation_mapping:
                code_lines.extend(
                    self.transformation_mapping[trans_type](transformation)
                )
        
        # Добавление выбора выходных столбцов
        code_lines.append("")
        code_lines.append("# Выбор выходных столбцов")
        if output_columns:
            code_lines.append("output_df = transformed_df.select(")
            code_lines.extend([f"    '{col}'," for col in output_columns])
            code_lines.append(")")
        else:
            code_lines.append("output_df = transformed_df")
        
        return '\n'.join(code_lines)
    
    def _parse_transformation(self, transformation_config):
        """Генерация кода для преобразования разбора адреса"""
        return [
            "    # Разбор адреса",
            "    parsed_df = transformed_df.withColumn(",
            "        'parsed_address',",
            "        standardize_udf(col('address_column'))",
            "    )",
            ""
        ]
    
    def _validate_transformation(self, transformation_config):
        """Генерация кода для преобразования проверки адреса"""
        return [
            "    # Проверка адреса",
            "    validated_df = parsed_df.withColumn(",
            "        'is_valid',",
            "        when(col('parsed_address').isNotNull(), True).otherwise(False)",
            "    )",
            ""
        ]
    
    def _standardize_transformation(self, transformation_config):
        """Генерация кода для преобразования стандартизации адреса"""
        return [
            "    # Стандартизация адреса",
            "    standardized_df = validated_df.withColumn(",
            "        'standardized_address',",
            "        col('parsed_address')['standardized_address']",
            "    )",
            ""
        ]
    
    def _enhance_transformation(self, transformation_config):
        """Генерация кода для преобразования дополнения адреса"""
        return [
            "    # Дополнение адреса",
            "    enhanced_df = standardized_df.withColumn(",
            "        'enhanced_address',",
            "        # Добавление логики дополнения здесь",
            "    )",
            ""
        ]

2. Интеграция с существующими конвейерами данных

python
class AddressStandardizationPipeline:
    """
    Полный конвейер для стандартизации адресов, который интегрируется
    с существующими рабочими процессами инженерии данных
    """
    
    def __init__(self, spark_session):
        self.spark = spark_session
    
    def create_standardization_job(self, config):
        """
        Создание полного задания стандартизации, которое можно планировать
        и интегрировать с существующими конвейерами
        """
        job_config = {
            'input_path': config.get('input_path'),
            'output_path': config.get('output_path'),
            'input_format': config.get('input_format', 'parquet'),
            'output_format': config.get('output_format', 'parquet'),
            'standardization_rules': config.get('standardization_rules', {}),
            'error_handling': config.get('error_handling', 'log_and_continue'),
            'parallelism': config.get('parallelism', 100)
        }
        
        return self._execute_standardization_job(job_config)
    
    def _execute_standardization_job(self, config):
        """Выполнение задания стандартизации"""
        try:
            # Чтение входных данных
            input_df = self._read_input_data(config)
            
            # Применение стандартизации
            standardized_df = self._apply_standardization(input_df, config)
            
            # Обработка ошибок (если есть)
            processed_df = self._handle_errors(standardized_df, config)
            
            # Запись выходных данных
            self._write_output_data(processed_df, config)
            
            return {'status': 'success', 'records_processed': processed_df.count()}
            
        except Exception as e:
            return {'status': 'error', 'error_message': str(e)}
    
    def _read_input_data(self, config):
        """Чтение входных данных из указанного пути"""
        if config['input_format'] == 'parquet':
            return self.spark.read.parquet(config['input_path'])
        elif config['input_format'] == 'csv':
            return self.spark.read.csv(config['input_path'], header=True)
        elif config['input_format'] == 'json':
            return self.spark.read.json(config['input_path'])
        else:
            raise ValueError(f"Неподдерживаемый формат входных данных: {config['input_format']}")
    
    def _apply_standardization(self, df, config):
        """Применение преобразований стандартизации адреса"""
        # Регистрация UDF
        standardize_udf = udf(
            lambda x: standardize_us_address(x),
            StringType()
        )
        
        name_standardize_udf = udf(
            lambda x: standardize_name(x),
            StringType()
        )
        
        area_standardize_udf = udf(
            lambda x: standardize_area(x, "STATE"),
            StringType()
        )
        
        # Применение преобразований на основе конфигурации
        transformed_df = df
        
        # Стандартизация адреса
        if 'address_column' in config['standardization_rules']:
            address_col = config['standardization_rules']['address_column']
            transformed_df = transformed_df.withColumn(
                f"{address_col}_standardized",
                standardize_udf(col(address_col))
            )
        
        # Стандартизация имени
        if 'name_column' in config['standardization_rules']:
            name_col = config['standardization_rules']['name_column']
            transformed_df = transformed_df.withColumn(
                f"{name_col}_standardized",
                name_standardize_udf(col(name_col))
            )
        
        # Стандартизация района
        if 'area_column' in config['standardization_rules']:
            area_col = config['standardization_rules']['area_column']
            transformed_df = transformed_df.withColumn(
                f"{area_col}_standardized",
                area_standardize_udf(col(area_col))
            )
        
        return transformed_df
    
    def _handle_errors(self, df, config):
        """Обработка ошибок во время стандартизации"""
        if config['error_handling'] == 'log_and_continue':
            # Добавление логики обработки ошибок
            return df.withColumn(
                'standardization_errors',
                when(col('address_standardized').isNull(), 'Стандартизация не удалась').otherwise(None)
            )
        elif config['error_handling'] == 'fail_fast':
            # Проверка ошибок и сбой при их обнаружении
            error_count = df.filter(col('address_standardized').isNull()).count()
            if error_count > 0:
                raise ValueError(f"Обнаружено {error_count} записей с ошибками стандартизации")
            return df
        else:
            return df
    
    def _write_output_data(self, df, config):
        """Запись обработанных данных в выходной путь"""
        if config['output_format'] == 'parquet':
            df.write.mode('overwrite').parquet(config['output_path'])
        elif config['output_format'] == 'csv':
            df.write.mode('overwrite').csv(config['output_path'], header=True)
        elif config['output_format'] == 'json':
            df.write.mode('overwrite').json(config['output_path'])
        else:
            raise ValueError(f"Неподдерживаемый формат выходных данных: {config['output_format']}")

3. Интеграция мониторинга и логирования

python
import logging
from datetime import datetime

class StandardizationMonitoring:
    """
    Мониторинг и логирование для процессов стандартизации адресов
    """
    
    def __init__(self):
        self.logger = self._setup_logging()
    
    def _setup_logging(self):
        """Настройка конфигурации логирования"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('address_standardization.log'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger('AddressStandardization')
    
    def log_standardization_start(self, job_id, input_count):
        """Логирование начала процесса стандартизации"""
        self.logger.info(
            f"Начало задания стандартизации адресов {job_id} "
            f"с {input_count} входными записями"
        )
    
    def log_standardization_progress(self, processed_count, total_count):
        """Логирование прогресса процесса стандартизации"""
        progress_percent = (processed_count / total_count) * 100
        self.logger.info(
            f"Обработано {processed_count}/{total_count} записей "
            f"({progress_percent:.1f}%)"
        )
    
    def log_standardization_complete(self, job_id, output_count, errors_count):
        """Логирование завершения процесса стандартизации"""
        self.logger.info(
            f"Завершение задания стандартизации адресов {job_id} "
            f"с {output_count} выходными записями и {errors_count} ошибками"
        )
    
    def log_error(self, job_id, error_message, record_data=None):
        """Логирование ошибок во время стандартизации"""
        error_log = {
            'timestamp': datetime.now().isoformat(),
            'job_id': job_id,
            'error_message': error_message,
            'record_data': record_data
        }
        self.logger.error(f"Ошибка стандартизации: {error_log}")

Эта комплексная реализация предоставляет полную структуру для воспроизведения функциональности этапа стандартизации DataStage в Databricks PySpark, включая стандартизацию адресов, районов и имен с оптимизацией производительности, тестированием и рассмотрениями миграции.

Источники

  1. Lakebridge: Модернизация ETL DataStage & Informatica в Databricks
  2. Проверка и стандартизация адресов - Precisely
  3. Массовая проверка адресов, пакетная проверка программного обеспечения и инструмента - PostGrid
  4. Spectrum Global Addressing: Международная проверка и валидация адресов
  5. API проверки адреса | API службы проверки адресов - USPS API - PostGrid
  6. Построение масштабируемого конвейера данных PySpark: пошаговый пример
  7. PySpark для инженеров данных
  8. Архитектура Apache Spark 101: Как работает Spark (2025)
  9. Как построить сквозной конвейер инженерии данных и машинного обучения с Apache Spark и PySpark
  10. 10 концепций PySpark, которые должен освоить каждый средний специалист по данным

Заключение

Реализация стандартизации адресов в Databricks PySpark для воспроизведения функциональности этапа стандартизации DataStage требует комплексного подхода, который объединяет разбор, проверку, стандартизацию и дополнение данных адресов, районов и имен США. Ключевые выводы из этой реализации включают:

  1. Модульный дизайн: Разбиение процесса стандартизации на отдельные компоненты (разбор, проверка, стандартизация, дополнение) для поддержания организации кода и облегчения тестирования.

  2. Оптимизация UDF: Использование кэширования и эффективных реализаций UDF для обработки адресов в крупном масштабе при поддержке производительности, аналогичной возможностям преобразования DataStage.

  3. Интеграция справочных данных: Использование трансляции (broadcast) и кэшированных справочных наборов данных для штатов, городов и почтовых индексов для повышения точности проверки и скорости обработки.

  4. Комплексное тестирование: Реализация юнит-тестов, интеграционных тестов и бенчмарков производительности для того, чтобы убедиться, что реализация PySpark соответствует или превосходит надежность и точность DataStage.

  5. Стратегия миграции: Использование структурированных утилит миграции для перевода конфигураций этапа стандартизации DataStage в эквивалентный код PySpark, обеспечивая непрерывность в процессе перехода.

  6. Мониторинг и логирование: Интеграция комплексных систем мониторинга и логирования для отслеживания качества стандартизации, выявления проблем и поддержания стандартов управления данными.

Реализация демонстрирует, как современная облачная инженерия данных с PySpark может эффективно воспроизводить и улучшать традиционную функциональность ETL, такую как этап стандартизации DataStage, обеспечивая масштабируемость, гибкость и поддерживаемость для рабочих процессов стандартизации адресов в крупном масштабе в средах Databricks.