Featured image of post containerd通信机制分析

containerd通信机制分析

深入分析containerd源码

containerd通信机制

简述

Containerd 是一个云原生(容器)领域行业标准容器运行时。以操作系统守护进程方式提供服务,管理一台机器上每个容器生命周期,包括:

  • 镜像下载和存储。
  • 容器 rootfs (根文件系统)的生成。
  • 容器的启动和守护。
  • 容器的低级存储和附加网络。

Containerd 是 CNCF 毕业项目,是 Kubernetes 和 Docker 的默认容器运行时。

Containerd 守护进程默认提供了两套 API:

  • Containerd 原生 GRPC API (源码),并提供了 Go SDK (参见:源码),Docker 以该方式集成 Containerd。
  • Kubernetes 的 CRI GRPC API(源码),形态上通过 Containerd Plugin 的方式提供服务(原生插件,打包到了 Containerd 二进制中),架构参见:docs。Kubernetes 以该方式集成 Containerd。

在底层容器运行时方面,Containerd 采用 OCI-runtime 标准,默认使用 runc 作为运行时。

简单概述典型场景中 Kubernetes、Containerd、Runc 的 层级关系如下:

  • Kubernetes 负责集群(多节点)的调度和管理,在单个节点,通过 Kubelet 组件通过 CRI GRPC 接口调用 Containerd。
  • Containerd 提供单个节点的容器生命周期管理,包括镜像、存储、rootfs、网络,启动容器是 Containerd 通过 OCI-runtime 标准调用 runc。
  • Runc 容器引导器,负责根据一个容器的具体配置,在指定 rootfs 上引导启动一个容器进程。

img

gRPC框架原理

gRPC(Google Remote Procedure Call)是一个现代化、高性能的远程过程调用(RPC)框架,设计用于在分布式系统中实现跨网络、跨语言的服务调用。它利用了 HTTP/2 和 Protocol Buffers,为开发者提供了一种简便的方式来调用远程服务,就像调用本地函数一样。下面是 gRPC 的基本原理及其关键组件。

gRPC 的工作流程

服务定义与接口生成

  • Protocol Buffers 定义服务: containerd 使用 Protocol Buffers(protobuf)来定义 gRPC 服务。这些服务接口和消息结构通常定义在 .proto 文件中。例如,containers.proto 文件定义了容器管理的服务接口。

    protobufCopy codesyntax = "proto3";
    
    service Containers {
        rpc Create (CreateContainerRequest) returns (CreateContainerResponse);
        rpc Start (StartContainerRequest) returns (StartContainerResponse);
        rpc Stop (StopContainerRequest) returns (StopContainerResponse);
    }
    
    message CreateContainerRequest {
        string id = 1;
        // 其他字段...
    }
    
    message CreateContainerResponse {
        // 响应字段...
    }
    
  • 生成客户端和服务端代码: 使用 protoc 编译器,.proto 文件会生成相应的客户端和服务端代码。对于 containerd,这些生成的代码是与 containerd 核心功能集成在一起的,客户端代码通常由外部工具或服务使用。

gRPC 服务的实现

  • 服务实现: containerd 中每个 gRPC 服务都有一个具体的实现,这些实现负责处理来自客户端的请求并执行相应的操作。服务的实现通常位于 containerdservices 目录中,下面是一个示例。

    type containersServer struct {
        // 相关字段和依赖注入
    }
    
    func (s *containersServer) Create(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {
        // 实现容器创建逻辑
    }
    
    func (s *containersServer) Start(ctx context.Context, req *StartContainerRequest) (*StartContainerResponse, error) {
        // 实现容器启动逻辑
    }
    
    // 其他服务方法...
    
  • 服务注册: 在 containerd 启动时,gRPC 服务器会启动并监听一个 Unix Domain Socket (UDS) 地址或 TCP 端口,然后将所有的服务注册到 gRPC 服务器中。

    grpcServer := grpc.NewServer()
    containerspb.RegisterContainersServer(grpcServer, &containersService)
    // 注册其他服务...
    

gRPC 通信机制

  • 请求的发送与接收: 客户端通过生成的 gRPC 客户端代码向 containerd 发送请求。请求通过 gRPC 框架在客户端进行序列化(使用 Protocol Buffers),然后通过底层传输协议(通常是 HTTP/2 over Unix Domain Sockets)发送到 containerd
  • 服务器端处理: containerd 的 gRPC 服务器接收到请求后,将其反序列化为相应的消息对象,并调用对应服务的实现方法进行处理。处理完成后,响应结果再通过 gRPC 返回给客户端。
  • 连接管理: gRPC 支持长连接,多个请求可以在同一个连接中并发进行。对于本地通信,containerd 通常使用 Unix Domain Sockets (UDS) 作为传输层,提升通信效率并增强安全性。

关键应用场景

  • 客户端与 containerd 的通信: 外部客户端(如 ctr 工具或 Docker 引擎)通过 gRPC 与 containerd 进行通信,执行容器的创建、启动、停止、删除等操作。
  • containerd 插件的集成: 插件通过 gRPC 提供服务接口,containerd 核心通过调用这些接口来管理镜像、快照和容器运行时等。
  • containerd-shimcontainerd 的通信: containerd-shim 进程负责管理每个容器的生命周期,与 containerd 核心通过 gRPC 通信来报告容器状态、接收管理命令等。

gRPC中使用UDS的场景

containerd 中,Unix Domain Sockets (UDS) 通信通常用于以下场景:

containerd 守护进程与客户端之间的通信

  • 场景: containerd 守护进程与外部客户端(如 ctr 工具或 Docker 引擎)之间的通信。
  • 例子: 当 Docker 引擎需要与 containerd 交互时,它通过 gRPC 连接到 containerd 的 UDS 地址,如 /run/containerd/containerd.sock,以请求管理容器、镜像、快照等操作。
  • 详细描述:
    • containerd 启动时,会在配置的地址(通常是 /run/containerd/containerd.sock)上创建一个 UDS 监听器。
    • Docker 引擎或其他客户端通过 gRPC 连接到这个 UDS 地址来与 containerd 交互。
    • 这种方式避免了网络开销,并限制了通信只能发生在本地主机上,提高了安全性。

containerd-shimcontainerd 守护进程之间的通信

  • 场景: 每个容器都有一个 containerd-shim 进程,containerd-shim 负责管理容器的生命周期,而 containerd-shimcontainerd 守护进程之间的通信也是通过 UDS 实现的。
  • 例子: 当 containerd 启动一个新的容器时,它会创建并启动一个对应的 containerd-shim 进程。containerd-shim 通过一个 UDS 与 containerd 进行通信,以报告容器的状态和接受命令。
  • 详细描述:
    • containerd-shim 进程会为每个容器创建一个 UDS,用于与 containerd 交互。
    • 通过这个 UDS,containerd 可以向 containerd-shim 发送指令,如启动或停止容器,并接收来自 containerd-shim 的状态更新。

containerd 插件与外部服务的通信

  • 场景: 某些插件可能作为外部服务运行,需要通过 UDS 与 containerd 通信。
  • 例子: 一些外部存储插件可能独立运行并通过 UDS 暴露其 gRPC 服务,containerd 通过连接这个 UDS 来调用插件的服务。
  • 详细描述:
    • 插件可以在不同的进程中运行,通过 UDS 暴露其服务接口。
    • containerd 连接到这些 UDS 地址来与插件交互,执行诸如存储管理或网络管理等操作。

守护进程之间的通信

  • 场景: 如果需要在多个守护进程(例如 containerd 和其他依赖服务)之间建立安全且高效的本地通信,可以使用 UDS。
  • 例子: 在一些复杂的集群环境中,containerd 可能需要与其他守护进程(如 CRI-O、etcd 等)进行本地通信,这种情况下也可能使用 UDS。
  • 详细描述:
    • 多个守护进程在同一台主机上运行时,通过 UDS 通信,避免了使用 TCP/IP 带来的网络开销和安全风险。

containerd源码分析

containerd启动流程

文件路径:containerd/cmd/containerd/main.go,整个程序从这里开始启动,调用command.App()函数。

package main

import (
	"crypto"
	"fmt"
	"os"

	"github.com/containerd/containerd/v2/cmd/containerd/command"
	"github.com/containerd/containerd/v2/internal/hasher"

	_ "github.com/containerd/containerd/v2/cmd/containerd/builtins"
)

func init() {
	crypto.RegisterHash(crypto.SHA256, hasher.NewSHA256)
}

func main() {
	app := command.App()
	if err := app.Run(os.Args); err != nil {
		fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
		os.Exit(1)
	}
}

文件路径:containerd/cmd/containerd/command/main.go,该文件主要用于实现 containerd 守护进程的启动和管理逻辑。该代码包括了如何初始化 containerd 守护进程、解析命令行参数、设置配置项,以及处理信号和服务终止。

// App returns a *cli.App instance.
func App() *cli.App {
	app := cli.NewApp()
	app.Name = "containerd"
	app.Version = version.Version
	app.Usage = usage
	app.Flags = append(app.Flags, serviceFlags()...)
	app.Commands = []*cli.Command{
		configCommand,
		publishCommand,
		ociHook,
	}
	app.Action = func(cliContext *cli.Context) error {
		var (
			start       = time.Now()
			signals     = make(chan os.Signal, 2048)
			serverC     = make(chan *server.Server, 1)
			ctx, cancel = context.WithCancel(cliContext.Context)
			config      = defaultConfig()
		)

//···此处省略部分代码

		if config.Debug.Address != "" {
			var l net.Listener
			if isLocalAddress(config.Debug.Address) {
				if l, err = sys.GetLocalListener(config.Debug.Address, config.Debug.UID, config.Debug.GID); err != nil {
					return fmt.Errorf("failed to get listener for debug endpoint: %w", err)
				}
			} else {
				if l, err = net.Listen("tcp", config.Debug.Address); err != nil {
					return fmt.Errorf("failed to get listener for debug endpoint: %w", err)
				}
			}
			serve(ctx, l, server.ServeDebug)
		}
		if config.Metrics.Address != "" {
			l, err := net.Listen("tcp", config.Metrics.Address)
			if err != nil {
				return fmt.Errorf("failed to get listener for metrics endpoint: %w", err)
			}
			serve(ctx, l, server.ServeMetrics)
		}
		// setup the ttrpc endpoint
		tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)
		if err != nil {
			return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)
		}
		serve(ctx, tl, server.ServeTTRPC)

		if config.GRPC.TCPAddress != "" {
			l, err := net.Listen("tcp", config.GRPC.TCPAddress)
			if err != nil {
				return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)
			}
			serve(ctx, l, server.ServeTCP)
		}
		// setup the main grpc endpoint
		l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)
		if err != nil {
			return fmt.Errorf("failed to get listener for main endpoint: %w", err)
		}
		serve(ctx, l, server.ServeGRPC)

		readyC := make(chan struct{})
		go func() {
			server.Wait()
			close(readyC)
		}()
        
		return nil
	}
	return app
}

func serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) error) {
	path := l.Addr().String()
	log.G(ctx).WithField("address", path).Info("serving...")
	go func() {
		defer l.Close()
		if err := serveFunc(l); err != nil {
			log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure")
		}
	}()
}
//···此处省略部分代码

详细分析(我只挑些重点的分析):

主逻辑 (app.Action)

