Skip to content

Commit

Permalink
🐛 fix log retrieval func
Browse files Browse the repository at this point in the history
  • Loading branch information
zcubbs committed May 9, 2024
1 parent 1c8ba7e commit b444546
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 11 deletions.
4 changes: 2 additions & 2 deletions cmd/server/api/rpc_get_job_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"google.golang.org/grpc/status"
)

func (s *Server) GetJobLogs(ctx context.Context, req *sparkpb.GetJobLogsRequest) (*sparkpb.GetJobLogsResponse, error) {
func (s *Server) GetJobLogs(_ context.Context, req *sparkpb.GetJobLogsRequest) (*sparkpb.GetJobLogsResponse, error) {
// Get the job logs
logs, err := s.k8sRunner.GetLogs(ctx, req.JobId)
logs, err := s.k8sRunner.GetLogsForTaskFromDB(req.JobId)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get logs: %v", err)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/k8s/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (r *Runner) createAndMonitorJob(ctx context.Context, namespace string, task
},
}

log.Debug("Creating job", "jobId", task.ID, "image", task.Image, "command", task.Command)
// Create the Kubernetes job
job, err := r.cs.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{})
if err != nil {
Expand All @@ -42,6 +43,7 @@ func (r *Runner) createAndMonitorJob(ctx context.Context, namespace string, task
}

// Monitor the job status until completion or failure
log.Debug("Waiting for job to complete...", "jobId", task.ID, "image", task.Image, "command", task.Command)
err = r.waitForJobCompletion(ctx, job)
if err != nil {
log.Error("Failed to monitor job", "error", err)
Expand Down Expand Up @@ -106,6 +108,7 @@ func (r *Runner) evaluateJobStatus(event watch.Event) error {
// Delete deletes a pod in a Kubernetes cluster.
func (r *Runner) delete(ctx context.Context, jobId string) {
deletePolicy := metav1.DeletePropagationForeground
log.Debug("Deleting job", "jobId", jobId)
err := r.cs.BatchV1().Jobs(r.namespace).Delete(ctx, jobId, metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
Expand All @@ -115,9 +118,9 @@ func (r *Runner) delete(ctx context.Context, jobId string) {
}

// GetLogs gets the logs of a running pod in a Kubernetes cluster.
func (r *Runner) GetLogs(ctx context.Context, jobId string) (string, error) {
func (r *Runner) getLogs(ctx context.Context, jobId string) (string, error) {
pods, err := r.cs.CoreV1().Pods(r.namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-id=%s", jobId),
LabelSelector: fmt.Sprintf("job-name=%s", jobId),
})
if err != nil {
return "", fmt.Errorf("error fetching pods: %v", err)
Expand All @@ -141,7 +144,10 @@ func (r *Runner) GetLogs(ctx context.Context, jobId string) (string, error) {
continue
}

logsAggregate += logsData + "\n" // Collect logs for all pods
if logsAggregate != "" && logsData != "" {
logsAggregate += "\n"
}
logsAggregate += logsData
}

return logsAggregate, nil
Expand Down
22 changes: 22 additions & 0 deletions pkg/k8s/jobs/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package k8sJobs

import (
"github.com/tidwall/buntdb"
"strings"
)

func (r *Runner) GetLogsForTaskFromDB(taskId string) (string, error) {
var logs string
err := r.db.View(func(tx *buntdb.Tx) error {
val, err := tx.Get(taskId)
if err != nil {
return err
}
logs = strings.Split(val, ",")[4]
return nil
})
if err != nil {
return "", err
}
return logs, nil
}
2 changes: 0 additions & 2 deletions pkg/k8s/jobs/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ func loadK8sConfig(kubeconfig string) (*rest.Config, error) {
return nil, fmt.Errorf("failed to load in-cluster config: %v", err)
}
} else {
fmt.Println("Using kubeconfig file")

if kubeconfig == "default" {
homeDir, err := os.UserHomeDir()
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions pkg/k8s/jobs/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,21 @@ func (r *Runner) processTasks(ctx context.Context) {
func (r *Runner) handleTask(ctx context.Context, t Task) {
defer r.wg.Done()

log.Debug("Processing task", "jobId", t.ID, "image", t.Image, "command", t.Command)

// Set initial task status to RUNNING
if !(r.updateTaskStatus(t.ID, t.Image, t.Command, "RUNNING", "")) {
// If the task status update fails, return
return
}

defaultTimeout := 60
if t.Timeout > 0 {
defaultTimeout = t.Timeout
}

// Process the task with a timeout
jobCtx, cancel := context.WithTimeout(ctx, time.Duration(t.Timeout)*time.Second)
jobCtx, cancel := context.WithTimeout(ctx, time.Duration(defaultTimeout)*time.Second)
defer cancel()

_, err := r.createAndMonitorJob(jobCtx, r.namespace, t)
Expand All @@ -76,11 +83,13 @@ func (r *Runner) handleTask(ctx context.Context, t Task) {
}

// Retrieve logs and update task status to SUCCEEDED or FAILED based on log retrieval status
logs, err := r.GetLogs(ctx, t.ID)
log.Debug("Retrieving logs", "jobId", t.ID)
logs, err := r.getLogs(ctx, t.ID)
finalStatus := "SUCCEEDED"
if err != nil {
finalStatus = "FAILED"
logs = fmt.Sprintf("Failed to get logs: %v", err)
log.Error("Failed to get logs", "error", err, "jobId", t.ID)
}

r.updateTaskStatus(t.ID, t.Image, t.Command, finalStatus, logs)
Expand Down
5 changes: 3 additions & 2 deletions tests.http
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ Content-Type: application/json

{
"image": "busybox",
"command": ["sh", "-c", "echo Hello World"]
"command": ["sh", "-c", "echo Hello World && sleep 10"],
"timeout": 120
}

###
Expand All @@ -21,5 +22,5 @@ POST http://localhost:8000/v1/get_job_logs
Content-Type: application/json

{
"jobId": "spark-test-job"
"jobId": "spark-job-jukq7m0a7sgh12c"
}

0 comments on commit b444546

Please sign in to comment.