Kubelet – Pod 创建之 CRI 和 CNI 源码剖析
本文源码跟踪基于 1.12.6
CRI
创建 Pod 入口
k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
// kubeGenericRuntimeManager::runtimeService: newInstrumentedRuntimeService(runtimeService) // SyncPod syncs the running pod into the desired pod by executing following steps: // // 1. Compute sandbox and container changes. // 2. Kill pod sandbox if necessary. // 3. Kill any containers that should not be running. // 4. Create sandbox if necessary. // 5. Create init containers. // 6. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: Compute sandbox and container changes. // Step 2: Kill the pod if the sandbox has changed. if podContainerChanges.KillPod { // ... } else { // Step 3: kill any running containers in this pod which are not to keep. // ... } // Keep terminated init containers fairly aggressively controlled // This is an optimization because container removals are typically handled // by container garbage collector. m.pruneInitContainersBeforeStart(pod, podStatus) // We pass the value of the podIP down to generatePodSandboxConfig and // generateContainerConfig, which in turn passes it to various other // functions, in order to facilitate functionality that requires this // value (hosts file and downward API) and avoid races determining // the pod IP in cases where a container requires restart but the // podIP isn't in the status manager yet. // // We default to the IP in the passed-in pod status, and overwrite it if the // sandbox needs to be (re)started. podIP := "" if podStatus != nil { podIP = podStatus.IP } // Step 4: Create a sandbox for the pod if necessary. // 创建使用 pause 镜像创建的 sandbox podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { // ... podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt) // 底层调用 m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler) glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod)) podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) } // Get podSandboxConfig for containers to start. podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt) // Step 5: start the init container. 启动所有的 init container if container := podContainerChanges.NextInitContainerToStart; container != nil { // Start the next init container. glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod)) if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil { // ... } // Successfully started the container; clear the entry in the failure glog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) } // Step 6: start containers in podContainerChanges.ContainersToStart. 启动 container for _, idx := range podContainerChanges.ContainersToStart { container := &pod.Spec.Containers[idx] // ... glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod)) if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil { // ... } } return }
CRI 接口调用和实现
CRI 相关文档: Introducing Container Runtime Interface (CRI) in Kubernetes
k8s.io/kubernetes/pkg/kubelet/dockershim/docker_sandbox.go
dockerService 类实现了 CRI 接口:
type dockerService struct { client libdocker.Interface os kubecontainer.OSInterface podSandboxImage string streamingRuntime *streamingRuntime streamingServer streaming.Server network *network.PluginManager // 实现了网络 // Map of podSandboxID :: network-is-ready networkReady map[string]bool networkReadyLock sync.Mutex containerManager cm.ContainerManager // cgroup driver used by Docker runtime. cgroupDriver string checkpointManager checkpointmanager.CheckpointManager // caches the version of the runtime. // To be compatible with multiple docker versions, we need to perform // version checking for some operations. Use this cache to avoid querying // the docker daemon every time we need to do such checks. versionCache *cache.ObjectCache // startLocalStreamingServer indicates whether dockershim should start a // streaming server on localhost. startLocalStreamingServer bool }
RuntimeService 接口定义:
// RuntimeService interface should be implemented by a container runtime. // The methods should be thread-safe. type RuntimeService interface { RuntimeVersioner ContainerManager PodSandboxManager ContainerStatsManager // UpdateRuntimeConfig updates runtime configuration if specified UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error // Status returns the status of the runtime. Status() (*runtimeapi.RuntimeStatus, error) }
PodSandboxManager 接口定义:
// PodSandboxManager contains methods for operating on PodSandboxes. The methods // are thread-safe. type PodSandboxManager interface { // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) // StopPodSandbox stops the sandbox. If there are any running containers in the // sandbox, they should be force terminated. StopPodSandbox(podSandboxID string) error // RemovePodSandbox removes the sandbox. If there are running containers in the // sandbox, they should be forcibly removed. RemovePodSandbox(podSandboxID string) error // PodSandboxStatus returns the Status of the PodSandbox. PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error) // ListPodSandbox returns a list of Sandbox. ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) }
以 RunPodSandbox
接口为例:
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. // For docker, PodSandbox is implemented by a container holding the network // namespace for the pod. // Note: docker doesn't use LogDirectory (yet). func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) { config := r.GetConfig() // Step 1: Pull the image for the sandbox. image := defaultSandboxImage if err := ensureSandboxImageExists(ds.client, image); err != nil { } createConfig, err := ds.makeSandboxDockerConfig(config, image) createResp, err := ds.client.CreateContainer(*createConfig) resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID} ds.setNetworkReady(createResp.ID, false) // Step 3: Create Sandbox Checkpoint. if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { return nil, err } // Step 4: Start the sandbox container. // Assume kubelet's garbage collector would remove the sandbox later, if // startContainer failed. err = ds.client.StartContainer(createResp.ID) // 设置 DNS config // Do not invoke network plugins if in hostNetwork mode. if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE { return resp, nil } // Step 5: Setup networking for the sandbox. // All pod networking is setup by a CNI plugin discovered at startup time. // This plugin assigns the pod ip, sets up routes inside the sandbox, // creates interfaces etc. In theory, its jurisdiction ends with pod // sandbox networking, but it might insert iptables rules or open ports // on the host as well, to satisfy parts of the pod spec that aren't // recognized by the CNI standard yet. cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations) return resp, nil }
CNI
CNI 接口调用和实现
在 dockershim 模式下, cniNetworkPlugin
实现了 CNI 定义的接口, SetUpPod
函数定义如下:
k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/cni.go
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error { netnsPath, err := plugin.host.GetNetNS(id.ID) // Windows doesn't have loNetwork. It comes only with Linux if plugin.loNetwork != nil { if _, err = plugin.addToNetwork(plugin.loNetwork, name, namespace, id, netnsPath, annotations); err != nil { } } _, err = plugin.addToNetwork(plugin.getDefaultNetwork(), name, namespace, id, netnsPath, annotations) return err }
network 相关的在函数 NewDockerService
中初始化:
k8s.io/kubernetes/pkg/kubelet/dockershim/docker_service.go
func NewDockerService(...){ // dockershim currently only supports CNI plugins. // 使用传入的 cni 配置目录和bin目录,初始化插件,并供后续选择 pluginSettings.PluginBinDirs = cni.SplitDirs(pluginSettings.PluginBinDirString) cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginBinDirs) cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDirs)) // cniPlugins 传入全部的系统 CNI 插件 plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU) ds.network = network.NewPluginManager(plug) }
CNI 接口定义
在 DockerShim 中使用 cniNetworkPlugin
实现了 CNI 接口,CNI 接口如下:
k8s.io/kubernetes/pkg/kubelet/dockershim/network/cni/cni.go
type cniNetworkPlugin struct { network.NoopNetworkPlugin loNetwork *cniNetwork // 指向 lo CNI 的实现,参见后续分析 defaultNetwork *cniNetwork // 指向 default CNI 的实现,参见后续分析 sync.RWMutex defaultNetwork *cniNetwork host network.Host execer utilexec.Interface nsenterPath string confDir string binDirs []string podCidr string }
CNI 接口如下:
k8s.io/kubernetes/pkg/kubelet/dockershim/network/plugins.go面向 Pod 的 NetworkPlugin
接口 ,CNI 规范参见: https://github.com/containernetworking/cni/blob/master/SPEC.md#network-configuration
// Plugin is an interface to network plugins for the kubelet type NetworkPlugin interface { // Init initializes the plugin. This will be called exactly once // before any other methods are called. Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error // Called on various events like: // NET_PLUGIN_EVENT_POD_CIDR_CHANGE Event(name string, details map[string]interface{}) // Name returns the plugin's name. This will be used when searching // for a plugin by name, e.g. Name() string // Returns a set of NET_PLUGIN_CAPABILITY_* Capabilities() utilsets.Int // SetUpPod is the method called after the infra container of // the pod has been created but before the other containers of the // pod are launched. SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations map[string]string) error // TearDownPod is the method called before a pod's infra container will be deleted TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error // GetPodNetworkStatus is the method called to obtain the ipv4 or ipv6 addresses of the container GetPodNetworkStatus(namespace string, name string, podSandboxID kubecontainer.ContainerID) (*PodNetworkStatus, error) // Status returns error if the network plugin is in error state Status() error }
cniNetworkPlugin
接口中包含接口 CNIConfig
,其实现了 libcni.CNI
接口
type cniNetworkPlugin struct { network.NoopNetworkPlugin loNetwork *cniNetwork // lo 本地端口 defaultNetwork *cniNetwork // 默认网络 } type cniNetwork struct { name string NetworkConfig *libcni.NetworkConfigList CNIConfig libcni.CNI // 指向 CNI 接口定义 }
CNI 接口定义
type CNI interface { AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error) DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error) DelNetwork(net *NetworkConfig, rt *RuntimeConf) error }
CNIConfig
的初始化:
func ProbeNetworkPlugins(confDir string, binDirs []string) []network.NetworkPlugin { plugin := &cniNetworkPlugin{ defaultNetwork: nil, loNetwork: getLoNetwork(binDirs), // lo 初始化 execer: utilexec.New(), confDir: confDir, binDirs: binDirs, } return []network.NetworkPlugin{plugin} }
getLoNetwork 函数定义:
func getLoNetwork(binDirs []string) *cniNetwork { loConfig, err := libcni.ConfListFromBytes([]byte(`{ "cniVersion": "0.2.0", "name": "cni-loopback", "plugins":[{ "type": "loopback" }] }`)) if err != nil { // The hardcoded config above should always be valid and unit tests will // catch this panic(err) } loNetwork := &cniNetwork{ name: "lo", NetworkConfig: loConfig, CNIConfig: &libcni.CNIConfig{Path: binDirs}, } return loNetwork }
cniNetworkPlugin
结构体中默认网络字段 defaultNetwork
的初始化如下,该函数在 cni.ProbeNetworkPlugins
函数中被调用:
func (plugin *cniNetworkPlugin) syncNetworkConfig() { network, err := getDefaultCNINetwork(plugin.confDir, plugin.binDirs) plugin.setDefaultNetwork(network) }
getDefaulgotCNINetwork
的定义如下,主要工作是从 CNI 的 conf 目录中读取 conf 文件,完成默认 network 的配置初始化:
func getDefaulgotCNINetwork(confDir string, binDirs []string) (*cniNetwork, error) { files, err := libcni.ConfFiles(confDir, []string{".conf", ".conflist", ".json"}) switch { case err != nil: return nil, err case len(files) == 0: return nil, fmt.Errorf("No networks found in %s", confDir) } sort.Strings(files) for _, confFile := range files { var confList *libcni.NetworkConfigList if strings.HasSuffix(confFile, ".conflist") { confList, err = libcni.ConfListFromFile(confFile) } else { conf, err := libcni.ConfFromFile(confFile) // Ensure the config has a "type" so we know what plugin to run. // Also catches the case where somebody put a conflist into a conf file. if conf.Network.Type == "" { glog.Warningf("Error loading CNI config file %s: no 'type'; perhaps this is a .conflist?", confFile) continue } confList, err = libcni.ConfListFromConf(conf) if err != nil { glog.Warningf("Error converting CNI config file %s to list: %v", confFile, err) continue } } if len(confList.Plugins) == 0 { glog.Warningf("CNI config list %s has no networks, skipping", confFile) continue } glog.V(4).Infof("Using CNI configuration file %s", confFile) network := &cniNetwork{ name: confList.Name, NetworkConfig: confList, CNIConfig: &libcni.CNIConfig{Path: binDirs}, } return network, nil } return nil, fmt.Errorf("No valid networks found in %s", confDir) }