app.Action 中的主逻辑是 containerd 守护进程启动的核心部分。它包括初始化上下文、加载和应用配置、启动 gRPC 和 TTRPC 服务、信号处理,以及最终的服务启动和管理。以下是对主逻辑的详细分析:

  1. 初始化

    var (
        start       = time.Now()
        signals     = make(chan os.Signal, 2048)
        serverC     = make(chan *server.Server, 1)
        ctx, cancel = context.WithCancel(cliContext.Context)
        config      = defaultConfig()
    )
    defer cancel()
    

    时间记录 (start): 记录守护进程启动的时间,用于后续日志记录启动耗时。

    信号通道 (signals): 创建一个缓冲区为 2048 的通道,用于接收系统信号,如 SIGTERMSIGHUP,以便进行优雅的关闭操作。

    服务器通道 (serverC): 用于在异步初始化完成后传递 containerd 服务器实例。

    上下文 (ctx, cancel): 使用 context.WithCancel 创建一个可取消的上下文,确保在需要时能够取消所有与上下文关联的操作。

    配置 (config): 调用 defaultConfig() 函数创建一个默认配置对象,用于存储加载的配置选项。

  2. 加载和应用配置

    configPath := cliContext.String("config")
    _, err := os.Stat(configPath)
    if !os.IsNotExist(err) || cliContext.IsSet("config") {
        if err := srvconfig.LoadConfig(ctx, configPath, config); err != nil {
            return err
        }
    }
    

    配置文件路径: 从命令行参数中获取配置文件路径(configPath),默认情况下指向 /etc/containerd/config.toml 或其他默认路径。

    检查配置文件存在性: 使用 os.Stat 检查配置文件是否存在,或者用户是否明确指定了配置文件路径。如果存在或指定了路径,调用 srvconfig.LoadConfig 加载配置文件到 config 对象中。

    错误处理: 如果加载配置失败,立即返回错误并终止启动流程。

  3. 确保必要配置存在

    if config.GRPC.Address == "" {
        return fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument)
    }
    if config.TTRPC.Address == "" {
        config.TTRPC.Address = config.GRPC.Address + ".ttrpc"
        config.TTRPC.UID = config.GRPC.UID
        config.TTRPC.GID = config.GRPC.GID
    }
    
    • 验证 gRPC 地址: 确保 gRPC 服务的地址已在配置中设置。如果没有设置,则返回错误,因为 gRPC 地址是 containerd 服务的关键配置。
    • 设置 TTRPC 地址: 如果 TTRPC 地址未配置,则使用 gRPC 地址附加 .ttrpc 作为默认地址,并复制 gRPC 的用户 ID 和组 ID 设置。
  4. 处理服务注册和信号

    stop, err := registerUnregisterService(config.Root)
    if err != nil {
        log.L.Fatal(err)
    }
    if stop {
        return nil
    }
    
    done := handleSignals(ctx, signals, serverC, cancel)
    signal.Notify(signals, handledSignals...)
    
    • 服务注册/注销: 调用 registerUnregisterService 函数,处理 Windows 服务的注册或注销。如果处理完毕则终止程序(用于在 Windows 上操作服务控制管理器)。
    • 信号处理: 调用 handleSignals 函数,设置信号处理器 done,用于处理来自系统的信号(如 SIGTERMSIGINT)。然后调用 signal.Notify,将指定的信号注册到 signals 通道中,以便在接收到信号时触发相应的操作。
  5. 启动 containerd 服务器

    type srvResp struct {
        s   *server.Server
        err error
    }
    
    chsrv := make(chan srvResp)
    go func() {
        defer close(chsrv)
    
        server, err := server.New(ctx, config)
        if err != nil {
            select {
            case chsrv <- srvResp{err: err}:
            case <-ctx.Done():
            }
            return
        }
    
        if err := launchService(server, done); err != nil {
            log.L.Fatal(err)
        }
        select {
        case <-ctx.Done():
            server.Stop()
        case chsrv <- srvResp{s: server}:
        }
    }()
    
    • 异步初始化服务器: 使用 goroutine 异步初始化 containerd 服务器,避免在主线程中阻塞,例如在 Bolt 数据库初始化过程中。
    • 创建 containerd 服务器: 调用 server.New 使用配置初始化 containerd 服务器实例。如果出现错误,通过 chsrv 通道返回错误并退出。
    • 启动服务: 如果需要,调用 launchService 启动 containerd 服务器作为 Windows 服务(仅在 Windows 平台上)。
    • 停止服务器: 监听 ctx.Done() 信号,当上下文被取消时,调用 server.Stop 停止服务器。

    进一步分析server.New()函数,这个函数在containerd/cmd/containerd/server/server.go中。

    这个函数用于创建和初始化 gRPC 服务器的核心部分。它涉及配置的迁移、插件的加载与初始化、gRPC 服务的注册,以及其他服务器设置。(我这里只分析了部分重要源码)

    (1)配置迁移

    if currentVersion < version.ConfigVersion {
        // Migrate config to latest version
        t1 := time.Now()
        err := config.MigrateConfig(ctx)
        if err != nil {
            return nil, err
        }
        migrationT = time.Since(t1)
    }
    
    • 版本检查与迁移:如果当前配置的版本低于系统要求的版本,则进行配置迁移。迁移时间被记录在 migrationT 中。

    • 配置迁移:通过 config.MigrateConfig(ctx) 方法进行迁移,将配置更新到最新版本。

    (2)配置 Stream 处理器

    for id, p := range config.StreamProcessors {
        diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args, p.Env))
    }
    
    • 注册流处理器:根据配置中的 StreamProcessors 注册流处理器,这些处理器用于处理数据流。

    (3)、初始化 gRPC 服务器

    serverOpts := []grpc.ServerOption{
        grpc.StatsHandler(otelgrpc.NewServerHandler()),
        grpc.ChainStreamInterceptor(
            streamNamespaceInterceptor,
            prometheusServerMetrics.StreamServerInterceptor(),
        ),
        grpc.ChainUnaryInterceptor(
            unaryNamespaceInterceptor,
            prometheusServerMetrics.UnaryServerInterceptor(),
        ),
    }
    
    • gRPC 服务器选项:配置 gRPC 服务器的选项,如统计处理程序、拦截器等。这些拦截器可以用于处理命名空间、监控等。

    (4)、注册服务

    for _, service := range grpcServices {
        if err := service.Register(grpcServer); err != nil {
            return nil, err
        }
    }
    
  6. 启动 gRPC 和其他服务(重点)

    if config.Debug.Address != "" {
        var l net.Listener
        if isLocalAddress(config.Debug.Address) {
            if l, err = sys.GetLocalListener(config.Debug.Address, config.Debug.UID, config.Debug.GID); err != nil {
                return fmt.Errorf("failed to get listener for debug endpoint: %w", err)
            }
        } else {
            if l, err = net.Listen("tcp", config.Debug.Address); err != nil {
                return fmt.Errorf("failed to get listener for debug endpoint: %w", err)
            }
        }
        serve(ctx, l, server.ServeDebug)
    }
    if config.Metrics.Address != "" {
        l, err := net.Listen("tcp", config.Metrics.Address)
        if err != nil {
            return fmt.Errorf("failed to get listener for metrics endpoint: %w", err)
        }
        serve(ctx, l, server.ServeMetrics)
    }
    
    tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)
    if err != nil {
        return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)
    }
    serve(ctx, tl, server.ServeTTRPC)
    
    if config.GRPC.TCPAddress != "" {
        l, err := net.Listen("tcp", config.GRPC.TCPAddress)
        if err != nil {
            return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)
        }
        serve(ctx, l, server.ServeTCP)
    }
    l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)
    if err != nil {
        return fmt.Errorf("failed to get listener for main endpoint: %w", err)
    }
    serve(ctx, l, server.ServeGRPC)
    
    • 启动调试服务: 如果配置了调试地址,调用 serve 启动调试服务 (ServeDebug)。
    • 启动度量服务: 如果配置了度量地址,调用 serve 启动度量服务 (ServeMetrics)。
    • 启动 TTRPC 服务: 使用 sys.GetLocalListener 获取本地监听器并启动 TTRPC 服务 (ServeTTRPC)。
    • 启动 gRPC 服务: 如果配置了 TCP gRPC 地址,则通过 TCP 启动 gRPC 服务 (ServeTCP)。同时,还会为主要的 gRPC 服务配置 Unix Domain Socket (UDS) 并启动服务 (ServeGRPC)。
  7. 服务启动与处理

    serve 函数用于启动指定的服务,接受一个监听器 l 和一个服务函数 serveFunc 作为参数。它在新的 goroutine 中执行服务函数,并在服务结束后关闭监听器。

    func serve(ctx context.Context, l net.Listener, serveFunc func(net.Listener) error) {
    	path := l.Addr().String()
    	log.G(ctx).WithField("address", path).Info("serving...")
    	go func() {
    		defer l.Close()
    		if err := serveFunc(l); err != nil {
    			log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure")
    		}
    	}()
    }
    

在启动 gRPC 中调用了sys.GetLocalListener(),这个函数在下面的函数中被定义。

文件路径:containerd/pkg/sys/socket_unix.go,这个文件涉及到了uds文件的创建。

package sys

import (
	"fmt"
	"net"
	"os"
	"path/filepath"

	"golang.org/x/sys/unix"
)

// CreateUnixSocket creates a unix socket and returns the listener
func CreateUnixSocket(path string) (net.Listener, error) {
	// BSDs have a 104 limit
	if len(path) > 104 {
		return nil, fmt.Errorf("%q: unix socket path too long (> 104)", path)
	}
	if err := os.MkdirAll(filepath.Dir(path), 0660); err != nil {
		return nil, err
	}
	if err := unix.Unlink(path); err != nil && !os.IsNotExist(err) {
		return nil, err
	}
	return net.Listen("unix", path)
}

// GetLocalListener returns a listener out of a unix socket.
func GetLocalListener(path string, uid, gid int) (net.Listener, error) {
	// Ensure parent directory is created
	if err := mkdirAs(filepath.Dir(path), uid, gid); err != nil {
		return nil, err
	}

	l, err := CreateUnixSocket(path)
	if err != nil {
		return l, err
	}

	if err := os.Chmod(path, 0660); err != nil {
		l.Close()
		return nil, err
	}

	if err := os.Chown(path, uid, gid); err != nil {
		l.Close()
		return nil, err
	}

	return l, nil
}

func mkdirAs(path string, uid, gid int) error {
	if _, err := os.Stat(path); !os.IsNotExist(err) {
		return err
	}

	if err := os.MkdirAll(path, 0770); err != nil {
		return err
	}

	return os.Chown(path, uid, gid)
}

详细分析:

CreateUnixSocket 函数

func CreateUnixSocket(path string) (net.Listener, error) {
	// BSDs have a 104 limit
	if len(path) > 104 {
		return nil, fmt.Errorf("%q: unix socket path too long (> 104)", path)
	}
	if err := os.MkdirAll(filepath.Dir(path), 0660); err != nil {
		return nil, err
	}
	if err := unix.Unlink(path); err != nil && !os.IsNotExist(err) {
		return nil, err
	}
	return net.Listen("unix", path)
}

CreateUnixSocket 函数用于创建一个 Unix Domain Socket,并返回一个用于监听连接的 net.Listener 实例。

详细分析:

  • 路径长度检查:首先,函数检查套接字路径的长度是否超过了 104 个字符。这是因为在一些 BSD 系统中,Unix Domain Socket 的路径长度限制为 104 个字符。如果路径太长,函数会返回一个错误。
  • 创建父目录:使用 os.MkdirAll 创建 Unix Socket 的父目录。如果该目录不存在,MkdirAll 会递归地创建目录。目录权限设置为 0660
  • 删除现有的 Unix Socket 文件:调用 unix.Unlink 尝试删除指定路径上的现有文件(如果存在),以确保新创建的套接字文件不会与旧文件冲突。如果文件不存在,Unlink 会返回一个错误,但如果错误类型是 os.IsNotExist,表示文件本来就不存在,这时会忽略这个错误。
  • 创建 Unix Socket:最后,使用 net.Listen("unix", path) 创建一个 Unix Domain Socket,并返回一个 net.Listener,供后续使用。

GetLocalListener 函数

func GetLocalListener(path string, uid, gid int) (net.Listener, error) {
	// Ensure parent directory is created
	if err := mkdirAs(filepath.Dir(path), uid, gid); err != nil {
		return nil, err
	}

	l, err := CreateUnixSocket(path)
	if err != nil {
		return l, err
	}

	if err := os.Chmod(path, 0660); err != nil {
		l.Close()
		return nil, err
	}

	if err := os.Chown(path, uid, gid); err != nil {
		l.Close()
		return nil, err
	}

	return l, nil
}

GetLocalListener 函数用于创建一个本地的 Unix Domain Socket 监听器,并设置相应的权限和所有者信息。

详细分析:

  • 创建父目录:调用 mkdirAs 函数确保 Unix Socket 的父目录已经创建,并设置了适当的用户 ID (uid) 和组 ID (gid)。
  • 创建 Unix Socket:使用 CreateUnixSocket 函数创建 Unix Domain Socket。
  • 设置权限:使用 os.Chmod 设置 Unix Socket 文件的权限为 0660,即文件所有者和组成员可读写,其他用户无权限。
  • 设置所有者:使用 os.Chown 将 Unix Socket 文件的所有者和组设置为指定的 uidgid
  • 返回监听器:如果所有操作都成功,返回创建的 net.Listener 实例。如果在过程中发生错误,关闭已创建的监听器,并返回错误信息。

mkdirAs 函数

func mkdirAs(path string, uid, gid int) error {
	if _, err := os.Stat(path); !os.IsNotExist(err) {
		return err
	}

	if err := os.MkdirAll(path, 0770); err != nil {
		return err
	}

	return os.Chown(path, uid, gid)
}

mkdirAs 函数用于创建指定的目录,并设置该目录的所有者和权限。

详细分析:

  • 检查目录是否存在:使用 os.Stat 检查目标目录是否已经存在。如果目录存在,返回该目录的状态信息或错误。如果目录不存在,继续执行。
  • 创建目录:使用 os.MkdirAll 创建目标目录,并将权限设置为 0770(即所有者和组成员可以读写执行,其他用户无权限)。
  • 设置所有者:使用 os.Chown 将目录的所有者和组设置为指定的 uidgid
  • 返回结果:如果所有操作都成功,返回 nil。如果任何步骤出错,则返回相应的错误信息。

client与containerd守护进程的通信

文件路径:containerd/client/client.go,这段代码是 containerd 项目中 client 包的实现,负责创建与 containerd 守护进程通信的客户端实例。它包含了与 containerd 进行 gRPC 通信的基础设施,提供了一组方法,用于与 containerd 的各种服务进行交互,例如容器管理、镜像管理、快照管理等。

  1. Client 结构体
type Client struct {
	services
	connMu    sync.Mutex
	conn      *grpc.ClientConn
	runtime   string
	defaultns string
	platform  platforms.MatchComparer
	connector func() (*grpc.ClientConn, error)
}
  • services: 内嵌的 services 结构体,包含了与 containerd 服务通信的具体方法,如 ContainerServiceImageService 等。
  • conn: 一个 gRPC 连接对象,用于与 containerd 守护进程通信。
  • connMu: 一个互斥锁,用于在多线程环境下保护 conn 对象的并发访问。
  • runtime: 表示当前使用的容器运行时。
  • defaultns: 客户端的默认命名空间。
  • platform: 平台匹配器,用于确定运行在特定平台上的容器。
  • connector: 一个函数,用于重新连接到 containerd 守护进程。
  1. New 函数

New 函数用于创建一个新的 Client 实例,连接到指定的 containerd 守护进程地址。

func New(address string, opts ...Opt) (*Client, error) {
	var copts clientOpts
	for _, o := range opts {
		if err := o(&copts); err != nil {
			return nil, err
		}
	}
	if copts.timeout == 0 {
		copts.timeout = 10 * time.Second
	}

	c := &Client{
		defaultns: copts.defaultns,
	}

	if copts.defaultRuntime != "" {
		c.runtime = copts.defaultRuntime
	} else {
		c.runtime = defaults.DefaultRuntime
	}

	if copts.defaultPlatform != nil {
		c.platform = copts.defaultPlatform
	} else {
		c.platform = platforms.Default()
	}

	if copts.services != nil {
		c.services = *copts.services
	}

	if address != "" {
		backoffConfig := backoff.DefaultConfig
		backoffConfig.MaxDelay = copts.timeout
		connParams := grpc.ConnectParams{
			Backoff: backoffConfig,
		}
		gopts := []grpc.DialOption{
			grpc.WithTransportCredentials(insecure.NewCredentials()),
			grpc.WithConnectParams(connParams),
			grpc.WithContextDialer(dialer.ContextDialer),
		}
		if len(copts.dialOptions) > 0 {
			gopts = copts.dialOptions
		}
		gopts = append(gopts, grpc.WithDefaultCallOptions(
			grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize),
			grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)))
		if len(copts.callOptions) > 0 {
			gopts = append(gopts, grpc.WithDefaultCallOptions(copts.callOptions...))
		}
		if copts.defaultns != "" {
			unary, stream := newNSInterceptors(copts.defaultns)
			gopts = append(gopts, grpc.WithChainUnaryInterceptor(unary))
			gopts = append(gopts, grpc.WithChainStreamInterceptor(stream))
		}

		connector := func() (*grpc.ClientConn, error) {
			conn, err := grpc.NewClient(dialer.DialAddress(address), gopts...) //gRPC连接
			if err != nil {
				return nil, fmt.Errorf("failed to dial %q: %w", address, err)
			}
			return conn, nil
		}
		conn, err := connector()
		if err != nil {
			return nil, err
		}
		c.conn, c.connector = conn, connector
	}

	if copts.services == nil && c.conn == nil {
		return nil, fmt.Errorf("no grpc connection or services is available: %w", errdefs.ErrUnavailable)
	}

	// check namespace labels for default runtime
	if copts.defaultRuntime == "" && c.defaultns != "" {
		if label, err := c.GetLabel(context.Background(), defaults.DefaultRuntimeNSLabel); err != nil {
			return nil, err
		} else if label != "" {
			c.runtime = label
		}
	}

	return c, nil
}
  • 选项解析: copts 用于存储客户端配置选项,通过传入的 opts 进行解析配置。
  • 默认配置设置: 如果未指定运行时、平台等选项,使用默认值进行初始化。
  • gRPC 连接:
    • 配置了连接参数和拨号选项(如超时、TLS、安全凭证等)。
    • 使用 grpc.NewClient 建立到 containerd 守护进程的 gRPC 连接。
    • 连接成功后,保存连接对象 conn 和重新连接的函数 connector
  • 命名空间标签检查: 如果没有指定运行时,并且设置了默认命名空间,会尝试从命名空间标签中获取默认运行时。
  • 错误处理: 如果无法建立 gRPC 连接,或未提供连接或服务,则返回错误。
  1. NewWithConn 函数

