Реализация сервиса подписки и публикации событий на основе gRPC с внутренней шиной событий.
Проект состоит из двух основных компонентов:
pkg/subpub- внутренняя реализация шины событий (Publisher-Subscriber)internal/service- gRPC сервис, использующий шину событий
+----------------+ +---------------+ +----------------+
| gRPC Server | <---> | PubSubService | <---> | subpub.Bus |
+----------------+ +---------------+ +----------------+
-
Шина событий:
- Поддержка множества подписчиков на один subject
- Асинхронная доставка сообщений (медленные подписчики не блокируют других)
- Гарантированный порядок сообщений (FIFO)
- Graceful shutdown с учетом контекста
- Отсутствие утечек горутин
-
gRPC сервис:
- Подписка на события по ключу (streaming)
- Публикация событий по ключу
- Правильные gRPC статусы ошибок
- Graceful shutdown
- Конфигурирование через YAML
.
├── cmd/ # Клиент и сервер
├── config/ # Конфигурация
├── internal/ # Внутренние компоненты
│ └── service/ # gRPC сервис
├── pkg/ # Переиспользуемые пакеты
│ ├── pb/ # Сгенерированный gRPC код
│ └── subpub/ # Реализация шины событий
└── proto/ # Protobuf схемы
- Go 1.20+
- protoc (для перегенерации proto-файлов)
Пример config.yaml:
server:
address: ":50051"
shutdown_timeout: 10s- Установите зависимости:
go mod download
- Сгенерируйте gRPC код (если меняли proto-файлы):
buf generate
- Запустите сервер:
go run cmd/server/main.go
- Запустите клиент (в другом терминале):
go run cmd/client/main.go
Subscribe(SubscribeRequest) returns (stream Event)- подписка на события по ключуPublish(PublishRequest) returns (Empty)- публикация события по ключу
- Dependency Injection:
- PubSubService принимает subpub.Bus через конструктор
- Позволяет легко тестировать и заменять реализации
- Graceful Shutdown:
- Сервер корректно обрабатывает сигналы завершения
- Дает время на завершение активных соединений
- Принудительно завершает работу по таймауту
- Observer/Publisher-Subscriber:
- Основной паттерн, реализованный через subpub
- Server-Side Streaming:
- Для реализации подписки на события
stream, err := client.Subscribe(ctx, &pb.SubscribeRequest{Key: "test"})
for {
event, err := stream.Recv()
}
_, err := client.Publish(ctx, &pb.PublishRequest{
Key: "test",
Data: "message",
})go test ./...Тесты покрывают:
- Шину событий (subpub)
- gRPC сервис
Используется структурированное логирование с полями:
- Временная метка
- Уровень логирования
- Сообщение
- Контекстные поля

