构建基于 CockroachDB Firestore 与 Weaviate 的多态读模型 CQRS 架构


项目需求很明确:一个支持强一致性事务操作的写入路径,同时服务于两种截然不同的读取场景。其一,是为前端应用提供毫秒级的实时状态同步;其二,是支持基于自然语言的复杂语义搜索。任何单一数据库解决方案在这种混合负载下都会捉襟见肘。

一个常见的错误是试图用单一的全能数据库来满足所有需求。例如,单独使用 CockroachDB,虽然它提供了无与伦比的分布式事务和水平扩展能力,但其全文搜索功能有限,且无法原生支持客户端的实时数据推送。反之,单独使用 Firestore,实时性极佳,但缺乏严格的事务保证和复杂关联查询的能力,不适合作为核心业务的单一事实来源 (Single Source of Truth)。

传统的 CQRS (命令查询职责分离) 模式将读写模型分离,通常使用一个数据库进行写入,另一个数据库进行读取。但这仍然不够。我们的读取需求本身就是多态的:文档模型的实时读取和向量模型的语义搜索。因此,必须将 CQRS 模式推向一个新阶段:一个写入模型,多个异构的、为特定查询场景优化的读取模型。

最终的架构决策是:

  • 写入模型 (Command Side): 使用 CockroachDB 作为唯一的事实来源。所有业务逻辑的变更、创建和删除操作都必须通过它,利用其 ACID 事务保证数据的完整性和一致性。
  • 读取模型 (Query Side):
    1. 实时文档模型: Google Firestore。用于存储非规范化的、为特定 UI 视图定制的数据。它的实时监听器能力可以极大地简化客户端开发。
    2. 语义搜索模型: Weaviate。一个向量数据库,用于存储从 CockroachDB 数据中生成的文本嵌入 (Embeddings),以支持自然语言搜索和相似性推荐。

这个架构的核心挑战在于数据同步。如何可靠、低延迟地将 CockroachDB 中的数据变更传播到 Firestore 和 Weaviate?答案是利用 CockroachDB 的 Change Data Capture (CDC) 功能,即 Changefeed。我们将构建一个独立的 Go 服务,称之为 “Projector”,它订阅 Changefeed,并将数据实时地“投影”到下游的多个读取数据库中。

架构概览

整个数据流可以用下面的图来表示。所有写入请求都由一个 Command Service 处理,该服务与 CockroachDB 交互。Projector 服务独立运行,它不处理任何业务请求,唯一的职责就是数据同步。

graph TD
    subgraph "写入路径 (Write Path)"
        A[Client/API] --> B(Command Service);
        B -->|ACID Transaction| C[CockroachDB];
    end

    subgraph "数据同步 (Data Projection)"
        C --"Changefeed (CDC)"--> D(Projector Service);
    end

    subgraph "读取路径 (Read Path)"
        D -->|Real-time View| E[Firestore];
        D -->|Vector Embedding| F[Weaviate];
        G[Web/Mobile Client] -->|Real-time Listeners| E;
        H[Search Service] -->|Semantic Search| F;
    end

在真实项目中,这个架构的稳定性至关重要。Projector 服务的失败或延迟都可能导致数据不一致。因此,它的设计必须考虑容错、幂等性和可观测性。

核心实现:Projector 服务

我们用 Go 语言来实现这个 Projector 服务。它需要处理几件事情:

  1. 建立到 CockroachDB Changefeed 的持久连接。
  2. 解析 CDC 事件流。
  3. 根据事件类型(创建、更新、删除)对 Firestore 和 Weaviate 执行相应的操作。
  4. 优雅地处理错误和网络中断,并确保至少一次 (at-least-once) 的处理语义。
  5. 通过记录检查点 (checkpointing) 来支持服务重启后的断点续传。

1. 数据库表结构

假设我们在 CockroachDB 中有一个 products 表,这是我们的事实来源。

-- products.sql
CREATE TABLE products (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name STRING NOT NULL,
    description STRING,
    tags STRING[],
    price DECIMAL(10, 2) NOT NULL,
    stock_quantity INT NOT NULL,
    updated_at TIMESTAMPTZ DEFAULT now() ON UPDATE now(),
    created_at TIMESTAMPTZ DEFAULT now()
);

-- 为 Changefeed 启用 rangefeeds,这是必要的先决条件
ALTER TABLE products SET (kv.rangefeed.enabled = true);

2. Changefeed 创建

我们需要在 CockroachDB 中为 products 表创建一个 Changefeed。注意,我们使用了 WITH resolved 选项,它会定期发送时间戳更新,这对于监控同步延迟非常有用。format=jsonenvelope=key_only 意味着我们只在消息中获取主键,然后回查数据库获取最新状态。这种模式在频繁更新的场景下可以减少 CDC 流量,但会增加回查的延迟和负载。在我们的场景中,我们选择 envelope=wrapped 来获取完整的行数据。