NewWithConn 函数用于创建一个与现有的 gRPC 连接关联的 Client 实例。

func NewWithConn(conn *grpc.ClientConn, opts ...Opt) (*Client, error) {
	var copts clientOpts
	for _, o := range opts {
		if err := o(&copts); err != nil {
			return nil, err
		}
	}
	c := &Client{
		defaultns: copts.defaultns,
		conn:      conn,
		runtime:   defaults.DefaultRuntime,
	}

	if copts.defaultPlatform != nil {
		c.platform = copts.defaultPlatform
	} else {
		c.platform = platforms.Default()
	}

	// check namespace labels for default runtime
	if copts.defaultRuntime == "" && c.defaultns != "" {
		if label, err := c.GetLabel(context.Background(), defaults.DefaultRuntimeNSLabel); err != nil {
			return nil, err
		} else if label != "" {
			c.runtime = label
		}
	}

	if copts.services != nil {
		c.services = *copts.services
	}
	return c, nil
}
  • 参数说明:
    • conn: 一个现有的 gRPC 连接实例。
    • opts: 可选的客户端配置选项。
  • 初始化:
    • New 函数类似,初始化 Client 实例,设置默认运行时和平台。
    • 如果传入了服务配置 services,则使用该配置。
  1. Reconnect 函数

Reconnect 函数用于重新建立与 containerd 守护进程的 gRPC 连接。

func (c *Client) Reconnect() error {
	if c.connector == nil {
		return fmt.Errorf("unable to reconnect to containerd, no connector available: %w", errdefs.ErrUnavailable)
	}
	c.connMu.Lock()
	defer c.connMu.Unlock()
	c.conn.Close()
	conn, err := c.connector()
	if err != nil {
		return err
	}
	c.conn = conn
	return nil
}
  • 锁定连接: 使用 connMu 互斥锁来防止并发修改 conn 对象。
  • 关闭旧连接: 关闭现有的 gRPC 连接。
  • 重新连接: 使用 connector 函数重新建立连接,并将新连接赋值给 conn
  1. IsServing 函数

IsServing 函数用于检查 containerd 守护进程是否正在运行,并返回 SERVING 状态。

func (c *Client) IsServing(ctx context.Context) (bool, error) {
	c.connMu.Lock()
	if c.conn == nil {
		c.connMu.Unlock()
		return false, fmt.Errorf("no grpc connection available: %w", errdefs.ErrUnavailable)
	}
	c.connMu.Unlock()
	r, err := c.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.WaitForReady(true))
	if err != nil {
		return false, err
	}
	return r.Status == grpc_health_v1.HealthCheckResponse_SERVING, nil
}
  • 锁定连接: 确保在检查连接状态时不会有其他线程修改 conn 对象。
  • 健康检查: 调用 HealthService().Check 方法,检查 containerd 的健康状态,确定服务是否可用。
  • 返回值: 如果服务正在运行并返回 SERVING 状态,则返回 true,否则返回 false 和错误信息。

containerd 的代码中,调用 grpc.NewClient 函数实际上是直接使用 gRPC 库中的这个函数来创建 gRPC 客户端连接。这个函数内部会创建和管理与 containerd 服务器的 gRPC 连接,下一部分将详细分析gRPC框架。

源码链接:grpc-go/clientconn.go at v1.65.0 · grpc/grpc-go (github.com)

func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
	cc := &ClientConn{
		target: target,
		conns:  make(map[*addrConn]struct{}),
		dopts:  defaultDialOptions(),
	}

	cc.retryThrottler.Store((*retryThrottler)(nil))
	cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
	cc.ctx, cc.cancel = context.WithCancel(context.Background())

	// Apply dial options.
	disableGlobalOpts := false
	for _, opt := range opts {
		if _, ok := opt.(*disableGlobalDialOptions); ok {
			disableGlobalOpts = true
			break
		}
	}

	if !disableGlobalOpts {
		for _, opt := range globalDialOptions {
			opt.apply(&cc.dopts)
		}
	}

	for _, opt := range opts {
		opt.apply(&cc.dopts)
	}

	// Determine the resolver to use.
	if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
		return nil, err
	}

	for _, opt := range globalPerTargetDialOptions {
		opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
	}

	chainUnaryClientInterceptors(cc)
	chainStreamClientInterceptors(cc)

	if err := cc.validateTransportCredentials(); err != nil {
		return nil, err
	}

	if cc.dopts.defaultServiceConfigRawJSON != nil {
		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
		if scpr.Err != nil {
			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
		}
		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
	}
	cc.mkp = cc.dopts.copts.KeepaliveParams

	if err = cc.initAuthority(); err != nil {
		return nil, err
	}

	// Register ClientConn with channelz. Note that this is only done after
	// channel creation cannot fail.
	cc.channelzRegistration(target)
	channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
	channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)

	cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
	cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)

	cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
	cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
	return cc, nil
}

在 gRPC 框架中,NewClient 函数负责为给定的目标 URI 创建一个 gRPC “通道”(即 ClientConn)。ClientConn 是 gRPC 客户端与服务器之间的虚拟连接,它可以根据需要创建多个实际连接。这个函数主要执行以下任务:

  1. 解析目标 URI:根据目标 URI 的方案(如 dns://passthrough://),选择合适的解析器来解析服务地址。
  2. 配置连接参数:应用各种拨号选项(如安全凭证、负载均衡策略、重试策略等),并初始化 ClientConn 的状态。
  3. 创建连接:在后台尝试与目标地址建立连接,并根据连接状态更新 ClientConn 的状态。
  4. 管理连接生命周期:在连接的生命周期中,ClientConn 会自动处理连接失败、重连、负载均衡等逻辑。

gRPC框架中的RPC通信

gRPC-Go 中 RPC 通信

Go版本gRPC通信机制概述

在 gRPC-Go 中,客户端发起的每一个 RPC 调用都会涉及到以下几个步骤:

  1. 发起 RPC 请求:客户端创建一个 RPC 调用,并发送元数据和请求数据。
  2. 服务器处理请求:服务器接收到请求,处理并生成响应。
  3. 发送响应:服务器将响应发送回客户端。
  4. 接收响应:客户端接收到响应,并将结果返回给用户。

Go版本gRPC源码结构

gRPC-Go 的源码主要分为以下几个模块:

  • transport:负责底层 HTTP/2 传输的实现。
  • stream:管理和控制 gRPC 的流。
  • serverclient:分别管理 gRPC 服务器和客户端的生命周期。
  • codec:用于消息的编码和解码。

gRPC-Go 中 RPC 通信流程的源码分析

ClientConn 的初始化
func NewClient(target string, opts ...DialOption) (*ClientConn, error) {
    cc := &ClientConn{
        target: target,
        conns:  make(map[*addrConn]struct{}),
        dopts:  defaultDialOptions(),
    }
    
    // 解析 target 地址,选择合适的 resolver (解析器)
    if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
        return nil, err
    }

    // 初始化连接管理和负载均衡
    cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
    cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)


    return cc, nil
}
RPC 请求的发起

在 gRPC-Go 中,RPC 请求的发起主要通过 invoke 方法。这个方法位于 google.golang.org/grpc 包的 call.go 文件中:

func invoke(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	if err != nil {
		return err
	}
	if err := cs.SendMsg(req); err != nil {
		return err
	}
	return cs.RecvMsg(reply)
}

invoke 方法是客户端执行单个 RPC 调用的入口。它首先通过 newClientStream 方法创建一个新的客户端流(ClientStream),然后发送消息并接收响应。

创建 ClientStream

newClientStream 的实现位于 google.golang.org/grpc 包的 stream.go 文件中,它负责初始化 gRPC 的流:

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
    // 初始化 stream,设置传输层
    t, err := cc.getTransport(ctx, opts...)
    if err != nil {
        return nil, err
    }
    
    // 开始流
    s, err := t.NewStream(ctx, hdr)
    if err != nil {
        return nil, err
    }

    return &clientStream{s: s, desc: desc}, nil
}

这里的 NewStream 方法是由 transport 层来处理的,它将创建一个新的 HTTP/2 流,用于后续的消息传输。

传输层(Transport Layer)

在 gRPC-Go 中,传输层的核心实现位于 transport/http2_client.go 中。这个文件包含了 gRPC 使用 HTTP/2 进行数据传输的核心逻辑。

func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
    // 创建 HTTP/2 流
    t.mu.Lock()
    s := &Stream{
        ...
    }
    t.activeStreams[s.id] = s
    t.mu.Unlock()

    // 发送请求头部
    err = t.framer.writeHeaders(s, ...)
    if err != nil {
        return nil, err
    }
    
    return s, nil
}

NewStream 方法创建一个新的 HTTP/2 流,并通过 writeHeaders 方法将 gRPC 请求的元数据发送到服务器。

消息的发送和接收

在客户端流中,消息的发送和接收通过 SendMsgRecvMsg 方法来完成,这些方法同样位于 stream.go 文件中。

//(cs *clientStream) SendMsg(m any) (err error)函数gRPC-Go 客户端用于发送消息的核心方法。它执行了从消息准备到实际发送的整个过程,并处理错误和重试逻辑。
func (cs *clientStream) SendMsg(m any) (err error) {
	//错误处理的 defer 逻辑
    defer func() {
		if err != nil && err != io.EOF {
			cs.finish(err)
		}
	}()
	if cs.sentLast {   //防止重复调用
		return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
	}
	if !cs.desc.ClientStreams {   //处理非客户端流式的 RPC
		cs.sentLast = true
	}

	// load hdr, payload, data
	hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
	if err != nil {  //准备消息数据,比如编码什么的
		return err
	}

	// TODO(dfawley): should we be checking len(data) instead?
	if len(payload) > *cs.callInfo.maxSendMessageSize {  //检查消息大小
		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
	}
	op := func(a *csAttempt) error {   //消息发送逻辑和重试机制
		return a.sendMsg(m, hdr, payload, data)
	}
	err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
    
	if len(cs.binlogs) != 0 && err == nil {  //二进制日志记录
		cm := &binarylog.ClientMessage{
			OnClientSide: true,
			Message:      data,
		}
		for _, binlog := range cs.binlogs {  
			binlog.Log(cs.ctx, cm)
		}
	}
	return err
}

//func (cs *clientStream) RecvMsg(m any) error 是 gRPC-Go 客户端用于接收服务器响应消息的核心方法。这个函数在客户端接收服务器的响应时执行,并包括了错误处理、重试机制、以及日志记录等功能。
func (cs *clientStream) RecvMsg(m any) error {
    //二进制日志记录初始化
	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {  
		// Call Header() to binary log header if it's not already logged.
		cs.Header()
	}
    //接收信息的初始化
	var recvInfo *payloadInfo
	if len(cs.binlogs) != 0 {
		recvInfo = &payloadInfo{}
	}
    //消息接收逻辑及重试机制
	err := cs.withRetry(func(a *csAttempt) error {
		return a.recvMsg(m, recvInfo)
	}, cs.commitAttemptLocked)
    //二进制日志记录处理
	if len(cs.binlogs) != 0 && err == nil {
		sm := &binarylog.ServerMessage{
			OnClientSide: true,
			Message:      recvInfo.uncompressedBytes,
		}
		for _, binlog := range cs.binlogs {
			binlog.Log(cs.ctx, sm)
		}
	}
    // 结束流或处理错误
	if err != nil || !cs.desc.ServerStreams {
		// err != nil or non-server-streaming indicates end of stream.
		cs.finish(err)
	}
	return err
}

上面的SendMsgRecvMsg 方法实际调用了更底层的sendMsgrecvMsg

//sendMsg 函数的主要职责是将编码后的消息通过底层传输层发送给服务器,并处理发送过程中的错误和统计信息。
func (a *csAttempt) sendMsg(m any, hdr, payld, data []byte) error {
	cs := a.cs
	if a.trInfo != nil {
		a.mu.Lock()
		if a.trInfo.tr != nil {
			a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
		}
		a.mu.Unlock()
	}
	if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
		if !cs.desc.ClientStreams {
			// For non-client-streaming RPCs, we return nil instead of EOF on error
			// because the generated code requires it.  finish is not called; RecvMsg()
			// will call it with the stream's status independently.
			return nil
		}
		return io.EOF
	}
	for _, sh := range a.statsHandlers {
		sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
	}
	if channelz.IsOn() {
		a.t.IncrMsgSent()
	}
	return nil
}

//recvMsg 函数的主要职责是从服务器接收消息并解码,同时处理接收过程中的错误、解压缩和统计信息。
func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
	cs := a.cs
	if len(a.statsHandlers) != 0 && payInfo == nil {
		payInfo = &payloadInfo{}
	}

	if !a.decompSet {
		// Block until we receive headers containing received message encoding.
		if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
			if a.dc == nil || a.dc.Type() != ct {
				// No configured decompressor, or it does not match the incoming
				// message encoding; attempt to find a registered compressor that does.
				a.dc = nil
				a.decomp = encoding.GetCompressor(ct)
			}
		} else {
			// No compression is used; disable our decompressor.
			a.dc = nil
		}
		// Only initialize this state once per stream.
		a.decompSet = true
	}
	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
	if err != nil {
		if err == io.EOF {
			if statusErr := a.s.Status().Err(); statusErr != nil {
				return statusErr
			}
			return io.EOF // indicates successful end of stream.
		}

		return toRPCErr(err)
	}
	if a.trInfo != nil {
		a.mu.Lock()
		if a.trInfo.tr != nil {
			a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
		}
		a.mu.Unlock()
	}
	for _, sh := range a.statsHandlers {
		sh.HandleRPC(a.ctx, &stats.InPayload{
			Client:   true,
			RecvTime: time.Now(),
			Payload:  m,
			// TODO truncate large payload.
			Data:             payInfo.uncompressedBytes,
			WireLength:       payInfo.compressedLength + headerLen,
			CompressedLength: payInfo.compressedLength,
			Length:           len(payInfo.uncompressedBytes),
		})
	}
	if channelz.IsOn() {
		a.t.IncrMsgRecv()
	}
	if cs.desc.ServerStreams {
		// Subsequent messages should be received by subsequent RecvMsg calls.
		return nil
	}
	// Special handling for non-server-stream rpcs.
	// This recv expects EOF or errors, so we don't collect inPayload.
	err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
	if err == nil {
		return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
	}
	if err == io.EOF {
		return a.s.Status().Err() // non-server streaming Recv returns nil on success
	}
	return toRPCErr(err)
}
服务器端的处理

在服务器端,RPC 调用的处理通过 handleStream 函数进行管理。这个函数位于 server.go 文件中,handleStream 方法处理传入的流,解析客户端的请求,并调用相应的服务方法来生成响应。

