Skip to content

Commit

Permalink
Refactor poller into a service
Browse files Browse the repository at this point in the history
  • Loading branch information
deadlycoconuts committed Jan 17, 2025
1 parent 1cf57ff commit a58a714
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
3 changes: 3 additions & 0 deletions plugins/turing/runner/experiment_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ func (er *experimentRunner) startBackgroundServices(
}
}()
}
if er.appContext.PollerService != nil {
er.appContext.PollerService.Start()
}
}

func (er *experimentRunner) getRequestParams(
Expand Down
7 changes: 7 additions & 0 deletions treatment-service/appcontext/appcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type AppContext struct {

AssignedTreatmentLogger *monitoring.AssignedTreatmentLogger
LocalStorage *models.LocalStorage
PollerService *services.PollerService
}

func NewAppContext(cfg *config.Config) (*AppContext, error) {
Expand Down Expand Up @@ -122,6 +123,11 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) {
return nil, err
}

var pollerService *services.PollerService
if cfg.ManagementServicePollerConfig.Enabled {
pollerService = services.NewPollerService(cfg.ManagementServicePollerConfig, localStorage)
}

appContext := &AppContext{
ExperimentService: experimentSvc,
MetricService: metricService,
Expand All @@ -131,6 +137,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) {
AssignedTreatmentLogger: logger,
MessageQueueService: messageQueueService,
LocalStorage: localStorage,
PollerService: pollerService,
}

return appContext, nil
Expand Down
17 changes: 4 additions & 13 deletions treatment-service/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type Server struct {
subscribe bool
// cleanup captures all the actions to be executed on server shut down
cleanup []func()
// poller captures the poller instance
poller *Poller
}

// NewServer creates and configures an APIServer serving all application routes.
Expand Down Expand Up @@ -108,11 +106,6 @@ func NewServer(configFiles []string) (*Server, error) {
subscribe = true
}

var poller *Poller
if cfg.ManagementServicePollerConfig.Enabled {
poller = NewPoller(cfg.ManagementServicePollerConfig, appCtx.LocalStorage)
}

srv := http.Server{
Addr: cfg.ListenAddress(),
Handler: mux,
Expand All @@ -123,7 +116,6 @@ func NewServer(configFiles []string) (*Server, error) {
appContext: appCtx,
subscribe: subscribe,
cleanup: cleanup,
poller: poller,
}, nil
}

Expand All @@ -141,11 +133,6 @@ func (srv *Server) Start() {
}()
log.Printf("Listening on %s\n", srv.Addr)

if srv.poller != nil {
log.Println("Starting poller...")
srv.poller.Start()
}

stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)

Expand Down Expand Up @@ -193,5 +180,9 @@ func (srv *Server) startBackgroundService(errChannel chan error) context.CancelF
}
}()

if srv.appContext.PollerService != nil {
srv.appContext.PollerService.Start()
}

return cancel
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package services

import (
"log"
Expand All @@ -8,24 +8,25 @@ import (
"github.com/caraml-dev/xp/treatment-service/models"
)

type Poller struct {
type PollerService struct {
pollerConfig config.ManagementServicePollerConfig
localStorage *models.LocalStorage
stopChannel chan struct{}
}

// NewPoller creates a new Poller instance with the given configuration and local storage.
// NewPollerService creates a new PollerService instance with the given configuration and local storage.
// pollerConfig: configuration for the poller
// localStorage: local storage to be used by the poller
func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *Poller {
return &Poller{
func NewPollerService(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *PollerService {
return &PollerService{
pollerConfig: pollerConfig,
localStorage: localStorage,
stopChannel: make(chan struct{}),
}
}

func (p *Poller) Start() {
func (p *PollerService) Start() {
log.Println("Starting management service poller service...")
pollInterval := time.Duration(p.pollerConfig.PollIntervalSeconds) * time.Second
ticker := time.NewTicker(pollInterval)
go func() {
Expand All @@ -46,11 +47,11 @@ func (p *Poller) Start() {
}()
}

func (p *Poller) Stop() {
func (p *PollerService) Stop() {
close(p.stopChannel)
}

func (p *Poller) Refresh() error {
func (p *PollerService) Refresh() error {
err := p.localStorage.Init()
return err
}

0 comments on commit a58a714

Please sign in to comment.