-- changefeed.sql
CREATE CHANGEFEED FOR TABLE products
INTO 'kafka://[broker-address]:9092?topic_name=products_cdc'
WITH
    format = json,
    envelope = wrapped,
    updated,
    resolved = '10s';

在生产环境中,直接将 Changefeed 写入 Kafka 是更稳健的做法。它为 Projector 服务提供了一个持久化的缓冲层,解耦了数据库和消费者。但为了简化示例,我们将演示 Go 服务直接连接到 CockroachDB Changefeed 的实验性 SQL 接口。

3. Go Projector 服务代码

下面是 Projector 服务的核心代码。它使用了 database/sql 连接 CockroachDB,官方的 Firestore Go SDK 和 Weaviate Go 客户端。

项目结构:

projector/
├── go.mod
├── go.sum
├── main.go         # 程序入口和配置加载
├── projector.go    # 核心的 CDC 处理逻辑
└── clients/
    ├── firestore.go  # Firestore 客户端封装
    └── weaviate.go # Weaviate 客户端封装

main.go

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/joho/godotenv"
)

func main() {
	// 在生产环境中,配置应该来自环境变量或配置服务,而不是 .env 文件
	if err := godotenv.Load(); err != nil {
		log.Println("Warning: .env file not found, using environment variables")
	}

	config, err := loadConfig()
	if err != nil {
		log.Fatalf("FATAL: Failed to load configuration: %v", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 初始化 Projector
	p, err := NewProjector(ctx, config)
	if err != nil {
		log.Fatalf("FATAL: Failed to initialize projector: %v", err)
	}

	// 监听中断信号以实现优雅退出
	go func() {
		sigchan := make(chan os.Signal, 1)
		signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
		<-sigchan
		log.Println("INFO: Shutdown signal received, closing projector...")
		cancel()
	}()

	log.Println("INFO: Starting projector...")
	if err := p.Run(ctx); err != nil {
		// 如果 Run 返回错误,意味着发生了不可恢复的错误
		log.Fatalf("FATAL: Projector run failed: %v", err)
	}

	log.Println("INFO: Projector shut down gracefully.")
}

// loadConfig 从环境变量中加载配置
func loadConfig() (Config, error) {
    // ... 此处省略具体的配置加载代码 ...
    // 需要加载 COCKROACH_DSN, FIRESTORE_PROJECT_ID, WEAVIATE_HOST, WEAVIATE_SCHEME, WEAVIATE_API_KEY 等
    // 返回一个 Config struct
    return Config{}, nil 
}

type Config struct {
    CockroachDSN      string
    FirestoreProjectID string
    WeaviateHost      string
    WeaviateScheme    string
    WeaviateAPIKey    string
    // 其他配置...
}

projector.go

这是最核心的部分。它执行 EXPERIMENTAL_SQL CREATE CHANGEFEED 语句,并循环读取结果集。

package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"time"

	_ "github.com/jackc/pgx/v5/stdlib"
	"projector/clients"
)

// CDCEvent 代表从 CockroachDB Changefeed 接收到的事件结构
type CDCEvent struct {
	Resolved *string `json:"resolved"` // Resolved timestamp
	Payload  []struct {
		After  *Product `json:"after"`
		Before *Product `json:"before"`
		Key    []string `json:"key"` // 主键,通常是 UUID
		Topic  string   `json:"topic"`
	} `json:"payload"`
}

// Product 对应 CockroachDB products 表的结构
type Product struct {
	ID            string    `json:"id"`
	Name          string    `json:"name"`
	Description   *string   `json:"description"`
	Tags          []string  `json:"tags"`
	Price         float64   `json:"price,string"` // JSON 中为字符串
	StockQuantity int       `json:"stock_quantity"`
	UpdatedAt     time.Time `json:"updated_at"`
	CreatedAt     time.Time `json:"created_at"`
}

type Projector struct {
	db              *sql.DB
	firestoreClient *clients.FirestoreClient
	weaviateClient  *clients.WeaviateClient
	// ... 其他依赖
}

func NewProjector(ctx context.Context, cfg Config) (*Projector, error) {
	// 连接 CockroachDB
	db, err := sql.Open("pgx", cfg.CockroachDSN)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to CockroachDB: %w", err)
	}
	if err := db.PingContext(ctx); err != nil {
		return nil, fmt.Errorf("failed to ping CockroachDB: %w", err)
	}
    db.SetConnMaxLifetime(time.Minute * 5)
    db.SetMaxOpenConns(10)
    db.SetMaxIdleConns(5)

	// 初始化 Firestore 客户端
	fsClient, err := clients.NewFirestoreClient(ctx, cfg.FirestoreProjectID)
	if err != nil {
		return nil, fmt.Errorf("failed to create Firestore client: %w", err)
	}

	// 初始化 Weaviate 客户端
	wvClient, err := clients.NewWeaviateClient(cfg.WeaviateHost, cfg.WeaviateScheme, cfg.WeaviateAPIKey)
	if err != nil {
		return nil, fmt.Errorf("failed to create Weaviate client: %w", err)
	}

	return &Projector{
		db:              db,
		firestoreClient: fsClient,
		weaviateClient:  wvClient,
	}, nil
}

