在GKE上构建基于Workload Identity与最小权限原则的多租户BASE搜索服务


我们面临一个棘手的多租户数据隔离问题。团队负责维护一个中心化的搜索服务,后端是Elasticsearch集群,为公司内部数十个应用提供日志、指标和业务数据的检索能力。最初的架构简单粗暴:一个巨大的ES集群,通过不同的index前缀区分租户,并在应用层代码中硬编码API Key来控制写入和查询权限。这个模式很快就暴露了其脆弱性:API Key泄露风险、权限变更的巨大运维成本、以及无法有效审计哪个服务在什么时间访问了什么数据。一个服务的安全漏洞可能导致整个搜索集群的数据被横向访问。这在生产环境中是完全无法接受的。

我们需要一个从根本上解决身份认证和授权的方案,目标是将权限控制下沉到基础设施层面,实现真正的最小权限原则。我们的基础设施是GCP GKE,因此,GCP的IAM系统是理想的权限控制中心。核心构想是:让运行在GKE Pod中的每个租户服务,都能获得一个临时的、具有严格范围权限的GCP身份,并用这个身份去执行操作,而不是依赖共享的、长期的密钥。

这直接指向了GKE的Workload Identity特性。它允许我们将Kubernetes Service Account (KSA)与GCP IAM Service Account (GSA)进行绑定。当一个Pod使用这个KSA时,它可以通过GKE元数据服务器获取一个临时的GCP访问令牌,该令牌的权限完全等同于绑定的GSA。这就构成了我们新架构的基石。

我们的搜索服务本质上是一个BASE系统(Basically Available, Soft state, Eventually consistent)。对于搜索场景,数据的最终一致性是可以接受的,而服务的高可用性至关重要。将安全模型与基础设施深度绑定,可以让我们更专注于保证服务的可用性和性能,而不是在应用层徒劳地构建复杂的安全壁垒。

下面是整个改造过程的复盘,从IAM配置、GKE部署,到一个可直接运行的、能体现该架构优势的索引器(Indexer)服务的完整实现。

第一步:基础设施准备与IAM绑定

要启用Workload Identity,必须在GKE集群创建或更新时指定。对于一个已存在的集群,可以通过更新命令开启。

# 假设我们有一个名为'secure-search-cluster'的集群
# 在'asia-east1'区域
gcloud container clusters update secure-search-cluster \
    --zone=asia-east1 \
    --workload-pool=[PROJECT_ID].svc.id.goog

workload-pool是关键,它为集群创建了一个唯一的身份池。

接下来,为我们的一个租户(假设为tenant-alpha)创建一个专用的GCP IAM Service Account (GSA)。这个GSA的权限将被严格限制,比如,它只能读取某个特定的GCS Bucket中的原始数据。

# 1. 创建GSA
gcloud iam service-accounts create tenant-alpha-indexer-gsa \
    --project=[PROJECT_ID] \
    --description="IAM service account for tenant-alpha's search indexer" \
    --display-name="Tenant Alpha Indexer GSA"

# 2. 为GSA授予最小权限
# 假设原始数据存储在名为 'tenant-alpha-raw-data' 的GCS Bucket中
# 我们只授予它读取该Bucket中对象的权限
gcloud projects add-iam-policy-binding [PROJECT_ID] \
    --member="serviceAccount:tenant-alpha-indexer-gsa@[PROJECT_ID].iam.gserviceaccount.com" \
    --role="roles/storage.objectViewer" \
    --condition='expression=resource.name.startsWith("projects/_/buckets/tenant-alpha-raw-data/"),title=bucket_prefix'

这里的IAM Condition是实现最小权限的关键。我们不是将权限授予整个项目或整个GCS,而是精确到某个特定Bucket,甚至可以到特定的对象前缀。

现在,我们需要在Kubernetes中创建一个对应的Service Account (KSA),并将其与刚刚创建的GSA绑定。

# k8s/tenant-alpha-ksa.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: tenant-alpha-indexer-ksa
  namespace: search-services
  annotations:
    iam.gke.io/gcp-service-account: tenant-alpha-indexer-gsa@[PROJECT_ID].iam.gserviceaccount.com

这个annotations是Workload Identity的核心,它声明了tenant-alpha-indexer-ksa这个KSA将模拟tenant-alpha-indexer-gsa

使用kubectl apply创建KSA后,执行最后的绑定命令:

# 允许KSA模拟GSA
gcloud iam service-accounts add-iam-policy-binding tenant-alpha-indexer-gsa@[PROJECT_ID].iam.gserviceaccount.com \
    --role="roles/iam.workloadIdentityUser" \
    --member="serviceAccount:[PROJECT_ID].svc.id.goog[search-services/tenant-alpha-indexer-ksa]"

至此,身份绑定的基础设施已经就绪。任何在search-services命名空间下、使用tenant-alpha-indexer-ksa这个Service Account的Pod,都将自动获得tenant-alpha-indexer-gsa的GCP IAM权限。

架构流程的可视化

为了更清晰地理解这个流程,我们可以用Mermaid图来表示。

graph TD
    subgraph GKE Cluster
        A[Pod: tenant-alpha-indexer] -- "Uses KSA" --> B(KSA: tenant-alpha-indexer-ksa);
        B -- "Asks for GCP token" --> C{GKE Metadata Server};
    end

    subgraph GCP IAM
        D(GSA: tenant-alpha-indexer-gsa);
        D -- "Has role: storage.objectViewer" --> E[GCS Bucket: tenant-alpha-raw-data];
    end

    C -- "1. Verifies Pod's KSA" --> B;
    C -- "2. Requests STS token for GSA" --> F[Google STS API];
    F -- "3. Validates IAM binding (workloadIdentityUser)" --> D;
    F -- "4. Issues short-lived GCP token" --> C;
    C -- "5. Provides token to Pod" --> A;
    A -- "6. Authenticates with GCP token" --> E;

    style A fill:#d4edda,stroke:#155724
    style E fill:#f8d7da,stroke:#721c24

这个图清晰地展示了Pod如何通过一系列的验证和令牌交换,最终获得了访问GCS Bucket的权限,全程没有一个静态密钥文件参与。

生产级的索引器服务实现

现在,我们来编写这个索引器服务的代码。这是一个Go应用,它会作为Pod的主进程运行。它的任务是:

  1. 从环境中自动获取GCP凭证(由Workload Identity提供)。
  2. 使用凭证访问指定的GCS Bucket。
  3. 拉取原始数据文件。
  4. 解析数据并将其索引到Elasticsearch中。

下面是这个服务的核心代码,它被设计为可配置、带错误处理和日志的生产级应用。

main.go:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"cloud.google.com/go/storage"
	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esutil"
)

// Config holds the application configuration.
// In a real project, this would be loaded from env vars or a config file.
type Config struct {
	ProjectID       string
	BucketName      string
	ElasticsearchURL string
	IndexName       string
	PollInterval    time.Duration
}

// loadConfig loads configuration from environment variables.
func loadConfig() (*Config, error) {
	projectID := os.Getenv("GCP_PROJECT_ID")
	if projectID == "" {
		return nil, fmt.Errorf("GCP_PROJECT_ID environment variable not set")
	}
	bucketName := os.Getenv("GCS_BUCKET_NAME")
	if bucketName == "" {
		return nil, fmt.Errorf("GCS_BUCKET_NAME environment variable not set")
	}
	esURL := os.Getenv("ELASTICSEARCH_URL")
	if esURL == "" {
		return nil, fmt.Errorf("ELASTICSEARCH_URL environment variable not set")
	}
	indexName := os.Getenv("ELASTICSEARCH_INDEX_NAME")
	if indexName == "" {
		return nil, fmt.Errorf("ELASTICSEARCH_INDEX_NAME environment variable not set")
	}
	pollIntervalStr := os.Getenv("POLL_INTERVAL")
	if pollIntervalStr == "" {
		pollIntervalStr = "60s" // Default to 60 seconds
	}
	pollInterval, err := time.ParseDuration(pollIntervalStr)
	if err != nil {
		return nil, fmt.Errorf("invalid POLL_INTERVAL: %w", err)
	}

	return &Config{
		ProjectID:       projectID,
		BucketName:      bucketName,
		ElasticsearchURL: esURL,
		IndexName:       indexName,
		PollInterval:    pollInterval,
	}, nil
}

// LogEntry represents a sample log structure to be indexed.
type LogEntry struct {
	Timestamp time.Time `json:"timestamp"`
	Level     string    `json:"level"`
	Message   string    `json:"message"`
	ServiceID string    `json:"service_id"`
}

func main() {
	log.Println("Starting indexer service...")
	cfg, err := loadConfig()
	if err != nil {
		log.Fatalf("Failed to load configuration: %v", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// The Google Cloud Go SDK automatically handles credential acquisition
	// from the environment, which is exactly what Workload Identity provides.
	// No explicit credential logic is needed here. This is the core benefit.
	storageClient, err := storage.NewClient(ctx)
	if err != nil {
		log.Fatalf("Failed to create Google Cloud Storage client: %v", err)
	}
	defer storageClient.Close()
	log.Printf("Successfully connected to Google Cloud Storage API.")

	// Setup Elasticsearch client
	esClient, err := elasticsearch.NewClient(elasticsearch.Config{
		Addresses: []string{cfg.ElasticsearchURL},
		// In a production setup, add retry logic, username/password, etc.
	})
	if err != nil {
		log.Fatalf("Error creating the Elasticsearch client: %s", err)
	}
	log.Printf("Successfully connected to Elasticsearch at %s.", cfg.ElasticsearchURL)

	// Setup a graceful shutdown mechanism
	shutdown := make(chan os.Signal, 1)
	signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)

	ticker := time.NewTicker(cfg.PollInterval)
	defer ticker.Stop()

	log.Printf("Starting polling GCS bucket '%s' every %v.", cfg.BucketName, cfg.PollInterval)

	// Main application loop
	for {
		select {
		case <-ticker.C:
			err := processBucketObjects(ctx, storageClient, esClient, cfg)
			if err != nil {
				log.Printf("ERROR: Failed to process bucket objects: %v", err)
			}
		case <-shutdown:
			log.Println("Shutdown signal received, exiting gracefully.")
			return
		}
	}
}

// processBucketObjects lists objects, processes them, and moves them to an archive.
func processBucketObjects(ctx context.Context, sc *storage.Client, esc *elasticsearch.Client, cfg *Config) error {
	log.Println("Checking for new objects in bucket...")
	bucket := sc.Bucket(cfg.BucketName)
	
	// Create a bulk indexer for Elasticsearch for efficiency
	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:      cfg.IndexName,
		Client:     esc,
		NumWorkers: 4,
		FlushBytes: int(5e6), // 5 MB
	})
	if err != nil {
		return fmt.Errorf("error creating bulk indexer: %w", err)
	}
	defer bi.Close(context.Background())

	// A more robust implementation would use GCS notifications via Pub/Sub
	// instead of polling. Polling is used here for simplicity.
	it := bucket.Objects(ctx, &storage.Query{Prefix: "new/"})
	
	processedCount := 0
	for objAttrs, err := it.Next(); err != io.EOF; objAttrs, err = it.Next() {
		if err != nil {
			log.Printf("Warning: error iterating bucket objects: %v", err)
			continue
		}
		
		log.Printf("Processing object: %s", objAttrs.Name)

		rc, err := bucket.Object(objAttrs.Name).NewReader(ctx)
		if err != nil {
			log.Printf("Warning: failed to create reader for object %s: %v", objAttrs.Name, err)
			continue
		}
		
		// Assuming each file contains JSON objects, one per line
		decoder := json.NewDecoder(rc)
		for decoder.More() {
			var entry LogEntry
			if err := decoder.Decode(&entry); err != nil {
				log.Printf("Warning: failed to decode JSON object in %s: %v", objAttrs.Name, err)
				continue
			}

			// Add to bulk indexer
			data, err := json.Marshal(entry)
			if err != nil {
				log.Printf("Warning: failed to marshal log entry: %v", err)
				continue
			}

			err = bi.Add(
				ctx,
				esutil.BulkIndexerItem{
					Action: "index",
					Body:   strings.NewReader(string(data)),
					OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
						if err != nil {
							log.Printf("ERROR: %s", err)
						} else {
							log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
						}
					},
				},
			)
			if err != nil {
				log.Printf("Warning: failed to add item to bulk indexer: %v", err)
			}
		}
		rc.Close()

		// Once processed, move the object to an "archived" folder to avoid reprocessing
		archiveName := strings.Replace(objAttrs.Name, "new/", "archived/", 1)
		dst := bucket.Object(archiveName)
		src := bucket.Object(objAttrs.Name)
		
		if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
			log.Printf("ERROR: failed to copy object %s to archive: %v", objAttrs.Name, err)
			continue // Don't delete original if copy failed
		}
		if err := src.Delete(ctx); err != nil {
			log.Printf("ERROR: failed to delete original object %s: %v", objAttrs.Name, err)
		}
		processedCount++
		log.Printf("Successfully processed and archived %s", objAttrs.Name)
	}

	stats := bi.Stats()
	log.Printf("Bulk indexing finished. Added: %d, Failed: %d", stats.NumAdded, stats.NumFailed)
	
	if processedCount == 0 {
		log.Println("No new objects found to process.")
	}

	return nil
}

