Event Flow описывает асинхронную коммуникацию между микросервисами через Apache Kafka. Используется для операций, которые не требуют немедленного ответа, обеспечения eventual consistency и интеграции независимых сервисов.
Архитектура
Producer] Credential[Credential Service
Producer] Auth[Auth Service
Producer] end subgraph "Event Bus (Kafka)" TopicUser[identity.user.events
3 partitions] TopicIdentifier[identity.identifier.events
3 partitions] TopicPassword[credential.password.events
3 partitions] end subgraph "Consumers" Herald[Herald Service
Notifications] Account[Account Service
Accounts] Analytics[Analytics Pipeline
Data Warehouse] end subgraph "Transactional Outbox" OutboxIdentity[(outbox_events
Identity DB)] OutboxCredential[(outbox_events
Credential DB)] WorkerIdentity[Outbox Worker
Identity] WorkerCredential[Outbox Worker
Credential] end Identity -->|INSERT in tx| OutboxIdentity WorkerIdentity -->|Poll & Publish| OutboxIdentity WorkerIdentity -->|PUBLISH| TopicUser WorkerIdentity -->|PUBLISH| TopicIdentifier Credential -->|INSERT in tx| OutboxCredential WorkerCredential -->|Poll & Publish| OutboxCredential WorkerCredential -->|PUBLISH| TopicPassword TopicUser -.->|CONSUME| Herald TopicUser -.->|CONSUME| Account TopicUser -.->|CONSUME| Analytics TopicIdentifier -.->|CONSUME| Herald TopicPassword -.->|CONSUME| Herald style TopicUser fill:#FF6B6B style TopicIdentifier fill:#FF6B6B style TopicPassword fill:#FF6B6B
Transactional Outbox Pattern
Гарантирует at-least-once доставку событий без distributed transactions между БД и Kafka.
Архитектура Outbox
(topic, aggregate_id, event_type, payload) UoW->>PostgreSQL: COMMIT Note over Worker: Background process
polling every 100ms Worker->>OutboxTable: SELECT * FROM outbox_events
WHERE status = 'PENDING'
LIMIT 100 FOR UPDATE SKIP LOCKED OutboxTable-->>Worker: [event1, event2, ...] Worker->>Kafka: PUBLISH event1 Kafka-->>Worker: ACK Worker->>OutboxTable: UPDATE SET status='PUBLISHED' Worker->>Kafka: PUBLISH event2 Kafka-->>Worker: ACK Worker->>OutboxTable: UPDATE SET status='PUBLISHED' Kafka->>Consumer: DELIVER events Consumer->>Consumer: Process events Consumer->>Kafka: COMMIT offset
Outbox Event Schema
Database Table:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | |
Python Model:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
Use Case Integration
Пример: CreateUserUseCase:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | |
Outbox Worker
Background job (запускается в отдельном процессе):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | |
Worker Features:
- Distributed locking: SELECT FOR UPDATE SKIP LOCKED — каждый worker берет свою порцию событий
- Bulk operations: Batch INSERT/UPDATE для производительности
- Retry with backoff: Exponential backoff для failed events
- Idempotency: Kafka producer с enable.idempotence=true
- Monitoring: Prometheus metrics для latency, throughput, errors
Event Topics
identity.user.events
Event Types:
1 2 3 4 5 | |
Event Schema (user.created):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
identity.identifier.events
Event Types:
1 2 3 4 | |
Event Schema (identifier.verified):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
credential.password.events
Event Types:
1 2 3 4 | |
Event Schema (password.changed):
1 2 3 4 5 6 7 8 9 10 11 12 | |
Security Note: Никогда не публикуем пароли или хеши паролей в события!
Event Consumers
Herald Service (Notifications)
Subscriptions:
- identity.user.events → Отправка welcome email при регистрации
- identity.identifier.events → Отправка verification codes
- credential.password.events → Уведомления о смене пароля
Consumer Implementation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | |
Account Service
Subscriptions:
- identity.user.events → Создание Account при регистрации User
Consumer Pattern (transactional):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | |
Idempotency Table:
1 2 3 4 5 6 7 8 | |
Event Schema Evolution
Versioning Strategy
Backward Compatible Changes (allowed): - Добавление новых опциональных полей - Добавление новых event types
1 2 3 4 5 6 7 8 9 | |
Breaking Changes (требуют новой версии topic): - Удаление полей - Изменение типов полей - Изменение формата значений
Решение: Создать новый topic с версией
1 2 3 4 5 6 7 8 | |
Monitoring & Observability
Metrics
Producer Metrics:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Consumer Metrics:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
Tracing
OpenTelemetry integration:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Dead Letter Queue (DLQ)
Pattern для failed events:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Best Practices
✅ DO
- Используй Transactional Outbox для критичных событий
- Версионируй события (добавляй
schema_versionв payload) - Реализуй idempotency на стороне consumer
- Логируй все события с trace_id для debugging
- Монитор consumer lag — алертируй при lag > threshold
- Используй Dead Letter Queue для permanently failed events
❌ DON'T
- НЕ публикуй PII в события без шифрования
- НЕ публикуй пароли/секреты даже в зашифрованном виде
- НЕ делай breaking changes без миграционного плана
- НЕ блокируй business logic ожиданием Kafka publish
- НЕ используй Kafka для request/response паттернов (используй gRPC)
Связанные страницы
- Online Data Flow — синхронные потоки данных
- Event Design Principles — принципы дизайна событий
- Backend / Event-Driven Patterns — паттерны реализации