func (p *Projector) Run(ctx context.Context) error {
	defer p.db.Close()
	defer p.firestoreClient.Close()

	// 这里的 SQL 是直接连接 Changefeed 的方式。
	// 在生产中,更推荐的模式是 `CREATE CHANGEFEED ... INTO 'kafka://...'`,
	// 然后让这个服务作为 Kafka 消费者。
	const changefeedQuery = `EXPERIMENTAL_SQL CREATE CHANGEFEED FOR TABLE products WITH format = json, envelope = wrapped`

	rows, err := p.db.QueryContext(ctx, changefeedQuery)
	if err != nil {
		return fmt.Errorf("failed to start changefeed: %w", err)
	}
	defer rows.Close()

	log.Println("INFO: Changefeed connection established. Waiting for events...")

	for rows.Next() {
		var topic string
		var key, value []byte

		if err := rows.Scan(&topic, &key, &value); err != nil {
			log.Printf("ERROR: Failed to scan changefeed row: %v. Retrying connection might be necessary.", err)
            // 简单的重试逻辑,生产环境需要更复杂的 backoff策略
            time.Sleep(5 * time.Second)
			continue
		}

        // `value` 为空表示删除事件
		if len(value) == 0 {
			if err := p.handleDelete(ctx, key); err != nil {
				log.Printf("ERROR: Failed to handle delete event for key %s: %v", string(key), err)
			}
			continue
		}

		var event Product
		if err := json.Unmarshal(value, &event); err != nil {
			log.Printf("ERROR: Failed to unmarshal CDC event value: %v. Raw value: %s", err, string(value))
			continue
		}

		if err := p.handleUpsert(ctx, &event); err != nil {
			log.Printf("ERROR: Failed to handle upsert event for product %s: %v", event.ID, err)
		}
	}

	if err := rows.Err(); err != nil {
		// 如果循环因为 context canceled 退出,这里的 err 会是 context.Canceled
		if ctx.Err() != nil {
			return nil // 正常关闭
		}
		return fmt.Errorf("changefeed query loop exited with error: %w", err)
	}

	return nil
}

// handleDelete 处理删除事件
func (p *Projector) handleDelete(ctx context.Context, key []byte) error {
	var primaryKey []string
	if err := json.Unmarshal(key, &primaryKey); err != nil {
		return fmt.Errorf("failed to unmarshal delete key: %w", err)
	}

	if len(primaryKey) == 0 {
		return fmt.Errorf("empty key received for delete event")
	}
	productID := primaryKey[0]

	log.Printf("INFO: Handling delete for product ID: %s", productID)

	// 并行删除,任何一个失败都应记录日志,但不应阻塞另一个
	errChan := make(chan error, 2)

	go func() {
		errChan <- p.firestoreClient.DeleteProduct(ctx, productID)
	}()
	go func() {
		errChan <- p.weaviateClient.DeleteProduct(ctx, productID)
	}()

	var combinedErr error
	for i := 0; i < 2; i++ {
		if err := <-errChan; err != nil {
            // 在生产中,我们会使用更复杂的错误聚合库
			if combinedErr == nil {
				combinedErr = err
			} else {
				combinedErr = fmt.Errorf("%v; %w", combinedErr, err)
			}
		}
	}

	return combinedErr
}


// handleUpsert 处理创建和更新事件
func (p *Projector) handleUpsert(ctx context.Context, product *Product) error {
	log.Printf("INFO: Handling upsert for product ID: %s, Name: %s", product.ID, product.Name)
    
    // 这里的坑在于,我们需要一个文本嵌入模型来为 Weaviate 生成向量。
    // 在真实项目中,这里会调用一个独立的 embedding 服务。
    // 为简化,我们假设 Weaviate 配置了 text2vec-transformers 模块自动生成。
	textToEmbed := fmt.Sprintf("Product name: %s. Description: %s. Tags: %v", product.Name, *product.Description, product.Tags)

	errChan := make(chan error, 2)
	go func() {
		errChan <- p.firestoreClient.UpsertProduct(ctx, product)
	}()
	go func() {
        // Weaviate 需要的是一个属性 map
		properties := map[string]interface{}{
			"name":           product.Name,
			"description":    *product.Description,
			"price":          product.Price,
			"stock_quantity": product.StockQuantity,
			"source_id":      product.ID, // 保存原始 ID 以便关联
			"text_for_embedding": textToEmbed,
		}
		errChan <- p.weaviateClient.UpsertProduct(ctx, product.ID, properties)
	}()
    
	var combinedErr error
	for i := 0; i < 2; i++ {
		if err := <-errChan; err != nil {
			if combinedErr == nil {
				combinedErr = err
			} else {
				combinedErr = fmt.Errorf("%v; %w", combinedErr, err)
			}
		}
	}
	return combinedErr
}

