Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactors to be more generic #3594

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions internal/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"encoding/json"

"github.com/amir20/dozzle/internal/agent/pb"
"github.com/amir20/dozzle/internal/docker"
"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/internal/utils"
"github.com/rs/zerolog/log"
orderedmap "github.com/wk8/go-ordered-map/v2"
Expand Down Expand Up @@ -89,7 +89,7 @@ func rpcErrToErr(err error) error {
}
}

func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since time.Time, until time.Time, std docker.StdType) (<-chan *docker.LogEvent, error) {
func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since time.Time, until time.Time, std container.StdType) (<-chan *container.LogEvent, error) {
stream, err := c.client.LogsBetweenDates(ctx, &pb.LogsBetweenDatesRequest{
ContainerId: containerID,
Since: timestamppb.New(since),
Expand All @@ -101,7 +101,7 @@ func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since
return nil, err
}

events := make(chan *docker.LogEvent)
events := make(chan *container.LogEvent)

go func() {
sendLogs(stream, events)
Expand All @@ -111,7 +111,7 @@ func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since
return events, nil
}

func (c *Client) StreamContainerLogs(ctx context.Context, containerID string, since time.Time, std docker.StdType, events chan<- *docker.LogEvent) error {
func (c *Client) StreamContainerLogs(ctx context.Context, containerID string, since time.Time, std container.StdType, events chan<- *container.LogEvent) error {
stream, err := c.client.StreamLogs(ctx, &pb.StreamLogsRequest{
ContainerId: containerID,
Since: timestamppb.New(since),
Expand All @@ -125,7 +125,7 @@ func (c *Client) StreamContainerLogs(ctx context.Context, containerID string, si
return sendLogs(stream, events)
}

func sendLogs(stream pb.AgentService_StreamLogsClient, events chan<- *docker.LogEvent) error {
func sendLogs(stream pb.AgentService_StreamLogsClient, events chan<- *container.LogEvent) error {
for {
resp, err := stream.Recv()
if err != nil {
Expand All @@ -151,19 +151,19 @@ func sendLogs(stream pb.AgentService_StreamLogsClient, events chan<- *docker.Log
continue
}

events <- &docker.LogEvent{
events <- &container.LogEvent{
Id: resp.Event.Id,
ContainerID: resp.Event.ContainerId,
Message: message,
Timestamp: resp.Event.Timestamp.AsTime().Unix(),
Position: docker.LogPosition(resp.Event.Position),
Position: container.LogPosition(resp.Event.Position),
Level: resp.Event.Level,
Stream: resp.Event.Stream,
}
}
}

func (c *Client) StreamRawBytes(ctx context.Context, containerID string, since time.Time, until time.Time, std docker.StdType) (io.ReadCloser, error) {
func (c *Client) StreamRawBytes(ctx context.Context, containerID string, since time.Time, until time.Time, std container.StdType) (io.ReadCloser, error) {
out, err := c.client.StreamRawBytes(context.Background(), &pb.StreamRawBytesRequest{
ContainerId: containerID,
Since: timestamppb.New(since),
Expand Down Expand Up @@ -199,7 +199,7 @@ func (c *Client) StreamRawBytes(ctx context.Context, containerID string, since t
return r, nil
}

func (c *Client) StreamStats(ctx context.Context, stats chan<- docker.ContainerStat) error {
func (c *Client) StreamStats(ctx context.Context, stats chan<- container.ContainerStat) error {
stream, err := c.client.StreamStats(ctx, &pb.StreamStatsRequest{})
if err != nil {
return err
Expand All @@ -211,7 +211,7 @@ func (c *Client) StreamStats(ctx context.Context, stats chan<- docker.ContainerS
return rpcErrToErr(err)
}

stats <- docker.ContainerStat{
stats <- container.ContainerStat{
CPUPercent: resp.Stat.CpuPercent,
MemoryPercent: resp.Stat.MemoryPercent,
MemoryUsage: resp.Stat.MemoryUsage,
Expand All @@ -220,7 +220,7 @@ func (c *Client) StreamStats(ctx context.Context, stats chan<- docker.ContainerS
}
}

func (c *Client) StreamEvents(ctx context.Context, events chan<- docker.ContainerEvent) error {
func (c *Client) StreamEvents(ctx context.Context, events chan<- container.ContainerEvent) error {
stream, err := c.client.StreamEvents(ctx, &pb.StreamEventsRequest{})
if err != nil {
return err
Expand All @@ -232,7 +232,7 @@ func (c *Client) StreamEvents(ctx context.Context, events chan<- docker.Containe
return rpcErrToErr(err)
}

events <- docker.ContainerEvent{
events <- container.ContainerEvent{
ActorID: resp.Event.ActorId,
Name: resp.Event.Name,
Host: resp.Event.Host,
Expand All @@ -241,7 +241,7 @@ func (c *Client) StreamEvents(ctx context.Context, events chan<- docker.Containe
}
}

func (c *Client) StreamNewContainers(ctx context.Context, containers chan<- docker.Container) error {
func (c *Client) StreamNewContainers(ctx context.Context, containers chan<- container.Container) error {
stream, err := c.client.StreamContainerStarted(ctx, &pb.StreamContainerStartedRequest{})
if err != nil {
return err
Expand All @@ -253,7 +253,7 @@ func (c *Client) StreamNewContainers(ctx context.Context, containers chan<- dock
return rpcErrToErr(err)
}

containers <- docker.Container{
containers <- container.Container{
ID: resp.Container.Id,
Name: resp.Container.Name,
Image: resp.Container.Image,
Expand All @@ -271,24 +271,24 @@ func (c *Client) StreamNewContainers(ctx context.Context, containers chan<- dock
}
}

func (c *Client) FindContainer(ctx context.Context, containerID string) (docker.Container, error) {
func (c *Client) FindContainer(ctx context.Context, containerID string) (container.Container, error) {
response, err := c.client.FindContainer(ctx, &pb.FindContainerRequest{ContainerId: containerID})
if err != nil {
return docker.Container{}, err
return container.Container{}, err
}

var stats []docker.ContainerStat
var stats []container.ContainerStat

for _, stat := range response.Container.Stats {
stats = append(stats, docker.ContainerStat{
stats = append(stats, container.ContainerStat{
ID: stat.Id,
CPUPercent: stat.CpuPercent,
MemoryPercent: stat.MemoryPercent,
MemoryUsage: stat.MemoryUsage,
})
}

return docker.Container{
return container.Container{
ID: response.Container.Id,
Name: response.Container.Name,
Image: response.Container.Image,
Expand All @@ -306,7 +306,7 @@ func (c *Client) FindContainer(ctx context.Context, containerID string) (docker.
}, nil
}

func (c *Client) ListContainers(ctx context.Context, filter docker.ContainerFilter) ([]docker.Container, error) {
func (c *Client) ListContainers(ctx context.Context, filter container.ContainerFilter) ([]container.Container, error) {
in := &pb.ListContainersRequest{}

if filter != nil {
Expand All @@ -321,50 +321,50 @@ func (c *Client) ListContainers(ctx context.Context, filter docker.ContainerFilt
return nil, err
}

containers := make([]docker.Container, 0)
for _, container := range response.Containers {
var stats []docker.ContainerStat
for _, stat := range container.Stats {
stats = append(stats, docker.ContainerStat{
containers := make([]container.Container, 0)
for _, c := range response.Containers {
var stats []container.ContainerStat
for _, stat := range c.Stats {
stats = append(stats, container.ContainerStat{
ID: stat.Id,
CPUPercent: stat.CpuPercent,
MemoryPercent: stat.MemoryPercent,
MemoryUsage: stat.MemoryUsage,
})
}

containers = append(containers, docker.Container{
ID: container.Id,
Name: container.Name,
Image: container.Image,
Labels: container.Labels,
Group: container.Group,
Created: container.Created.AsTime(),
State: container.State,
Health: container.Health,
Host: container.Host,
Tty: container.Tty,
Command: container.Command,
StartedAt: container.Started.AsTime(),
FinishedAt: container.Finished.AsTime(),
containers = append(containers, container.Container{
ID: c.Id,
Name: c.Name,
Image: c.Image,
Labels: c.Labels,
Group: c.Group,
Created: c.Created.AsTime(),
State: c.State,
Health: c.Health,
Host: c.Host,
Tty: c.Tty,
Command: c.Command,
StartedAt: c.Started.AsTime(),
FinishedAt: c.Finished.AsTime(),
Stats: utils.RingBufferFrom(300, stats),
})
}

return containers, nil
}

func (c *Client) Host(ctx context.Context) (docker.Host, error) {
func (c *Client) Host(ctx context.Context) (container.Host, error) {
info, err := c.client.HostInfo(ctx, &pb.HostInfoRequest{})
if err != nil {
return docker.Host{
return container.Host{
Endpoint: c.endpoint,
Type: "agent",
Available: false,
}, err
}

return docker.Host{
return container.Host{
ID: info.Host.Id,
Name: info.Host.Name,
NCPU: int(info.Host.CpuCores),
Expand All @@ -376,16 +376,16 @@ func (c *Client) Host(ctx context.Context) (docker.Host, error) {
}, nil
}

func (c *Client) ContainerAction(ctx context.Context, containerId string, action docker.ContainerAction) error {
func (c *Client) ContainerAction(ctx context.Context, containerId string, action container.ContainerAction) error {
var containerAction pb.ContainerAction
switch action {
case docker.Start:
case container.Start:
containerAction = pb.ContainerAction_Start

case docker.Stop:
case container.Stop:
containerAction = pb.ContainerAction_Stop

case docker.Restart:
case container.Restart:
containerAction = pb.ContainerAction_Restart

}
Expand Down
Loading
Loading