项目需求很明确:一个支持强一致性事务操作的写入路径,同时服务于两种截然不同的读取场景。其一,是为前端应用提供毫秒级的实时状态同步;其二,是支持基于自然语言的复杂语义搜索。任何单一数据库解决方案在这种混合负载下都会捉襟见肘。
一个常见的错误是试图用单一的全能数据库来满足所有需求。例如,单独使用 CockroachDB,虽然它提供了无与伦比的分布式事务和水平扩展能力,但其全文搜索功能有限,且无法原生支持客户端的实时数据推送。反之,单独使用 Firestore,实时性极佳,但缺乏严格的事务保证和复杂关联查询的能力,不适合作为核心业务的单一事实来源 (Single Source of Truth)。
传统的 CQRS (命令查询职责分离) 模式将读写模型分离,通常使用一个数据库进行写入,另一个数据库进行读取。但这仍然不够。我们的读取需求本身就是多态的:文档模型的实时读取和向量模型的语义搜索。因此,必须将 CQRS 模式推向一个新阶段:一个写入模型,多个异构的、为特定查询场景优化的读取模型。
最终的架构决策是:
- 写入模型 (Command Side): 使用 CockroachDB 作为唯一的事实来源。所有业务逻辑的变更、创建和删除操作都必须通过它,利用其 ACID 事务保证数据的完整性和一致性。
- 读取模型 (Query Side):
- 实时文档模型: Google Firestore。用于存储非规范化的、为特定 UI 视图定制的数据。它的实时监听器能力可以极大地简化客户端开发。
- 语义搜索模型: Weaviate。一个向量数据库,用于存储从 CockroachDB 数据中生成的文本嵌入 (Embeddings),以支持自然语言搜索和相似性推荐。
这个架构的核心挑战在于数据同步。如何可靠、低延迟地将 CockroachDB 中的数据变更传播到 Firestore 和 Weaviate?答案是利用 CockroachDB 的 Change Data Capture (CDC) 功能,即 Changefeed。我们将构建一个独立的 Go 服务,称之为 “Projector”,它订阅 Changefeed,并将数据实时地“投影”到下游的多个读取数据库中。
架构概览
整个数据流可以用下面的图来表示。所有写入请求都由一个 Command Service 处理,该服务与 CockroachDB 交互。Projector 服务独立运行,它不处理任何业务请求,唯一的职责就是数据同步。
graph TD
subgraph "写入路径 (Write Path)"
A[Client/API] --> B(Command Service);
B -->|ACID Transaction| C[CockroachDB];
end
subgraph "数据同步 (Data Projection)"
C --"Changefeed (CDC)"--> D(Projector Service);
end
subgraph "读取路径 (Read Path)"
D -->|Real-time View| E[Firestore];
D -->|Vector Embedding| F[Weaviate];
G[Web/Mobile Client] -->|Real-time Listeners| E;
H[Search Service] -->|Semantic Search| F;
end
在真实项目中,这个架构的稳定性至关重要。Projector 服务的失败或延迟都可能导致数据不一致。因此,它的设计必须考虑容错、幂等性和可观测性。
核心实现:Projector 服务
我们用 Go 语言来实现这个 Projector 服务。它需要处理几件事情:
- 建立到 CockroachDB Changefeed 的持久连接。
- 解析 CDC 事件流。
- 根据事件类型(创建、更新、删除)对 Firestore 和 Weaviate 执行相应的操作。
- 优雅地处理错误和网络中断,并确保至少一次 (at-least-once) 的处理语义。
- 通过记录检查点 (checkpointing) 来支持服务重启后的断点续传。
1. 数据库表结构
假设我们在 CockroachDB 中有一个 products 表,这是我们的事实来源。
-- products.sql
CREATE TABLE products (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name STRING NOT NULL,
description STRING,
tags STRING[],
price DECIMAL(10, 2) NOT NULL,
stock_quantity INT NOT NULL,
updated_at TIMESTAMPTZ DEFAULT now() ON UPDATE now(),
created_at TIMESTAMPTZ DEFAULT now()
);
-- 为 Changefeed 启用 rangefeeds,这是必要的先决条件
ALTER TABLE products SET (kv.rangefeed.enabled = true);
2. Changefeed 创建
我们需要在 CockroachDB 中为 products 表创建一个 Changefeed。注意,我们使用了 WITH resolved 选项,它会定期发送时间戳更新,这对于监控同步延迟非常有用。format=json 和 envelope=key_only 意味着我们只在消息中获取主键,然后回查数据库获取最新状态。这种模式在频繁更新的场景下可以减少 CDC 流量,但会增加回查的延迟和负载。在我们的场景中,我们选择 envelope=wrapped 来获取完整的行数据。
-- changefeed.sql
CREATE CHANGEFEED FOR TABLE products
INTO 'kafka://[broker-address]:9092?topic_name=products_cdc'
WITH
format = json,
envelope = wrapped,
updated,
resolved = '10s';
在生产环境中,直接将 Changefeed 写入 Kafka 是更稳健的做法。它为 Projector 服务提供了一个持久化的缓冲层,解耦了数据库和消费者。但为了简化示例,我们将演示 Go 服务直接连接到 CockroachDB Changefeed 的实验性 SQL 接口。
3. Go Projector 服务代码
下面是 Projector 服务的核心代码。它使用了 database/sql 连接 CockroachDB,官方的 Firestore Go SDK 和 Weaviate Go 客户端。
项目结构:
projector/
├── go.mod
├── go.sum
├── main.go # 程序入口和配置加载
├── projector.go # 核心的 CDC 处理逻辑
└── clients/
├── firestore.go # Firestore 客户端封装
└── weaviate.go # Weaviate 客户端封装
main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/joho/godotenv"
)
func main() {
// 在生产环境中,配置应该来自环境变量或配置服务,而不是 .env 文件
if err := godotenv.Load(); err != nil {
log.Println("Warning: .env file not found, using environment variables")
}
config, err := loadConfig()
if err != nil {
log.Fatalf("FATAL: Failed to load configuration: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 初始化 Projector
p, err := NewProjector(ctx, config)
if err != nil {
log.Fatalf("FATAL: Failed to initialize projector: %v", err)
}
// 监听中断信号以实现优雅退出
go func() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
<-sigchan
log.Println("INFO: Shutdown signal received, closing projector...")
cancel()
}()
log.Println("INFO: Starting projector...")
if err := p.Run(ctx); err != nil {
// 如果 Run 返回错误,意味着发生了不可恢复的错误
log.Fatalf("FATAL: Projector run failed: %v", err)
}
log.Println("INFO: Projector shut down gracefully.")
}
// loadConfig 从环境变量中加载配置
func loadConfig() (Config, error) {
// ... 此处省略具体的配置加载代码 ...
// 需要加载 COCKROACH_DSN, FIRESTORE_PROJECT_ID, WEAVIATE_HOST, WEAVIATE_SCHEME, WEAVIATE_API_KEY 等
// 返回一个 Config struct
return Config{}, nil
}
type Config struct {
CockroachDSN string
FirestoreProjectID string
WeaviateHost string
WeaviateScheme string
WeaviateAPIKey string
// 其他配置...
}
projector.go
这是最核心的部分。它执行 EXPERIMENTAL_SQL CREATE CHANGEFEED 语句,并循环读取结果集。
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
_ "github.com/jackc/pgx/v5/stdlib"
"projector/clients"
)
// CDCEvent 代表从 CockroachDB Changefeed 接收到的事件结构
type CDCEvent struct {
Resolved *string `json:"resolved"` // Resolved timestamp
Payload []struct {
After *Product `json:"after"`
Before *Product `json:"before"`
Key []string `json:"key"` // 主键,通常是 UUID
Topic string `json:"topic"`
} `json:"payload"`
}
// Product 对应 CockroachDB products 表的结构
type Product struct {
ID string `json:"id"`
Name string `json:"name"`
Description *string `json:"description"`
Tags []string `json:"tags"`
Price float64 `json:"price,string"` // JSON 中为字符串
StockQuantity int `json:"stock_quantity"`
UpdatedAt time.Time `json:"updated_at"`
CreatedAt time.Time `json:"created_at"`
}
type Projector struct {
db *sql.DB
firestoreClient *clients.FirestoreClient
weaviateClient *clients.WeaviateClient
// ... 其他依赖
}
func NewProjector(ctx context.Context, cfg Config) (*Projector, error) {
// 连接 CockroachDB
db, err := sql.Open("pgx", cfg.CockroachDSN)
if err != nil {
return nil, fmt.Errorf("failed to connect to CockroachDB: %w", err)
}
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("failed to ping CockroachDB: %w", err)
}
db.SetConnMaxLifetime(time.Minute * 5)
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(5)
// 初始化 Firestore 客户端
fsClient, err := clients.NewFirestoreClient(ctx, cfg.FirestoreProjectID)
if err != nil {
return nil, fmt.Errorf("failed to create Firestore client: %w", err)
}
// 初始化 Weaviate 客户端
wvClient, err := clients.NewWeaviateClient(cfg.WeaviateHost, cfg.WeaviateScheme, cfg.WeaviateAPIKey)
if err != nil {
return nil, fmt.Errorf("failed to create Weaviate client: %w", err)
}
return &Projector{
db: db,
firestoreClient: fsClient,
weaviateClient: wvClient,
}, nil
}
func (p *Projector) Run(ctx context.Context) error {
defer p.db.Close()
defer p.firestoreClient.Close()
// 这里的 SQL 是直接连接 Changefeed 的方式。
// 在生产中,更推荐的模式是 `CREATE CHANGEFEED ... INTO 'kafka://...'`,
// 然后让这个服务作为 Kafka 消费者。
const changefeedQuery = `EXPERIMENTAL_SQL CREATE CHANGEFEED FOR TABLE products WITH format = json, envelope = wrapped`
rows, err := p.db.QueryContext(ctx, changefeedQuery)
if err != nil {
return fmt.Errorf("failed to start changefeed: %w", err)
}
defer rows.Close()
log.Println("INFO: Changefeed connection established. Waiting for events...")
for rows.Next() {
var topic string
var key, value []byte
if err := rows.Scan(&topic, &key, &value); err != nil {
log.Printf("ERROR: Failed to scan changefeed row: %v. Retrying connection might be necessary.", err)
// 简单的重试逻辑,生产环境需要更复杂的 backoff策略
time.Sleep(5 * time.Second)
continue
}
// `value` 为空表示删除事件
if len(value) == 0 {
if err := p.handleDelete(ctx, key); err != nil {
log.Printf("ERROR: Failed to handle delete event for key %s: %v", string(key), err)
}
continue
}
var event Product
if err := json.Unmarshal(value, &event); err != nil {
log.Printf("ERROR: Failed to unmarshal CDC event value: %v. Raw value: %s", err, string(value))
continue
}
if err := p.handleUpsert(ctx, &event); err != nil {
log.Printf("ERROR: Failed to handle upsert event for product %s: %v", event.ID, err)
}
}
if err := rows.Err(); err != nil {
// 如果循环因为 context canceled 退出,这里的 err 会是 context.Canceled
if ctx.Err() != nil {
return nil // 正常关闭
}
return fmt.Errorf("changefeed query loop exited with error: %w", err)
}
return nil
}
// handleDelete 处理删除事件
func (p *Projector) handleDelete(ctx context.Context, key []byte) error {
var primaryKey []string
if err := json.Unmarshal(key, &primaryKey); err != nil {
return fmt.Errorf("failed to unmarshal delete key: %w", err)
}
if len(primaryKey) == 0 {
return fmt.Errorf("empty key received for delete event")
}
productID := primaryKey[0]
log.Printf("INFO: Handling delete for product ID: %s", productID)
// 并行删除,任何一个失败都应记录日志,但不应阻塞另一个
errChan := make(chan error, 2)
go func() {
errChan <- p.firestoreClient.DeleteProduct(ctx, productID)
}()
go func() {
errChan <- p.weaviateClient.DeleteProduct(ctx, productID)
}()
var combinedErr error
for i := 0; i < 2; i++ {
if err := <-errChan; err != nil {
// 在生产中,我们会使用更复杂的错误聚合库
if combinedErr == nil {
combinedErr = err
} else {
combinedErr = fmt.Errorf("%v; %w", combinedErr, err)
}
}
}
return combinedErr
}
// handleUpsert 处理创建和更新事件
func (p *Projector) handleUpsert(ctx context.Context, product *Product) error {
log.Printf("INFO: Handling upsert for product ID: %s, Name: %s", product.ID, product.Name)
// 这里的坑在于,我们需要一个文本嵌入模型来为 Weaviate 生成向量。
// 在真实项目中,这里会调用一个独立的 embedding 服务。
// 为简化,我们假设 Weaviate 配置了 text2vec-transformers 模块自动生成。
textToEmbed := fmt.Sprintf("Product name: %s. Description: %s. Tags: %v", product.Name, *product.Description, product.Tags)
errChan := make(chan error, 2)
go func() {
errChan <- p.firestoreClient.UpsertProduct(ctx, product)
}()
go func() {
// Weaviate 需要的是一个属性 map
properties := map[string]interface{}{
"name": product.Name,
"description": *product.Description,
"price": product.Price,
"stock_quantity": product.StockQuantity,
"source_id": product.ID, // 保存原始 ID 以便关联
"text_for_embedding": textToEmbed,
}
errChan <- p.weaviateClient.UpsertProduct(ctx, product.ID, properties)
}()
var combinedErr error
for i := 0; i < 2; i++ {
if err := <-errChan; err != nil {
if combinedErr == nil {
combinedErr = err
} else {
combinedErr = fmt.Errorf("%v; %w", combinedErr, err)
}
}
}
return combinedErr
}
clients/firestore.go 和 clients/weaviate.go
这两个文件封装了与下游数据库交互的逻辑。它们应该包含具体的 Set/Delete/Update 操作。例如,firestore.go 看起来会是这样:
package clients
import (
"context"
"fmt"
"cloud.google.com/go/firestore"
)
type FirestoreClient struct {
client *firestore.Client
}
func NewFirestoreClient(ctx context.Context, projectID string) (*FirestoreClient, error) {
client, err := firestore.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("firestore.NewClient: %w", err)
}
return &FirestoreClient{client: client}, nil
}
// UpsertProduct 将产品数据写入 Firestore
func (c *FirestoreClient) UpsertProduct(ctx context.Context, product *main.Product) error {
// 在 Firestore 中,我们通常会为了查询而对数据进行反范式化。
// 这里直接使用 product 结构作为示例。
collection := c.client.Collection("products_view")
_, err := collection.Doc(product.ID).Set(ctx, product)
return err
}
// DeleteProduct 从 Firestore 删除产品
func (c *FirestoreClient) DeleteProduct(ctx context.Context, productID string) error {
_, err := c.client.Collection("products_view").Doc(productID).Delete(ctx)
return err
}
func (c *FirestoreClient) Close() {
c.client.Close()
}
weaviate.go 的实现会类似,使用 Weaviate 的 Go 客户端来执行 upsert 和 delete 操作。一个关键点是 Weaviate 中的对象 ID 最好使用 CockroachDB 的 UUID,以保持一致性,方便删除和更新。
架构的扩展性与局限性
这种架构的优势是显而易见的。写入路径和读取路径被完全解耦,可以独立扩展。如果写入量增大,可以为 CockroachDB 增加节点。如果实时读取的用户增多,Firestore 可以自动处理。如果语义搜索请求量大,可以为 Weaviate 增加副本。Projector 服务本身是无状态的,也可以水平扩展(当使用 Kafka 作为中间层并按 key 分区时)。
然而,这个架构也引入了它自身的复杂性。
最终一致性延迟: 从数据写入 CockroachDB 到它出现在 Firestore 和 Weaviate 之间,存在一个可观测的延迟。这个延迟由 Changefeed 的
resolved间隔、Projector 的处理速度以及下游数据库的写入性能共同决定。对于某些对一致性要求极高的场景,需要在客户端层面处理这种延迟,例如在写入后短时间内从 CockroachDB 直接读取,或者在 UI 上明确告知用户“数据正在同步中”。操作复杂性: 维护三个不同的数据库系统外加一个自定义的同步服务,对运维团队提出了更高的要求。你需要为每个组件建立独立的监控、告警和备份恢复策略。Projector 服务的日志和指标(如处理延迟、错误率、CDC 事件队列长度)是诊断问题的生命线。
Schema 演进: 当 CockroachDB 中的
products表结构发生变化时(例如增加一个字段),需要同步更新 Projector 服务中的Product结构体,以及 Firestore 和 Weaviate 中的数据模型。这个过程需要仔细协调,以避免数据丢失或处理错误。一个常见的错误是只更新了应用代码,却忘了更新数据管道。回填 (Backfilling) 问题: 当为已有的海量数据表启用这个架构,或者增加一个新的读取模型时,需要一个“回填”机制。即,需要一次性地将 CockroachDB 中的存量数据全部转换并加载到新的读取模型中。这通常通过一个独立的批处理任务完成,在任务执行期间,CDC 流需要被正确处理以避免数据覆盖。
这个架构并非万能药。它适用于那些写入一致性要求高,且读取模式复杂多样的系统。对于简单的 CRUD 应用,它的复杂性可能会超过其带来的好处。但在处理复杂的、面向未来的系统时,这种拥抱多态持久化和 CQRS 的思想,是构建高伸缩性、高性能服务的关键所在。