clients/firestore.goclients/weaviate.go

这两个文件封装了与下游数据库交互的逻辑。它们应该包含具体的 Set/Delete/Update 操作。例如,firestore.go 看起来会是这样:

package clients

import (
	"context"
	"fmt"

	"cloud.google.com/go/firestore"
)

type FirestoreClient struct {
	client *firestore.Client
}

func NewFirestoreClient(ctx context.Context, projectID string) (*FirestoreClient, error) {
	client, err := firestore.NewClient(ctx, projectID)
	if err != nil {
		return nil, fmt.Errorf("firestore.NewClient: %w", err)
	}
	return &FirestoreClient{client: client}, nil
}

// UpsertProduct 将产品数据写入 Firestore
func (c *FirestoreClient) UpsertProduct(ctx context.Context, product *main.Product) error {
	// 在 Firestore 中,我们通常会为了查询而对数据进行反范式化。
	// 这里直接使用 product 结构作为示例。
	collection := c.client.Collection("products_view")
	_, err := collection.Doc(product.ID).Set(ctx, product)
	return err
}

// DeleteProduct 从 Firestore 删除产品
func (c *FirestoreClient) DeleteProduct(ctx context.Context, productID string) error {
	_, err := c.client.Collection("products_view").Doc(productID).Delete(ctx)
	return err
}

func (c *FirestoreClient) Close() {
	c.client.Close()
}

weaviate.go 的实现会类似,使用 Weaviate 的 Go 客户端来执行 upsert 和 delete 操作。一个关键点是 Weaviate 中的对象 ID 最好使用 CockroachDB 的 UUID,以保持一致性,方便删除和更新。

架构的扩展性与局限性

这种架构的优势是显而易见的。写入路径和读取路径被完全解耦,可以独立扩展。如果写入量增大,可以为 CockroachDB 增加节点。如果实时读取的用户增多,Firestore 可以自动处理。如果语义搜索请求量大,可以为 Weaviate 增加副本。Projector 服务本身是无状态的,也可以水平扩展(当使用 Kafka 作为中间层并按 key 分区时)。

然而,这个架构也引入了它自身的复杂性。

  1. 最终一致性延迟: 从数据写入 CockroachDB 到它出现在 Firestore 和 Weaviate 之间,存在一个可观测的延迟。这个延迟由 Changefeed 的 resolved 间隔、Projector 的处理速度以及下游数据库的写入性能共同决定。对于某些对一致性要求极高的场景,需要在客户端层面处理这种延迟,例如在写入后短时间内从 CockroachDB 直接读取,或者在 UI 上明确告知用户“数据正在同步中”。

  2. 操作复杂性: 维护三个不同的数据库系统外加一个自定义的同步服务,对运维团队提出了更高的要求。你需要为每个组件建立独立的监控、告警和备份恢复策略。Projector 服务的日志和指标(如处理延迟、错误率、CDC 事件队列长度)是诊断问题的生命线。

  3. Schema 演进: 当 CockroachDB 中的 products 表结构发生变化时(例如增加一个字段),需要同步更新 Projector 服务中的 Product 结构体,以及 Firestore 和 Weaviate 中的数据模型。这个过程需要仔细协调,以避免数据丢失或处理错误。一个常见的错误是只更新了应用代码,却忘了更新数据管道。

  4. 回填 (Backfilling) 问题: 当为已有的海量数据表启用这个架构,或者增加一个新的读取模型时,需要一个“回填”机制。即,需要一次性地将 CockroachDB 中的存量数据全部转换并加载到新的读取模型中。这通常通过一个独立的批处理任务完成,在任务执行期间,CDC 流需要被正确处理以避免数据覆盖。

这个架构并非万能药。它适用于那些写入一致性要求高,且读取模式复杂多样的系统。对于简单的 CRUD 应用,它的复杂性可能会超过其带来的好处。但在处理复杂的、面向未来的系统时,这种拥抱多态持久化和 CQRS 的思想,是构建高伸缩性、高性能服务的关键所在。


  目录