kubectl exec 原理
Contents
执行 kubectl exec
时,首先 kubectl 会向 apiserver 发起 GET 和 POST 请求,apiserver 返回 101 Upgrade
响应,向客户端表示已经升级到 SPDY 协议
随后由 apiserver 向 pod 所在节点上的 kubelet 发送流式请求 /exec。kubelet 接到 apiserver 的请求后,通过 CRI 接口向 CRI Shim 请求 exec 的 URL,CRI Shim 向 Kubelet 返回 exec URL,kubelet 再将这个 URL 以 Redirect
的方式返回给 apiserver
最后 apiserver 重定向流式请求到对应 Streaming Server 上发起的 exec 请求,并维护长链
整个过程,由 apiserver 代理 kubelet,kubelet 再代理 container,完成了 kubectl 和容器的交互,最终的响应都是由容器执行的
源码实现
kubelet 的源码阉割模拟
func main() {
container := restful.NewContainer()
ws := new(restful.WebService)
ws.Path("/exec")
{
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(GetExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(GetExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(GetExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(GetExec).
Operation("getExec"))
}
container.Add(ws)
klog.Info("启动http服务,监听9090端口")
http.ListenAndServe(":9090", container)
}
func GetExec(request *restful.Request, response *restful.Response) {
url, err := GetUrl()
if err != nil {
streaming.WriteError(err, response.ResponseWriter)
return
}
proxyStream(response.ResponseWriter, request.Request, url)
}
func GetUrl() (*url.URL, error) {
req := &runtimeapi.ExecRequest{
ContainerId: ContainerId,
Cmd: []string{"ls"},
Tty: false,
Stdin: true,
Stdout: true,
Stderr: true,
}
resp, err := runtimeExec(req)
if err != nil {
return nil, err
}
// 需要修改containerd配置:vi /etc/containerd/config.toml
// stream_server_address改为0.0.0.0 (默认127.0.0.1 只能本地访问)
// stream_server_port改为一个固定的端口如6595 (默认0 代表随机生成端口)
// 这个地址是容器运行时生成的,每次都不一样,他会启动时监听一个地址用于给我们exec
klog.Info("得到的URL是:", resp.Url)
resp.Url = strings.Replace(resp.Url, "[::]", RemoteRuntimeIp, -1)
klog.Info("修改过后的URL是:", resp.Url)
return url.Parse(resp.Url)
}
const RemoteRuntimeAddress = "" // 远程的containerd地址
const RemoteRuntimeIp = ""
const ContainerId = "" // 容器id kubectl get pod -o yaml查看
func initRuntimeClient() runtimeapi.RuntimeServiceClient {
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
conn, err := grpc.DialContext(ctx, RemoteRuntimeAddress, gopts...)
if err != nil {
log.Fatalln(err)
}
return runtimeapi.NewRuntimeServiceClient(conn)
}
func runtimeExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
runtimeClient := initRuntimeClient()
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
// 发起grpc调用让运行时准备一个流式通信的端点,该端点用于在容器执行命令
resp, err := runtimeClient.Exec(ctx, req)
if err != nil {
klog.ErrorS(err, "Exec cmd from runtime service failed", "containerID", req.ContainerId, "cmd", req.Cmd)
return nil, err
}
klog.V(10).InfoS("[RemoteRuntimeService] Exec Response")
if resp.Url == "" {
errorMessage := "URL is not set"
err := errors.New(errorMessage)
klog.ErrorS(err, "Exec failed")
return nil, err
}
return resp, nil
}
type responder struct{}
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
klog.ErrorS(err, "Error while proxying request")
http.Error(w, err.Error(), http.StatusInternalServerError)
}
func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{})
handler.ServeHTTP(w, r)
}
模拟客户端直接请求 kubelet
func main() {
execUrl := "http://localhost:9090/exec/default/mypod/mycontainer"
req, _ := http.NewRequest("GET", execUrl, nil)
req.Header.Set("Upgrade", "SPDY/3.1")
req.Header.Set("Connection", "Upgrade")
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
rt := spdy.NewRoundTripper(tlsConfig, true, false)
executor, err := remotecommand.NewSPDYExecutorForTransports(rt, rt, http.MethodGet, req.URL)
if err != nil {
log.Fatal(err)
}
err = executor.Stream(remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
Tty: true,
})
if err != nil {
log.Fatalln(err)
}
}
apiserver 的源码阉割模拟
func main() {
urlStr := "http://localhost:9090/exec/default/mypod/mycontainer"
urlObj, err := url.Parse(urlStr)
if err != nil {
panic(err)
}
proxyHandler := proxy.NewUpgradeAwareHandler(urlObj, http.DefaultTransport, false, true, proxy.NewErrorResponder(nil))
log.Println("启动假的apiserver")
http.ListenAndServe(":6443", proxyHandler)
}
// 创建一个代理的handler,代理kubelet
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
handler.InterceptRedirects = interceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects)
handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects)
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
return handler
}
修改模拟客户端的 execUrl
execUrl := "http://localhost:6443"