Implementing Kubernetes Jobs Client And Controller For FlowForge

by gitftunila 65 views
Iklan Headers

Introduction

In the realm of modern application development, Kubernetes has emerged as a pivotal platform for orchestrating containerized workloads. This article delves into the implementation of a Kubernetes Jobs client and controller, specifically tailored for FlowForge. This comprehensive guide addresses the user story of a backend developer aiming to leverage the Kubernetes Jobs API for orchestrating Claude Code execution containers. By setting up a robust Kubernetes client, managing job lifecycles, and ensuring resource efficiency, FlowForge can effectively harness the power of containerization. This article provides a deep dive into the technical implementation, architecture references, and definition of done, making it an invaluable resource for developers seeking to enhance their understanding and application of Kubernetes within FlowForge.

User Story: Implement Kubernetes Jobs Client and Controller

The Challenge

As a backend developer, the task is to implement a Kubernetes Jobs client and controller in Go. This will enable FlowForge to orchestrate Claude Code execution containers using the Kubernetes Jobs API. The goal is to create a system that seamlessly manages jobs within a Kubernetes cluster, ensuring proper resource allocation, security, and observability.

Acceptance Criteria

To ensure the successful implementation of the Kubernetes Jobs client and controller, several key acceptance criteria must be met. These criteria cover various aspects, from Kubernetes client setup to job lifecycle control and resource management. Here's a detailed breakdown:

1. Kubernetes Client Setup

The first critical step is setting up the Kubernetes client. This involves several considerations to ensure robust and reliable connectivity with the Kubernetes cluster.

  • In-Cluster Config Support: The client must be able to initialize using the in-cluster configuration. This is essential for applications running within the Kubernetes cluster, as it allows them to automatically discover and connect to the Kubernetes API server.
  • Out-of-Cluster Config Support (Development): For development purposes, the client should also support out-of-cluster configurations. This typically involves using a kubeconfig file to connect to the cluster from a local development environment.
  • Proper Authentication and RBAC Setup: Authentication and Role-Based Access Control (RBAC) are crucial for securing access to the Kubernetes API. The client must be configured to authenticate correctly and have the necessary permissions to create and manage jobs.
  • Connection Pooling and Retry Logic: To ensure resilience and efficiency, the client should implement connection pooling and retry logic. This minimizes the overhead of establishing new connections and handles transient network issues.
  • Graceful Degradation on Connection Issues: The client should be able to gracefully handle connection issues, providing informative error messages and avoiding cascading failures.

2. Job Creation and Management

Creating and managing Kubernetes Jobs is at the heart of this implementation. Several key aspects must be addressed to ensure jobs are created correctly and efficiently.

  • Create Kubernetes Job from Job Configuration: The system must be able to create Kubernetes Jobs based on a provided job configuration. This configuration should include details such as the image to use, command to run, and resource requirements.
  • Apply Security Contexts and Resource Limits: Security contexts and resource limits are essential for securing and isolating jobs. The system should apply appropriate security contexts, such as running containers as non-root users, and set resource limits to prevent jobs from consuming excessive resources.
  • Set Proper Labels and Annotations: Labels and annotations are crucial for organizing and managing jobs within the cluster. The system should set meaningful labels, such as app, component, job-id, and user-id, and annotations for additional metadata, such as repository and branch information.
  • Configure Volume Mounts for Workspace: Jobs often require access to persistent storage. The system should configure volume mounts to provide jobs with access to the necessary workspace volumes.
  • Handle Job Naming Conflicts: Job names must be unique within a namespace. The system should handle potential naming conflicts, possibly by generating unique names or providing informative error messages.

3. Job Lifecycle Control

Managing the lifecycle of jobs is critical for ensuring efficient resource utilization and proper execution. This includes monitoring job status, handling timeouts, and cleaning up completed jobs.

  • Monitor Job Status Changes: The system must monitor the status of jobs, tracking changes such as pending, running, succeeded, or failed.
  • Implement Job Timeout Handling: Jobs should have a timeout to prevent them from running indefinitely. The system should implement a mechanism to automatically terminate jobs that exceed their timeout.
  • Support Job Cancellation: Users should be able to cancel running jobs. The system should provide a way to gracefully terminate jobs upon user request.
  • Clean Up Completed Jobs (TTL): To prevent the accumulation of completed jobs, the system should implement a Time-To-Live (TTL) mechanism to automatically delete jobs after a specified duration.
  • Handle Pod Failures and Retries: Pod failures are a common occurrence in Kubernetes. The system should handle pod failures and implement retry mechanisms as necessary.

