Skip to content

Commit

Permalink
sidecar: container ordered start/shutdown support
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwans committed Jul 11, 2019
1 parent a87e9a9 commit ba9e797
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 35 deletions.
14 changes: 12 additions & 2 deletions pkg/kubelet/kubelet.go
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -33,7 +33,7 @@ import (

cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -52,6 +52,7 @@ import (
"k8s.io/client-go/util/flowcontrol"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
"k8s.io/klog/glog"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand Down Expand Up @@ -2110,10 +2111,19 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
// to the pod manager.
kl.podManager.UpdatePod(pod)

sidecarsStatus := status.GetSidecarsStatus(pod)

// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
if status.NeedToReconcilePodReadiness(pod) {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
} else if sidecarsStatus.ContainersWaiting {
// if containers aren't running and the sidecars are all ready trigger a sync so that the containers get started
if sidecarsStatus.SidecarsPresent && sidecarsStatus.SidecarsReady {
glog.Infof("Pod: %s: sidecars: sidecars are ready, dispatching work", format.Pod(pod))
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}
}

// After an evicted pod is synced, all dead containers in the pod can be removed.
Expand Down
71 changes: 58 additions & 13 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -34,6 +34,7 @@ import (

"github.com/armon/circbuf"
"k8s.io/klog"
"k8s.io/klog/glog"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -176,8 +177,8 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", 0); err != nil {
glog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
}
return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
Expand Down Expand Up @@ -501,6 +502,12 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID

l := getContainerInfoFromLabels(s.Labels)
a := getContainerInfoFromAnnotations(s.Annotations)

annotations := make(map[string]string)
if a.Sidecar {
annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", l.ContainerName)] = "Sidecar"
}

// Notice that the followings are not full spec. The container killing code should not use
// un-restored fields.
pod = &v1.Pod{
Expand All @@ -509,6 +516,7 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID
Name: l.PodName,
Namespace: l.PodNamespace,
DeletionGracePeriodSeconds: a.PodDeletionGracePeriod,
Annotations: annotations,
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: a.PodTerminationGracePeriod,
Expand All @@ -530,7 +538,7 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(containerID
// killContainer kills a container through the following steps:
// * Run the pre-stop lifecycle hooks (if applicable).
// * Stop the container.
func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, gracePeriodOverride *int64) error {
func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, gracePeriodDuration time.Duration) error {
var containerSpec *v1.Container
if pod != nil {
if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil {
Expand Down Expand Up @@ -573,9 +581,9 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
}
if gracePeriodOverride != nil {
gracePeriod = *gracePeriodOverride
klog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod)
if gracePeriodDuration > 0 {
gracePeriod = int64(gracePeriodDuration.Seconds())
glog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod)
}

klog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod)
Expand All @@ -593,24 +601,61 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec
}

// killContainersWithSyncResult kills all pod's containers with sync results.
func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {
func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodDuration time.Duration) (syncResults []*kubecontainer.SyncResult) {
// split out sidecars and non-sidecars
var (
sidecars []*kubecontainer.Container
nonSidecars []*kubecontainer.Container
)
for _, container := range runningPod.Containers {
if isSidecar(pod, container.Name) {
sidecars = append(sidecars, container)
} else {
nonSidecars = append(nonSidecars, container)
}
}

containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
wg := sync.WaitGroup{}
// non-sidecars first
start := time.Now()
glog.Infof("Pod: %s, killContainersWithSyncResult: killing %d non-sidecars, %s termination period", runningPod.Name, len(nonSidecars), gracePeriodDuration)
nonSidecarsWg := sync.WaitGroup{}
nonSidecarsWg.Add(len(nonSidecars))
for _, container := range nonSidecars {
go func(container *kubecontainer.Container) {
defer utilruntime.HandleCrash()
defer nonSidecarsWg.Done()
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
if err := m.killContainer(pod, container.ID, container.Name, "Need to kill Pod", gracePeriodDuration); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
}
containerResults <- killContainerResult
}(container)
}
nonSidecarsWg.Wait()

wg.Add(len(runningPod.Containers))
for _, container := range runningPod.Containers {
gracePeriodDuration = gracePeriodDuration - time.Since(start)
if gracePeriodDuration < 0 {
gracePeriodDuration = 0
}

// then sidecars
glog.Infof("Pod: %s, killContainersWithSyncResult: killing %d sidecars, %s left", runningPod.Name, len(sidecars), gracePeriodDuration)
wg := sync.WaitGroup{}
wg.Add(len(sidecars))
for _, container := range sidecars {
go func(container *kubecontainer.Container) {
defer utilruntime.HandleCrash()
defer wg.Done()

killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
if err := m.killContainer(pod, container.ID, container.Name, "", gracePeriodOverride); err != nil {
if err := m.killContainer(pod, container.ID, container.Name, "Need to kill Pod", gracePeriodDuration); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
}
containerResults <- killContainerResult
}(container)
}
wg.Wait()

close(containerResults)

for containerResult := range containerResults {
Expand Down
12 changes: 6 additions & 6 deletions pkg/kubelet/kuberuntime/kuberuntime_container_test.go
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -25,7 +25,7 @@ import (
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestKillContainer(t *testing.T) {
}

for _, test := range tests {
err := m.killContainer(test.pod, test.containerID, test.containerName, test.reason, &test.gracePeriodOverride)
err := m.killContainer(test.pod, test.containerID, test.containerName, test.reason, time.Duration(test.gracePeriodOverride)*time.Second)
if test.succeed != (err == nil) {
t.Errorf("%s: expected %v, got %v (%v)", test.caseName, test.succeed, (err == nil), err)
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestLifeCycleHook(t *testing.T) {
// Configured and works as expected
t.Run("PreStop-CMDExec", func(t *testing.T) {
testPod.Spec.Containers[0].Lifecycle = cmdLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", &gracePeriod)
m.killContainer(testPod, cID, "foo", "testKill", time.Duration(gracePeriod)*time.Second)
if fakeRunner.Cmd[0] != cmdLifeCycle.PreStop.Exec.Command[0] {
t.Errorf("CMD Prestop hook was not invoked")
}
Expand All @@ -279,7 +279,7 @@ func TestLifeCycleHook(t *testing.T) {
t.Run("PreStop-HTTPGet", func(t *testing.T) {
defer func() { fakeHTTP.url = "" }()
testPod.Spec.Containers[0].Lifecycle = httpLifeCycle
m.killContainer(testPod, cID, "foo", "testKill", &gracePeriod)
m.killContainer(testPod, cID, "foo", "testKill", time.Duration(gracePeriod)*time.Second)

if !strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
Expand All @@ -293,7 +293,7 @@ func TestLifeCycleHook(t *testing.T) {
testPod.DeletionGracePeriodSeconds = &gracePeriodLocal
testPod.Spec.TerminationGracePeriodSeconds = &gracePeriodLocal

m.killContainer(testPod, cID, "foo", "testKill", &gracePeriodLocal)
m.killContainer(testPod, cID, "foo", "testKill", time.Duration(gracePeriodLocal)*time.Second)

if strings.Contains(fakeHTTP.url, httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Should not execute when gracePeriod is 0")
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/kuberuntime/kuberuntime_gc.go
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -133,7 +133,7 @@ func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int
ID: containers[i].id,
}
message := "Container is in unknown state, try killing it before removal"
if err := cgc.manager.killContainer(nil, id, containers[i].name, message, nil); err != nil {
if err := cgc.manager.killContainer(nil, id, containers[i].name, message, 0); err != nil {
klog.Errorf("Failed to stop container %q: %v", containers[i].id, err)
continue
}
Expand Down

0 comments on commit ba9e797

Please sign in to comment.