上下文初始化和追踪信息

ctx := stream.Context()
ctx = contextWithServer(ctx, s)
var ti *traceInfo
if EnableTracing {
	tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
	ctx = newTraceContext(ctx, tr)
	ti = &traceInfo{
		tr: tr,
		firstLine: firstLine{
			client:     false,
			remoteAddr: t.Peer().Addr,
		},
	}
	if dl, ok := ctx.Deadline(); ok {
		ti.firstLine.deadline = time.Until(dl)
	}
}
  • 上下文准备:从流中提取出上下文,并将服务器相关的信息添加到上下文中。
  • 追踪信息:如果启用了追踪(EnableTracing),函数会创建一个新的追踪对象,并将其添加到上下文中。追踪对象用于记录请求的处理过程和相关的元数据(如客户端地址、截止时间等)。

解析方法名称

sm := stream.Method()
if sm != "" && sm[0] == '/' {
	sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
	// 处理错误的请求方法
	if ti != nil {
		ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
		ti.tr.SetError()
	}
	errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
		if ti != nil {
			ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
			ti.tr.SetError()
		}
		channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
	}
	if ti != nil {
		ti.tr.Finish()
	}
	return
}
service := sm[:pos]
method := sm[pos+1:]
  • 方法名称解析:从请求的 Method 中解析出服务名和方法名。如果解析失败(例如方法名称格式不正确),则立即返回一个 Unimplemented 错误状态。

处理元数据和统计信息

md, _ := metadata.FromIncomingContext(ctx)
for _, sh := range s.opts.statsHandlers {
	ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
	sh.HandleRPC(ctx, &stats.InHeader{
		FullMethod:  stream.Method(),
		RemoteAddr:  t.Peer().Addr,
		LocalAddr:   t.Peer().LocalAddr,
		Compression: stream.RecvCompress(),
		WireLength:  stream.HeaderWireLength(),
		Header:      md,
	})
}
stream.SetContext(ctx)
  • 元数据提取:从上下文中提取元数据(metadata),例如请求头中的信息。
  • 统计处理:遍历所有已配置的统计处理器,并调用它们来记录此次 RPC 调用的统计数据。

根据方法名称选择处理器

srv, knownService := s.services[service]
if knownService {
	if md, ok := srv.methods[method]; ok {
		s.processUnaryRPC(ctx, t, stream, srv, md, ti)
		return
	}
	if sd, ok := srv.streams[method]; ok {
		s.processStreamingRPC(ctx, t, stream, srv, sd, ti)
		return
	}
}
  • 服务和方法查找:根据解析出的服务名和方法名,在注册的服务中查找对应的服务对象和方法。
  • 处理请求:如果找到对应的处理器,则调用 processUnaryRPCprocessStreamingRPC 来处理请求。这两个函数分别用于处理单向 RPC 和流式 RPC。

处理未知服务或方法

if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
	s.processStreamingRPC(ctx, t, stream, nil, unknownDesc, ti)
	return
}
var errDesc string
if !knownService {
	errDesc = fmt.Sprintf("unknown service %v", service)
} else {
	errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
}
if ti != nil {
	ti.tr.LazyPrintf("%s", errDesc)
	ti.tr.SetError()
}
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
	if ti != nil {
		ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
		ti.tr.SetError()
	}
	channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
}
if ti != nil {
	ti.tr.Finish()
}
  • 处理未知服务或方法:如果服务或方法未找到,函数会返回一个 Unimplemented 状态,表示客户端请求的服务或方法不存在。如果配置了 unknownStreamDesc,将调用其处理器来处理未知的请求,否则直接返回错误。

我们这里继续往下分析processStreamingRPC函数

processStreamingRPC 是 gRPC-Go 服务器端处理流式 RPC 请求的关键函数。它负责处理客户端与服务器之间的双向或单向流式 RPC 调用。这个函数执行的主要任务包括:初始化上下文和流、处理压缩和解压缩、调用实际的 RPC 方法处理器,并在完成后记录日志和状态。

1、函数结构概述

processStreamingRPC 处理流式 RPC 的核心逻辑可以分为以下几个主要部分:

  1. 初始化上下文和追踪信息。
  2. 初始化流(serverStream)对象。
  3. 处理压缩和解压缩逻辑。
  4. 调用实际的 RPC 处理器。
  5. 处理 RPC 调用后的清理和日志记录。

2、关键步骤解析

初始化上下文和追踪信息

if channelz.IsOn() {
	s.incrCallsStarted()
}
shs := s.opts.statsHandlers
var statsBegin *stats.Begin
if len(shs) != 0 {
	beginTime := time.Now()
	statsBegin = &stats.Begin{
		BeginTime:      beginTime,
		IsClientStream: sd.ClientStreams,
		IsServerStream: sd.ServerStreams,
	}
	for _, sh := range shs {
		sh.HandleRPC(ctx, statsBegin)
	}
}
ctx = NewContextWithServerTransportStream(ctx, stream)
  • 追踪和统计信息:如果启用了 channelz,服务器会增加启动的 RPC 调用计数。然后,如果有配置统计处理器,会记录 RPC 调用的开始时间和其他元数据。
  • 上下文初始化:通过 NewContextWithServerTransportStream 函数将流对象加入到上下文中,便于后续处理。

初始化 serverStream 对象

ss := &serverStream{
	ctx:                   ctx,
	t:                     t,
	s:                     stream,
	p:                     &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},
	codec:                 s.getCodec(stream.ContentSubtype()),
	maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
	maxSendMessageSize:    s.opts.maxSendMessageSize,
	trInfo:                trInfo,
	statsHandler:          shs,
}
  • 创建 serverStreamserverStream 是 gRPC-Go 用于处理流式 RPC 请求的核心对象。它包含了与当前流相关的所有信息,如上下文、传输层、编解码器等。

处理压缩和解压缩逻辑

if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
	ss.dc = s.opts.dc
} else if rc != "" && rc != encoding.Identity {
	ss.decomp = encoding.GetCompressor(rc)
	if ss.decomp == nil {
		st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
		t.WriteStatus(ss.s, st)
		return st.Err()
	}
}

if s.opts.cp != nil {
	ss.cp = s.opts.cp
	ss.sendCompressorName = s.opts.cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
	ss.comp = encoding.GetCompressor(rc)
	if ss.comp != nil {
		ss.sendCompressorName = rc
	}
}
  • 解压缩设置:检查客户端请求中使用的压缩方法,如果服务器端配置了对应的解压缩器,则设置在 serverStream 对象中。如果找不到对应的解压缩器,则返回 Unimplemented 错误。
  • 压缩设置:如果服务器配置了压缩器,或者客户端请求中指定了压缩方法,服务器会尝试使用相同的压缩方法响应。

调用实际的 RPC 处理器

var appErr error
var server any
if info != nil {
	server = info.serviceImpl
}
if s.opts.streamInt == nil {
	appErr = sd.Handler(server, ss)
} else {
	info := &StreamServerInfo{
		FullMethod:     stream.Method(),
		IsClientStream: sd.ClientStreams,
		IsServerStream: sd.ServerStreams,
	}
	appErr = s.opts.streamInt(server, ss, info, sd.Handler)
}
  • 实际调用:这里根据是否配置了拦截器(streamInt)来决定如何调用实际的 RPC 处理器(Handler)。如果有拦截器,处理器会被拦截器包装;否则直接调用处理器。
  • Handler 函数Handler 是用户定义的处理流式 RPC 请求的函数,它会在服务器端处理从客户端接收到的数据流,并根据逻辑生成响应。

处理 RPC 调用后的清理和日志记录

if appErr != nil {
	appStatus, ok := status.FromError(appErr)
	if !ok {
		appStatus = status.FromContextError(appErr)
		appErr = appStatus.Err()
	}
	if trInfo != nil {
		ss.mu.Lock()
		ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
		ss.trInfo.tr.SetError()
		ss.mu.Unlock()
	}
	if len(ss.binlogs) != 0 {
		st := &binarylog.ServerTrailer{
			Trailer: ss.s.Trailer(),
			Err:     appErr,
		}
		for _, binlog := range ss.binlogs {
			binlog.Log(ctx, st)
		}
	}
	t.WriteStatus(ss.s, appStatus)
	return appErr
}
if trInfo != nil {
	ss.mu.Lock()
	ss.trInfo.tr.LazyLog(stringer("OK"), false)
	ss.mu.Unlock()
}
if len(ss.binlogs) != 0 {
	st := &binarylog.ServerTrailer{
		Trailer: ss.s.Trailer(),
		Err:     appErr,
	}
	for _, binlog := range ss.binlogs {
		binlog.Log(ctx, st)
	}
}
return t.WriteStatus(ss.s, statusOK)
  • 错误处理:如果处理过程中发生错误,函数会将错误转换为 gRPC 状态码并返回给客户端。
  • 日志和追踪:记录处理过程中的日志和追踪信息(如错误信息、响应状态等),确保在调试或监控时有充分的上下文。
  • 返回状态:函数最后会通过 WriteStatus 将处理结果的状态码写回客户端。

接着分析sd.Handler 的来源

在 gRPC 中,服务和方法的定义通常是在 .proto 文件中定义的,之后通过 gRPC 编译器生成相应的 Go 代码。在生成的代码中,每个服务方法都会有一个对应的 Handler 函数。这个 Handler 函数会在服务注册时被传递给 gRPC 服务器。(所以containerd中的服务都是通过handler来处理的,而非每个服务都是一个单独的进程,同时这里与前面形成了闭环,在containerd的启动中有涉及到服务注册

  • sd 对象sd*StreamDesc 类型的对象,描述了流式 RPC 方法的特性,包括是否是客户端流、是否是服务器端流,以及处理该 RPC 调用的 Handler 函数。
  • Handler 字段sd.Handler 是一个函数类型,用于处理特定的流式 RPC 请求。这个函数是用户在服务注册时提供的,用于执行实际的业务逻辑。

Containerd与Containerd-shim通信机制

containerd-shimcontainerd 之间的通信是容器运行时的核心部分,确保容器的创建、管理和删除等操作能够顺利进行。它们之间的通信主要通过以下方式进行:

ttRPC

containerdcontainerd-shim 之间的通信主要通过一种称为 ttRPC 的轻量级 RPC 框架进行。ttRPCcontainerd 项目中引入的,专门设计用于高效的进程间通信(IPC),特别是在同一主机上运行的进程之间。

  • ttRPC:
    • ttRPCcontainerd 团队开发的一种优化后的 RPC 框架,旨在提供比 gRPC 更低的延迟和更小的开销。它直接通过 Unix 域套接字进行通信,没有 HTTP/2 的开销。
    • containerd 使用 ttRPC 来向 containerd-shim 发送控制命令,如启动、停止、挂起和删除容器。

containerd中是这样启动的

tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)
if err != nil {
    return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)
}
serve(ctx, tl, server.ServeTTRPC)

Unix 域套接字

containerdcontainerd-shim 之间的通信通常通过 Unix 域套接字进行。这种通信方式非常适合同一主机上的进程间通信,具有低延迟和高效的特点。

  • Unix 域套接字:
    • 在启动时,containerd-shim 进程会通过一个专用的 Unix 域套接字与 containerd 建立通信。
    • 这个套接字通常位于 /run/containerd//run/containerd/io.containerd.runtime.v2.linux/ 目录下,并带有容器 ID 相关的命名。

容器生命周期管理

containerd 通过 ttRPC 和 Unix 域套接字与 containerd-shim 进行通信,以管理容器的生命周期:

  • 容器创建:
    • containerd 收到创建容器的请求时,它会启动一个新的 containerd-shim 进程。这个进程负责在 runc 或其他 OCI 兼容的运行时上运行容器。
  • 容器管理:
    • containerd-shim 进程负责处理容器的标准输入/输出流、信号管理以及其他与容器相关的操作。containerd 通过 ttRPCcontainerd-shim 发送命令(如启动、停止容器)。
  • 容器删除:
    • 当容器退出时,containerd-shim 进程将继续运行,直到 containerd 通过 ttRPC 命令告知 containerd-shim 进程可以安全退出。这确保了即使 containerd 崩溃,容器进程也不会被终止。

容器与守护进程的分离

containerd-shim 的存在还使得容器与 containerd 守护进程分离。这意味着:

  • 独立性:
    • 即使 containerd 守护进程崩溃或重启,已经运行的容器仍然能够继续运行,因为它们由独立的 containerd-shim 进程管理。
  • 减少依赖:
    • 这种架构减少了对单点故障的依赖,增强了容器运行的稳定性。

Containerd-shim与Container通信机制

通信流程

containerd 通过一个runtime来实现对多个容器的控制,例如 create、start 和 stop。

通信流程如下:

  1. 来自 containerd 的创建容器的客户端请求
  2. containerd 设置容器的文件系统,并创建必要的配置信息
  3. containerd 调用 shim,包括容器配置,这个容器配置决定是启动新的套接字侦听器(shim与container 1:1)还是使用现有的套接字侦听器(1:多)
    • 如果使用现有套接字,则返回现有 socket 的地址并退出
    • 如果是使用新的套接字,则shim
      • a. 创建一个新进程来侦听套接字中来自 containerd 的 ttRPC 命令
      • b. 将该套接字的地址返回给 containerd
      • c. 退出
  4. containerd 向 shim 发送一个命令来启动容器
  5. containerd 通过 API 调用runtime来创建/启动/停止容器

但是,containerd 本身实际上并不直接调用运行时来启动容器。相反,它期望调用运行时,这将暴露一个套接字 , 在类 Unix 系统上是 Unix 域,在 Windows 上名为 pipe, 并通过该套接字上的 ttRPC 侦听容器命令。

运行时有两种常见的模式:

  • 一个用于运行时的二进制文件,它既侦听套接字又创建/启动/停止容器
  • 一个分离的 Shim 二进制文件,用于侦听套接字,并调用一个单独的运行时引擎来创建/启动/停止容器

使用单独的“shim + engine”模式是因为它可以更轻松地集成实现特定运行时引擎规范(如 OCI 运行时规范)的不同运行时。ttRPC 协议可以通过一个runtime shim进行处理,而可以使用不同的运行时引擎实现,只要它们实现 OCI 运行时规范即可。

最常用的运行时引擎是 runc,它实施 OCI 运行时规范。由于这是一个运行时引擎,因此 containerd 不会直接调用它;相反,它由 Shim 调用,该 Shim 侦听套接字并调用运行时引擎。

以下序列图显示了执行 ctr run 命令时的操作流程。

ctr run操作流程

源码解析

shim启动入口:containerd/cmd/containerd-shim-runc-v2/main.go

func main() {
	shim.Run(context.Background(a), manager.NewShimManager("io.containerd.runc.v2"))
}

Run函数:containerd/pkg/shim/shim.go