4. Resource Management

Effective resource management is essential for ensuring the stability and performance of the Kubernetes cluster. This involves enforcing CPU and memory limits, configuring node selectors, and setting toleration rules.

  • Enforce CPU and Memory Limits: The system should enforce CPU and memory limits to prevent jobs from consuming excessive resources and impacting other workloads.
  • Implement Ephemeral Storage Limits: Ephemeral storage is used for temporary data within a pod. The system should implement limits on ephemeral storage to prevent jobs from filling up the node's disk.
  • Node Selector Configuration: Node selectors allow jobs to be scheduled on specific nodes based on labels. The system should support configuring node selectors to ensure jobs are run on appropriate nodes.
  • Toleration Rules for Worker Nodes: Tolerations allow jobs to be scheduled on nodes with specific taints. The system should implement toleration rules to allow jobs to be scheduled on worker nodes.
  • Priority Class Assignment: Priority classes allow jobs to be prioritized within the cluster. The system should assign appropriate priority classes to jobs to ensure critical workloads are given preference.

5. Observability

Observability is crucial for understanding the behavior of jobs and troubleshooting issues. This includes streaming job logs, exposing metrics, and tracking job lifecycle events.

  • Stream Job Logs to Centralized Logging: Job logs should be streamed to a centralized logging system for analysis and troubleshooting.
  • Expose Job Metrics to Prometheus: Key job metrics, such as CPU usage, memory usage, and execution time, should be exposed to Prometheus for monitoring and alerting.
  • Track Job Lifecycle Events: Job lifecycle events, such as creation, start, completion, and failure, should be tracked for auditing and analysis.
  • Error Reporting and Alerting: The system should provide error reporting and alerting mechanisms to notify administrators of job failures and other issues.
  • Job Execution History: A history of job executions should be maintained, including details such as status, start time, and end time.

Technical Implementation

The technical implementation involves creating a Kubernetes Jobs client and controller in Go. The client handles the interaction with the Kubernetes API, while the controller manages the lifecycle of jobs.

Kubernetes Jobs Client

The Kubernetes Jobs client is responsible for interacting with the Kubernetes API to create, manage, and monitor jobs. The client includes functions for creating jobs, monitoring job status, deleting jobs, and retrieving job logs.

// pkg/k8s/client.go
package k8s

import (
    "context"
    "fmt"
    "time"
    
    batchv1 "k8s.io/api/batch/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/resource"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
)

type JobsClient struct {
    client    kubernetes.Interface
    namespace string
    config    *Config
}

type Config struct {
    Namespace           string
    ServiceAccountName  string
    ImagePullPolicy     string
    WorkerImage        string
    DefaultTimeout     time.Duration
    MaxRetries         int32
}

// NewJobsClient creates a new Kubernetes Jobs client
func NewJobsClient(config *Config) (*JobsClient, error) {
    // Try in-cluster config first
    k8sConfig, err := rest.InClusterConfig()
    if err != nil {
        // Fall back to kubeconfig for development
        k8sConfig, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
        if err != nil {
            return nil, fmt.Errorf("failed to create k8s config: %w", err)
        }
    }
    
    clientset, err := kubernetes.NewForConfig(k8sConfig)
    if err != nil {
        return nil, fmt.Errorf("failed to create k8s client: %w", err)
    }
    
    return &JobsClient{
        client:    clientset,
        namespace: config.Namespace,
        config:    config,
    }, nil
}

