Как реализовать стандартизацию адресов в Databricks PySpark для воспроизведения функциональности этапа Standardize в DataStage для адресов, районов и имен США?
Функциональность этапа стандартизации IBM DataStage для адресов, районов и имен США может быть воспроизведена в Databricks PySpark путем реализации комплексного конвейера стандартизации адресов, который использует как встроенные преобразования PySpark, так и специализированные библиотеки проверки адресов. Этот подход объединяет возможности преобразования, характерные для DataStage, с современной облачной обработкой для достижения аналогичных результатов качества и стандартизации данных.
- Понимание функциональности этапа стандартизации DataStage
- Подход к реализации в PySpark
- Компоненты стандартизации адресов США
- Техники стандартизации районов и имен
- Стратегии оптимизации производительности
- Тестовая среда и валидация
- Рассмотрения миграции и интеграции
Понимание функциональности этапа стандартизации DataStage
Этап стандартизации IBM DataStage предназначен для преобразования нестандартизированных адресных данных в единообразный формат, соответствующий почтовым правилам и организационным стандартам. Исследования показывают, что DataStage широко используется для “сопоставления формата HIPAA с форматом конкретного продукта” и “преобразования клиентских данных из нескольких источников в целевую базу данных ORACLE, ПЛОСКИЕ ФАЙЛЫ”, что указывает на его роль в преобразовании и стандартизации данных.
Этап стандартизации обычно выполняет несколько ключевых функций:
- Разбор адресов (Address Parsing): Разделение необработанных строк адресов на компоненты (улица, город, штат, ZIP)
- Проверка адресов (Address Validation): Проверка адресов по почтовым базам данных
- Стандартизация адресов (Address Standardization): Преобразование адресов в единообразный формат
- Дополнение адресов (Address Enhancement): Добавление недостающей информации, таких как коды ZIP+4
Согласно исследованиям, аналогичную функциональность предоставляют специализированные службы проверки адресов, такие как Precisely, которые предлагают “встроенные, готовые к использованию шаблоны для разбора и стандартизации глобальных адресов”. Эти службы могут быть интегрированы в рабочие процессы PySpark для воспроизведения возможностей стандартизации DataStage.
Подход к реализации в PySpark
Для воспроизведения функциональности этапа стандартизации DataStage в PySpark необходимо создать преобразующий конвейер, который решает те же требования к качеству и стандартизации данных. Исследования упоминают Lakebridge как решение для “модернизации ETL DataStage & Informatica в Databricks” путем “перевода устаревших сопоставлений и преобразований в конвейеры PySpark или Spark SQL Databricks”.
Вот комплексный подход к реализации:
1. Настройка среды и зависимостей
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, regexp_replace, split, when, lit
from pyspark.sql.types import StringType, StructType, StructField
import re
from typing import Optional, Dict, List
# Инициализация Spark сессии
spark = SparkSession.builder \
.appName("AddressStandardization") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
2. Функция стандартизации адресов
def standardize_us_address(raw_address: str) -> Optional[Dict[str, str]]:
"""
Стандартизация адреса США путем разбора и нормализации компонентов
Воспроизводит функциональность этапа стандартизации DataStage
"""
if not raw_address or pd.isna(raw_address):
return None
# Преобразование в верхний регистр и удаление лишних пробелов
address = str(raw_address).upper().strip()
# Инициализация словаря результатов
result = {
'original_address': raw_address,
'standardized_address': '',
'street_number': '',
'street_name': '',
'street_type': '',
'apartment_number': '',
'city': '',
'state': '',
'zip_code': '',
'zip4': '',
'address_quality': '',
'is_valid': False
}
try:
# Разбор компонентов адреса
parsed = _parse_address_components(address)
result.update(parsed)
# Проверка и дополнение адреса
validated = _validate_and_enhance_address(result)
result.update(validated)
# Генерация стандартизированной строки адреса
result['standardized_address'] = _generate_standardized_address(result)
result['is_valid'] = True
except Exception as e:
result['address_quality'] = f'ОШИБКА: {str(e)}'
return result
def _parse_address_components(address: str) -> Dict[str, str]:
"""Разбор адреса на компоненты, аналогично логике разбора DataStage"""
# Удаление специальных символов и нормализация
clean_address = re.sub(r'[^\w\s]', ' ', address)
# Разделение на части
parts = clean_address.split()
# Инициализация компонентов
components = {
'street_number': '',
'street_name': '',
'street_type': '',
'apartment_number': '',
'city': '',
'state': '',
'zip_code': '',
'zip4': ''
}
# Типы улиц США для стандартизации
street_types = {'AVE', 'AVENUE', 'BLVD', 'BOULEVARD', 'CIR', 'CIRCLE',
'CT', 'COURT', 'DR', 'DRIVE', 'LN', 'LANE', 'PKWY', 'PARKWAY',
'PL', 'PLACE', 'RD', 'ROAD', 'ST', 'STREET', 'WAY', 'TRL', 'TRAIL'}
# Коды штатов США
us_states = {'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'}
# Разбор компонентов на основе общих шаблонов
if len(parts) >= 2:
# Первая часть обычно является номером улицы
if parts[0].isdigit():
components['street_number'] = parts[0]
# Поиск типа улицы
for i, part in enumerate(parts[1:], 1):
if part in street_types:
components['street_type'] = part
# Название улицы - все до типа улицы
if i > 1:
components['street_name'] = ' '.join(parts[1:i])
break
# Поиск штата (обычно 2-буквенный код)
for part in parts:
if len(part) == 2 and part in us_states:
components['state'] = part
break
# Поиск почтового индекса (5 цифр или формат 5+4)
for part in parts:
if re.match(r'^\d{5}$', part):
components['zip_code'] = part
elif re.match(r'^\d{5}-\d{4}$', part):
zip_parts = part.split('-')
components['zip_code'] = zip_parts[0]
components['zip4'] = zip_parts[1]
# Город обычно находится перед штатом или после компонентов улицы
# Это упрощенная реализация - в реальной реализации она была бы более сложной
city_parts = []
in_city_section = False
for part in parts:
if part in us_states:
in_city_section = False
elif in_city_section or (not components['street_type'] and not components['state']):
if part not in street_types and not part.isdigit() and len(part) > 2:
city_parts.append(part)
if city_parts:
components['city'] = ' '.join(city_parts)
return components
def _validate_and_enhance_address(address_data: Dict[str, str]) -> Dict[str, str]:
"""Проверка и дополнение компонентов адреса"""
validated = address_data.copy()
# Оценка качества адреса
quality_score = 0
quality_factors = []
# Проверка обязательных компонентов
if validated['street_number']:
quality_score += 20
else:
quality_factors.append('Отсутствует номер улицы')
if validated['street_name']:
quality_score += 20
else:
quality_factors.append('Отсутствует название улицы')
if validated['city']:
quality_score += 15
else:
quality_factors.append('Отсутствует город')
if validated['state']:
quality_score += 15
# Проверка кода штата
if validated['state'] not in {'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'}:
quality_score -= 10
quality_factors.append('Недействительный код штата')
else:
quality_factors.append('Отсутствует штат')
if validated['zip_code']:
quality_score += 20
# Проверка формата ZIP
if not re.match(r'^\d{5}$', validated['zip_code']):
quality_score -= 5
quality_factors.append('Недействительный формат ZIP')
else:
quality_factors.append('Отсутствует почтовый индекс')
if validated['street_type']:
quality_score += 10
# Ограничение оценки качества 100 баллами
validated['address_quality'] = f"{min(quality_score, 100)}% - {', '.join(quality_factors) if quality_factors else 'Хорошо'}"
return validated
def _generate_standardized_address(address_data: Dict[str, str]) -> str:
"""Генерация стандартизированной строки адреса из компонентов"""
parts = []
if address_data['street_number']:
parts.append(address_data['street_number'])
if address_data['street_name']:
parts.append(address_data['street_name'])
if address_data['street_type']:
parts.append(address_data['street_type'])
if address_data['apartment_number']:
parts.append(f"КВ {address_data['apartment_number']}")
address_line = ' '.join(parts) if parts else ''
# Добавление города, штата, ZIP
location_parts = []
if address_data['city']:
location_parts.append(address_data['city'])
if address_data['state']:
location_parts.append(address_data['state'])
if address_data['zip_code']:
if address_data['zip4']:
location_parts.append(f"{address_data['zip_code']}-{address_data['zip4']}")
else:
location_parts.append(address_data['zip_code'])
location_line = ', '.join(location_parts) if location_parts else ''
# Объединение строк
standardized = []
if address_line:
standardized.append(address_line)
if location_line:
standardized.append(location_line)
return '\n'.join(standardized)
Компоненты стандартизации адресов США
Процесс стандартизации адресов США включает несколько ключевых компонентов, которые работают вместе для преобразования необработанных адресных данных в единообразный формат. На основе результатов исследований, особенно от служб проверки адресов, таких как Precisely, мы можем определить основные компоненты:
1. Разбор адресов
Разбор адресов разбивает необработанные строки адресов на их составные части. Это похоже на то, как PostGrid описывает стандартизацию как “выполняемую для того, чтобы ваша почта соответствовала почтовым требованиям”.
# Регистрация UDF для стандартизации адресов
standardize_address_udf = udf(standardize_us_address, StringType())
# Пример использования в PySpark
address_df = spark.createDataFrame([
("123 Main St Apt 4B New York NY 10001",),
("456 OAK AVENUE SAN FRANCISCO CA 94102",),
("789 BROADWAY APT 5 LOS ANGELES CA 90028",)
], ["raw_address"])
# Применение стандартизации
standardized_df = address_df.withColumn(
"standardized_result",
standardize_address_udf(col("raw_address"))
)
# Извлечение компонентов в отдельные столбцы
final_df = standardized_df.select(
col("raw_address"),
col("standardized_result.*")
).drop("original_address")
final_df.show(truncate=False)
2. Проверка адресов
Проверка адресов гарантирует, что каждый компонент адреса действителен и существует в почтовой базе данных. Согласно исследованиям, это включает проверку на “местные почтовые стандарты”, как упоминается в документации API PostGrid.
def validate_address_components(address_data: Dict[str, str]) -> Dict[str, bool]:
"""Проверка отдельных компонентов адреса"""
validation_results = {
'street_valid': False,
'city_valid': False,
'state_valid': False,
'zip_valid': False,
'address_complete': False
}
# Проверка улицы (упрощенная - в реальной реализации использовалась бы почтовая база данных)
if (address_data['street_number'] and
address_data['street_name'] and
address_data['street_type']):
validation_results['street_valid'] = True
# Проверка города (упрощенная)
if address_data['city'] and len(address_data['city']) > 2:
validation_results['city_valid'] = True
# Проверка штата
us_states = {'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'}
if address_data['state'] in us_states:
validation_results['state_valid'] = True
# Проверка почтового индекса
if re.match(r'^\d{5}$', address_data['zip_code']):
validation_results['zip_valid'] = True
# Проверка, является ли адрес полным
validation_results['address_complete'] = all([
validation_results['street_valid'],
validation_results['city_valid'],
validation_results['state_valid'],
validation_results['zip_valid']
])
return validation_results
3. Дополнение адресов
Дополнение адресов добавляет недостающую информацию, такую как коды ZIP+4, информацию о округе и другие почтовые данные. Это соответствует тому, как Spectrum Global Addressing присваивает уникальные идентификаторы и предоставляет богатые метаданные.
def enhance_address_data(address_data: Dict[str, str]) -> Dict[str, str]:
"""Дополнение адреса дополнительными метаданными"""
enhanced = address_data.copy()
# Добавление информации ZIP+4 (упрощенная - в реальной реализации использовался бы почтовый API)
if enhanced['zip_code'] and not enhanced['zip4']:
# Мок-генерация ZIP+4 для демонстрации
zip4 = str(int(enhanced['zip_code']) + 1000).zfill(4)
enhanced['zip4'] = zip4
# Добавление информации об округе (упрощенная)
zip_to_county = {
'10001': 'Округ Нью-Йорк',
'94102': 'Округ Сан-Франциско',
'90028': 'Округ Лос-Анджелес'
}
if enhanced['zip_code'] in zip_to_county:
enhanced['county'] = zip_to_county[enhanced['zip_code']]
else:
enhanced['county'] = 'Неизвестно'
# Добавление классификации типа адреса
if enhanced['street_type'] in {'AVE', 'AVENUE', 'BLVD', 'BOULEVARD'}:
enhanced['address_type'] = 'Основная дорога'
elif enhanced['street_type'] in {'CT', 'COURT', 'CIR', 'CIRCLE'}:
enhanced['address_type'] = 'Жилой'
else:
enhanced['address_type'] = 'Стандартный'
return enhanced
Техники стандартизации районов и имен
Помимо стандартизации адресов, этап стандартизации DataStage также обрабатывает стандартизацию районов и имен. Эти компоненты важны для поддержания согласованности данных в крупных наборах данных.
1. Стандартизация районов
Стандартизация районов включает нормализацию географических районов, регионов и административных делений. Это особенно важно для наборов данных, которые охватывают несколько штатов или регионов.
def standardize_area(area_name: str, area_type: str) -> Dict[str, str]:
"""
Стандартизация названий географических районов
- Названия штатов в аббревиатуры
- Названия округов в стандартный формат
- Метрополитенские области в стандартное обозначение
"""
if not area_name:
return {'standardized_area': '', 'area_code': ''}
area_name = str(area_name).upper().strip()
# Сопоставление названий штатов с аббревиатурами
state_names = {
'ALABAMA': 'AL', 'ALASKA': 'AK', 'ARIZONA': 'AZ', 'ARKANSAS': 'AR',
'CALIFORNIA': 'CA', 'COLORADO': 'CO', 'CONNECTICUT': 'CT', 'DELAWARE': 'DE',
'FLORIDA': 'FL', 'GEORGIA': 'GA', 'HAWAII': 'HI', 'IDAHO': 'ID',
'ILLINOIS': 'IL', 'INDIANA': 'IN', 'IOWA': 'IA', 'KANSAS': 'KS',
'KENTUCKY': 'KY', 'LOUISIANA': 'LA', 'MAINE': 'ME', 'MARYLAND': 'MD',
'MASSACHUSETTS': 'MA', 'MICHIGAN': 'MI', 'MINNESOTA': 'MN', 'MISSISSIPPI': 'MS',
'MISSOURI': 'MO', 'MONTANA': 'MT', 'NEBRASKA': 'NE', 'NEVADA': 'NV',
'NEW HAMPSHIRE': 'NH', 'NEW JERSEY': 'NJ', 'NEW MEXICO': 'NM', 'NEW YORK': 'NY',
'NORTH CAROLINA': 'NC', 'NORTH DAKOTA': 'ND', 'OHIO': 'OH', 'OKLAHOMA': 'OK',
'OREGON': 'OR', 'PENNSYLVANIA': 'PA', 'RHODE ISLAND': 'RI', 'SOUTH CAROLINA': 'SC',
'SOUTH DAKOTA': 'SD', 'TENNESSEE': 'TN', 'TEXAS': 'TX', 'UTAH': 'UT',
'VERMONT': 'VT', 'VIRGINIA': 'VA', 'WASHINGTON': 'WA', 'WEST VIRGINIA': 'WV',
'WISCONSIN': 'WI', 'WYOMING': 'WY'
}
# Распространенные варианты названий округов
county_variations = {
'SAINT': 'ST',
'SAINTS': 'STS',
'SAINT LOUIS': 'ST LOUIS',
'DE KALB': 'DEKALB',
'DU PAGE': 'DUPAGE',
'LA SALLE': 'LASALLE'
}
# Коды метрополитенских областей
metro_areas = {
'NEW YORK': 'NYC',
'LOS ANGELES': 'LA',
'CHICAGO': 'CHI',
'HOUSTON': 'HOU',
'PHOENIX': 'PHX',
'PHILADELPHIA': 'PHL',
'SAN ANTONIO': 'SAT',
'SAN DIEGO': 'SD',
'DALLAS': 'DAL',
'SAN JOSE': 'SJ'
}
result = {'standardized_area': '', 'area_code': ''}
# Проверка, является ли это названием штата
if area_name in state_names:
result['standardized_area'] = state_names[area_name]
result['area_code'] = f'SH_{result["standardized_area"]}'
return result
# Проверка вариантов названий округов
for variation, standard in county_variations.items():
if variation in area_name:
area_name = area_name.replace(variation, standard)
# Проверка метрополитенских областей
for metro_name, metro_code in metro_areas.items():
if metro_name in area_name:
result['standardized_area'] = area_name
result['area_code'] = f'METRO_{metro_code}'
return result
# По умолчанию: возврат исходного (очищенного) названия района
result['standardized_area'] = area_name
result['area_code'] = f'AREA_{area_name.replace(" ", "_")}'
return result
2. Стандартизация имен
Стандартизация имен включает нормализацию личных имен и названий организаций в единообразный формат. Это особенно важно для клиентских данных и управления записями.
def standardize_name(name: str, name_type: str = 'PERSONAL') -> Dict[str, str]:
"""
Стандартизация имен в соответствии с указанным типом
- Личные имена: формат Имя, Отчество, Фамилия
- Названия организаций: юридический формат
"""
if not name:
return {
'standardized_name': '',
'first_name': '',
'middle_name': '',
'last_name': '',
'suffix': '',
'name_quality': 'Пусто'
}
name = str(name).strip()
result = {
'standardized_name': '',
'first_name': '',
'middle_name': '',
'last_name': '',
'suffix': '',
'name_quality': 'Хорошо'
}
if name_type == 'PERSONAL':
# Разбор компонентов личного имени
name_parts = [part.strip() for part in name.split() if part.strip()]
if len(name_parts) == 0:
result['name_quality'] = 'Пусто'
return result
# Распространенные суффиксы
suffixes = {'JR', 'SR', 'II', 'III', 'IV', 'V', 'ESQ', 'PHD', 'MD', 'DDS'}
# Проверка суффикса в конце
if name_parts[-1] in suffixes:
result['suffix'] = name_parts[-1]
name_parts = name_parts[:-1]
if len(name_parts) == 1:
result['first_name'] = name_parts[0]
result['last_name'] = name_parts[0]
elif len(name_parts) == 2:
result['first_name'] = name_parts[0]
result['last_name'] = name_parts[1]
elif len(name_parts) == 3:
result['first_name'] = name_parts[0]
result['middle_name'] = name_parts[1]
result['last_name'] = name_parts[2]
else:
# Более сложные имена - предполагаем, что первая часть - имя,
# последняя часть - фамилия, а средняя - все остальное
result['first_name'] = name_parts[0]
result['middle_name'] = ' '.join(name_parts[1:-1])
result['last_name'] = name_parts[-1]
# Генерация стандартизированного имени
name_components = []
if result['first_name']:
name_components.append(result['first_name'])
if result['middle_name']:
name_components.append(result['middle_name'])
if result['last_name']:
name_components.append(result['last_name'])
if result['suffix']:
name_components.append(result['suffix'])
result['standardized_name'] = ' '.join(name_components)
# Оценка качества
if not result['first_name'] or not result['last_name']:
result['name_quality'] = 'Неполный'
elif name_type == 'ORGANIZATION':
# Стандартизация названий организаций
# Удаление распространенных префиксов/суффиксов
org_prefixes = {'THE', 'A', 'AN', 'INC', 'INCORPORATED', 'CORP', 'CORPORATION',
'LLC', 'LIMITED LIABILITY COMPANY', 'LP', 'LTD', 'LIMITED'}
name_parts = [part.strip().upper() for part in name.split() if part.strip()]
# Удаление распространенных префиксов
while name_parts and name_parts[0] in org_prefixes:
name_parts = name_parts[1:]
# Удаление распространенных суффиксов
while name_parts and name_parts[-1] in org_prefixes:
name_parts = name_parts[:-1]
result['standardized_name'] = ' '.join(name_parts)
result['first_name'] = result['standardized_name'] # Для организации используем как единое поле
return result
Стратегии оптимизации производительности
При реализации стандартизации адресов в PySpark в крупном масштабе оптимизация производительности является критически важной. На основе результатов исследований о PySpark и лучших практиках инженерии данных, вот ключевые стратегии оптимизации:
1. Оптимизация UDF и кэширование
from functools import lru_cache
# Кэширование функций стандартизации для избежания повторных вычислений
@lru_cache(maxsize=1000)
def cached_standardize_address(raw_address: str) -> Optional[Dict[str, str]]:
"""Кэшированная версия стандартизации адреса"""
return standardize_us_address(raw_address)
# Регистрация оптимизированного UDF
optimized_standardize_udf = udf(cached_standardize_address, StringType())
# Пример использования с кэшированием
optimized_df = address_df.withColumn(
"standardized_result",
optimized_standardize_udf(col("raw_address"))
)
2. Партиционирование и параллельная обработка
# Перепартиционирование данных для оптимальной обработки
partitioned_df = address_df.repartition(100, col("state")) if 'state' in address_df.columns else address_df.repartition(100)
# Применение стандартизации параллельно
parallel_standardized_df = partitioned_df.withColumn(
"standardized_result",
optimized_standardize_udf(col("raw_address"))
)
3. Трансляция (Broadcast) для справочных данных
# Создание справочных данных для штатов, городов и т.д.
reference_data = {
'states': [('AL', 'Alabama'), ('AK', 'Alaska'), ('AZ', 'Arizona'), ('AR', 'Arkansas')],
'cities': [('NEW YORK', 'NY'), ('LOS ANGELES', 'CA'), ('CHICAGO', 'IL')]
}
# Создание DataFrame из справочных данных
states_df = spark.createDataFrame(reference_data['states'], ['state_code', 'state_name'])
cities_df = spark.createDataFrame(reference_data['cities'], ['city_name', 'state_code'])
# Кэширование справочных данных для частого доступа
states_df.cache()
cities_df.cache()
# Использование трансляции для небольших справочных наборов данных
from pyspark.sql.functions import broadcast
# Пример использования справочных данных в стандартизации
def enhanced_address_standardization_with_references(raw_address: str, states_df, cities_df):
"""Улучшенная стандартизация с использованием трансляции справочных данных"""
# Реализация использовала бы транслированные данные для лучшей проверки
pass
4. Адаптивное выполнение запросов
# Включение адаптивного выполнения запросов для лучшей производительности
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Настройка параметров памяти для больших наборов данных
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
Тестовая среда и валидация
Чтобы обеспечить точное воспроизведение функциональности этапа стандартизации DataStage в PySpark, необходима комплексная тестовая среда и валидация.
1. Юнит-тестирование для функций стандартизации
import unittest
from pyspark.sql import SparkSession
import sys
import os
# Добавление текущей директории в Python путь для импортов
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
class TestAddressStandardization(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder \
.appName("AddressStandardizationTest") \
.master("local[4]") \
.getOrCreate()
@classmethod
def tearDownClass(cls):
cls.spark.stop()
def test_address_parsing(self):
"""Тестирование функциональности разбора адреса"""
test_cases = [
("123 Main St", {
'street_number': '123',
'street_name': 'MAIN',
'street_type': 'ST',
'city': '',
'state': '',
'zip_code': ''
}),
("456 OAK AVENUE SAN FRANCISCO CA 94102", {
'street_number': '456',
'street_name': 'OAK',
'street_type': 'AVENUE',
'city': 'SAN FRANCISCO',
'state': 'CA',
'zip_code': '94102'
})
]
for raw_address, expected in test_cases:
result = standardize_us_address(raw_address)
self.assertIsNotNone(result)
self.assertEqual(result['street_number'], expected['street_number'])
self.assertEqual(result['street_name'], expected['street_name'])
self.assertEqual(result['street_type'], expected['street_type'])
def test_name_standardization(self):
"""Тестирование функциональности стандартизации имен"""
test_cases = [
("John Doe", {
'first_name': 'JOHN',
'last_name': 'DOE',
'standardized_name': 'JOHN DOE'
}),
("Mary Jane Smith PhD", {
'first_name': 'MARY',
'middle_name': 'JANE',
'last_name': 'SMITH',
'suffix': 'PHD',
'standardized_name': 'MARY JANE SMITH PHD'
})
]
for raw_name, expected in test_cases:
result = standardize_name(raw_name)
self.assertEqual(result['first_name'], expected['first_name'])
self.assertEqual(result['last_name'], expected['last_name'])
if 'suffix' in expected:
self.assertEqual(result['suffix'], expected['suffix'])
def test_area_standardization(self):
"""Тестирование функциональности стандартизации районов"""
test_cases = [
("California", {
'standardized_area': 'CA',
'area_code': 'SH_CA'
}),
("New York", {
'standardized_area': 'NY',
'area_code': 'SH_NY'
})
]
for raw_area, expected in test_cases:
result = standardize_area(raw_area, 'STATE')
self.assertEqual(result['standardized_area'], expected['standardized_area'])
self.assertEqual(result['area_code'], expected['area_code'])
if __name__ == '__main__':
unittest.main()
2. Интеграционное тестирование с PySpark
class TestPySparkIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder \
.appName("PySparkIntegrationTest") \
.master("local[4]") \
.getOrCreate()
def test_end_to_end_pipeline(self):
"""Тестирование полного конвейера стандартизации"""
# Создание тестовых данных
test_data = [
("123 Main St Apt 4B New York NY 10001", "John Smith", "New York"),
("456 OAK AVENUE SAN FRANCISCO CA 94102", "Jane Doe", "California"),
("789 BROADWAY APT 5 LOS ANGELES CA 90028", "Alice Johnson", "Los Angeles")
]
# Создание DataFrame
test_df = self.spark.createDataFrame(test_data, ["address", "name", "area"])
# Применение функций стандартизации
standardized_df = test_df.withColumn(
"address_result",
optimized_standardize_udf(col("address"))
).withColumn(
"name_result",
udf(lambda x: standardize_name(x), StringType())(col("name"))
).withColumn(
"area_result",
udf(lambda x: standardize_area(x, "STATE"), StringType())(col("area"))
)
# Извлечение результатов
result_df = standardized_df.select(
col("address"),
col("address_result.*"),
col("name_result.*"),
col("area_result.*")
).drop("original_address")
# Проверка результатов
results = result_df.collect()
# Проверка первой записи
first_record = results[0]
self.assertEqual(first_record['street_number'], '123')
self.assertEqual(first_record['first_name'], 'JOHN')
self.assertEqual(first_record['standardized_area'], 'NY')
# Подсчет общего количества записей
self.assertEqual(len(results), 3)
def test_performance_benchmark(self):
"""Тестирование производительности с большим набором данных"""
import time
# Создание большего тестового набора данных
large_data = [(f"{i} Main St", f"User {i}", "CA") for i in range(10000)]
large_df = self.spark.createDataFrame(large_data, ["address", "name", "area"])
# Бенчмарк стандартизации
start_time = time.time()
large_standardized_df = large_df.withColumn(
"address_result",
optimized_standardize_udf(col("address"))
)
large_standardized_df.count() # Запуск выполнения
end_time = time.time()
processing_time = end_time - start_time
records_per_second = 10000 / processing_time
print(f"Обработано 10,000 записей за {processing_time:.2f} секунд")
print(f"Записей в секунду: {records_per_second:.0f}")
# Порог производительности (настраивается в зависимости от вашей среды)
self.assertGreater(records_per_second, 1000)
3. Валидация качества данных
def validate_standardization_results(original_df, standardized_df):
"""Валидация качества результатов стандартизации"""
# Объединение исходных и стандартизированных данных
validation_df = original_df.join(
standardized_df,
original_df["raw_address"] == standardized_df["original_address"],
"inner"
)
# Расчет метрик качества
total_records = validation_df.count()
valid_addresses = validation_df.filter(col("is_valid") == True).count()
invalid_addresses = total_records - valid_addresses
# Распределение качества адресов
quality_distribution = validation_df.groupBy("address_quality").count().collect()
# Качество стандартизации имен
name_quality_issues = validation_df.filter(
(col("first_name") == "") | (col("last_name") == "")
).count()
# Вывод отчета валидации
print(f"Отчет валидации стандартизации")
print(f"Всего записей: {total_records}")
print(f"Действительных адресов: {valid_records} ({valid_records/total_records*100:.1f}%)")
print(f"Недействительных адресов: {invalid_records} ({invalid_records/total_records*100:.1f}%)")
print(f"Проблем с качеством имен: {name_quality_issues}")
print(f"Распределение качества:")
for row in quality_distribution:
print(f" {row['address_quality']}: {row['count']}")
return {
'total_records': total_records,
'valid_records': valid_records,
'invalid_records': invalid_records,
'name_quality_issues': name_quality_issues,
'quality_distribution': quality_distribution
}
Рассмотрения миграции и интеграции
При миграции с DataStage на PySpark для стандартизации адресов необходимо учесть несколько аспектов интеграции и миграции. Исследования упоминают Lakebridge как решение для “модернизации ETL DataStage & Informatica в Databricks”, что дает представление об этом процессе миграции.
1. Сопоставление этапа стандартизации DataStage с PySpark
class DataStageToPySparkMigration:
"""
Утилита миграции для сопоставления функциональности этапа стандартизации DataStage
с эквивалентными операциями PySpark
"""
def __init__(self, spark_session):
self.spark = spark_session
self.transformation_mapping = {
'PARSE': self._parse_transformation,
'VALIDATE': self._validate_transformation,
'STANDARDIZE': self._standardize_transformation,
'ENHANCE': self._enhance_transformation
}
def migrate_standardize_stage(self, datastage_config):
"""
Миграция конфигурации этапа стандартизации DataStage в код PySpark
"""
# Извлечение конфигурации из DataStage
input_columns = datastage_config.get('input_columns', [])
output_columns = datastage_config.get('output_columns', [])
transformations = datastage_config.get('transformations', [])
# Генерация кода PySpark
pyspark_code = self._generate_pyspark_code(
input_columns, output_columns, transformations
)
return pyspark_code
def _generate_pyspark_code(self, input_columns, output_columns, transformations):
"""Генерация эквивалентного кода преобразования PySpark"""
code_lines = [
"from pyspark.sql.functions import col, udf",
"from pyspark.sql.types import StringType, StructType",
"",
"# Определение функций стандартизации",
"def standardize_address(raw_address):",
" # Реализация здесь",
" pass",
"",
"# Регистрация UDF",
"standardize_udf = udf(standardize_address, StringType())",
""
]
# Добавление создания входного DataFrame
code_lines.append("# Входной DataFrame (предполагается, что 'input_df' доступен)")
code_lines.append("input_df = ... # Ваш входной DataFrame")
code_lines.append("")
# Добавление преобразующего конвейера
code_lines.append("# Применение преобразований стандартизации")
code_lines.append("transformed_df = input_df")
for transformation in transformations:
trans_type = transformation.get('type')
if trans_type in self.transformation_mapping:
code_lines.extend(
self.transformation_mapping[trans_type](transformation)
)
# Добавление выбора выходных столбцов
code_lines.append("")
code_lines.append("# Выбор выходных столбцов")
if output_columns:
code_lines.append("output_df = transformed_df.select(")
code_lines.extend([f" '{col}'," for col in output_columns])
code_lines.append(")")
else:
code_lines.append("output_df = transformed_df")
return '\n'.join(code_lines)
def _parse_transformation(self, transformation_config):
"""Генерация кода для преобразования разбора адреса"""
return [
" # Разбор адреса",
" parsed_df = transformed_df.withColumn(",
" 'parsed_address',",
" standardize_udf(col('address_column'))",
" )",
""
]
def _validate_transformation(self, transformation_config):
"""Генерация кода для преобразования проверки адреса"""
return [
" # Проверка адреса",
" validated_df = parsed_df.withColumn(",
" 'is_valid',",
" when(col('parsed_address').isNotNull(), True).otherwise(False)",
" )",
""
]
def _standardize_transformation(self, transformation_config):
"""Генерация кода для преобразования стандартизации адреса"""
return [
" # Стандартизация адреса",
" standardized_df = validated_df.withColumn(",
" 'standardized_address',",
" col('parsed_address')['standardized_address']",
" )",
""
]
def _enhance_transformation(self, transformation_config):
"""Генерация кода для преобразования дополнения адреса"""
return [
" # Дополнение адреса",
" enhanced_df = standardized_df.withColumn(",
" 'enhanced_address',",
" # Добавление логики дополнения здесь",
" )",
""
]
2. Интеграция с существующими конвейерами данных
class AddressStandardizationPipeline:
"""
Полный конвейер для стандартизации адресов, который интегрируется
с существующими рабочими процессами инженерии данных
"""
def __init__(self, spark_session):
self.spark = spark_session
def create_standardization_job(self, config):
"""
Создание полного задания стандартизации, которое можно планировать
и интегрировать с существующими конвейерами
"""
job_config = {
'input_path': config.get('input_path'),
'output_path': config.get('output_path'),
'input_format': config.get('input_format', 'parquet'),
'output_format': config.get('output_format', 'parquet'),
'standardization_rules': config.get('standardization_rules', {}),
'error_handling': config.get('error_handling', 'log_and_continue'),
'parallelism': config.get('parallelism', 100)
}
return self._execute_standardization_job(job_config)
def _execute_standardization_job(self, config):
"""Выполнение задания стандартизации"""
try:
# Чтение входных данных
input_df = self._read_input_data(config)
# Применение стандартизации
standardized_df = self._apply_standardization(input_df, config)
# Обработка ошибок (если есть)
processed_df = self._handle_errors(standardized_df, config)
# Запись выходных данных
self._write_output_data(processed_df, config)
return {'status': 'success', 'records_processed': processed_df.count()}
except Exception as e:
return {'status': 'error', 'error_message': str(e)}
def _read_input_data(self, config):
"""Чтение входных данных из указанного пути"""
if config['input_format'] == 'parquet':
return self.spark.read.parquet(config['input_path'])
elif config['input_format'] == 'csv':
return self.spark.read.csv(config['input_path'], header=True)
elif config['input_format'] == 'json':
return self.spark.read.json(config['input_path'])
else:
raise ValueError(f"Неподдерживаемый формат входных данных: {config['input_format']}")
def _apply_standardization(self, df, config):
"""Применение преобразований стандартизации адреса"""
# Регистрация UDF
standardize_udf = udf(
lambda x: standardize_us_address(x),
StringType()
)
name_standardize_udf = udf(
lambda x: standardize_name(x),
StringType()
)
area_standardize_udf = udf(
lambda x: standardize_area(x, "STATE"),
StringType()
)
# Применение преобразований на основе конфигурации
transformed_df = df
# Стандартизация адреса
if 'address_column' in config['standardization_rules']:
address_col = config['standardization_rules']['address_column']
transformed_df = transformed_df.withColumn(
f"{address_col}_standardized",
standardize_udf(col(address_col))
)
# Стандартизация имени
if 'name_column' in config['standardization_rules']:
name_col = config['standardization_rules']['name_column']
transformed_df = transformed_df.withColumn(
f"{name_col}_standardized",
name_standardize_udf(col(name_col))
)
# Стандартизация района
if 'area_column' in config['standardization_rules']:
area_col = config['standardization_rules']['area_column']
transformed_df = transformed_df.withColumn(
f"{area_col}_standardized",
area_standardize_udf(col(area_col))
)
return transformed_df
def _handle_errors(self, df, config):
"""Обработка ошибок во время стандартизации"""
if config['error_handling'] == 'log_and_continue':
# Добавление логики обработки ошибок
return df.withColumn(
'standardization_errors',
when(col('address_standardized').isNull(), 'Стандартизация не удалась').otherwise(None)
)
elif config['error_handling'] == 'fail_fast':
# Проверка ошибок и сбой при их обнаружении
error_count = df.filter(col('address_standardized').isNull()).count()
if error_count > 0:
raise ValueError(f"Обнаружено {error_count} записей с ошибками стандартизации")
return df
else:
return df
def _write_output_data(self, df, config):
"""Запись обработанных данных в выходной путь"""
if config['output_format'] == 'parquet':
df.write.mode('overwrite').parquet(config['output_path'])
elif config['output_format'] == 'csv':
df.write.mode('overwrite').csv(config['output_path'], header=True)
elif config['output_format'] == 'json':
df.write.mode('overwrite').json(config['output_path'])
else:
raise ValueError(f"Неподдерживаемый формат выходных данных: {config['output_format']}")
3. Интеграция мониторинга и логирования
import logging
from datetime import datetime
class StandardizationMonitoring:
"""
Мониторинг и логирование для процессов стандартизации адресов
"""
def __init__(self):
self.logger = self._setup_logging()
def _setup_logging(self):
"""Настройка конфигурации логирования"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('address_standardization.log'),
logging.StreamHandler()
]
)
return logging.getLogger('AddressStandardization')
def log_standardization_start(self, job_id, input_count):
"""Логирование начала процесса стандартизации"""
self.logger.info(
f"Начало задания стандартизации адресов {job_id} "
f"с {input_count} входными записями"
)
def log_standardization_progress(self, processed_count, total_count):
"""Логирование прогресса процесса стандартизации"""
progress_percent = (processed_count / total_count) * 100
self.logger.info(
f"Обработано {processed_count}/{total_count} записей "
f"({progress_percent:.1f}%)"
)
def log_standardization_complete(self, job_id, output_count, errors_count):
"""Логирование завершения процесса стандартизации"""
self.logger.info(
f"Завершение задания стандартизации адресов {job_id} "
f"с {output_count} выходными записями и {errors_count} ошибками"
)
def log_error(self, job_id, error_message, record_data=None):
"""Логирование ошибок во время стандартизации"""
error_log = {
'timestamp': datetime.now().isoformat(),
'job_id': job_id,
'error_message': error_message,
'record_data': record_data
}
self.logger.error(f"Ошибка стандартизации: {error_log}")
Эта комплексная реализация предоставляет полную структуру для воспроизведения функциональности этапа стандартизации DataStage в Databricks PySpark, включая стандартизацию адресов, районов и имен с оптимизацией производительности, тестированием и рассмотрениями миграции.
Источники
- Lakebridge: Модернизация ETL DataStage & Informatica в Databricks
- Проверка и стандартизация адресов - Precisely
- Массовая проверка адресов, пакетная проверка программного обеспечения и инструмента - PostGrid
- Spectrum Global Addressing: Международная проверка и валидация адресов
- API проверки адреса | API службы проверки адресов - USPS API - PostGrid
- Построение масштабируемого конвейера данных PySpark: пошаговый пример
- PySpark для инженеров данных
- Архитектура Apache Spark 101: Как работает Spark (2025)
- Как построить сквозной конвейер инженерии данных и машинного обучения с Apache Spark и PySpark
- 10 концепций PySpark, которые должен освоить каждый средний специалист по данным
Заключение
Реализация стандартизации адресов в Databricks PySpark для воспроизведения функциональности этапа стандартизации DataStage требует комплексного подхода, который объединяет разбор, проверку, стандартизацию и дополнение данных адресов, районов и имен США. Ключевые выводы из этой реализации включают:
-
Модульный дизайн: Разбиение процесса стандартизации на отдельные компоненты (разбор, проверка, стандартизация, дополнение) для поддержания организации кода и облегчения тестирования.
-
Оптимизация UDF: Использование кэширования и эффективных реализаций UDF для обработки адресов в крупном масштабе при поддержке производительности, аналогичной возможностям преобразования DataStage.
-
Интеграция справочных данных: Использование трансляции (broadcast) и кэшированных справочных наборов данных для штатов, городов и почтовых индексов для повышения точности проверки и скорости обработки.
-
Комплексное тестирование: Реализация юнит-тестов, интеграционных тестов и бенчмарков производительности для того, чтобы убедиться, что реализация PySpark соответствует или превосходит надежность и точность DataStage.
-
Стратегия миграции: Использование структурированных утилит миграции для перевода конфигураций этапа стандартизации DataStage в эквивалентный код PySpark, обеспечивая непрерывность в процессе перехода.
-
Мониторинг и логирование: Интеграция комплексных систем мониторинга и логирования для отслеживания качества стандартизации, выявления проблем и поддержания стандартов управления данными.
Реализация демонстрирует, как современная облачная инженерия данных с PySpark может эффективно воспроизводить и улучшать традиционную функциональность ETL, такую как этап стандартизации DataStage, обеспечивая масштабируемость, гибкость и поддерживаемость для рабочих процессов стандартизации адресов в крупном масштабе в средах Databricks.