From fdaa24a6cee24cfbe0a61242c282621d0d3f5613 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Sun, 15 Dec 2024 18:17:04 +0530 Subject: [PATCH 01/19] Added changes for using queue only if poller is not enabled in config --- management-service/appcontext/appcontext.go | 14 +++++++++++--- management-service/config/config.go | 6 ++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/management-service/appcontext/appcontext.go b/management-service/appcontext/appcontext.go index 8e7ec6aa..3cfaf94b 100644 --- a/management-service/appcontext/appcontext.go +++ b/management-service/appcontext/appcontext.go @@ -30,9 +30,17 @@ func NewAppContext(db *gorm.DB, authorizer *mw.Authorizer, cfg *config.Config) ( } // Init Services - messageQueueService, err := messagequeue.NewMessageQueueService(*cfg.MessageQueueConfig) - if err != nil { - return nil, err + var messageQueueService messagequeue.MessageQueueService + if cfg.PollerConfig != nil && cfg.PollerConfig.Enabled { + messageQueueService, err = messagequeue.NewNoopMQService() + if err != nil { + return nil, err + } + } else { + messageQueueService, err = messagequeue.NewMessageQueueService(*cfg.MessageQueueConfig) + if err != nil { + return nil, err + } } segmenterSvc, err := services.NewSegmenterService(&allServices, cfg.SegmenterConfig, db) diff --git a/management-service/config/config.go b/management-service/config/config.go index 24a15e10..ca65405f 100644 --- a/management-service/config/config.go +++ b/management-service/config/config.go @@ -26,6 +26,7 @@ type Config struct { NewRelicConfig newrelic.Config SentryConfig sentry.Config XpUIConfig *XpUIConfig + PollerConfig *PollerConfig } // AuthorizationConfig captures the config for MLP authz @@ -81,6 +82,11 @@ type XpUIConfig struct { Homepage string `default:"/xp"` } +type PollerConfig struct { + Enabled bool `default:"false"` + PollInterval time.Duration `default:"30s"` +} + // ListenAddress returns the Management API app's port func (c *Config) ListenAddress() string { return fmt.Sprintf(":%d", c.Port) From 70c140774b71d45c95f0ff1f86eafc73f0ccd63f Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Sun, 15 Dec 2024 19:55:48 +0530 Subject: [PATCH 02/19] Added changes for poller config in treatment service --- management-service/config/config.go | 3 +- treatment-service/appcontext/appcontext.go | 4 +- treatment-service/config/config.go | 50 +++++++++++++++++----- treatment-service/go.mod | 4 ++ treatment-service/go.sum | 6 +++ treatment-service/server/server.go | 2 +- 6 files changed, 54 insertions(+), 15 deletions(-) diff --git a/management-service/config/config.go b/management-service/config/config.go index ca65405f..a30ff90b 100644 --- a/management-service/config/config.go +++ b/management-service/config/config.go @@ -83,8 +83,7 @@ type XpUIConfig struct { } type PollerConfig struct { - Enabled bool `default:"false"` - PollInterval time.Duration `default:"30s"` + Enabled bool `default:"false"` } // ListenAddress returns the Management API app's port diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index 58f4defd..06912973 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -99,7 +99,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { messageQueueService, err = messagequeue.NewMessageQueueService( context.Background(), localStorage, - cfg.MessageQueueConfig, + *cfg.MessageQueueConfig, cfg.GetProjectIds(), cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, ) @@ -110,7 +110,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { messageQueueService, err = messagequeue.NewMessageQueueService( pubsubInitContext, localStorage, - cfg.MessageQueueConfig, + *cfg.MessageQueueConfig, cfg.GetProjectIds(), cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, ) diff --git a/treatment-service/config/config.go b/treatment-service/config/config.go index 7058d6cc..22f4c5eb 100644 --- a/treatment-service/config/config.go +++ b/treatment-service/config/config.go @@ -3,9 +3,11 @@ package config import ( "fmt" "strconv" + "time" "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" + "github.com/go-playground/validator/v10" common_config "github.com/caraml-dev/xp/common/config" common_mq_config "github.com/caraml-dev/xp/common/messagequeue" @@ -24,16 +26,17 @@ type Config struct { Port int `json:"port" default:"8080" validate:"required"` ProjectIds []string `json:"project_ids" default:""` - AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` - DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"` - NewRelicConfig newrelic.Config `json:"new_relic_config"` - SentryConfig sentry.Config `json:"sentry_config"` - DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"` - MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"` - ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"` - MonitoringConfig Monitoring `json:"monitoring_config"` - SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"` - SegmenterConfig map[string]interface{} `json:"segmenter_config"` + AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` + DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"` + NewRelicConfig newrelic.Config `json:"new_relic_config"` + SentryConfig sentry.Config `json:"sentry_config"` + DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"` + MessageQueueConfig *common_mq_config.MessageQueueConfig `json:"message_queue_config"` + ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"` + MonitoringConfig Monitoring `json:"monitoring_config"` + SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"` + SegmenterConfig map[string]interface{} `json:"segmenter_config"` + PollerConfig *PollerConfig `json:"poller_config"` } type AssignedTreatmentLoggerConfig struct { @@ -94,6 +97,11 @@ type ManagementServiceConfig struct { AuthorizationEnabled bool `json:"authorization_enabled"` } +type PollerConfig struct { + Enabled bool `default:"false"` + PollInterval time.Duration `default:"30s"` +} + func (c *Config) GetProjectIds() []models.ProjectId { projectIds := make([]models.ProjectId, 0) for _, projectIdString := range c.ProjectIds { @@ -109,6 +117,24 @@ func (c *Config) ListenAddress() string { return fmt.Sprintf(":%d", c.Port) } +func (c *Config) Validate() error { + validate := validator.New() + + if c.MessageQueueConfig != nil { + if err := validate.Struct(c.MessageQueueConfig); err != nil { + return fmt.Errorf("invalid message queue configuration: %w", err) + } + } + + if c.PollerConfig != nil { + if err := validate.Struct(c.PollerConfig); err != nil { + return fmt.Errorf("invalid poller configuration: %w", err) + } + } + + return nil +} + func Load(filepaths ...string) (*Config, error) { var cfg Config err := common_config.ParseConfig(&cfg, filepaths) @@ -116,5 +142,9 @@ func Load(filepaths ...string) (*Config, error) { return nil, fmt.Errorf("failed to update viper config: %s", err) } + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid configuration: %w", err) + } + return &cfg, nil } diff --git a/treatment-service/go.mod b/treatment-service/go.mod index fda4fabc..c4c1d473 100644 --- a/treatment-service/go.mod +++ b/treatment-service/go.mod @@ -14,6 +14,7 @@ require ( github.com/deepmap/oapi-codegen v1.11.0 github.com/getkin/kin-openapi v0.94.0 github.com/go-chi/chi/v5 v5.0.7 + github.com/go-playground/validator/v10 v10.11.1 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 github.com/google/go-cmp v0.6.0 @@ -103,6 +104,8 @@ require ( github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.3 // indirect + github.com/go-playground/locales v0.14.0 // indirect + github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/goccy/go-json v0.9.11 // indirect github.com/gofrs/flock v0.8.1 // indirect @@ -136,6 +139,7 @@ require ( github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/leodido/go-urn v1.2.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect diff --git a/treatment-service/go.sum b/treatment-service/go.sum index f4f0b24d..788fc763 100644 --- a/treatment-service/go.sum +++ b/treatment-service/go.sum @@ -333,13 +333,18 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs= github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho= github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= +github.com/go-playground/validator/v10 v10.11.1 h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ= +github.com/go-playground/validator/v10 v10.11.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-sql-driver/mysql v1.3.0 h1:pgwjLi/dvffoP9aabwkT3AKpXQM93QARkjFhDDqC1UE= github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -599,6 +604,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.7.2/go.mod h1:xkCDAdFCIf8jsFQ5NnbK7oqaF/yU1A1X20Ltm0OvSks= github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lestrrat-go/backoff/v2 v2.0.8/go.mod h1:rHP/q/r9aT27n24JQLa7JhSQZCKBBOiM/uP402WwN8Y= github.com/lestrrat-go/blackmagic v1.0.0/go.mod h1:TNgH//0vYSs8VXDCfkZLgIrVTTXQELZffUV0tz3MtdQ= diff --git a/treatment-service/server/server.go b/treatment-service/server/server.go index 924a65ce..ca75d3a1 100644 --- a/treatment-service/server/server.go +++ b/treatment-service/server/server.go @@ -102,7 +102,7 @@ func NewServer(configFiles []string) (*Server, error) { } subscribe := false - if cfg.MessageQueueConfig.Kind != common_mq_config.NoopMQ { + if cfg.MessageQueueConfig.Kind != common_mq_config.NoopMQ && !cfg.PollerConfig.Enabled { subscribe = true } From 2b9fd4439b4cf710d1b37bca828a9e927c01aa08 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Mon, 16 Dec 2024 22:59:40 +0530 Subject: [PATCH 03/19] Added changes for poller in treatment service --- treatment-service/appcontext/appcontext.go | 34 ++++++++++++---------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index 06912973..a46d4920 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -92,10 +92,8 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { return nil, err } - log.Println("Initializing message queue subscriber...") var messageQueueService messagequeue.MessageQueueService - switch cfg.MessageQueueConfig.Kind { - case common_mq_config.NoopMQ: + if cfg.PollerConfig.Enabled || cfg.MessageQueueConfig.Kind == common_mq_config.NoopMQ { messageQueueService, err = messagequeue.NewMessageQueueService( context.Background(), localStorage, @@ -103,20 +101,24 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { cfg.GetProjectIds(), cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, ) - case common_mq_config.PubSubMQ: - pubsubInitContext, cancel := context.WithTimeout( - context.Background(), time.Duration(cfg.MessageQueueConfig.PubSubConfig.PubSubTimeoutSeconds)*time.Second) - defer cancel() - messageQueueService, err = messagequeue.NewMessageQueueService( - pubsubInitContext, - localStorage, - *cfg.MessageQueueConfig, - cfg.GetProjectIds(), - cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, - ) - default: - err = fmt.Errorf("unrecognized Message Queue Kind: %s", loggerConfig.Kind) + } else { + if cfg.MessageQueueConfig.Kind == common_mq_config.PubSubMQ { + log.Println("Initializing message queue subscriber...") + pubsubInitContext, cancel := context.WithTimeout( + context.Background(), time.Duration(cfg.MessageQueueConfig.PubSubConfig.PubSubTimeoutSeconds)*time.Second) + defer cancel() + messageQueueService, err = messagequeue.NewMessageQueueService( + pubsubInitContext, + localStorage, + *cfg.MessageQueueConfig, + cfg.GetProjectIds(), + cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, + ) + } else { + err = fmt.Errorf("unrecognized Message Queue Kind: %s", loggerConfig.Kind) + } } + if err != nil { return nil, err } From 46f6ecb3fd6db273c14e0b56bd1e9c8e494d3c79 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Tue, 17 Dec 2024 12:47:30 +0530 Subject: [PATCH 04/19] Added changes for poller in treatment service --- treatment-service/server/poller.go | 40 ++++++++++++++++++++++++++++++ treatment-service/server/server.go | 13 +++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 treatment-service/server/poller.go diff --git a/treatment-service/server/poller.go b/treatment-service/server/poller.go new file mode 100644 index 00000000..9059a19a --- /dev/null +++ b/treatment-service/server/poller.go @@ -0,0 +1,40 @@ +package server + +import ( + "log" + "time" + + "github.com/caraml-dev/xp/treatment-service/config" +) + +type Poller struct { + config config.PollerConfig + stopChan chan struct{} +} + +func NewPoller(cfg *config.PollerConfig) *Poller { + return &Poller{ + config: *cfg, + stopChan: make(chan struct{}), + } +} + +func (p *Poller) Start() { + ticker := time.NewTicker(p.config.PollInterval) + go func() { + for { + select { + case <-ticker.C: + // Polling logic here + log.Println("Polling...") + case <-p.stopChan: + ticker.Stop() + return + } + } + }() +} + +func (p *Poller) Stop() { + close(p.stopChan) +} diff --git a/treatment-service/server/server.go b/treatment-service/server/server.go index ca75d3a1..99e80d87 100644 --- a/treatment-service/server/server.go +++ b/treatment-service/server/server.go @@ -33,6 +33,8 @@ type Server struct { subscribe bool // cleanup captures all the actions to be executed on server shut down cleanup []func() + // poller captures the poller instance + poller *Poller } // NewServer creates and configures an APIServer serving all application routes. @@ -102,7 +104,10 @@ func NewServer(configFiles []string) (*Server, error) { } subscribe := false - if cfg.MessageQueueConfig.Kind != common_mq_config.NoopMQ && !cfg.PollerConfig.Enabled { + var poller *Poller + if cfg.PollerConfig != nil && cfg.PollerConfig.Enabled { + poller = NewPoller(cfg.PollerConfig) + } else if cfg.MessageQueueConfig.Kind != common_mq_config.NoopMQ { subscribe = true } @@ -116,6 +121,7 @@ func NewServer(configFiles []string) (*Server, error) { appContext: appCtx, subscribe: subscribe, cleanup: cleanup, + poller: poller, }, nil } @@ -133,6 +139,11 @@ func (srv *Server) Start() { }() log.Printf("Listening on %s\n", srv.Addr) + if srv.poller != nil { + log.Println("Starting poller...") + srv.poller.Start() + } + stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt) From eb29b7642116024f4a8fc3e79952ba21018ba8ee Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Wed, 18 Dec 2024 14:22:31 +0530 Subject: [PATCH 05/19] Added changes for poller in treatment service --- treatment-service/appcontext/appcontext.go | 2 ++ treatment-service/appcontext/appcontext_test.go | 2 +- treatment-service/config/config_test.go | 4 ++-- treatment-service/models/storage.go | 4 ++-- treatment-service/server/poller.go | 7 +++++-- treatment-service/server/server.go | 2 +- 6 files changed, 13 insertions(+), 8 deletions(-) diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index a46d4920..e4d5b023 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -23,6 +23,7 @@ type AppContext struct { SegmenterService services.SegmenterService AssignedTreatmentLogger *monitoring.AssignedTreatmentLogger + LocalStorage *models.LocalStorage } func NewAppContext(cfg *config.Config) (*AppContext, error) { @@ -131,6 +132,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { TreatmentService: treatmentSvc, AssignedTreatmentLogger: logger, MessageQueueService: messageQueueService, + LocalStorage: localStorage, } return appContext, nil diff --git a/treatment-service/appcontext/appcontext_test.go b/treatment-service/appcontext/appcontext_test.go index 172e309a..9c1b13cf 100644 --- a/treatment-service/appcontext/appcontext_test.go +++ b/treatment-service/appcontext/appcontext_test.go @@ -57,7 +57,7 @@ func TestContext(t *testing.T) { NewRelicConfig: newrelic.Config{}, SentryConfig: sentry.Config{}, DeploymentConfig: config.DeploymentConfig{}, - MessageQueueConfig: common_mq_config.MessageQueueConfig{ + MessageQueueConfig: &common_mq_config.MessageQueueConfig{ Kind: "pubsub", PubSubConfig: pubSubConfig, }, diff --git a/treatment-service/config/config_test.go b/treatment-service/config/config_test.go index 621c50aa..93bd145b 100644 --- a/treatment-service/config/config_test.go +++ b/treatment-service/config/config_test.go @@ -46,7 +46,7 @@ func TestDefaultConfigs(t *testing.T) { DebugConfig: DebugConfig{ OutputPath: "/tmp", }, - MessageQueueConfig: common_mq_config.MessageQueueConfig{ + MessageQueueConfig: &common_mq_config.MessageQueueConfig{ Kind: "", PubSubConfig: &common_mq_config.PubSubConfig{ Project: "dev", @@ -109,7 +109,7 @@ func TestLoadMultipleConfigs(t *testing.T) { DebugConfig: DebugConfig{ OutputPath: "/tmp1", }, - MessageQueueConfig: common_mq_config.MessageQueueConfig{ + MessageQueueConfig: &common_mq_config.MessageQueueConfig{ Kind: "", PubSubConfig: &common_mq_config.PubSubConfig{ Project: "dev", diff --git a/treatment-service/models/storage.go b/treatment-service/models/storage.go index 04af00d6..52d0529a 100644 --- a/treatment-service/models/storage.go +++ b/treatment-service/models/storage.go @@ -486,7 +486,7 @@ func (s *LocalStorage) DumpExperiments(filepath string) error { return os.WriteFile(filepath, file, 0644) } -func (s *LocalStorage) init() error { +func (s *LocalStorage) Init() error { s.Lock() defer s.Unlock() @@ -592,7 +592,7 @@ func NewLocalStorage( } segmenterCache := make(map[ProjectId]map[string]schema.SegmenterType) s := LocalStorage{managementClient: xpClient, subscribedProjectIds: projectIds, ProjectSegmenters: segmenterCache} - err = s.init() + err = s.Init() return &s, err } diff --git a/treatment-service/server/poller.go b/treatment-service/server/poller.go index 9059a19a..91ad9fce 100644 --- a/treatment-service/server/poller.go +++ b/treatment-service/server/poller.go @@ -5,16 +5,19 @@ import ( "time" "github.com/caraml-dev/xp/treatment-service/config" + "github.com/caraml-dev/xp/treatment-service/models" ) type Poller struct { config config.PollerConfig + storage *models.LocalStorage stopChan chan struct{} } -func NewPoller(cfg *config.PollerConfig) *Poller { +func NewPoller(cfg *config.PollerConfig, storage *models.LocalStorage) *Poller { return &Poller{ config: *cfg, + storage: storage, stopChan: make(chan struct{}), } } @@ -25,7 +28,7 @@ func (p *Poller) Start() { for { select { case <-ticker.C: - // Polling logic here + p.storage.Init() log.Println("Polling...") case <-p.stopChan: ticker.Stop() diff --git a/treatment-service/server/server.go b/treatment-service/server/server.go index 99e80d87..fb5b9e19 100644 --- a/treatment-service/server/server.go +++ b/treatment-service/server/server.go @@ -106,7 +106,7 @@ func NewServer(configFiles []string) (*Server, error) { subscribe := false var poller *Poller if cfg.PollerConfig != nil && cfg.PollerConfig.Enabled { - poller = NewPoller(cfg.PollerConfig) + poller = NewPoller(cfg.PollerConfig, appCtx.LocalStorage) } else if cfg.MessageQueueConfig.Kind != common_mq_config.NoopMQ { subscribe = true } From e7b91a33be0431a3937f7130bfba71c5523fc175 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 19 Dec 2024 09:46:26 +0530 Subject: [PATCH 06/19] Reverted poller changes for management-service --- management-service/appcontext/appcontext.go | 14 +++----------- management-service/config/config.go | 1 - 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/management-service/appcontext/appcontext.go b/management-service/appcontext/appcontext.go index 3cfaf94b..8e7ec6aa 100644 --- a/management-service/appcontext/appcontext.go +++ b/management-service/appcontext/appcontext.go @@ -30,17 +30,9 @@ func NewAppContext(db *gorm.DB, authorizer *mw.Authorizer, cfg *config.Config) ( } // Init Services - var messageQueueService messagequeue.MessageQueueService - if cfg.PollerConfig != nil && cfg.PollerConfig.Enabled { - messageQueueService, err = messagequeue.NewNoopMQService() - if err != nil { - return nil, err - } - } else { - messageQueueService, err = messagequeue.NewMessageQueueService(*cfg.MessageQueueConfig) - if err != nil { - return nil, err - } + messageQueueService, err := messagequeue.NewMessageQueueService(*cfg.MessageQueueConfig) + if err != nil { + return nil, err } segmenterSvc, err := services.NewSegmenterService(&allServices, cfg.SegmenterConfig, db) diff --git a/management-service/config/config.go b/management-service/config/config.go index a30ff90b..d01241b8 100644 --- a/management-service/config/config.go +++ b/management-service/config/config.go @@ -26,7 +26,6 @@ type Config struct { NewRelicConfig newrelic.Config SentryConfig sentry.Config XpUIConfig *XpUIConfig - PollerConfig *PollerConfig } // AuthorizationConfig captures the config for MLP authz From 7c8c331bf4b767fb67343c9d62934a2a40535a7e Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 19 Dec 2024 13:07:36 +0530 Subject: [PATCH 07/19] Fixed treatment service config test case --- treatment-service/config/config_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/treatment-service/config/config_test.go b/treatment-service/config/config_test.go index 93bd145b..f76fcf53 100644 --- a/treatment-service/config/config_test.go +++ b/treatment-service/config/config_test.go @@ -2,6 +2,7 @@ package config import ( "testing" + "time" "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" @@ -64,6 +65,10 @@ func TestDefaultConfigs(t *testing.T) { }, SentryConfig: sentry.Config{Enabled: false, Labels: emptyStringMap}, SegmenterConfig: make(map[string]interface{}), + PollerConfig: &PollerConfig{ + Enabled: false, + PollInterval: 30 * time.Second, + }, } cfg, err := Load() require.NoError(t, err) @@ -127,6 +132,10 @@ func TestLoadMultipleConfigs(t *testing.T) { }, SentryConfig: sentry.Config{Enabled: true, DSN: "my.amazing.sentry.dsn", Labels: map[string]string{"app": "xp-treatment-service"}}, SegmenterConfig: map[string]interface{}{"s2_ids": map[string]interface{}{"mins2celllevel": 9, "maxs2celllevel": 15}}, + PollerConfig: &PollerConfig{ + Enabled: false, + PollInterval: 30 * time.Second, + }, } cfg, err := Load(configFiles...) From 7760cbd1c660a79319244fb5f3b81a0c1bc6395f Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 19 Dec 2024 13:53:36 +0530 Subject: [PATCH 08/19] Refactored poller --- treatment-service/server/poller.go | 46 ++++++++++++++++++------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/treatment-service/server/poller.go b/treatment-service/server/poller.go index 91ad9fce..403143ba 100644 --- a/treatment-service/server/poller.go +++ b/treatment-service/server/poller.go @@ -2,6 +2,7 @@ package server import ( "log" + "sync" "time" "github.com/caraml-dev/xp/treatment-service/config" @@ -9,35 +10,44 @@ import ( ) type Poller struct { - config config.PollerConfig - storage *models.LocalStorage - stopChan chan struct{} + pollerConfig config.PollerConfig + storage *models.LocalStorage + stopChan chan struct{} + stopOnce sync.Once } -func NewPoller(cfg *config.PollerConfig, storage *models.LocalStorage) *Poller { +func NewPoller(pollerConfig *config.PollerConfig, storage *models.LocalStorage) *Poller { return &Poller{ - config: *cfg, - storage: storage, - stopChan: make(chan struct{}), + pollerConfig: *pollerConfig, + storage: storage, + stopChan: make(chan struct{}), } } func (p *Poller) Start() { - ticker := time.NewTicker(p.config.PollInterval) - go func() { - for { - select { - case <-ticker.C: - p.storage.Init() - log.Println("Polling...") - case <-p.stopChan: - ticker.Stop() + ticker := time.NewTicker(p.pollerConfig.PollInterval) + go p.poll(ticker) +} + +func (p *Poller) poll(ticker *time.Ticker) { + for { + select { + case <-ticker.C: + err := p.storage.Init() + log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval) + if err != nil { + log.Printf("Error updating local storage: %v", err) return } + case <-p.stopChan: + ticker.Stop() + return } - }() + } } func (p *Poller) Stop() { - close(p.stopChan) + p.stopOnce.Do(func() { + close(p.stopChan) + }) } From a7ee54eb47cd908d39e5d66028764955554c3b31 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 19 Dec 2024 15:01:51 +0530 Subject: [PATCH 09/19] Fixed poller condition --- treatment-service/appcontext/appcontext.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index e4d5b023..38b8859c 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -94,7 +94,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { } var messageQueueService messagequeue.MessageQueueService - if cfg.PollerConfig.Enabled || cfg.MessageQueueConfig.Kind == common_mq_config.NoopMQ { + if (cfg.PollerConfig != nil && cfg.PollerConfig.Enabled) || cfg.MessageQueueConfig.Kind == common_mq_config.NoopMQ { messageQueueService, err = messagequeue.NewMessageQueueService( context.Background(), localStorage, From 814fc9de623b01d312d69fa0f1aea244ae4d0da2 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 19 Dec 2024 16:14:46 +0530 Subject: [PATCH 10/19] Refactored poller --- treatment-service/server/poller.go | 61 ++++++++++++++++++------------ 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/treatment-service/server/poller.go b/treatment-service/server/poller.go index 403143ba..eb39aba3 100644 --- a/treatment-service/server/poller.go +++ b/treatment-service/server/poller.go @@ -11,43 +11,56 @@ import ( type Poller struct { pollerConfig config.PollerConfig - storage *models.LocalStorage - stopChan chan struct{} + localStorage *models.LocalStorage + stopChannel chan struct{} stopOnce sync.Once + waitGroup sync.WaitGroup } -func NewPoller(pollerConfig *config.PollerConfig, storage *models.LocalStorage) *Poller { +// NewPoller creates a new Poller instance with the given configuration and local storage. +// pollerConfig: configuration for the poller +// localStorage: local storage to be used by the poller +func NewPoller(pollerConfig *config.PollerConfig, localStorage *models.LocalStorage) *Poller { return &Poller{ pollerConfig: *pollerConfig, - storage: storage, - stopChan: make(chan struct{}), + localStorage: localStorage, + stopChannel: make(chan struct{}), } } func (p *Poller) Start() { - ticker := time.NewTicker(p.pollerConfig.PollInterval) - go p.poll(ticker) -} - -func (p *Poller) poll(ticker *time.Ticker) { - for { - select { - case <-ticker.C: - err := p.storage.Init() - log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval) - if err != nil { - log.Printf("Error updating local storage: %v", err) - return + var startOnce sync.Once + startOnce.Do(func() { + ticker := time.NewTicker(p.pollerConfig.PollInterval) + p.waitGroup.Add(1) + go func() { + defer p.waitGroup.Done() + for { + select { + case <-ticker.C: + err := p.localStorage.Init() + log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval) + if err != nil { + log.Printf("Error updating local storage: %v", err) + continue + } + case <-p.stopChannel: + ticker.Stop() + return + } } - case <-p.stopChan: - ticker.Stop() - return - } - } + }() + }) } func (p *Poller) Stop() { p.stopOnce.Do(func() { - close(p.stopChan) + select { + case <-p.stopChannel: + // Already closed + default: + close(p.stopChannel) + p.waitGroup.Wait() + } }) } From fa9f4d55f9ab9e18c709335d5192f7604d843634 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 19 Dec 2024 16:48:56 +0530 Subject: [PATCH 11/19] Removed unused struct --- management-service/config/config.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/management-service/config/config.go b/management-service/config/config.go index d01241b8..24a15e10 100644 --- a/management-service/config/config.go +++ b/management-service/config/config.go @@ -81,10 +81,6 @@ type XpUIConfig struct { Homepage string `default:"/xp"` } -type PollerConfig struct { - Enabled bool `default:"false"` -} - // ListenAddress returns the Management API app's port func (c *Config) ListenAddress() string { return fmt.Sprintf(":%d", c.Port) From 0e953f23342c34de82aed8538ac79b4c9f9c8d7b Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Fri, 20 Dec 2024 14:13:50 +0530 Subject: [PATCH 12/19] Integrated PR review comments --- treatment-service/appcontext/appcontext.go | 6 +-- treatment-service/config/config.go | 46 ++++++---------------- treatment-service/config/config_test.go | 8 ++-- treatment-service/server/poller.go | 46 ++++++++++++---------- treatment-service/server/server.go | 2 +- 5 files changed, 45 insertions(+), 63 deletions(-) diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index 38b8859c..ce022b9b 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -94,11 +94,11 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { } var messageQueueService messagequeue.MessageQueueService - if (cfg.PollerConfig != nil && cfg.PollerConfig.Enabled) || cfg.MessageQueueConfig.Kind == common_mq_config.NoopMQ { + if cfg.PollerConfig.Enabled || cfg.MessageQueueConfig.Kind == common_mq_config.NoopMQ { messageQueueService, err = messagequeue.NewMessageQueueService( context.Background(), localStorage, - *cfg.MessageQueueConfig, + cfg.MessageQueueConfig, cfg.GetProjectIds(), cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, ) @@ -111,7 +111,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { messageQueueService, err = messagequeue.NewMessageQueueService( pubsubInitContext, localStorage, - *cfg.MessageQueueConfig, + cfg.MessageQueueConfig, cfg.GetProjectIds(), cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, ) diff --git a/treatment-service/config/config.go b/treatment-service/config/config.go index 22f4c5eb..32996523 100644 --- a/treatment-service/config/config.go +++ b/treatment-service/config/config.go @@ -7,8 +7,6 @@ import ( "github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic" "github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry" - "github.com/go-playground/validator/v10" - common_config "github.com/caraml-dev/xp/common/config" common_mq_config "github.com/caraml-dev/xp/common/messagequeue" "github.com/caraml-dev/xp/treatment-service/models" @@ -26,17 +24,17 @@ type Config struct { Port int `json:"port" default:"8080" validate:"required"` ProjectIds []string `json:"project_ids" default:""` - AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` - DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"` - NewRelicConfig newrelic.Config `json:"new_relic_config"` - SentryConfig sentry.Config `json:"sentry_config"` - DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"` - MessageQueueConfig *common_mq_config.MessageQueueConfig `json:"message_queue_config"` - ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"` - MonitoringConfig Monitoring `json:"monitoring_config"` - SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"` - SegmenterConfig map[string]interface{} `json:"segmenter_config"` - PollerConfig *PollerConfig `json:"poller_config"` + AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` + DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"` + NewRelicConfig newrelic.Config `json:"new_relic_config"` + SentryConfig sentry.Config `json:"sentry_config"` + DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"` + MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"` + ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"` + MonitoringConfig Monitoring `json:"monitoring_config"` + SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"` + SegmenterConfig map[string]interface{} `json:"segmenter_config"` + PollerConfig PollerConfig `json:"poller_config"` } type AssignedTreatmentLoggerConfig struct { @@ -117,24 +115,6 @@ func (c *Config) ListenAddress() string { return fmt.Sprintf(":%d", c.Port) } -func (c *Config) Validate() error { - validate := validator.New() - - if c.MessageQueueConfig != nil { - if err := validate.Struct(c.MessageQueueConfig); err != nil { - return fmt.Errorf("invalid message queue configuration: %w", err) - } - } - - if c.PollerConfig != nil { - if err := validate.Struct(c.PollerConfig); err != nil { - return fmt.Errorf("invalid poller configuration: %w", err) - } - } - - return nil -} - func Load(filepaths ...string) (*Config, error) { var cfg Config err := common_config.ParseConfig(&cfg, filepaths) @@ -142,9 +122,5 @@ func Load(filepaths ...string) (*Config, error) { return nil, fmt.Errorf("failed to update viper config: %s", err) } - if err := cfg.Validate(); err != nil { - return nil, fmt.Errorf("invalid configuration: %w", err) - } - return &cfg, nil } diff --git a/treatment-service/config/config_test.go b/treatment-service/config/config_test.go index f76fcf53..b0660b7b 100644 --- a/treatment-service/config/config_test.go +++ b/treatment-service/config/config_test.go @@ -47,7 +47,7 @@ func TestDefaultConfigs(t *testing.T) { DebugConfig: DebugConfig{ OutputPath: "/tmp", }, - MessageQueueConfig: &common_mq_config.MessageQueueConfig{ + MessageQueueConfig: common_mq_config.MessageQueueConfig{ Kind: "", PubSubConfig: &common_mq_config.PubSubConfig{ Project: "dev", @@ -65,7 +65,7 @@ func TestDefaultConfigs(t *testing.T) { }, SentryConfig: sentry.Config{Enabled: false, Labels: emptyStringMap}, SegmenterConfig: make(map[string]interface{}), - PollerConfig: &PollerConfig{ + PollerConfig: PollerConfig{ Enabled: false, PollInterval: 30 * time.Second, }, @@ -114,7 +114,7 @@ func TestLoadMultipleConfigs(t *testing.T) { DebugConfig: DebugConfig{ OutputPath: "/tmp1", }, - MessageQueueConfig: &common_mq_config.MessageQueueConfig{ + MessageQueueConfig: common_mq_config.MessageQueueConfig{ Kind: "", PubSubConfig: &common_mq_config.PubSubConfig{ Project: "dev", @@ -132,7 +132,7 @@ func TestLoadMultipleConfigs(t *testing.T) { }, SentryConfig: sentry.Config{Enabled: true, DSN: "my.amazing.sentry.dsn", Labels: map[string]string{"app": "xp-treatment-service"}}, SegmenterConfig: map[string]interface{}{"s2_ids": map[string]interface{}{"mins2celllevel": 9, "maxs2celllevel": 15}}, - PollerConfig: &PollerConfig{ + PollerConfig: PollerConfig{ Enabled: false, PollInterval: 30 * time.Second, }, diff --git a/treatment-service/server/poller.go b/treatment-service/server/poller.go index eb39aba3..bf18359d 100644 --- a/treatment-service/server/poller.go +++ b/treatment-service/server/poller.go @@ -12,39 +12,45 @@ import ( type Poller struct { pollerConfig config.PollerConfig localStorage *models.LocalStorage - stopChannel chan struct{} - stopOnce sync.Once - waitGroup sync.WaitGroup + control PollerControl +} + +type PollerControl struct { + stopChannel chan struct{} + startOnce sync.Once + stopOnce sync.Once + waitGroup sync.WaitGroup } // NewPoller creates a new Poller instance with the given configuration and local storage. // pollerConfig: configuration for the poller // localStorage: local storage to be used by the poller -func NewPoller(pollerConfig *config.PollerConfig, localStorage *models.LocalStorage) *Poller { +func NewPoller(pollerConfig config.PollerConfig, localStorage *models.LocalStorage) *Poller { return &Poller{ - pollerConfig: *pollerConfig, + pollerConfig: pollerConfig, localStorage: localStorage, - stopChannel: make(chan struct{}), + control: PollerControl{ + stopChannel: make(chan struct{}), + }, } } func (p *Poller) Start() { - var startOnce sync.Once - startOnce.Do(func() { + p.control.startOnce.Do(func() { ticker := time.NewTicker(p.pollerConfig.PollInterval) - p.waitGroup.Add(1) + p.control.waitGroup.Add(1) go func() { - defer p.waitGroup.Done() + defer p.control.waitGroup.Done() for { select { case <-ticker.C: - err := p.localStorage.Init() + err := p.Refresh() log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval) if err != nil { log.Printf("Error updating local storage: %v", err) continue } - case <-p.stopChannel: + case <-p.control.stopChannel: ticker.Stop() return } @@ -54,13 +60,13 @@ func (p *Poller) Start() { } func (p *Poller) Stop() { - p.stopOnce.Do(func() { - select { - case <-p.stopChannel: - // Already closed - default: - close(p.stopChannel) - p.waitGroup.Wait() - } + p.control.stopOnce.Do(func() { + close(p.control.stopChannel) + p.control.waitGroup.Wait() }) } + +func (p *Poller) Refresh() error { + err := p.localStorage.Init() + return err +} diff --git a/treatment-service/server/server.go b/treatment-service/server/server.go index fb5b9e19..6797d687 100644 --- a/treatment-service/server/server.go +++ b/treatment-service/server/server.go @@ -105,7 +105,7 @@ func NewServer(configFiles []string) (*Server, error) { subscribe := false var poller *Poller - if cfg.PollerConfig != nil && cfg.PollerConfig.Enabled { + if cfg.PollerConfig.Enabled { poller = NewPoller(cfg.PollerConfig, appCtx.LocalStorage) } else if cfg.MessageQueueConfig.Kind != common_mq_config.NoopMQ { subscribe = true From 63bd2c62f7edb0a0c538b1f7770cb2f61da7cf1e Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Fri, 20 Dec 2024 14:22:08 +0530 Subject: [PATCH 13/19] Integrated PR review comments --- treatment-service/appcontext/appcontext_test.go | 2 +- treatment-service/config/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/treatment-service/appcontext/appcontext_test.go b/treatment-service/appcontext/appcontext_test.go index 9c1b13cf..172e309a 100644 --- a/treatment-service/appcontext/appcontext_test.go +++ b/treatment-service/appcontext/appcontext_test.go @@ -57,7 +57,7 @@ func TestContext(t *testing.T) { NewRelicConfig: newrelic.Config{}, SentryConfig: sentry.Config{}, DeploymentConfig: config.DeploymentConfig{}, - MessageQueueConfig: &common_mq_config.MessageQueueConfig{ + MessageQueueConfig: common_mq_config.MessageQueueConfig{ Kind: "pubsub", PubSubConfig: pubSubConfig, }, diff --git a/treatment-service/config/config.go b/treatment-service/config/config.go index 32996523..d157a592 100644 --- a/treatment-service/config/config.go +++ b/treatment-service/config/config.go @@ -34,7 +34,7 @@ type Config struct { MonitoringConfig Monitoring `json:"monitoring_config"` SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"` SegmenterConfig map[string]interface{} `json:"segmenter_config"` - PollerConfig PollerConfig `json:"poller_config"` + PollerConfig PollerConfig `json:"poller_config" validate:"required,dive"` } type AssignedTreatmentLoggerConfig struct { From c79dde16c0fc511232f91712ff975a0d457b3177 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Fri, 20 Dec 2024 15:47:59 +0530 Subject: [PATCH 14/19] Integrated PR review comments --- treatment-service/appcontext/appcontext.go | 28 ++++++++++------------ 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index ce022b9b..87f9fa4c 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -102,22 +102,20 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { cfg.GetProjectIds(), cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, ) + } else if cfg.MessageQueueConfig.Kind == common_mq_config.PubSubMQ { + log.Println("Initializing message queue subscriber...") + pubsubInitContext, cancel := context.WithTimeout( + context.Background(), time.Duration(cfg.MessageQueueConfig.PubSubConfig.PubSubTimeoutSeconds)*time.Second) + defer cancel() + messageQueueService, err = messagequeue.NewMessageQueueService( + pubsubInitContext, + localStorage, + cfg.MessageQueueConfig, + cfg.GetProjectIds(), + cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, + ) } else { - if cfg.MessageQueueConfig.Kind == common_mq_config.PubSubMQ { - log.Println("Initializing message queue subscriber...") - pubsubInitContext, cancel := context.WithTimeout( - context.Background(), time.Duration(cfg.MessageQueueConfig.PubSubConfig.PubSubTimeoutSeconds)*time.Second) - defer cancel() - messageQueueService, err = messagequeue.NewMessageQueueService( - pubsubInitContext, - localStorage, - cfg.MessageQueueConfig, - cfg.GetProjectIds(), - cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, - ) - } else { - err = fmt.Errorf("unrecognized Message Queue Kind: %s", loggerConfig.Kind) - } + err = fmt.Errorf("unrecognized Message Queue Kind: %s", loggerConfig.Kind) } if err != nil { From 9a5357c705e8b30d43516cfe94b74301dc02e7d2 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Tue, 24 Dec 2024 09:24:19 +0530 Subject: [PATCH 15/19] Added sample poller config --- treatment-service/config/example.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/treatment-service/config/example.yaml b/treatment-service/config/example.yaml index a353f080..9a8e2d11 100644 --- a/treatment-service/config/example.yaml +++ b/treatment-service/config/example.yaml @@ -30,3 +30,7 @@ SegmenterConfig: S2_IDs: MinS2CellLevel: 10 MaxS2CellLevel: 14 + +PollerConfig: + Enabled: true + PollInterval: 10s From a8d46ae56c23563ae9110dd10aa08d3be551ad63 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 2 Jan 2025 13:09:56 +0530 Subject: [PATCH 16/19] Integrated PR review comments --- treatment-service/appcontext/appcontext.go | 2 +- treatment-service/config/config.go | 24 +++++++++++----------- treatment-service/config/config_test.go | 4 ++-- treatment-service/server/poller.go | 4 ++-- treatment-service/server/server.go | 4 ++-- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index 87f9fa4c..9021993a 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -94,7 +94,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { } var messageQueueService messagequeue.MessageQueueService - if cfg.PollerConfig.Enabled || cfg.MessageQueueConfig.Kind == common_mq_config.NoopMQ { + if cfg.ManagementServicePollerConfig.Enabled || cfg.MessageQueueConfig.Kind == common_mq_config.NoopMQ { messageQueueService, err = messagequeue.NewMessageQueueService( context.Background(), localStorage, diff --git a/treatment-service/config/config.go b/treatment-service/config/config.go index d157a592..a738b02d 100644 --- a/treatment-service/config/config.go +++ b/treatment-service/config/config.go @@ -24,17 +24,17 @@ type Config struct { Port int `json:"port" default:"8080" validate:"required"` ProjectIds []string `json:"project_ids" default:""` - AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` - DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"` - NewRelicConfig newrelic.Config `json:"new_relic_config"` - SentryConfig sentry.Config `json:"sentry_config"` - DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"` - MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"` - ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"` - MonitoringConfig Monitoring `json:"monitoring_config"` - SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"` - SegmenterConfig map[string]interface{} `json:"segmenter_config"` - PollerConfig PollerConfig `json:"poller_config" validate:"required,dive"` + AssignedTreatmentLogger AssignedTreatmentLoggerConfig `json:"assigned_treatment_logger"` + DebugConfig DebugConfig `json:"debug_config" validate:"required,dive"` + NewRelicConfig newrelic.Config `json:"new_relic_config"` + SentryConfig sentry.Config `json:"sentry_config"` + DeploymentConfig DeploymentConfig `json:"deployment_config" validate:"required,dive"` + MessageQueueConfig common_mq_config.MessageQueueConfig `json:"message_queue_config" validate:"required,dive"` + ManagementService ManagementServiceConfig `json:"management_service" validate:"required,dive"` + MonitoringConfig Monitoring `json:"monitoring_config"` + SwaggerConfig SwaggerConfig `json:"swagger_config" validate:"required,dive"` + SegmenterConfig map[string]interface{} `json:"segmenter_config"` + ManagementServicePollerConfig ManagementServicePollerConfig `json:"management_service_poller_config" validate:"required,dive"` } type AssignedTreatmentLoggerConfig struct { @@ -95,7 +95,7 @@ type ManagementServiceConfig struct { AuthorizationEnabled bool `json:"authorization_enabled"` } -type PollerConfig struct { +type ManagementServicePollerConfig struct { Enabled bool `default:"false"` PollInterval time.Duration `default:"30s"` } diff --git a/treatment-service/config/config_test.go b/treatment-service/config/config_test.go index b0660b7b..9c04fd6c 100644 --- a/treatment-service/config/config_test.go +++ b/treatment-service/config/config_test.go @@ -65,7 +65,7 @@ func TestDefaultConfigs(t *testing.T) { }, SentryConfig: sentry.Config{Enabled: false, Labels: emptyStringMap}, SegmenterConfig: make(map[string]interface{}), - PollerConfig: PollerConfig{ + ManagementServicePollerConfig: ManagementServicePollerConfig{ Enabled: false, PollInterval: 30 * time.Second, }, @@ -132,7 +132,7 @@ func TestLoadMultipleConfigs(t *testing.T) { }, SentryConfig: sentry.Config{Enabled: true, DSN: "my.amazing.sentry.dsn", Labels: map[string]string{"app": "xp-treatment-service"}}, SegmenterConfig: map[string]interface{}{"s2_ids": map[string]interface{}{"mins2celllevel": 9, "maxs2celllevel": 15}}, - PollerConfig: PollerConfig{ + ManagementServicePollerConfig: ManagementServicePollerConfig{ Enabled: false, PollInterval: 30 * time.Second, }, diff --git a/treatment-service/server/poller.go b/treatment-service/server/poller.go index bf18359d..899b8ac7 100644 --- a/treatment-service/server/poller.go +++ b/treatment-service/server/poller.go @@ -10,7 +10,7 @@ import ( ) type Poller struct { - pollerConfig config.PollerConfig + pollerConfig config.ManagementServicePollerConfig localStorage *models.LocalStorage control PollerControl } @@ -25,7 +25,7 @@ type PollerControl struct { // NewPoller creates a new Poller instance with the given configuration and local storage. // pollerConfig: configuration for the poller // localStorage: local storage to be used by the poller -func NewPoller(pollerConfig config.PollerConfig, localStorage *models.LocalStorage) *Poller { +func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage *models.LocalStorage) *Poller { return &Poller{ pollerConfig: pollerConfig, localStorage: localStorage, diff --git a/treatment-service/server/server.go b/treatment-service/server/server.go index 6797d687..ef1d8cc0 100644 --- a/treatment-service/server/server.go +++ b/treatment-service/server/server.go @@ -105,8 +105,8 @@ func NewServer(configFiles []string) (*Server, error) { subscribe := false var poller *Poller - if cfg.PollerConfig.Enabled { - poller = NewPoller(cfg.PollerConfig, appCtx.LocalStorage) + if cfg.ManagementServicePollerConfig.Enabled { + poller = NewPoller(cfg.ManagementServicePollerConfig, appCtx.LocalStorage) } else if cfg.MessageQueueConfig.Kind != common_mq_config.NoopMQ { subscribe = true } From 482006a014a78b0761f2f23ab25c529ba9898de6 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 2 Jan 2025 14:08:48 +0530 Subject: [PATCH 17/19] Integrated PR review comments --- treatment-service/server/poller.go | 53 ++++++++++-------------------- 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/treatment-service/server/poller.go b/treatment-service/server/poller.go index 899b8ac7..da636d4f 100644 --- a/treatment-service/server/poller.go +++ b/treatment-service/server/poller.go @@ -2,7 +2,6 @@ package server import ( "log" - "sync" "time" "github.com/caraml-dev/xp/treatment-service/config" @@ -12,14 +11,7 @@ import ( type Poller struct { pollerConfig config.ManagementServicePollerConfig localStorage *models.LocalStorage - control PollerControl -} - -type PollerControl struct { - stopChannel chan struct{} - startOnce sync.Once - stopOnce sync.Once - waitGroup sync.WaitGroup + stopChannel chan struct{} } // NewPoller creates a new Poller instance with the given configuration and local storage. @@ -29,41 +21,32 @@ func NewPoller(pollerConfig config.ManagementServicePollerConfig, localStorage * return &Poller{ pollerConfig: pollerConfig, localStorage: localStorage, - control: PollerControl{ - stopChannel: make(chan struct{}), - }, + stopChannel: make(chan struct{}), } } func (p *Poller) Start() { - p.control.startOnce.Do(func() { - ticker := time.NewTicker(p.pollerConfig.PollInterval) - p.control.waitGroup.Add(1) - go func() { - defer p.control.waitGroup.Done() - for { - select { - case <-ticker.C: - err := p.Refresh() - log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval) - if err != nil { - log.Printf("Error updating local storage: %v", err) - continue - } - case <-p.control.stopChannel: - ticker.Stop() - return + ticker := time.NewTicker(p.pollerConfig.PollInterval) + go func() { + for { + select { + case <-ticker.C: + err := p.Refresh() + log.Printf("Polling at %v with interval %v", time.Now(), p.pollerConfig.PollInterval) + if err != nil { + log.Printf("Error updating local storage: %v", err) + continue } + case <-p.stopChannel: + ticker.Stop() + return } - }() - }) + } + }() } func (p *Poller) Stop() { - p.control.stopOnce.Do(func() { - close(p.control.stopChannel) - p.control.waitGroup.Wait() - }) + close(p.stopChannel) } func (p *Poller) Refresh() error { From 440e98a5fdd382a023a116385705c68a91639649 Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 2 Jan 2025 14:36:08 +0530 Subject: [PATCH 18/19] Integrated PR review comments --- treatment-service/appcontext/appcontext.go | 9 ++++----- treatment-service/server/server.go | 6 ++++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index 9021993a..8510d814 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -94,7 +94,8 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { } var messageQueueService messagequeue.MessageQueueService - if cfg.ManagementServicePollerConfig.Enabled || cfg.MessageQueueConfig.Kind == common_mq_config.NoopMQ { + switch cfg.MessageQueueConfig.Kind { + case common_mq_config.NoopMQ: messageQueueService, err = messagequeue.NewMessageQueueService( context.Background(), localStorage, @@ -102,8 +103,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { cfg.GetProjectIds(), cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, ) - } else if cfg.MessageQueueConfig.Kind == common_mq_config.PubSubMQ { - log.Println("Initializing message queue subscriber...") + case common_mq_config.PubSubMQ: pubsubInitContext, cancel := context.WithTimeout( context.Background(), time.Duration(cfg.MessageQueueConfig.PubSubConfig.PubSubTimeoutSeconds)*time.Second) defer cancel() @@ -114,10 +114,9 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { cfg.GetProjectIds(), cfg.DeploymentConfig.GoogleApplicationCredentialsEnvVar, ) - } else { + default: err = fmt.Errorf("unrecognized Message Queue Kind: %s", loggerConfig.Kind) } - if err != nil { return nil, err } diff --git a/treatment-service/server/server.go b/treatment-service/server/server.go index ef1d8cc0..e94f17d4 100644 --- a/treatment-service/server/server.go +++ b/treatment-service/server/server.go @@ -104,11 +104,13 @@ func NewServer(configFiles []string) (*Server, error) { } subscribe := false + if cfg.MessageQueueConfig.Kind != common_mq_config.NoopMQ { + subscribe = true + } + var poller *Poller if cfg.ManagementServicePollerConfig.Enabled { poller = NewPoller(cfg.ManagementServicePollerConfig, appCtx.LocalStorage) - } else if cfg.MessageQueueConfig.Kind != common_mq_config.NoopMQ { - subscribe = true } srv := http.Server{ From bca7aac30666200709245f2bb91a2e56bb3be92f Mon Sep 17 00:00:00 2001 From: Anirudh Rautela Date: Thu, 2 Jan 2025 14:44:50 +0530 Subject: [PATCH 19/19] Integrated PR review comments --- treatment-service/appcontext/appcontext.go | 1 + 1 file changed, 1 insertion(+) diff --git a/treatment-service/appcontext/appcontext.go b/treatment-service/appcontext/appcontext.go index 8510d814..8e961263 100644 --- a/treatment-service/appcontext/appcontext.go +++ b/treatment-service/appcontext/appcontext.go @@ -93,6 +93,7 @@ func NewAppContext(cfg *config.Config) (*AppContext, error) { return nil, err } + log.Println("Initializing message queue subscriber...") var messageQueueService messagequeue.MessageQueueService switch cfg.MessageQueueConfig.Kind { case common_mq_config.NoopMQ: