我们面临一个棘手的多租户数据隔离问题。团队负责维护一个中心化的搜索服务,后端是Elasticsearch集群,为公司内部数十个应用提供日志、指标和业务数据的检索能力。最初的架构简单粗暴:一个巨大的ES集群,通过不同的index前缀区分租户,并在应用层代码中硬编码API Key来控制写入和查询权限。这个模式很快就暴露了其脆弱性:API Key泄露风险、权限变更的巨大运维成本、以及无法有效审计哪个服务在什么时间访问了什么数据。一个服务的安全漏洞可能导致整个搜索集群的数据被横向访问。这在生产环境中是完全无法接受的。
我们需要一个从根本上解决身份认证和授权的方案,目标是将权限控制下沉到基础设施层面,实现真正的最小权限原则。我们的基础设施是GCP GKE,因此,GCP的IAM系统是理想的权限控制中心。核心构想是:让运行在GKE Pod中的每个租户服务,都能获得一个临时的、具有严格范围权限的GCP身份,并用这个身份去执行操作,而不是依赖共享的、长期的密钥。
这直接指向了GKE的Workload Identity特性。它允许我们将Kubernetes Service Account (KSA)与GCP IAM Service Account (GSA)进行绑定。当一个Pod使用这个KSA时,它可以通过GKE元数据服务器获取一个临时的GCP访问令牌,该令牌的权限完全等同于绑定的GSA。这就构成了我们新架构的基石。
我们的搜索服务本质上是一个BASE系统(Basically Available, Soft state, Eventually consistent)。对于搜索场景,数据的最终一致性是可以接受的,而服务的高可用性至关重要。将安全模型与基础设施深度绑定,可以让我们更专注于保证服务的可用性和性能,而不是在应用层徒劳地构建复杂的安全壁垒。
下面是整个改造过程的复盘,从IAM配置、GKE部署,到一个可直接运行的、能体现该架构优势的索引器(Indexer)服务的完整实现。
第一步:基础设施准备与IAM绑定
要启用Workload Identity,必须在GKE集群创建或更新时指定。对于一个已存在的集群,可以通过更新命令开启。
# 假设我们有一个名为'secure-search-cluster'的集群
# 在'asia-east1'区域
gcloud container clusters update secure-search-cluster \
--zone=asia-east1 \
--workload-pool=[PROJECT_ID].svc.id.goog
workload-pool是关键,它为集群创建了一个唯一的身份池。
接下来,为我们的一个租户(假设为tenant-alpha)创建一个专用的GCP IAM Service Account (GSA)。这个GSA的权限将被严格限制,比如,它只能读取某个特定的GCS Bucket中的原始数据。
# 1. 创建GSA
gcloud iam service-accounts create tenant-alpha-indexer-gsa \
--project=[PROJECT_ID] \
--description="IAM service account for tenant-alpha's search indexer" \
--display-name="Tenant Alpha Indexer GSA"
# 2. 为GSA授予最小权限
# 假设原始数据存储在名为 'tenant-alpha-raw-data' 的GCS Bucket中
# 我们只授予它读取该Bucket中对象的权限
gcloud projects add-iam-policy-binding [PROJECT_ID] \
--member="serviceAccount:tenant-alpha-indexer-gsa@[PROJECT_ID].iam.gserviceaccount.com" \
--role="roles/storage.objectViewer" \
--condition='expression=resource.name.startsWith("projects/_/buckets/tenant-alpha-raw-data/"),title=bucket_prefix'
这里的IAM Condition是实现最小权限的关键。我们不是将权限授予整个项目或整个GCS,而是精确到某个特定Bucket,甚至可以到特定的对象前缀。
现在,我们需要在Kubernetes中创建一个对应的Service Account (KSA),并将其与刚刚创建的GSA绑定。
# k8s/tenant-alpha-ksa.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: tenant-alpha-indexer-ksa
namespace: search-services
annotations:
iam.gke.io/gcp-service-account: tenant-alpha-indexer-gsa@[PROJECT_ID].iam.gserviceaccount.com
这个annotations是Workload Identity的核心,它声明了tenant-alpha-indexer-ksa这个KSA将模拟tenant-alpha-indexer-gsa。
使用kubectl apply创建KSA后,执行最后的绑定命令:
# 允许KSA模拟GSA
gcloud iam service-accounts add-iam-policy-binding tenant-alpha-indexer-gsa@[PROJECT_ID].iam.gserviceaccount.com \
--role="roles/iam.workloadIdentityUser" \
--member="serviceAccount:[PROJECT_ID].svc.id.goog[search-services/tenant-alpha-indexer-ksa]"
至此,身份绑定的基础设施已经就绪。任何在search-services命名空间下、使用tenant-alpha-indexer-ksa这个Service Account的Pod,都将自动获得tenant-alpha-indexer-gsa的GCP IAM权限。
架构流程的可视化
为了更清晰地理解这个流程,我们可以用Mermaid图来表示。
graph TD
subgraph GKE Cluster
A[Pod: tenant-alpha-indexer] -- "Uses KSA" --> B(KSA: tenant-alpha-indexer-ksa);
B -- "Asks for GCP token" --> C{GKE Metadata Server};
end
subgraph GCP IAM
D(GSA: tenant-alpha-indexer-gsa);
D -- "Has role: storage.objectViewer" --> E[GCS Bucket: tenant-alpha-raw-data];
end
C -- "1. Verifies Pod's KSA" --> B;
C -- "2. Requests STS token for GSA" --> F[Google STS API];
F -- "3. Validates IAM binding (workloadIdentityUser)" --> D;
F -- "4. Issues short-lived GCP token" --> C;
C -- "5. Provides token to Pod" --> A;
A -- "6. Authenticates with GCP token" --> E;
style A fill:#d4edda,stroke:#155724
style E fill:#f8d7da,stroke:#721c24
这个图清晰地展示了Pod如何通过一系列的验证和令牌交换,最终获得了访问GCS Bucket的权限,全程没有一个静态密钥文件参与。
生产级的索引器服务实现
现在,我们来编写这个索引器服务的代码。这是一个Go应用,它会作为Pod的主进程运行。它的任务是:
- 从环境中自动获取GCP凭证(由Workload Identity提供)。
- 使用凭证访问指定的GCS Bucket。
- 拉取原始数据文件。
- 解析数据并将其索引到Elasticsearch中。
下面是这个服务的核心代码,它被设计为可配置、带错误处理和日志的生产级应用。
main.go:
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"cloud.google.com/go/storage"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
// Config holds the application configuration.
// In a real project, this would be loaded from env vars or a config file.
type Config struct {
ProjectID string
BucketName string
ElasticsearchURL string
IndexName string
PollInterval time.Duration
}
// loadConfig loads configuration from environment variables.
func loadConfig() (*Config, error) {
projectID := os.Getenv("GCP_PROJECT_ID")
if projectID == "" {
return nil, fmt.Errorf("GCP_PROJECT_ID environment variable not set")
}
bucketName := os.Getenv("GCS_BUCKET_NAME")
if bucketName == "" {
return nil, fmt.Errorf("GCS_BUCKET_NAME environment variable not set")
}
esURL := os.Getenv("ELASTICSEARCH_URL")
if esURL == "" {
return nil, fmt.Errorf("ELASTICSEARCH_URL environment variable not set")
}
indexName := os.Getenv("ELASTICSEARCH_INDEX_NAME")
if indexName == "" {
return nil, fmt.Errorf("ELASTICSEARCH_INDEX_NAME environment variable not set")
}
pollIntervalStr := os.Getenv("POLL_INTERVAL")
if pollIntervalStr == "" {
pollIntervalStr = "60s" // Default to 60 seconds
}
pollInterval, err := time.ParseDuration(pollIntervalStr)
if err != nil {
return nil, fmt.Errorf("invalid POLL_INTERVAL: %w", err)
}
return &Config{
ProjectID: projectID,
BucketName: bucketName,
ElasticsearchURL: esURL,
IndexName: indexName,
PollInterval: pollInterval,
}, nil
}
// LogEntry represents a sample log structure to be indexed.
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
ServiceID string `json:"service_id"`
}
func main() {
log.Println("Starting indexer service...")
cfg, err := loadConfig()
if err != nil {
log.Fatalf("Failed to load configuration: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// The Google Cloud Go SDK automatically handles credential acquisition
// from the environment, which is exactly what Workload Identity provides.
// No explicit credential logic is needed here. This is the core benefit.
storageClient, err := storage.NewClient(ctx)
if err != nil {
log.Fatalf("Failed to create Google Cloud Storage client: %v", err)
}
defer storageClient.Close()
log.Printf("Successfully connected to Google Cloud Storage API.")
// Setup Elasticsearch client
esClient, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{cfg.ElasticsearchURL},
// In a production setup, add retry logic, username/password, etc.
})
if err != nil {
log.Fatalf("Error creating the Elasticsearch client: %s", err)
}
log.Printf("Successfully connected to Elasticsearch at %s.", cfg.ElasticsearchURL)
// Setup a graceful shutdown mechanism
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
ticker := time.NewTicker(cfg.PollInterval)
defer ticker.Stop()
log.Printf("Starting polling GCS bucket '%s' every %v.", cfg.BucketName, cfg.PollInterval)
// Main application loop
for {
select {
case <-ticker.C:
err := processBucketObjects(ctx, storageClient, esClient, cfg)
if err != nil {
log.Printf("ERROR: Failed to process bucket objects: %v", err)
}
case <-shutdown:
log.Println("Shutdown signal received, exiting gracefully.")
return
}
}
}
// processBucketObjects lists objects, processes them, and moves them to an archive.
func processBucketObjects(ctx context.Context, sc *storage.Client, esc *elasticsearch.Client, cfg *Config) error {
log.Println("Checking for new objects in bucket...")
bucket := sc.Bucket(cfg.BucketName)
// Create a bulk indexer for Elasticsearch for efficiency
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: cfg.IndexName,
Client: esc,
NumWorkers: 4,
FlushBytes: int(5e6), // 5 MB
})
if err != nil {
return fmt.Errorf("error creating bulk indexer: %w", err)
}
defer bi.Close(context.Background())
// A more robust implementation would use GCS notifications via Pub/Sub
// instead of polling. Polling is used here for simplicity.
it := bucket.Objects(ctx, &storage.Query{Prefix: "new/"})
processedCount := 0
for objAttrs, err := it.Next(); err != io.EOF; objAttrs, err = it.Next() {
if err != nil {
log.Printf("Warning: error iterating bucket objects: %v", err)
continue
}
log.Printf("Processing object: %s", objAttrs.Name)
rc, err := bucket.Object(objAttrs.Name).NewReader(ctx)
if err != nil {
log.Printf("Warning: failed to create reader for object %s: %v", objAttrs.Name, err)
continue
}
// Assuming each file contains JSON objects, one per line
decoder := json.NewDecoder(rc)
for decoder.More() {
var entry LogEntry
if err := decoder.Decode(&entry); err != nil {
log.Printf("Warning: failed to decode JSON object in %s: %v", objAttrs.Name, err)
continue
}
// Add to bulk indexer
data, err := json.Marshal(entry)
if err != nil {
log.Printf("Warning: failed to marshal log entry: %v", err)
continue
}
err = bi.Add(
ctx,
esutil.BulkIndexerItem{
Action: "index",
Body: strings.NewReader(string(data)),
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
log.Printf("Warning: failed to add item to bulk indexer: %v", err)
}
}
rc.Close()
// Once processed, move the object to an "archived" folder to avoid reprocessing
archiveName := strings.Replace(objAttrs.Name, "new/", "archived/", 1)
dst := bucket.Object(archiveName)
src := bucket.Object(objAttrs.Name)
if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
log.Printf("ERROR: failed to copy object %s to archive: %v", objAttrs.Name, err)
continue // Don't delete original if copy failed
}
if err := src.Delete(ctx); err != nil {
log.Printf("ERROR: failed to delete original object %s: %v", objAttrs.Name, err)
}
processedCount++
log.Printf("Successfully processed and archived %s", objAttrs.Name)
}
stats := bi.Stats()
log.Printf("Bulk indexing finished. Added: %d, Failed: %d", stats.NumAdded, stats.NumFailed)
if processedCount == 0 {
log.Println("No new objects found to process.")
}
return nil
}
Kubernetes部署清单
现在,我们将上面的Go应用容器化(Dockerfile省略,但它会编译并运行这个Go程序),然后通过下面的Deployment清单部署到GKE。
k8s/tenant-alpha-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: tenant-alpha-indexer
namespace: search-services
labels:
app: indexer
tenant: alpha
spec:
replicas: 1
selector:
matchLabels:
app: indexer
tenant: alpha
template:
metadata:
labels:
app: indexer
tenant: alpha
spec:
# This is the crucial link to our IAM-bound KSA
serviceAccountName: tenant-alpha-indexer-ksa
containers:
- name: indexer
image: gcr.io/[PROJECT_ID]/tenant-indexer:v1.0.0
env:
- name: GCP_PROJECT_ID
value: "[PROJECT_ID]"
- name: GCS_BUCKET_NAME
value: "tenant-alpha-raw-data"
- name: ELASTICSEARCH_URL
value: "http://elasticsearch-master.logging.svc.cluster.local:9200"
- name: ELASTICSEARCH_INDEX_NAME
value: "tenant-alpha-logs"
- name: POLL_INTERVAL
value: "30s"
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "256Mi"
当我们部署这个Deployment时,会发生什么?
- Kubernetes在
search-services命名空间创建一个Pod。 - Pod被分配了
tenant-alpha-indexer-ksa这个Service Account。 - Pod内的Go应用启动,它需要访问GCS。
- Go的GCP SDK会探测环境,发现自己在一个GKE Pod中,并向GKE元数据服务器
http://metadata.google.internal/...发起请求获取凭证。 - GKE元数据服务器检查该Pod的KSA,发现它与GSA
tenant-alpha-indexer-gsa绑定。 - 它代表Pod向Google STS(Security Token Service)请求一个短期的OAuth 2.0访问令牌,这个令牌拥有
tenant-alpha-indexer-gsa的所有权限。 - SDK获得令牌,并用它来签署对GCS API的请求。
- GCP IAM验证令牌,并检查该令牌(也就是
tenant-alpha-indexer-gsa)是否有权读取tenant-alpha-raw-dataBucket。权限检查通过,返回数据。
如果另一个租户(tenant-beta)的Pod,哪怕它被攻破,尝试使用同样的代码去访问tenant-alpha-raw-data Bucket,会因为它的Pod没有绑定到正确的KSA-GSA对而直接被IAM拒绝。这就是基础设施级别的安全隔离。
当前方案的局限性与未来展望
这个架构有效地解决了索引器侧的认证和授权问题,实现了对数据源的最小权限访问。然而,它并非银弹。
首先,查询侧的授权依然依赖于Elasticsearch自身的安全机制(例如,X-Pack Security)。用户或服务在查询数据时,仍然需要通过ES的认证。一个更完整的零信任方案是构建一个API网关,置于ES之前。这个网关也运行在GKE上,并使用Workload Identity验证传入请求的调用者身份(比如另一个GKE服务),然后将GCP IAM身份映射到ES的查询权限上。这将实现端到端的GCP IAM授权。
其次,我们的Elasticsearch集群本身是一个共享资源。虽然通过索引隔离了数据,但一个行为异常的租户(例如,发起了代价高昂的聚合查询)仍然可能影响其他租户的性能,造成“吵闹的邻居”问题。未来的迭代方向是探索单元化架构,即通过Kubernetes Operator自动化部署和管理更小的、租户独享或分组共享的ES集群,实现更彻底的资源隔离。
最后,手动创建和绑定GSA、KSA以及IAM策略的过程虽然清晰,但在租户数量增多时会变得非常繁琐且容易出错。将这一整套资源的创建流程通过Terraform或Crossplane等IaC(基础设施即代码)工具进行模板化和自动化,是确保该模式能够规模化应用的关键。