// Run initializes and runs a shim server.
func Run(ctx context.Context, manager Manager, opts ...BinaryOpts) {
	var config Config
	for _, o := range opts {
		o(&config)
	}

	ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", manager.Name()))

	if err := run(ctx, manager, config); err != nil {
		fmt.Fprintf(os.Stderr, "%s: %s", manager.Name(), err)
		os.Exit(1)
	}
}

shim真正启动:

//shim 启动
func run(ctx context.Context, manager Manager, config Config) error {
    //...

	setRuntime()
	//...

	// Handle explicit actions
	switch action {
	case "delete":
		//...

	case "start":
		opts := StartOpts{
			Address:      addressFlag,
			TTRPCAddress: ttrpcAddress,
			Debug:        debugFlag,
		}
		// 第一个启动的shim接收的action 就是 start。这里启动第二个shim。address是根据ns和id哈希出来的,会传递给第二个shim,第二个shim会以这个地址起一个server,同时会通过stdout发送给containerd(因为c启动的本进程,所以可以收到),这就是containerd和第二个shim交流的.sock地址。
		params, err := manager.Start(ctx, id, opts)
		if err != nil {
			return err
		}

		data, err := json.Marshal(&params)
		if err != nil {
			return fmt.Errorf("failed to marshal bootstrap params to json: %w", err)
		}

		if _, err := os.Stdout.Write(data); err != nil {
			return err
		}

		return nil
	}

	//...
	unaryInterceptor := chainUnaryServerInterceptors(ttrpcUnaryInterceptors...)
	server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor))
	if err != nil {
		return fmt.Errorf("failed creating server: %w", err)
	}

	for _, srv := range ttrpcServices {
		if err := srv.RegisterTTRPC(server); err != nil {
			return fmt.Errorf("failed to register service: %w", err)
		}
	}

	if err := serve(ctx, server, signals, sd.Shutdown); err != nil {
		if !errors.Is(err, shutdown.ErrShutdown) {
			cleanupSockets(ctx)
			return err
		}
	}
	//...
}

启动第二个shim:

这里有一个有意思地方是第二个shim如何获取自身作为server的socket地址。从代码上看是通过把套接字转换成文件描述符传递给第二个shim,然后第二个shim再还原成listener实现的。

func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ shim.BootstrapParams, retErr error) {
	var params shim.BootstrapParams
	params.Version = 3
	params.Protocol = "ttrpc"

	cmd, err := newCommand(ctx, id, opts.Address, opts.TTRPCAddress, opts.Debug)
	if err != nil {
		return params, err
	}
	grouping := id
	spec, err := readSpec()
    
    //...

	var sockets []*shimSocket

	s, err := newShimSocket(ctx, opts.Address, grouping, false)
	if err != nil {
		if errdefs.IsAlreadyExists(err) {
			params.Address = s.addr
			return params, nil
		}
		return params, err
	}
	sockets = append(sockets, s)
	cmd.ExtraFiles = append(cmd.ExtraFiles, s.f)


	goruntime.LockOSThread()
	if os.Getenv("SCHED_CORE") != "" {
		if err := schedcore.Create(schedcore.ProcessGroup); err != nil {
			return params, fmt.Errorf("enable sched core support: %w", err)
		}
	}

	if err := cmd.Start(); err != nil {
		return params, err
	}

	goruntime.UnlockOSThread()

    // 启动成功后,第一个 shim 退出,执行清理操作
    defer func() {
		if retErr != nil {
			cmd.Process.Kill()
		}
	}()
	// make sure to wait after start
	go cmd.Wait()
    
    //...

	params.Address = sockets[0].addr
	return params, nil
}

shim socket套接字创建:

//shim socket创建
func newShimSocket(ctx context.Context, path, id string, debug bool) (*shimSocket, error) {
	address, err := shim.SocketAddress(ctx, path, id, debug)
	socket, err := shim.NewSocket(address)
	//...
	s := &shimSocket{
		addr: address,
		s:    socket,
	}
	f, err := socket.File()
	if err != nil {
		s.Close()
		return nil, err
	}
	s.f = f
	return s, nil
}

SocketAddress:生成一个唯一的位于/run/containerd/s/<哈希值>下的Unix 套接字地址

// SocketAddress returns a socket address
func SocketAddress(ctx context.Context, socketPath, id string, debug bool) (string, error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return "", err
	}
	path := filepath.Join(socketPath, ns, id)
	if debug {
		path = filepath.Join(path, "debug")
	}
	d := sha256.Sum256([]byte(path))
	return fmt.Sprintf("unix://%s/%x", filepath.Join(socketRoot, "s"), d), nil
}

NewSocket:设置sock文件权限

//真正的socket创建
// NewSocket returns a new socket
func NewSocket(address string) (*net.UnixListener, error) {
	var (
		sock       = socket(address)
		path       = sock.path()
		isAbstract = sock.isAbstract()
		perm       = os.FileMode(0600)
	)

	// Darwin needs +x to access socket, otherwise it'll fail with "bind: permission denied" when running as non-root.
	if runtime.GOOS == "darwin" {
		perm = 0700
	}

	if !isAbstract {
		if err := os.MkdirAll(filepath.Dir(path), perm); err != nil {
			return nil, fmt.Errorf("mkdir failed for %s: %w", path, err)
		}
	}
	l, err := net.Listen("unix", path)
	if err != nil {
		return nil, err
	}

	if !isAbstract {
		if err := os.Chmod(path, perm); err != nil {
			os.Remove(sock.path())
			l.Close()
			return nil, fmt.Errorf("chmod failed for %s: %w", path, err)
		}
	}

	return l.(*net.UnixListener), nil
}

serve函数,启动 ttrpc 服务,并提供RPC服务:

// serve serves the ttrpc API over a unix socket in the current working directory
// and blocks until the context is canceled
func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, shutdown func()) error {
	dump := make(chan os.Signal, 32)
	setupDumpStacks(dump)

	path, err := os.Getwd()
	if err != nil {
		return err
	}
    //创建 Unix 套接字监听器
	l, err := serveListener(socketFlag, 3)
	if err != nil {
		return err
	}
    // 启动 ttrpc 服务器
	go func() {
		defer l.Close()
		if err := server.Serve(ctx, l); err != nil && !errors.Is(err, net.ErrClosed) {
			log.G(ctx).WithError(err).Fatal("containerd-shim: ttrpc server failure")
		}
	}()

	//...

	go handleExitSignals(ctx, logger, shutdown)
	return reap(ctx, logger, signals)
}

创建一个用于监听 Unix 套接字的 net.Listener

// serve()会最终调用这个函数来启动服务监听
func serveListener(path string, fd uintptr) (net.Listener, error) {
    //创建监听器的逻辑
	var (
		l   net.Listener
		err error
	)
    //处理继承的文件描述符
	if path == "" {
       //os.NewFile(fd, "socket") 将文件描述符 fd 封装成
       //一个*os.File 对象,并使用 net.FileListener 将其
       //转换为 net.Listener,这样可以通过套接字进行通信。
		l, err = net.FileListener(os.NewFile(fd, "socket"))
		path = "[inherited from parent]"
	} else {  //创建新的 Unix 套接字
		if len(path) > socketPathLimit {
			return nil, fmt.Errorf("%q: unix socket path too long (> %d)", path, socketPathLimit)
		}
		l, err = net.Listen("unix", path)
	}
	if err != nil {
		return nil, err
	}
	log.L.WithField("socket", path).Debug("serving api on socket")·
	return l, nil
}

回到serve函数,containerd-shim以注册服务的形式来对containerd提供容器相关操作,下面是相关服务注册的源码,可以看到shim通过调用runc容器运行时来创建容器。

func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
	taskAPI.RegisterTTRPCTaskService(server, s)
	return nil
}

// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	s.lifecycleMu.Lock()
	handleStarted, cleanup := s.preStart(nil)
	s.lifecycleMu.Unlock()
	defer cleanup()

	container, err := runc.NewContainer(ctx, s.platform, r)
	if err != nil {
		return nil, err
	}

	s.containers[r.ID] = container

	s.send(&eventstypes.TaskCreate{
		ContainerID: r.ID,
		Bundle:      r.Bundle,
		Rootfs:      r.Rootfs,
		IO: &eventstypes.TaskIO{
			Stdin:    r.Stdin,
			Stdout:   r.Stdout,
			Stderr:   r.Stderr,
			Terminal: r.Terminal,
		},
		Checkpoint: r.Checkpoint,
		Pid:        uint32(container.Pid()),
	})

	// The following line cannot return an error as the only state in which that
	// could happen would also cause the container.Pid() call above to
	// nil-deference panic.
	proc, _ := container.Process("")
	handleStarted(container, proc)

	return &taskAPI.CreateTaskResponse{
		Pid: uint32(container.Pid()),
	}, nil
}

注:shim创建的与containerd通信的sock文件的mode为0600,实际情况与源码一致。

sock文件权限

容器启动流程分析

分析流程图如下,task.Start没有往下分析,它的函数传递流程与Newtask类似。

ctr指令解析图

ctr解析命令

//调用command.NewClient()->client.LoadContainer()->NewTask()->task.Start()

/*
	1、command.NewClient() 创建containerd client
	2、LoadContainer()
	3、NewTask()
	4、task.Start()
	//如果收到退出信号
	5、task.Delete(ctx)
*/

var startCommand = &cli.Command{
	Name:      "start",
	Usage:     "Start a container that has been created",
	ArgsUsage: "CONTAINER",
	Flags: append(platformStartFlags, []cli.Flag{
		&cli.BoolFlag{
			Name:  "null-io",
			Usage: "Send all IO to /dev/null",
		},
		&cli.StringFlag{
			Name:  "log-uri",
			Usage: "Log uri",
		},
		&cli.StringFlag{
			Name:  "fifo-dir",
			Usage: "Directory used for storing IO FIFOs",
		},
		&cli.StringFlag{
			Name:  "pid-file",
			Usage: "File path to write the task's pid",
		},
		&cli.BoolFlag{
			Name:    "detach",
			Aliases: []string{"d"},
			Usage:   "Detach from the task after it has started execution",
		},
	}...),
	Action: func(cliContext *cli.Context) error {
		var (
			err    error
			id     = cliContext.Args().Get(0)
			detach = cliContext.Bool("detach")
		)
		if id == "" {
			return errors.New("container id must be provided")
		}
		client, ctx, cancel, err := commands.NewClient(cliContext)

		container, err := client.LoadContainer(ctx, id)

		spec, err := container.Spec(ctx)

		var (
			tty    = spec.Process.Terminal
			opts   = GetNewTaskOpts(cliContext)
			ioOpts = []cio.Opt{cio.WithFIFODir(cliContext.String("fifo-dir"))}
		)
		var con console.Console
		if tty {
			con = console.Current()
			defer con.Reset()
			if err := con.SetRaw(); err != nil {
				return err
			}
		}

		task, err := NewTask(ctx, client, container, "", con, cliContext.Bool("null-io"), cliContext.String("log-uri"), ioOpts, opts...)
        
        //...

		if err := task.Start(ctx); err != nil {
			return err
		}

		if tty {
			if err := HandleConsoleResize(ctx, task, con); err != nil {
				log.L.WithError(err).Error("console resize")
			}
		} else {
			sigc := commands.ForwardAllSignals(ctx, task)
			defer commands.StopCatch(sigc)
		}

		status := <-statusC
		code, _, err := status.Result()
		if err != nil {
			return err
		}
		if _, err := task.Delete(ctx); err != nil {
			return err
		}
		if code != 0 {
			return cli.Exit("", int(code))
		}
		return nil
	},
}

1、containerd client创建

// containerd\containerd\cmd\ctr\commands\client.go
// NewClient returns a new containerd client
func NewClient(cliContext *cli.Context, opts ...containerd.Opt) (*containerd.Client, context.Context, context.CancelFunc, error) {
	timeoutOpt := containerd.WithTimeout(cliContext.Duration("connect-timeout"))
	opts = append(opts, timeoutOpt)
	client, err := containerd.New(cliContext.String("address"), opts...)
	if err != nil {
		return nil, nil, nil, err
	}
	ctx, cancel := AppContext(cliContext)
	var suppressDeprecationWarnings bool
    
	if !suppressDeprecationWarnings {
		resp, err := client.IntrospectionService().Server(ctx)
		if err != nil {
			log.L.WithError(err).Warn("Failed to check deprecations")
		} else {
			for _, d := range resp.Deprecations {
				log.L.Warn("DEPRECATION: " + d.Message)
			}
		}
	}
	return client, ctx, cancel, nil
}

1、实际调用New函数创建

// containerd\containerd\client\client.go
// New returns a new containerd client that is connected to the containerd
// instance provided by address
func New(address string, opts ...Opt) (*Client, error) {
	var copts clientOpts
	for _, o := range opts {
		if err := o(&copts); err != nil {
			return nil, err
		}
	}
	if copts.timeout == 0 {
		copts.timeout = 10 * time.Second
	}

	c := &Client{
		defaultns: copts.defaultns,
	}

	if copts.defaultRuntime != "" {
		c.runtime = copts.defaultRuntime
	} else {
		c.runtime = defaults.DefaultRuntime
	}

	if copts.defaultPlatform != nil {
		c.platform = copts.defaultPlatform
	} else {
		c.platform = platforms.Default()
	}

	if copts.services != nil {
		c.services = *copts.services
	}
	if address != "" {
		backoffConfig := backoff.DefaultConfig
		backoffConfig.MaxDelay = copts.timeout
		connParams := grpc.ConnectParams{
			Backoff: backoffConfig,
		}
		gopts := []grpc.DialOption{
			grpc.WithTransportCredentials(insecure.NewCredentials()),
			grpc.WithConnectParams(connParams),
			grpc.WithContextDialer(dialer.ContextDialer),
		}
		if len(copts.dialOptions) > 0 {
			gopts = copts.dialOptions
		}
		gopts = append(gopts, grpc.WithDefaultCallOptions(
			grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize),
			grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)))
		if len(copts.callOptions) > 0 {
			gopts = append(gopts, grpc.WithDefaultCallOptions(copts.callOptions...))
		}
		if copts.defaultns != "" {
			unary, stream := newNSInterceptors(copts.defaultns)
			gopts = append(gopts, grpc.WithChainUnaryInterceptor(unary))
			gopts = append(gopts, grpc.WithChainStreamInterceptor(stream))
		}

		connector := func() (*grpc.ClientConn, error) {
			conn, err := grpc.NewClient(dialer.DialAddress(address), gopts...)
			if err != nil {
				return nil, fmt.Errorf("failed to dial %q: %w", address, err)
			}
			return conn, nil
		}
		conn, err := connector()
		if err != nil {
			return nil, err
		}
		c.conn, c.connector = conn, connector
	}
	if copts.services == nil && c.conn == nil {
		return nil, fmt.Errorf("no grpc connection or services is available: %w", errdefs.ErrUnavailable)
	}

	// check namespace labels for default runtime
	if copts.defaultRuntime == "" && c.defaultns != "" {
		if label, err := c.GetLabel(context.Background(), defaults.DefaultRuntimeNSLabel); err != nil {
			return nil, err
		} else if label != "" {
			c.runtime = label
		}
	}

	return c, nil
}