Kubernetes部署清单

现在,我们将上面的Go应用容器化(Dockerfile省略,但它会编译并运行这个Go程序),然后通过下面的Deployment清单部署到GKE。

k8s/tenant-alpha-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: tenant-alpha-indexer
  namespace: search-services
  labels:
    app: indexer
    tenant: alpha
spec:
  replicas: 1
  selector:
    matchLabels:
      app: indexer
      tenant: alpha
  template:
    metadata:
      labels:
        app: indexer
        tenant: alpha
    spec:
      # This is the crucial link to our IAM-bound KSA
      serviceAccountName: tenant-alpha-indexer-ksa
      containers:
      - name: indexer
        image: gcr.io/[PROJECT_ID]/tenant-indexer:v1.0.0
        env:
        - name: GCP_PROJECT_ID
          value: "[PROJECT_ID]"
        - name: GCS_BUCKET_NAME
          value: "tenant-alpha-raw-data"
        - name: ELASTICSEARCH_URL
          value: "http://elasticsearch-master.logging.svc.cluster.local:9200"
        - name: ELASTICSEARCH_INDEX_NAME
          value: "tenant-alpha-logs"
        - name: POLL_INTERVAL
          value: "30s"
        resources:
          requests:
            cpu: "100m"
            memory: "128Mi"
          limits:
            cpu: "500m"
            memory: "256Mi"

当我们部署这个Deployment时,会发生什么?

  1. Kubernetes在search-services命名空间创建一个Pod。
  2. Pod被分配了tenant-alpha-indexer-ksa这个Service Account。
  3. Pod内的Go应用启动,它需要访问GCS。
  4. Go的GCP SDK会探测环境,发现自己在一个GKE Pod中,并向GKE元数据服务器 http://metadata.google.internal/... 发起请求获取凭证。
  5. GKE元数据服务器检查该Pod的KSA,发现它与GSA tenant-alpha-indexer-gsa 绑定。
  6. 它代表Pod向Google STS(Security Token Service)请求一个短期的OAuth 2.0访问令牌,这个令牌拥有tenant-alpha-indexer-gsa的所有权限。
  7. SDK获得令牌,并用它来签署对GCS API的请求。
  8. GCP IAM验证令牌,并检查该令牌(也就是tenant-alpha-indexer-gsa)是否有权读取tenant-alpha-raw-data Bucket。权限检查通过,返回数据。

如果另一个租户(tenant-beta)的Pod,哪怕它被攻破,尝试使用同样的代码去访问tenant-alpha-raw-data Bucket,会因为它的Pod没有绑定到正确的KSA-GSA对而直接被IAM拒绝。这就是基础设施级别的安全隔离。

当前方案的局限性与未来展望

这个架构有效地解决了索引器侧的认证和授权问题,实现了对数据源的最小权限访问。然而,它并非银弹。

首先,查询侧的授权依然依赖于Elasticsearch自身的安全机制(例如,X-Pack Security)。用户或服务在查询数据时,仍然需要通过ES的认证。一个更完整的零信任方案是构建一个API网关,置于ES之前。这个网关也运行在GKE上,并使用Workload Identity验证传入请求的调用者身份(比如另一个GKE服务),然后将GCP IAM身份映射到ES的查询权限上。这将实现端到端的GCP IAM授权。

其次,我们的Elasticsearch集群本身是一个共享资源。虽然通过索引隔离了数据,但一个行为异常的租户(例如,发起了代价高昂的聚合查询)仍然可能影响其他租户的性能,造成“吵闹的邻居”问题。未来的迭代方向是探索单元化架构,即通过Kubernetes Operator自动化部署和管理更小的、租户独享或分组共享的ES集群,实现更彻底的资源隔离。

最后,手动创建和绑定GSA、KSA以及IAM策略的过程虽然清晰,但在租户数量增多时会变得非常繁琐且容易出错。将这一整套资源的创建流程通过Terraform或Crossplane等IaC(基础设施即代码)工具进行模板化和自动化,是确保该模式能够规模化应用的关键。


  目录