cronjob_controller.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. /*
  2. Copyright 2023.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package controllers
  14. import (
  15. "context"
  16. "fmt"
  17. "sort"
  18. "time"
  19. "github.com/robfig/cron"
  20. kbatch "k8s.io/api/batch/v1"
  21. corev1 "k8s.io/api/core/v1"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/runtime"
  24. ref "k8s.io/client-go/tools/reference"
  25. ctrl "sigs.k8s.io/controller-runtime"
  26. "sigs.k8s.io/controller-runtime/pkg/client"
  27. "sigs.k8s.io/controller-runtime/pkg/log"
  28. batchv1 "tutorial.kubebuilder.io/project/api/v1"
  29. )
  30. // 我们需要一个时钟,它允许我们在测试中伪造计时
  31. // CronJobReconciler reconciles a CronJob object
  32. type CronJobReconciler struct {
  33. client.Client
  34. Scheme *runtime.Scheme
  35. Clock
  36. }
  37. type realClock struct{}
  38. // 我们将模拟时钟,以便在测试时更容易及时跳转,“真实”时钟只调用 time.Now
  39. func (_ realClock) Now() time.Time { return time.Now() }
  40. // clock knows how to get the current time.
  41. // It can be used to fake out timing for testing.
  42. type Clock interface {
  43. Now() time.Time
  44. }
  45. //+kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete
  46. //+kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs/status,verbs=get;update;patch
  47. //+kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs/finalizers,verbs=update
  48. //+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
  49. //+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get
  50. var (
  51. scheduledTimeAnnotation = "batch.tutorial.kubebuilder.io/scheduled-at"
  52. )
  53. // Reconcile is part of the main kubernetes reconciliation loop which aims to
  54. // move the current state of the cluster closer to the desired state.
  55. // TODO(user): Modify the Reconcile function to compare the state specified by
  56. // the CronJob object against the actual cluster state, and then
  57. // perform operations to make the cluster state reflect the state specified by
  58. // the user.
  59. //
  60. // For more details, check Reconcile and its Result here:
  61. // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.1/pkg/reconcile
  62. func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
  63. log := log.FromContext(ctx)
  64. // TODO(user): your logic here
  65. // 1: Load the CronJob by name
  66. var cronJob batchv1.CronJob
  67. if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
  68. log.Error(err, "unable to fetch CronJob")
  69. // we'll ignore not-found errors, since they can't be fixed by an immediate
  70. // requeue (we'll need to wait for a new notification), and we can get them
  71. // on deleted requests.
  72. return ctrl.Result{}, client.IgnoreNotFound(err)
  73. }
  74. // 2: List all active jobs, and update the status
  75. var childJobs kbatch.JobList
  76. if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
  77. log.Error(err, "unable to list child Jobs")
  78. return ctrl.Result{}, err
  79. }
  80. // find the active list of jobs
  81. var activeJobs []*kbatch.Job
  82. var successfulJobs []*kbatch.Job
  83. var failedJobs []*kbatch.Job
  84. var mostRecentTime *time.Time // find the last run so we can update the status
  85. // We consider a job “finished” if it has a “Complete” or “Failed” condition marked as true.
  86. // Status conditions allow us to add extensible status information to our objects that other humans and controllers can examine to check things like completion and health.
  87. // 如果作业的“完成”或“失败”条件标记为真,则我们认为该作业“已完成”。
  88. // 状态条件允许我们向我们的对象添加可扩展的状态信息,其他人和控制器可以检查这些信息以检查诸如完成和健康之类的事情。
  89. isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
  90. for _, c := range job.Status.Conditions {
  91. if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
  92. return true, c.Type
  93. }
  94. }
  95. return false, ""
  96. }
  97. // We’ll use a helper to extract the scheduled time from the annotation that we added during job creation.
  98. // 我们将使用帮助程序从我们在创建作业期间添加的注释中提取预定时间。
  99. getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
  100. timeRaw := job.Annotations[scheduledTimeAnnotation]
  101. if len(timeRaw) == 0 {
  102. return nil, nil
  103. }
  104. timeParsed, err := time.Parse(time.RFC3339, timeRaw)
  105. if err != nil {
  106. return nil, err
  107. }
  108. return &timeParsed, nil
  109. }
  110. for i, job := range childJobs.Items {
  111. _, finishedType := isJobFinished(&job)
  112. switch finishedType {
  113. case "": // ongoing
  114. activeJobs = append(activeJobs, &childJobs.Items[i])
  115. case kbatch.JobFailed:
  116. failedJobs = append(failedJobs, &childJobs.Items[i])
  117. case kbatch.JobComplete:
  118. successfulJobs = append(successfulJobs, &childJobs.Items[i])
  119. }
  120. // We'll store the launch time in an annotation, so we'll reconstitute that from
  121. // the active jobs themselves.
  122. scheduledTimeForJob, err := getScheduledTimeForJob(&job)
  123. if err != nil {
  124. log.Error(err, "unable to parse schedule time for child job", "job", &job)
  125. continue
  126. }
  127. if scheduledTimeForJob != nil {
  128. if mostRecentTime == nil {
  129. mostRecentTime = scheduledTimeForJob
  130. } else if mostRecentTime.Before(*scheduledTimeForJob) {
  131. mostRecentTime = scheduledTimeForJob
  132. }
  133. }
  134. }
  135. if mostRecentTime != nil {
  136. cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
  137. } else {
  138. cronJob.Status.LastScheduleTime = nil
  139. }
  140. cronJob.Status.Active = nil
  141. for _, activeJob := range activeJobs {
  142. jobRef, err := ref.GetReference(r.Scheme, activeJob)
  143. if err != nil {
  144. log.Error(err, "unable to make reference to active job", "job", activeJob)
  145. continue
  146. }
  147. cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
  148. }
  149. log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))
  150. if err := r.Status().Update(ctx, &cronJob); err != nil {
  151. log.Error(err, "unable to update CronJob status")
  152. return ctrl.Result{}, err
  153. }
  154. // 3: Clean up old jobs according to the history limit
  155. // NB: deleting these are "best effort" -- if we fail on a particular one,
  156. // we won't requeue just to finish the deleting.
  157. if cronJob.Spec.FailedJobsHistoryLimit != nil {
  158. sort.Slice(failedJobs, func(i, j int) bool {
  159. if failedJobs[i].Status.StartTime == nil {
  160. return failedJobs[j].Status.StartTime != nil
  161. }
  162. return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
  163. })
  164. for i, job := range failedJobs {
  165. if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit {
  166. break
  167. }
  168. if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
  169. log.Error(err, "unable to delete old failed job", "job", job)
  170. } else {
  171. log.V(0).Info("deleted old failed job", "job", job)
  172. }
  173. }
  174. }
  175. if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
  176. sort.Slice(successfulJobs, func(i, j int) bool {
  177. if successfulJobs[i].Status.StartTime == nil {
  178. return successfulJobs[j].Status.StartTime != nil
  179. }
  180. return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
  181. })
  182. for i, job := range successfulJobs {
  183. if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit {
  184. break
  185. }
  186. if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
  187. log.Error(err, "unable to delete old successful job", "job", job)
  188. } else {
  189. log.V(0).Info("deleted old successful job", "job", job)
  190. }
  191. }
  192. }
  193. // 4: Check if we’re suspended
  194. if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
  195. log.V(1).Info("cronjob suspended, skipping")
  196. return ctrl.Result{}, nil
  197. }
  198. getNextSchedule := func(cronJob *batchv1.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) {
  199. sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
  200. if err != nil {
  201. return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
  202. }
  203. // for optimization purposes, cheat a bit and start from our last observed run time
  204. // we could reconstitute this here, but there's not much point, since we've
  205. // just updated it.
  206. var earliestTime time.Time
  207. if cronJob.Status.LastScheduleTime != nil {
  208. earliestTime = cronJob.Status.LastScheduleTime.Time
  209. } else {
  210. earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
  211. }
  212. if cronJob.Spec.StartingDeadlineSeconds != nil {
  213. // controller is not going to schedule anything below this point
  214. schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))
  215. if schedulingDeadline.After(earliestTime) {
  216. earliestTime = schedulingDeadline
  217. }
  218. }
  219. if earliestTime.After(now) {
  220. return time.Time{}, sched.Next(now), nil
  221. }
  222. starts := 0
  223. for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
  224. lastMissed = t
  225. // An object might miss several starts. For example, if
  226. // controller gets wedged on Friday at 5:01pm when everyone has
  227. // gone home, and someone comes in on Tuesday AM and discovers
  228. // the problem and restarts the controller, then all the hourly
  229. // jobs, more than 80 of them for one hourly scheduledJob, should
  230. // all start running with no further intervention (if the scheduledJob
  231. // allows concurrency and late starts).
  232. //
  233. // However, if there is a bug somewhere, or incorrect clock
  234. // on controller's server or apiservers (for setting creationTimestamp)
  235. // then there could be so many missed start times (it could be off
  236. // by decades or more), that it would eat up all the CPU and memory
  237. // of this controller. In that case, we want to not try to list
  238. // all the missed start times.
  239. starts++
  240. if starts > 100 {
  241. // We can't get the most recent times so just return an empty slice
  242. return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
  243. }
  244. }
  245. return lastMissed, sched.Next(now), nil
  246. }
  247. // 5: Get the next scheduled run
  248. // figure out the next times that we need to create
  249. // jobs at (or anything we missed).
  250. missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
  251. if err != nil {
  252. log.Error(err, "unable to figure out CronJob schedule")
  253. // we don't really care about requeuing until we get an update that
  254. // fixes the schedule, so don't return an error
  255. return ctrl.Result{}, nil
  256. }
  257. scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} // save this so we can re-use it elsewhere
  258. log = log.WithValues("now", r.Now(), "next run", nextRun)
  259. // 6: Run a new job if it’s on schedule, not past the deadline, and not blocked by our concurrency policy
  260. // 如果我们错过了一次运行,而我们仍在截止日期之内开始运行,我们就需要运行一个作业。
  261. if missedRun.IsZero() {
  262. log.V(1).Info("no upcoming scheduled times, sleeping until next")
  263. return scheduledResult, nil
  264. }
  265. // make sure we're not too late to start the run
  266. log = log.WithValues("current run", missedRun)
  267. tooLate := false
  268. if cronJob.Spec.StartingDeadlineSeconds != nil {
  269. tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
  270. }
  271. if tooLate {
  272. log.V(1).Info("missed starting deadline for last run, sleeping till next")
  273. // TODO(directxman12): events
  274. return scheduledResult, nil
  275. }
  276. // 如果我们真的必须运行一个作业,我们需要等到现有的完成,替换现有的,或者只是添加新的。如果我们的信息由于缓存延迟而过时,我们将在获取最新信息时重新排队。
  277. // figure out how to run this job -- concurrency policy might forbid us from running
  278. // multiple at the same time...
  279. if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(activeJobs) > 0 {
  280. log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
  281. return scheduledResult, nil
  282. }
  283. // ...or instruct us to replace existing ones...
  284. if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
  285. for _, activeJob := range activeJobs {
  286. // we don't care if the job was already deleted
  287. if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
  288. log.Error(err, "unable to delete active job", "job", activeJob)
  289. return ctrl.Result{}, err
  290. }
  291. }
  292. }
  293. // 一旦我们弄清楚如何处理现有的工作,我们实际上就会创造我们想要的工作
  294. constructJobForCronJob := func(cronJob *batchv1.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
  295. // We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
  296. name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())
  297. job := &kbatch.Job{
  298. ObjectMeta: metav1.ObjectMeta{
  299. Labels: make(map[string]string),
  300. Annotations: make(map[string]string),
  301. Name: name,
  302. Namespace: cronJob.Namespace,
  303. },
  304. Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
  305. }
  306. for k, v := range cronJob.Spec.JobTemplate.Annotations {
  307. job.Annotations[k] = v
  308. }
  309. job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
  310. for k, v := range cronJob.Spec.JobTemplate.Labels {
  311. job.Labels[k] = v
  312. }
  313. if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
  314. return nil, err
  315. }
  316. return job, nil
  317. }
  318. // actually make the job...
  319. job, err := constructJobForCronJob(&cronJob, missedRun)
  320. if err != nil {
  321. log.Error(err, "unable to construct job from template")
  322. // don't bother requeuing until we get a change to the spec
  323. return scheduledResult, nil
  324. }
  325. // ...and create it on the cluster
  326. if err := r.Create(ctx, job); err != nil {
  327. log.Error(err, "unable to create Job for CronJob", "job", job)
  328. return ctrl.Result{}, err
  329. }
  330. log.V(1).Info("created Job for CronJob run", "job", job)
  331. // 7: Requeue when we either see a running job or it’s time for the next scheduled run
  332. // 7:当我们看到正在运行的作业或下一次预定运行时间时重新排队
  333. // we'll requeue once we see the running job, and update our status
  334. return scheduledResult, nil
  335. }
  336. var (
  337. jobOwnerKey = ".metadata.controller"
  338. apiGVStr = batchv1.GroupVersion.String()
  339. )
  340. // SetupWithManager sets up the controller with the Manager.
  341. func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
  342. // set up a real clock, since we're not in a test
  343. if r.Clock == nil {
  344. r.Clock = realClock{}
  345. }
  346. if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(rawObj client.Object) []string {
  347. // grab the job object, extract the owner...
  348. job := rawObj.(*kbatch.Job)
  349. owner := metav1.GetControllerOf(job)
  350. if owner == nil {
  351. return nil
  352. }
  353. // ...make sure it's a CronJob...
  354. if owner.APIVersion != apiGVStr || owner.Kind != "CronJob" {
  355. return nil
  356. }
  357. // ...and if so, return it
  358. return []string{owner.Name}
  359. }); err != nil {
  360. return err
  361. }
  362. return ctrl.NewControllerManagedBy(mgr).
  363. For(&batchv1.CronJob{}).
  364. Owns(&kbatch.Job{}).
  365. Complete(r)
  366. }