2、加载container

// containerd\containerd\client\client.go
// LoadContainer loads an existing container from metadata
func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) {
	ctx, span := tracing.StartSpan(ctx, "client.LoadContainer")
	defer span.End()
	r, err := c.ContainerService().Get(ctx, id)
	if err != nil {
		return nil, err
	}

	span.SetAttributes(
		tracing.Attribute("container.id", r.ID),
		tracing.Attribute("container.image.ref", r.Image),
		tracing.Attribute("container.runtime.name", r.Runtime.Name),
		tracing.Attribute("container.snapshotter.name", r.Snapshotter),
		tracing.Attribute("container.createdAt", r.CreatedAt.Format(time.RFC3339)),
		tracing.Attribute("container.updatedAt", r.UpdatedAt.Format(time.RFC3339)),
	)
	return containerFromRecord(c, r), nil
}

2、ContainerService实际是调用的NewRemoteContainerStore,返回NewContainersClient

// ContainerService returns the underlying container Store
func (c *Client) ContainerService() containers.Store {
	if c.containerStore != nil {
		return c.containerStore
	}
	c.connMu.Lock()
	defer c.connMu.Unlock()
	return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
}

2、gRPC调用containerd的containers.Get函数

containerd/containerd/api/services/containers/v1/containers_grpc.pb.go

func (c *containersClient) Get(ctx context.Context, in *GetContainerRequest, opts ...grpc.CallOption) (*GetContainerResponse, error) {
	out := new(GetContainerResponse)
	err := c.cc.Invoke(ctx, "/containerd.services.containers.v1.Containers/Get", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

2、containerd接收并处理请求

// containerd/containerd/api/services/containers/v1/containers_grpc.pb.go
func _Containers_Get_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(GetContainerRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(ContainersServer).Get(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/containerd.services.containers.v1.Containers/Get",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(ContainersServer).Get(ctx, req.(*GetContainerRequest))
	}
	return interceptor(ctx, in, info, handler)
}

2、接着调用local.Get()函数处理(这里有个对象的转换ContainersServer转换到Service再转换到local)

// containerd/containerd/api/services/containers/v1/containers_grpc.pb.go
// 这里调用底层的数据库获取contianer
func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc.CallOption) (*api.GetContainerResponse, error) {
	var resp api.GetContainerResponse

	return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error {
		container, err := l.Store.Get(ctx, req.ID)
		if err != nil {
			return err
		}
		containerpb := containerToProto(&container)
		resp.Container = containerpb

		return nil
	}))
}

3、NewTask创建容器请求

// containerd\containerd\cmd\ctr\commands\tasks\tasks_unix.go
//NewTask creates a new task
func NewTask(ctx context.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
	stdinC := &stdinCloser{
		stdin: os.Stdin,
	}
	if checkpoint != "" {
		im, err := client.GetImage(ctx, checkpoint)
		if err != nil {
			return nil, err
		}
		opts = append(opts, containerd.WithTaskCheckpoint(im))
	}

	spec, err := container.Spec(ctx)
	if err != nil {
		return nil, err
	}
	if spec.Linux != nil {
		if len(spec.Linux.UIDMappings) != 0 {
			opts = append(opts, containerd.WithUIDOwner(spec.Linux.UIDMappings[0].HostID))
		}
		if len(spec.Linux.GIDMappings) != 0 {
			opts = append(opts, containerd.WithGIDOwner(spec.Linux.GIDMappings[0].HostID))
		}
	}

	var ioCreator cio.Creator
	if con != nil {
		if nullIO {
			return nil, errors.New("tty and null-io cannot be used together")
		}
		ioCreator = cio.NewCreator(append([]cio.Opt{cio.WithStreams(con, con, nil), cio.WithTerminal}, ioOpts...)...)
	} else if nullIO {
		ioCreator = cio.NullIO
	} else if logURI != "" {
		u, err := url.Parse(logURI)
		if err != nil {
			return nil, err
		}
		ioCreator = cio.LogURI(u)
	} else {
		ioCreator = cio.NewCreator(append([]cio.Opt{cio.WithStreams(stdinC, os.Stdout, os.Stderr)}, ioOpts...)...)
	}
	t, err := container.NewTask(ctx, ioCreator, opts...)
	if err != nil {
		return nil, err
	}
	stdinC.closer = func() {
		t.CloseIO(ctx, containerd.WithStdinCloser)
	}
	return t, nil
}

3、调用container.NewTask函数

//containerd\containerd\client\container.go
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
	ctx, span := tracing.StartSpan(ctx, "container.NewTask")
	defer span.End()
	i, err := ioCreate(c.id)
	if err != nil {
		return nil, err
	}
	defer func() {
		if err != nil && i != nil {
			i.Cancel()
			i.Close()
		}
	}()
	cfg := i.Config()
	request := &tasks.CreateTaskRequest{
		ContainerID: c.id,
		Terminal:    cfg.Terminal,
		Stdin:       cfg.Stdin,
		Stdout:      cfg.Stdout,
		Stderr:      cfg.Stderr,
	}
	r, err := c.get(ctx)
	if err != nil {
		return nil, err
	}
	if r.SnapshotKey != "" {
		if r.Snapshotter == "" {
			return nil, fmt.Errorf("unable to resolve rootfs mounts without snapshotter on container: %w", errdefs.ErrInvalidArgument)
		}

		// get the rootfs from the snapshotter and add it to the request
		s, err := c.client.getSnapshotter(ctx, r.Snapshotter)
		if err != nil {
			return nil, err
		}
		mounts, err := s.Mounts(ctx, r.SnapshotKey)
		if err != nil {
			return nil, err
		}
		spec, err := c.Spec(ctx)
		if err != nil {
			return nil, err
		}
		for _, m := range mounts {
			if spec.Linux != nil && spec.Linux.MountLabel != "" {
				if ml := label.FormatMountLabel("", spec.Linux.MountLabel); ml != "" {
					m.Options = append(m.Options, ml)
				}
			}
			request.Rootfs = append(request.Rootfs, &types.Mount{
				Type:    m.Type,
				Source:  m.Source,
				Target:  m.Target,
				Options: m.Options,
			})
		}
	}
	info := TaskInfo{
		runtime: r.Runtime.Name,
	}
	for _, o := range opts {
		if err := o(ctx, c.client, &info); err != nil {
			return nil, err
		}
	}
	for _, m := range info.RootFS {
		request.Rootfs = append(request.Rootfs, &types.Mount{
			Type:    m.Type,
			Source:  m.Source,
			Target:  m.Target,
			Options: m.Options,
		})
	}
	request.RuntimePath = info.RuntimePath
	if info.Options != nil {
		o, err := typeurl.MarshalAny(info.Options)
		if err != nil {
			return nil, err
		}
		request.Options = typeurl.MarshalProto(o)
	}
	t := &task{
		client: c.client,
		io:     i,
		id:     c.id,
		c:      c,
	}
	if info.Checkpoint != nil {
		request.Checkpoint = info.Checkpoint
	}

	span.SetAttributes(
		tracing.Attribute("task.container.id", request.ContainerID),
		tracing.Attribute("task.request.options", request.Options.String()),
		tracing.Attribute("task.runtime.name", info.runtime),
	)
	response, err := c.client.TaskService().Create(ctx, request)
	if err != nil {
		return nil, errdefs.FromGRPC(err)
	}

	span.AddEvent("task created",
		tracing.Attribute("task.process.id", int(response.Pid)),
	)
	t.pid = response.Pid
	return t, nil
}

3、Taskservice获取TasksClient实例

// TaskService returns the underlying TasksClient
func (c *Client) TaskService() tasks.TasksClient {
	if c.taskService != nil {
		return c.taskService
	}
	c.connMu.Lock()
	defer c.connMu.Unlock()
	return tasks.NewTasksClient(c.conn)
}

3、通过gRPC发送给containerd

源码路径:containerd/api/services/tasks/v1/tasks_grpc.pb.go

func (c *tasksClient) Create(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption) (*CreateTaskResponse, error) {
	out := new(CreateTaskResponse)
	err := c.cc.Invoke(ctx, "/containerd.services.tasks.v1.Tasks/Create", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

3、containerd接收并处理请求

源码路径:containerd/api/services/tasks/v1/tasks_grpc.pb.go

func _Tasks_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(CreateTaskRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(TasksServer).Create(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/containerd.services.tasks.v1.Tasks/Create",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(TasksServer).Create(ctx, req.(*CreateTaskRequest))
	}
	return interceptor(ctx, in, info, handler)
}

3、实际调用local的相关函数处理(这里也有对象的转换)

源码路径:containerd/plugins/services/tasks/local.go

func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
	container, err := l.getContainer(ctx, r.ContainerID)
	if err != nil {
		return nil, errdefs.ToGRPC(err)
	}

	var (
		checkpointPath string
		taskAPIAddress string
		taskAPIVersion uint32
	)

	if r.Options != nil {
		taskOptions, err := formatOptions(container.Runtime.Name, r.Options)
		if err != nil {
			return nil, err
		}
		checkpointPath = taskOptions.CriuImagePath
		taskAPIAddress = taskOptions.TaskApiAddress
		taskAPIVersion = taskOptions.TaskApiVersion
	}

	// jump get checkpointPath from checkpoint image
	if checkpointPath == "" && r.Checkpoint != nil {
		checkpointPath, err = os.MkdirTemp(os.Getenv("XDG_RUNTIME_DIR"), "ctrd-checkpoint")
		if err != nil {
			return nil, err
		}
		if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
			return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
		}
		reader, err := l.store.ReaderAt(ctx, ocispec.Descriptor{
			MediaType:   r.Checkpoint.MediaType,
			Digest:      digest.Digest(r.Checkpoint.Digest),
			Size:        r.Checkpoint.Size,
			Annotations: r.Checkpoint.Annotations,
		})
		if err != nil {
			return nil, err
		}
		_, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader))
		reader.Close()
		if err != nil {
			return nil, err
		}
	}
	opts := runtime.CreateOpts{
		Spec: container.Spec,
		IO: runtime.IO{
			Stdin:    r.Stdin,
			Stdout:   r.Stdout,
			Stderr:   r.Stderr,
			Terminal: r.Terminal,
		},
		Checkpoint:     checkpointPath,
		Runtime:        container.Runtime.Name,
		RuntimeOptions: container.Runtime.Options,
		TaskOptions:    r.Options,
		SandboxID:      container.SandboxID,
		Address:        taskAPIAddress,
		Version:        taskAPIVersion,
	}
	if r.RuntimePath != "" {
		opts.Runtime = r.RuntimePath
	}
	for _, m := range r.Rootfs {
		opts.Rootfs = append(opts.Rootfs, mount.Mount{
			Type:    m.Type,
			Source:  m.Source,
			Target:  m.Target,
			Options: m.Options,
		})
	}

	rtime := l.v2Runtime

	_, err = rtime.Get(ctx, r.ContainerID)
	if err != nil && !errdefs.IsNotFound(err) {
		return nil, errdefs.ToGRPC(err)
	}
	if err == nil {
		return nil, errdefs.ToGRPC(fmt.Errorf("task %s: %w", r.ContainerID, errdefs.ErrAlreadyExists))
	}
	c, err := rtime.Create(ctx, r.ContainerID, opts)
	if err != nil {
		return nil, errdefs.ToGRPC(err)
	}
	labels := map[string]string{"runtime": container.Runtime.Name}
	if err := l.monitor.Monitor(c, labels); err != nil {
		return nil, fmt.Errorf("monitor task: %w", err)
	}
	pid, err := c.PID(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to get task pid: %w", err)
	}
	return &api.CreateTaskResponse{
		ContainerID: r.ContainerID,
		Pid:         pid,
	}, nil
}

3、调用runtime.PlatformRuntime.create,PlatformRuntime接口实际由TaskManager实现,也就是TaskManager.Create。Create函数的关键调用流程有点多,我们一一分析。

1.m.manager.Start()

2.newShimTask(shim)

3.shimTask.Create()

//源码路径:containerd\containerd\core\runtime\v2\task_manager.go
// Create launches new shim instance and creates new task
func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {
	bundle, err := NewBundle(ctx, m.root, m.state, taskID, opts.Spec)
	if err != nil {
		return nil, err
	}
	defer func() {
		if retErr != nil {
			bundle.Delete()
		}
	}()

	shim, err := m.manager.Start(ctx, taskID, bundle, opts)
	if err != nil {
		return nil, fmt.Errorf("failed to start shim: %w", err)
	}

	// Cast to shim task and call task service to create a new container task instance.
	// This will not be required once shim service / client implemented.
	shimTask, err := newShimTask(shim)
	if err != nil {
		return nil, err
	}

	// runc ignores silently features it doesn't know about, so for things that this is
	// problematic let's check if this runc version supports them.
	if err := m.validateRuntimeFeatures(ctx, opts); err != nil {
		return nil, fmt.Errorf("failed to validate OCI runtime features: %w", err)
	}

	t, err := shimTask.Create(ctx, opts)
	if err != nil {
		// NOTE: ctx contains required namespace information.
		m.manager.shims.Delete(ctx, taskID)

		dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
		defer cancel()

		sandboxed := opts.SandboxID != ""
		_, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {})
		if errShim != nil {
			if errdefs.IsDeadlineExceeded(errShim) {
				dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
				defer cancel()
			}

			shimTask.Shutdown(dctx)
			shimTask.Close()
		}

		return nil, fmt.Errorf("failed to create shim task: %w", err)
	}

	return t, nil
}

3.1、m.manager.Start()调用ShimManager.Start

containerd\containerd\core\runtime\v2\shim_manager.go

// Start launches a new shim instance
func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts runtime.CreateOpts) (_ ShimInstance, retErr error) {
	// This container belongs to sandbox which supposed to be already started via sandbox API.
	if opts.SandboxID != "" {
		var params shimbinary.BootstrapParams
		if opts.Address != "" {
			// The address returned from sandbox controller should be in the form like ttrpc+unix://<uds-path>
			// or grpc+vsock://<cid>:<port>, we should get the protocol from the url first.
			protocol, address, ok := strings.Cut(opts.Address, "+")
			if !ok {
				return nil, fmt.Errorf("the scheme of sandbox address should be in " +
					" the form of <protocol>+<unix|vsock|tcp>, i.e. ttrpc+unix or grpc+vsock")
			}
			params = shimbinary.BootstrapParams{
				Version:  int(opts.Version),
				Protocol: protocol,
				Address:  address,
			}
		} else {
			// For those sandbox we can not get endpoint,
			// fallback to legacy implementation
			process, err := m.Get(ctx, opts.SandboxID)
			if err != nil {
				return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID)
			}
			p, restoreErr := restoreBootstrapParams(process.Bundle())
			if restoreErr != nil {
				return nil, fmt.Errorf("failed to get bootstrap "+
					"params of sandbox %s, %v, legacy restore error %v", opts.SandboxID, err, restoreErr)
			}
			params = p
		}

		// Write sandbox ID this task belongs to.
		if err := os.WriteFile(filepath.Join(bundle.Path, "sandbox"), []byte(opts.SandboxID), 0600); err != nil {
			return nil, err
		}

		if err := writeBootstrapParams(filepath.Join(bundle.Path, "bootstrap.json"), params); err != nil {
			return nil, fmt.Errorf("failed to write bootstrap.json for bundle %s: %w", bundle.Path, err)
		}

		shim, err := loadShim(ctx, bundle, func() {})
		if err != nil {
			return nil, fmt.Errorf("failed to load sandbox task %q: %w", opts.SandboxID, err)
		}

		if err := m.shims.Add(ctx, shim); err != nil {
			return nil, err
		}

		return shim, nil
	}

	shim, err := m.startShim(ctx, bundle, id, opts)
	if err != nil {
		return nil, err
	}
	defer func() {
		if retErr != nil {
			m.cleanupShim(ctx, shim)
		}
	}()

	if err := m.shims.Add(ctx, shim); err != nil {
		return nil, fmt.Errorf("failed to add task: %w", err)
	}

	return shim, nil
}