// CreateJob creates a new Kubernetes Job for Claude Code execution
func (c *JobsClient) CreateJob(ctx context.Context, jobConfig *JobConfig) (*batchv1.Job, error) {
    job := &batchv1.Job{
        ObjectMeta: metav1.ObjectMeta{
            Name:      fmt.Sprintf("claude-job-%s", jobConfig.ID),
            Namespace: c.namespace,
            Labels: map[string]string{
                "app":        "flowforge",
                "component":  "worker",
                "job-id":     jobConfig.ID,
                "user-id":    jobConfig.UserID,
            },
            Annotations: map[string]string{
                "flowforge.io/repository": jobConfig.Repository,
                "flowforge.io/branch":     jobConfig.Branch,
            },
        },
        Spec: batchv1.JobSpec{
            TTLSecondsAfterFinished: ptr(int32(3600)), // 1 hour
            BackoffLimit:            &c.config.MaxRetries,
            ActiveDeadlineSeconds:   ptr(int64(jobConfig.Timeout.Seconds())),
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: map[string]string{
                        "app":       "flowforge",
                        "component": "worker",
                        "job-id":    jobConfig.ID,
                    },
                    Annotations: map[string]string{
                        "container.apparmor.security.beta.kubernetes.io/worker": "runtime/default",
                    },
                },
                Spec: c.buildPodSpec(jobConfig),
            },
        },
    }
    
    return c.client.BatchV1().Jobs(c.namespace).Create(ctx, job, metav1.CreateOptions{})
}

// buildPodSpec creates the pod specification with security constraints
func (c *JobsClient) buildPodSpec(config *JobConfig) corev1.PodSpec {
    return corev1.PodSpec{
        ServiceAccountName:           c.config.ServiceAccountName,
        AutomountServiceAccountToken: ptr(false),
        RestartPolicy:               corev1.RestartPolicyOnFailure,
        RuntimeClassName:            ptr("gvisor"), // Enhanced isolation
        SecurityContext: &corev1.PodSecurityContext{
            RunAsNonRoot: ptr(true),
            RunAsUser:    ptr(int64(1000)),
            RunAsGroup:   ptr(int64(1000)),
            FSGroup:      ptr(int64(1000)),
            SeccompProfile: &corev1.SeccompProfile{
                Type:             corev1.SeccompProfileTypeLocalhost,
                LocalhostProfile: ptr("claude-worker.json"),
            },
        },
        Containers: []corev1.Container{
            {
                Name:            "worker",
                Image:           c.config.WorkerImage,
                ImagePullPolicy: corev1.PullPolicy(c.config.ImagePullPolicy),
                Command:         config.Command,
                Args:           config.Args,
                Env:            c.buildEnvironment(config),
                Resources:      c.buildResourceRequirements(config),
                VolumeMounts:   c.buildVolumeMounts(),
                SecurityContext: &corev1.SecurityContext{
                    ReadOnlyRootFilesystem:   ptr(true),
                    AllowPrivilegeEscalation: ptr(false),
                    Capabilities: &corev1.Capabilities{
                        Drop: []corev1.Capability{"ALL"},
                        Add:  []corev1.Capability{"NET_BIND_SERVICE"},
                    },
                },
                LivenessProbe: &corev1.Probe{
                    ProbeHandler: corev1.ProbeHandler{
                        Exec: &corev1.ExecAction{
                            Command: []string{"/bin/sh", "-c", "pgrep -f claude-code || exit 1"},
                        },
                    },
                    InitialDelaySeconds: 60,
                    PeriodSeconds:       30,
                },
            },
        },
        Volumes:       c.buildVolumes(),
        NodeSelector:  config.NodeSelector,
        Tolerations:   c.buildTolerations(),
        PriorityClassName: "flowforge-worker",
    }
}

// buildEnvironment creates environment variables for the container
func (c *JobsClient) buildEnvironment(config *JobConfig) []corev1.EnvVar {
    return []corev1.EnvVar{
        {
            Name:  "JOB_ID",
            Value: config.ID,
        },
        {
            Name: "ANTHROPIC_API_KEY",
            ValueFrom: &corev1.EnvVarSource{
                SecretKeyRef: &corev1.SecretKeySelector{
                    LocalObjectReference: corev1.LocalObjectReference{
                        Name: "claude-secret",
                    },
                    Key: "api-key",
                },
            },
        },
        {
            Name: "GIT_CREDENTIALS",
            ValueFrom: &corev1.EnvVarSource{
                SecretKeyRef: &corev1.SecretKeySelector{
                    LocalObjectReference: corev1.LocalObjectReference{
                        Name: fmt.Sprintf("git-credentials-%s", config.ID),
                    },
                    Key: "credentials",
                },
            },
        },
        {
            Name:  "REPOSITORY_URL",
            Value: config.RepositoryURL,
        },
        {
            Name:  "BRANCH",
            Value: config.Branch,
        },
        {
            Name:  "TASK",
            Value: config.Task,
        },
    }
}

// buildResourceRequirements sets resource limits and requests
func (c *JobsClient) buildResourceRequirements(config *JobConfig) corev1.ResourceRequirements {
    return corev1.ResourceRequirements{
        Requests: corev1.ResourceList{
            corev1.ResourceCPU:              resource.MustParse(config.Resources.CPURequest),
            corev1.ResourceMemory:           resource.MustParse(config.Resources.MemoryRequest),
            corev1.ResourceEphemeralStorage: resource.MustParse("10Gi"),
        },
        Limits: corev1.ResourceList{
            corev1.ResourceCPU:              resource.MustParse(config.Resources.CPULimit),
            corev1.ResourceMemory:           resource.MustParse(config.Resources.MemoryLimit),
            corev1.ResourceEphemeralStorage: resource.MustParse("20Gi"),
        },
    }
}

// MonitorJob watches job status and streams events
func (c *JobsClient) MonitorJob(ctx context.Context, jobName string, eventChan chan<- JobEvent) error {
    watcher, err := c.client.BatchV1().Jobs(c.namespace).Watch(ctx, metav1.ListOptions{
        FieldSelector: fmt.Sprintf("metadata.name=%s", jobName),
    })
    if err != nil {
        return fmt.Errorf("failed to watch job: %w", err)
    }
    defer watcher.Stop()
    
    for event := range watcher.ResultChan() {
        job, ok := event.Object.(*batchv1.Job)
        if !ok {
            continue
        }
        
        jobEvent := JobEvent{
            Type:      string(event.Type),
            JobName:   job.Name,
            Status:    c.getJobStatus(job),
            Timestamp: time.Now(),
        }
        
        select {
        case eventChan <- jobEvent:
        case <-ctx.Done():
            return ctx.Err()
        }
        
        // Check if job is complete
        if jobEvent.Status == JobStatusSucceeded || jobEvent.Status == JobStatusFailed {
            return nil
        }
    }
    
    return nil
}

// DeleteJob deletes a Kubernetes job
func (c *JobsClient) DeleteJob(ctx context.Context, jobName string) error {
    deletePolicy := metav1.DeletePropagationBackground
    return c.client.BatchV1().Jobs(c.namespace).Delete(ctx, jobName, metav1.DeleteOptions{
        PropagationPolicy: &deletePolicy,
    })
}

// GetJobLogs retrieves logs from the job's pod
func (c *JobsClient) GetJobLogs(ctx context.Context, jobName string, follow bool) (io.ReadCloser, error) {
    // Find the pod for this job
    pods, err := c.client.CoreV1().Pods(c.namespace).List(ctx, metav1.ListOptions{
        LabelSelector: fmt.Sprintf("job-name=%s", jobName),
    })
    if err != nil {
        return nil, fmt.Errorf("failed to list pods: %w", err)
    }
    
    if len(pods.Items) == 0 {
        return nil, fmt.Errorf("no pods found for job %s", jobName)
    }
    
    // Get logs from the first pod
    podName := pods.Items[0].Name
    req := c.client.CoreV1().Pods(c.namespace).GetLogs(podName, &corev1.PodLogOptions{
        Container: "worker",
        Follow:    follow,
    })
    
    return req.Stream(ctx)
}

// Helper function for pointer creation
func ptr[T any](v T) *T {
    return &v
}

The JobsClient struct encapsulates the Kubernetes client, namespace, and configuration. The NewJobsClient function initializes the client, attempting to use the in-cluster configuration first and falling back to a kubeconfig file for development environments. The CreateJob function creates a new Kubernetes Job with specified labels, annotations, and security contexts. The buildPodSpec function constructs the pod specification, including resource limits, security settings, and container details. The MonitorJob function watches the job status and streams events to a channel, allowing the controller to react to job status changes. Lastly, the DeleteJob function deletes a Kubernetes Job, and the GetJobLogs function retrieves logs from the job's pod.

Job Controller Implementation

The Job Controller is responsible for managing the lifecycle of jobs. It receives job requests, creates Kubernetes Jobs, monitors their status, and handles job completion or failure. The controller uses a worker pool to process jobs concurrently, improving efficiency and scalability.

// pkg/controller/job_controller.go
package controller

import (
    "context"
    "fmt"
    "sync"
    "time"
    
    "github.com/flowforge/flowforge/pkg/k8s"
    "github.com/flowforge/flowforge/pkg/models"
    "go.uber.org/zap"
)

type JobController struct {
    k8sClient   *k8s.JobsClient
    jobStore    models.JobStore
    logger      *zap.Logger
    workers     int
    jobQueue    chan *models.Job
    wg          sync.WaitGroup
    ctx         context.Context
    cancel      context.CancelFunc
}

// NewJobController creates a new job controller
func NewJobController(k8sClient *k8s.JobsClient, jobStore models.JobStore, logger *zap.Logger) *JobController {
    ctx, cancel := context.WithCancel(context.Background())
    
    return &JobController{
        k8sClient: k8sClient,
        jobStore:  jobStore,
        logger:    logger,
        workers:   5,
        jobQueue:  make(chan *models.Job, 100),
        ctx:       ctx,
        cancel:    cancel,
    }
}

// Start begins processing jobs
func (c *JobController) Start() error {
    c.logger.Info("Starting job controller", zap.Int("workers", c.workers))
    
    // Start worker goroutines
    for i := 0; i < c.workers; i++ {
        c.wg.Add(1)
        go c.worker(i)
    }
    
    // Start job status monitor
    c.wg.Add(1)
    go c.statusMonitor()
    
    return nil
}

// SubmitJob adds a job to the processing queue
func (c *JobController) SubmitJob(job *models.Job) error {
    select {
    case c.jobQueue <- job:
        return nil
    case <-c.ctx.Done():
        return fmt.Errorf("controller is shutting down")
    default:
        return fmt.Errorf("job queue is full")
    }
}

// worker processes jobs from the queue
func (c *JobController) worker(id int) {
    defer c.wg.Done()
    
    logger := c.logger.With(zap.Int("worker", id))
    logger.Info("Worker started")
    
    for {
        select {
        case job := <-c.jobQueue:
            c.processJob(job, logger)
        case <-c.ctx.Done():
            logger.Info("Worker shutting down")
            return
        }
    }
}

// processJob handles the execution of a single job
func (c *JobController) processJob(job *models.Job, logger *zap.Logger) {
    logger = logger.With(zap.String("job_id", job.ID))
    logger.Info("Processing job")
    
    // Update job status to running
    job.Status = models.JobStatusRunning
    job.StartedAt = time.Now()
    if err := c.jobStore.UpdateJob(job); err != nil {
        logger.Error("Failed to update job status", zap.Error(err))
        return
    }
    
    // Create Kubernetes job
    k8sJobConfig := &k8s.JobConfig{
        ID:            job.ID,
        UserID:        job.UserID,
        Repository:    job.Repository.Name,
        RepositoryURL: job.Repository.URL,
        Branch:        job.Config.Branch,
        Task:          job.Config.Task,
        Timeout:       time.Duration(job.Config.Timeout) * time.Second,
        Resources: k8s.ResourceConfig{
            CPURequest:    "1",
            CPULimit:      "2",
            MemoryRequest: "2Gi",
            MemoryLimit:   "4Gi",
        },
        NodeSelector: map[string]string{
            "workload-type": "worker",
        },
    }
    
    k8sJob, err := c.k8sClient.CreateJob(c.ctx, k8sJobConfig)
    if err != nil {
        logger.Error("Failed to create Kubernetes job", zap.Error(err))
        c.handleJobError(job, err)
        return
    }
    
    // Monitor job execution
    eventChan := make(chan k8s.JobEvent, 10)
    go func() {
        if err := c.k8sClient.MonitorJob(c.ctx, k8sJob.Name, eventChan); err != nil {
            logger.Error("Job monitoring error", zap.Error(err))
        }
        close(eventChan)
    }()
    
    // Process job events
    for event := range eventChan {
        logger.Info("Job event", 
            zap.String("type", event.Type),
            zap.String("status", string(event.Status)))
        
        switch event.Status {
        case k8s.JobStatusSucceeded:
            c.handleJobSuccess(job, k8sJob.Name)
        case k8s.JobStatusFailed:
            c.handleJobFailure(job, k8sJob.Name)
        }
    }
}

// Stop gracefully shuts down the controller
func (c *JobController) Stop() error {
    c.logger.Info("Stopping job controller")
    c.cancel()
    c.wg.Wait()
    return nil
}

The JobController struct includes the Kubernetes client, a job store, a logger, and a worker pool. The NewJobController function initializes the controller, setting up the worker pool and job queue. The Start function starts the controller, launching worker goroutines and a job status monitor. The SubmitJob function adds a job to the processing queue. The worker function processes jobs from the queue, creating Kubernetes Jobs and monitoring their execution. The processJob function handles the creation of the Kubernetes Job, monitoring of job execution, and handling of job success or failure. Finally, the Stop function gracefully shuts down the controller, ensuring all workers complete their tasks.

Architecture References

The architecture references provide insights into how Kubernetes Jobs are executed within FlowForge.

Kubernetes Job Executor

The Kubernetes Job Executor is responsible for creating Kubernetes Jobs based on job configurations. The reference code snippet illustrates how a secure job executor can be implemented in Python.

Reference: docs/02-system-components.md:719-763

class SecureJobExecutor:
    def create_claude_job(self, job_config):
        job = client.V1Job(
            metadata=client.V1ObjectMeta(
                name=f"claude-{job_config['id']}",
                namespace="flowforge-jobs"
            ),
            spec=client.V1JobSpec(
                template=client.V1PodTemplateSpec(
                    spec=client.V1PodSpec(
                        service_account_name="claude-job-sa",
                        security_context=client.V1PodSecurityContext(
                            run_as_non_root=True,
                            run_as_user=1000,
                            fs_group=1000
                        )
                    )
                )
            )
        )

Kubernetes Job Specification

The Kubernetes Job Specification defines the structure and configuration of a Kubernetes Job. The YAML snippet provides an example of a job specification, including metadata and specifications for the pod template.

Reference: docs/02-system-components.md:810-911

apiVersion: batch/v1
kind: Job
metadata:
  name: claude-job-{id}
spec:
  ttlSecondsAfterFinished: 3600
  backoffLimit: 3
  activeDeadlineSeconds: 7200
  template:
    spec:
      serviceAccountName: claude-job-sa
      automountServiceAccountToken: false
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000

Job Execution Flow

The Job Execution Flow describes the sequence of steps involved in executing a job within the system. This includes job assignment, container initialization, and job completion acknowledgment.

Reference: docs/03-data-flow.md:53-78

Queue->>Worker: Job assignment
Worker->>Container: Start container
Container->>Container: Initialize environment
Container->>JobMgr: Job complete
JobMgr->>Queue: Acknowledge job

Definition of Done

The definition of done outlines the criteria that must be met for the implementation to be considered complete. These criteria ensure the functionality, reliability, and maintainability of the system.

  • Kubernetes client fully functional: The Kubernetes client must be able to connect to the cluster and perform all necessary operations, such as creating, monitoring, and deleting jobs.
  • Job creation with security constraints: Jobs must be created with the appropriate security contexts and resource limits to ensure isolation and security.
  • Job monitoring and status updates: The system must monitor job status changes and provide real-time updates to users.
  • Log streaming implemented: Job logs must be streamed to a centralized logging system for analysis and troubleshooting.
  • Resource limits enforced: Resource limits, such as CPU and memory, must be enforced to prevent jobs from consuming excessive resources.
  • Unit tests with >80% coverage: Unit tests must cover at least 80% of the codebase to ensure code quality and reliability.
  • Integration tests with test cluster: Integration tests must be performed with a test Kubernetes cluster to ensure the system functions correctly in a realistic environment.
  • Documentation updated: Documentation must be updated to reflect the changes made and provide guidance on how to use the system.

Dependencies

The implementation depends on several factors, including the worker container image, Kubernetes cluster access, and RBAC configuration.

  • Worker container image (previous story): The worker container image must be available for jobs to be executed.
  • Kubernetes cluster access: Access to a Kubernetes cluster is required for deploying and managing jobs.
  • RBAC configuration: Proper RBAC configuration is necessary to ensure the system has the necessary permissions to create and manage jobs.

Effort Estimate

Story Points: 13

Labels

  • backend
  • kubernetes
  • containers
  • epic-4

Conclusion

Implementing a Kubernetes Jobs client and controller for FlowForge is a significant undertaking that enhances the platform's ability to orchestrate containerized workloads efficiently. By adhering to the acceptance criteria, technical implementation details, and definition of done outlined in this article, developers can ensure a robust and scalable solution. The integration of security contexts, resource limits, and observability features further solidifies the reliability and maintainability of the system. This comprehensive guide serves as a valuable resource for developers seeking to leverage Kubernetes Jobs API within FlowForge, ultimately contributing to the platform's overall effectiveness and efficiency.