3.1、上面如果SandboxID不为空,调用loadShim函数加载shim,否则调用startShim函数启动shim,这里我们分析startShim。

源码路径:containerd\containerd\core\runtime\v2\shim.go

func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return nil, err
	}
	ctx = log.WithLogger(ctx, log.G(ctx).WithField("namespace", ns))

	topts := opts.TaskOptions
	if topts == nil || topts.GetValue() == nil {
		topts = opts.RuntimeOptions
	}

	runtimePath, err := m.resolveRuntimePath(opts.Runtime)
	if err != nil {
		return nil, fmt.Errorf("failed to resolve runtime path: %w", err)
	}

	b := shimBinary(bundle, shimBinaryConfig{
		runtime:      runtimePath,
		address:      m.containerdAddress,
		ttrpcAddress: m.containerdTTRPCAddress,
		env:          m.env,
	})
	shim, err := b.Start(ctx, typeurl.MarshalProto(topts), func() {
		log.G(ctx).WithField("id", id).Info("shim disconnected")

		cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)
		// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
		// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
		// disconnect and there is no chance to remove this dead task from runtime task lists.
		// Thus it's better to delete it here.
		m.shims.Delete(ctx, id)
	})
	if err != nil {
		return nil, fmt.Errorf("start failed: %w", err)
	}

	return shim, nil
}

3.2、newShimTask(shim)调用NewTaskClient

//containerd\containerd\core\runtime\v2\shim.go
func newShimTask(shim ShimInstance) (*shimTask, error) {
	_, version := shim.Endpoint()
	taskClient, err := NewTaskClient(shim.Client(), version)
	if err != nil {
		return nil, err
	}

	return &shimTask{
		ShimInstance: shim,
		task:         taskClient,
	}, nil
}

3.2、NewTaskClient()根据传入的版本号选择不同的通信框架与shim通信,并返回一个shimTask实例

//containerd\containerd\core\runtime\v2\bridge.go
func NewTaskClient(client interface{}, version int) (TaskServiceClient, error) {
	switch c := client.(type) {
	case *ttrpc.Client:
		switch version {
		case 2:
			return &ttrpcV2Bridge{client: v2.NewTaskClient(c)}, nil
		case 3:
			return v3.NewTTRPCTaskClient(c), nil
		default:
			return nil, fmt.Errorf("containerd client supports only v2 and v3 TTRPC task client (got %d)", version)
		}

	case grpc.ClientConnInterface:
		if version != 3 {
			return nil, fmt.Errorf("containerd client supports only v3 GRPC task service (got %d)", version)
		}

		return &grpcV3Bridge{v3.NewTaskClient(c)}, nil
	default:
		return nil, fmt.Errorf("unsupported shim client type %T", c)
	}
}

3.3、shimTask.Create(ctx, opts)创建task

func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) {
	topts := opts.TaskOptions
	if topts == nil || topts.GetValue() == nil {
		topts = opts.RuntimeOptions
	}
	request := &task.CreateTaskRequest{
		ID:         s.ID(),
		Bundle:     s.Bundle(),
		Stdin:      opts.IO.Stdin,
		Stdout:     opts.IO.Stdout,
		Stderr:     opts.IO.Stderr,
		Terminal:   opts.IO.Terminal,
		Checkpoint: opts.Checkpoint,
		Options:    typeurl.MarshalProto(topts),
	}
	for _, m := range opts.Rootfs {
		request.Rootfs = append(request.Rootfs, &types.Mount{
			Type:    m.Type,
			Source:  m.Source,
			Target:  m.Target,
			Options: m.Options,
		})
	}

	_, err := s.task.Create(ctx, request)
	if err != nil {
		return nil, errdefs.FromGRPC(err)
	}

	return s, nil
}

3.3、task.Create根据NewTaskClient传入的版本号选择调用不同的RPC(这个NewTaskClient在前面newShimTask函数调用过),这里我们选择分析ttrpctaskClient.Create,这里调用ttRPC接口向shim发出请求。

//containerd\containerd\api\runtime\task\v3\shim_ttrpc.pb.go
func (c *ttrpctaskClient) Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) {
	var resp CreateTaskResponse
	if err := c.client.Call(ctx, "containerd.task.v3.Task", "Create", req, &resp); err != nil {
		return nil, err
	}
	return &resp, nil
}

3.3、shim在之前启动后会调用RegisterTTRPC注册服务,所以会接收到containerd发出的TTRPC请求,在这里进行处理。

源码路径:containerd\containerd\cmd\containerd-shim-runc-v2\task\service.go

func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
	taskAPI.RegisterTTRPCTaskService(server, s)
	return nil
}

// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	s.lifecycleMu.Lock()
	handleStarted, cleanup := s.preStart(nil)
	s.lifecycleMu.Unlock()
	defer cleanup()

	container, err := runc.NewContainer(ctx, s.platform, r)
	if err != nil {
		return nil, err
	}

	s.containers[r.ID] = container

	s.send(&eventstypes.TaskCreate{
		ContainerID: r.ID,
		Bundle:      r.Bundle,
		Rootfs:      r.Rootfs,
		IO: &eventstypes.TaskIO{
			Stdin:    r.Stdin,
			Stdout:   r.Stdout,
			Stderr:   r.Stderr,
			Terminal: r.Terminal,
		},
		Checkpoint: r.Checkpoint,
		Pid:        uint32(container.Pid()),
	})

	// The following line cannot return an error as the only state in which that
	// could happen would also cause the container.Pid() call above to
	// nil-deference panic.
	proc, _ := container.Process("")
	handleStarted(container, proc)

	return &taskAPI.CreateTaskResponse{
		Pid: uint32(container.Pid()),
	}, nil
}

3.3、接着调用runc.NewContainer函数

源码路径:containerd\containerd\cmd\containerd-shim-runc-v2\runc\container.go

// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
	ns, err := namespaces.NamespaceRequired(ctx)
	if err != nil {
		return nil, fmt.Errorf("create namespace: %w", err)
	}

	opts := &options.Options{}
	if r.Options.GetValue() != nil {
		v, err := typeurl.UnmarshalAny(r.Options)
		if err != nil {
			return nil, err
		}
		if v != nil {
			opts = v.(*options.Options)
		}
	}

	var pmounts []process.Mount
	for _, m := range r.Rootfs {
		pmounts = append(pmounts, process.Mount{
			Type:    m.Type,
			Source:  m.Source,
			Target:  m.Target,
			Options: m.Options,
		})
	}

	rootfs := ""
	if len(pmounts) > 0 {
		rootfs = filepath.Join(r.Bundle, "rootfs")
		if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
			return nil, err
		}
	}

	config := &process.CreateConfig{
		ID:               r.ID,
		Bundle:           r.Bundle,
		Runtime:          opts.BinaryName,
		Rootfs:           pmounts,
		Terminal:         r.Terminal,
		Stdin:            r.Stdin,
		Stdout:           r.Stdout,
		Stderr:           r.Stderr,
		Checkpoint:       r.Checkpoint,
		ParentCheckpoint: r.ParentCheckpoint,
		Options:          r.Options,
	}

	if err := WriteOptions(r.Bundle, opts); err != nil {
		return nil, err
	}
	// For historical reason, we write opts.BinaryName as well as the entire opts
	if err := WriteRuntime(r.Bundle, opts.BinaryName); err != nil {
		return nil, err
	}

	var mounts []mount.Mount
	for _, pm := range pmounts {
		mounts = append(mounts, mount.Mount{
			Type:    pm.Type,
			Source:  pm.Source,
			Target:  pm.Target,
			Options: pm.Options,
		})
	}
	defer func() {
		if retErr != nil {
			if err := mount.UnmountMounts(mounts, rootfs, 0); err != nil {
				log.G(ctx).WithError(err).Warn("failed to cleanup rootfs mount")
			}
		}
	}()
	if err := mount.All(mounts, rootfs); err != nil {
		return nil, fmt.Errorf("failed to mount rootfs component: %w", err)
	}

	p, err := newInit(
		ctx,
		r.Bundle,
		filepath.Join(r.Bundle, "work"),
		ns,
		platform,
		config,
		opts,
		rootfs,
	)
	if err != nil {
		return nil, errdefs.ToGRPC(err)
	}
	if err := p.Create(ctx, config); err != nil {
		return nil, errdefs.ToGRPC(err)
	}
	container := &Container{
		ID:              r.ID,
		Bundle:          r.Bundle,
		process:         p,
		processes:       make(map[string]process.Process),
		reservedProcess: make(map[string]struct{}),
	}
	pid := p.Pid()
	if pid > 0 {
		var cg interface{}
		if cgroups.Mode() == cgroups.Unified {
			g, err := cgroupsv2.PidGroupPath(pid)
			if err != nil {
				log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", pid)
				return container, nil
			}
			cg, err = cgroupsv2.Load(g)
			if err != nil {
				log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", pid)
			}
		} else {
			cg, err = cgroup1.Load(cgroup1.PidPath(pid))
			if err != nil {
				log.G(ctx).WithError(err).Errorf("loading cgroup for %d", pid)
			}
		}
		container.cgroup = cg
	}
	return container, nil
}

3.3、接着调用newInit.Create

源码路径:containerd\containerd\cmd\containerd-shim-runc-v2\process\init.go

// Create the process with the provided config
func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
	var (
		err     error
		socket  *runc.Socket
		pio     *processIO
		pidFile = newPidFile(p.Bundle)
	)

	if r.Terminal {
        //这里创建一个临时的socket文件
		if socket, err = runc.NewTempConsoleSocket(); err != nil {
			return fmt.Errorf("failed to create OCI runtime console socket: %w", err)
		}
		defer socket.Close()
	} else {
		if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil {
			return fmt.Errorf("failed to create init process I/O: %w", err)
		}
		p.io = pio
	}
	if r.Checkpoint != "" {
		return p.createCheckpointedState(r, pidFile)
	}
	opts := &runc.CreateOpts{
		PidFile:      pidFile.Path(),
		NoPivot:      p.NoPivotRoot,
		NoNewKeyring: p.NoNewKeyring,
	}
	if p.io != nil {
		opts.IO = p.io.IO()
	}
	if socket != nil {
		opts.ConsoleSocket = socket
	}

	if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
		return p.runtimeError(err, "OCI runtime create failed")
	}
	if r.Stdin != "" {
		if err := p.openStdin(r.Stdin); err != nil {
			return err
		}
	}
	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	if socket != nil {
		console, err := socket.ReceiveMaster()
		if err != nil {
			return fmt.Errorf("failed to retrieve console master: %w", err)
		}
		console, err = p.Platform.CopyConsole(ctx, console, p.id, r.Stdin, r.Stdout, r.Stderr, &p.wg)
		if err != nil {
			return fmt.Errorf("failed to start console copy: %w", err)
		}
		p.console = console
	} else {
		if err := pio.Copy(ctx, &p.wg); err != nil {
			return fmt.Errorf("failed to start io pipe copy: %w", err)
		}
	}
	pid, err := pidFile.Read()
	if err != nil {
		return fmt.Errorf("failed to retrieve OCI runtime container pid: %w", err)
	}
	p.pid = pid
	return nil
}

3.3、接着调用runtime.Create

源码路径:containerd\go-runc\runc.go

// Create creates a new container and returns its pid if it was created successfully
func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error {
	args := []string{"create", "--bundle", bundle}
	if opts == nil {
		opts = &CreateOpts{}
	}

	oargs, err := opts.args()
	if err != nil {
		return err
	}
	args = append(args, oargs...)
	cmd := r.command(context, append(args, id)...)
	if opts.IO != nil {
		opts.Set(cmd)
	}
	cmd.ExtraFiles = opts.ExtraFiles

	if cmd.Stdout == nil && cmd.Stderr == nil {
		data, err := r.cmdOutput(cmd, true, nil)
		defer putBuf(data)
		if err != nil {
			return fmt.Errorf("%s: %s", err, data.String())
		}
		return nil
	}
	ec, err := r.startCommand(cmd)
	if err != nil {
		return err
	}
	if opts.IO != nil {
		if c, ok := opts.IO.(StartCloser); ok {
			if err := c.CloseAfterStart(); err != nil {
				return err
			}
		}
	}
	status, err := Monitor.Wait(cmd, ec)
	if err == nil && status != 0 {
		err = fmt.Errorf("%s did not terminate successfully: %w", cmd.Args[0], &ExitError{status})
	}
	return err
}

3.3、接着往下调用runc.startCommand

源码路径:containerd\go-runc\monitor.go

func (r *Runc) startCommand(cmd *exec.Cmd) (chan Exit, error) {
	if r.PdeathSignal != 0 {
		return Monitor.StartLocked(cmd)
	}
	return Monitor.Start(cmd)
}

3.3、再往下调用Monitor.Start

源码路径:containerd\containerd\pkg\sys\reaper\reaper_unix.go

// Start starts the command and registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
	ec := m.Subscribe()
	if err := c.Start(); err != nil {
		m.Unsubscribe(ec)
		return nil, err
	}
	return ec, nil
}

3.3、接着往下调用Cmd.Start,最后调用了一个系统调用os.StartProcess启动进程。

源码路径:Go\src\os\exec\exec.go

func (c *Cmd) Start() error {
	// Check for doubled Start calls before we defer failure cleanup. If the prior
	// call to Start succeeded, we don't want to spuriously close its pipes.
	if c.Process != nil {
		return errors.New("exec: already started")
	}

	started := false

	lp := c.Path
	c.Process, err = os.StartProcess(lp, c.argv(), &os.ProcAttr{
		Dir:   c.Dir,
		Files: childFiles,
		Env:   env,
		Sys:   c.SysProcAttr,
	})
	started = true


	return nil
}

3、Start函数调用RPC

containerd/client/process.go

// Start starts the exec process
func (p *process) Start(ctx context.Context) error {
	ctx, span := tracing.StartSpan(ctx, "process.Start",
		tracing.WithAttribute("process.id", p.ID()),
		tracing.WithAttribute("process.task.id", p.task.ID()),
	)
	defer span.End()
	r, err := p.task.client.TaskService().Start(ctx, &tasks.StartRequest{
		ContainerID: p.task.id,
		ExecID:      p.id,
	})
	if err != nil {
		if p.io != nil {
			p.io.Cancel()
			p.io.Wait()
			p.io.Close()
		}
		return errdefs.FromGRPC(err)
	}
	span.SetAttributes(tracing.Attribute("process.pid", int(r.Pid)))
	p.pid = r.Pid
	return nil
}

3、RPC请求shim

containerd/api/runtime/task/v3/shim_grpc.pb.go

func (c *taskClient) Create(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption) (*CreateTaskResponse, error) {
	out := new(CreateTaskResponse)
	err := c.cc.Invoke(ctx, "/containerd.task.v3.Task/Create", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

func (c *taskClient) Start(ctx context.Context, in *StartRequest, opts ...grpc.CallOption) (*StartResponse, error) {
	out := new(StartResponse)
	err := c.cc.Invoke(ctx, "/containerd.task.v3.Task/Start", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

3、shim收到请求并调用处理函数

containerd/api/runtime/task/v3/shim_grpc.pb.go

func _Task_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(CreateTaskRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(TaskServer).Create(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/containerd.task.v3.Task/Create",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(TaskServer).Create(ctx, req.(*CreateTaskRequest))
	}
	return interceptor(ctx, in, info, handler)
}

func _Task_Start_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(StartRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(TaskServer).Start(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/containerd.task.v3.Task/Start",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(TaskServer).Start(ctx, req.(*StartRequest))
	}
	return interceptor(ctx, in, info, handler)
}

3、实际调用的处理函数

containerd/cmd/containerd-shim-runc-v2/task/service.go

func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
	taskAPI.RegisterTTRPCTaskService(server, s)
	return nil
}

// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	s.lifecycleMu.Lock()
	handleStarted, cleanup := s.preStart(nil)
	s.lifecycleMu.Unlock()
	defer cleanup()

	container, err := runc.NewContainer(ctx, s.platform, r)
	if err != nil {
		return nil, err
	}

	s.containers[r.ID] = container

	s.send(&eventstypes.TaskCreate{
		ContainerID: r.ID,
		Bundle:      r.Bundle,
		Rootfs:      r.Rootfs,
		IO: &eventstypes.TaskIO{
			Stdin:    r.Stdin,
			Stdout:   r.Stdout,
			Stderr:   r.Stderr,
			Terminal: r.Terminal,
		},
		Checkpoint: r.Checkpoint,
		Pid:        uint32(container.Pid()),
	})

	// The following line cannot return an error as the only state in which that
	// could happen would also cause the container.Pid() call above to
	// nil-deference panic.
	proc, _ := container.Process("")
	handleStarted(container, proc)

	return &taskAPI.CreateTaskResponse{
		Pid: uint32(container.Pid()),
	}, nil
}

// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
	container, err := s.getContainer(r.ID)
	if err != nil {
		return nil, err
	}

	var cinit *runc.Container
	s.lifecycleMu.Lock()
	if r.ExecID == "" {
		cinit = container
	} else {
		if _, initExited := s.containerInitExit[container]; initExited {
			s.lifecycleMu.Unlock()
			return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID)
		}
		s.runningExecs[container]++
	}
	handleStarted, cleanup := s.preStart(cinit)
	s.lifecycleMu.Unlock()
	defer cleanup()

	p, err := container.Start(ctx, r)
	if err != nil {
		// If we failed to even start the process, s.runningExecs
		// won't get decremented in s.handleProcessExit. We still need
		// to update it.
		if r.ExecID != "" {
			s.lifecycleMu.Lock()
			s.runningExecs[container]--
			if ch, ok := s.execCountSubscribers[container]; ok {
				ch <- s.runningExecs[container]
			}
			s.lifecycleMu.Unlock()
		}
		handleStarted(container, p)
		return nil, errdefs.ToGRPC(err)
	}

	switch r.ExecID {
	case "":
		switch cg := container.Cgroup().(type) {
		case cgroup1.Cgroup:
			if err := s.ep.Add(container.ID, cg); err != nil {
				log.G(ctx).WithError(err).Error("add cg to OOM monitor")
			}
		case *cgroupsv2.Manager:
			allControllers, err := cg.RootControllers()
			if err != nil {
				log.G(ctx).WithError(err).Error("failed to get root controllers")
			} else {
				if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil {
					if userns.RunningInUserNS() {
						log.G(ctx).WithError(err).Debugf("failed to enable controllers (%v)", allControllers)
					} else {
						log.G(ctx).WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
					}
				}
			}
			if err := s.ep.Add(container.ID, cg); err != nil {
				log.G(ctx).WithError(err).Error("add cg to OOM monitor")
			}
		}

		s.send(&eventstypes.TaskStart{
			ContainerID: container.ID,
			Pid:         uint32(p.Pid()),
		})
	default:
		s.send(&eventstypes.TaskExecStarted{
			ContainerID: container.ID,
			ExecID:      r.ExecID,
			Pid:         uint32(p.Pid()),
		})
	}
	handleStarted(container, p)
	return &taskAPI.StartResponse{
		Pid: uint32(p.Pid()),
	}, nil
}

问题与解答

1、containerd的 gRPC 目前启用加解密保护通信与否?是否有双向认证?

答:在 containerd 中,gRPC 的加密是通过配置文件中的 TLS 设置实现的。你可以在 containerd 的配置文件 /etc/containerd/config.toml 中配置 TLS 相关的选项。

config.toml 文件中,你可以为 gRPC 服务启用 TLS:

[grpc]
  address = "/run/containerd/containerd.sock"
  # 开启TCP监听,默认是关闭的
  tcp_address = "0.0.0.0:2375"
  # 配置TLS证书和密钥
  tls_cert = "/etc/containerd/tls/containerd.crt"
  tls_key = "/etc/containerd/tls/containerd.key"
  # 可选的,CA证书路径
  tls_ca = "/etc/containerd/tls/ca.crt"

tls_certtls_key 是服务器的证书和私钥文件,containerd 将使用这些文件来加密 gRPC 通信。

tls_ca 是可选的 CA 证书,用于验证客户端证书,从而支持双向认证(Mutual TLS, mTLS)。

2、2.1.4中提到“插件通过 gRPC 提供服务接口,containerd 核心通过调用这些接口来管理镜像、快照和容器运行时等”。 是否意味着containerd与插件之间也是使用gRPC进行的通信?也就是containerd内部也有gRPC通信?

答:containerd内部应该不是gRPC通信,gRPC是一个RPC通信框架,一般用于进程间,在一个进程中使用gRPC没有太大的必要,一个进程内部各模块的通信应该是通过接口和直接的函数调用来实现内部通信的。

3、2.1.4 以及 3.1 分别提到:“守护进程与外部客户端(如 ctr 工具或 Docker 引擎)之间的通信 ”使用 gRPC/UDS ? 特别是 3.1 如何理解?3.1 后面提到一个例子,Docker引擎通过 gRPC 连接到守护进程的UDS地址,以请求管理容器、镜像、快照等操作。问:gRPC 如何连接的这个 UDS 地址(技术原理)?

答:这个问题理解的关键点在于uds是如何与gRPC结合使用的,containerd在服务启动的时候使用 sys.GetLocalListener 来监听一个 Unix Domain Socket。然后将监听器传递给 serve函数处理grpc请求。(具体的代码可以在4.1中找到)

监听uds

serve函数

4、创建的容器的所属 user、group 默认是与containerd 同组同用户吗?

答:我在这个问题上做了一些实验来验证。首先,创建的容器的用户和用户组肯定是可以设置的,无论是在镜像中指定,还是在创建容器的时候指定;但默认不指定的情况下镜像一般都是将容器内的进程设置为以root用户进行。这也印证了下面这句话:在 containerd 中,创建的容器默认情况下不会自动继承 containerd 进程的用户(user)和组(group)。相反,容器的用户和组是根据容器的配置(如 OCI 规范)来决定的。

下面是我使用containerd创建alpine服务容器的权限的配置文件:

5、3.3 提到“插件可能作为外部服务运行,需要通过 UDS 与 containerd 通信。”,例子介绍中说:“外部存储插件可能独立运行并通过 UDS 暴露其 gRPC 服务,containerd 通过连接这个 UDS 来调用插件的服务。” 问:containerd 通过 UDS 连接外部插件,外部插件暴露的 gRPC 给谁?

答:这是containerd官方文档查到的,containerd通过gRPC连接外部插件,那外部插件暴露的gRPC肯定是给containerd的配置文件。

外部插件

代理插件

6、systemd 提供使用socket activationt机制,是针对内核态进程还是用户态进程,亦或是两者都支持?

答:针对用户态进程。

7、containerd的socket文件的mode和uid、gid分别是?由谁设置该内容?

答:具体回答可以看问题9,至于socket文件的mode、uid、gid的设置肯定是由containerd自己设置,具体可看源码。

//首先传入config.toml中设置的gid,uid
[ttrpc]
  address = ""
  gid = 0
  uid = 0

//接着调用GetLocalListener
tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)

//再创建socket文件,之后调用chmod为0660,chown为上面传入的gid、uid
func GetLocalListener(path string, uid, gid int) (net.Listener, error) {
	// Ensure parent directory is created
	if err := mkdirAs(filepath.Dir(path), uid, gid); err != nil {
		return nil, err
	}

	l, err := CreateUnixSocket(path)
	if err != nil {
		return l, err
	}

	if err := os.Chmod(path, 0660); err != nil {
		l.Close()
		return nil, err
	}

	if err := os.Chown(path, uid, gid); err != nil {
		l.Close()
		return nil, err
	}

	return l, nil
}

8、root权限下:containerd、shim以及容器的UID、GID是否都一样?如何设置容器不同的UID、GID

答:root权限下containerd、shim的UID、GID都是root

hacker@LAPTOP-V47UU71B:/mnt/c/Users/L$ ps -eo pid,user,group,comm | grep containerd
    248 root     root     containerd
   8561 root     root     containerd-shim
  13295 root     root     containerd-shim

如何设置容器不同的uid、gid,可以在containerd创建容器时指定用户及用户组,比如我的用户和用户组都是1000,我可以这么创建容器。

hacker@LAPTOP-V47UU71B:/mnt/c/Users/L$ sudo ctr container create -u 1000:1000 m.daocloud.io/docker.io/library/alpine:latest mycont
ainer

进入容器后查看id

hacker@LAPTOP-V47UU71B:/mnt/c/Users/L$ sudo ctr task exec -t --exec-id exec-1 mycontainer /bin/sh
~ $ id
uid=1000 gid=1000 groups=1000

9、如果用systemd启动containerd守护进程以及用于与containerd通信的client进程,那么socket文件是由systemd创建还是containerd创建?

如果使用 systemd 来启动 containerd 守护进程以及用于与 containerd 通信的客户端进程,并且启用了 socket activation 机制(可以不启动),那么 socket 文件将由 systemd 创建,而不是 containerd

(事实上containerd应该是不支持systemd的socket activation机制的,github也有人提出过在containerd中加入此机制:add socket activation · Issue #164 · containerd/containerd (github.com)

一般如果支持socket activation机制的话会有类似下图的逻辑,服务器会先调用sd_listen_fds函数,看systemd是否创建了socket文件,如果创建了就不会再创建了。所以如果使用system的socket activation启动进程,那么就一定是systemd创建socket文件。

socket activation代码逻辑

具体过程如下:

  1. systemd 创建 socket:在 socket activation 机制下,systemd 会首先根据配置创建一个监听 socket 文件,并将其置于监听状态。这是在 containerd 守护进程启动之前完成的。
  2. systemd 启动 containerd:当有客户端连接到由 systemd 创建的 socket 时,systemd 检测到连接请求并启动 containerd 守护进程,并将这个 socket 传递给 containerd
  3. containerd 使用 socketcontainerd 启动后,接收 systemd 传递的 socket,并使用该 socket 来处理客户端的通信请求。

为了更好的理解为什么是systemd创建我们写一个使用 systemd 启动 containerd 守护进程和客户端进程的流程:

1、创建 systemd 的 socket 单元文件

你需要为 containerd 创建一个 .socket 单元文件,通常放在 /etc/systemd/system/containerd.socket/usr/lib/systemd/system/containerd.socket

示例如下:

[Unit]
Description=containerd Socket

[Socket]
ListenStream=/run/containerd/containerd.sock
SocketMode=0660

[Install]
WantedBy=sockets.target

2、创建 systemd 的 service 单元文件

你需要确保 containerd.service 单元文件正确配置。如果默认的 service 文件不支持 socket activation,你可能需要提供一个自定义的 service 文件。放在 /etc/systemd/system/containerd.service/usr/lib/systemd/system/containerd.service

示例如下:

[Unit]
Description=containerd container runtime
Documentation=https://containerd.io
Wants=network-online.target
After=network-online.target

[Service]
ExecStart=/usr/bin/containerd
Type=notify
Restart=always
LimitNOFILE=1048576
LimitNPROC=infinity
LimitCORE=infinity
TasksMax=infinity
Delegate=yes
KillMode=process
OOMScoreAdjust=-999
ExecStartPre=-/sbin/modprobe overlay
ExecReload=/bin/kill -s HUP $MAINPID
KillSignal=SIGTERM
TimeoutStartSec=0

[Install]
WantedBy=multi-user.target

3、启用并启动 socket 和 service

启用并启动 socket 和 service:

sudo systemctl enable containerd.socket
sudo systemctl start containerd.socket

此时,systemd 会监听 /run/containerd/containerd.sock,并在有客户端连接时自动启动 containerd 服务。

10、OCI 标准是什么?

OCI(Open Container Initiative) 是一个开源项目,旨在定义容器运行时和镜像的标准。它由 Linux Foundation 组织主导,主要包括两个关键规范:

  1. OCI Runtime Specification(OCI 运行时规范)
    • 这个规范定义了容器的运行时行为,包括如何创建、配置、启动、停止和删除容器。它定义了容器生命周期的各个阶段,以及容器进程的环境、命名空间、cgroups 等配置。
  2. OCI Image Specification(OCI 镜像规范)
    • 这个规范定义了容器镜像的格式及其内容。这包括如何打包应用程序及其依赖项,以便镜像可以被各种容器运行时拉取和解压,以一致的方式运行。

11、为什么containerd最终调用的是net.listen()创建socket文件?

因为net.listen()调用的是Go标准库net包中的一个函数,Go对底层uds的syscall做了一个封装,实际创建socket文件还是bind阶段,具体可以查看net的实现,我稍微看了一下源码确实是做了封装。

参考资料

containerd-shim文档

Licensed under CC BY-NC-SA 4.0
最后更新于 2024-12-11