Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/agent/dummy_data_gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (g *dummyDataGatherer) WaitForCacheSync(ctx context.Context) error {
return nil
}

func (c *dummyDataGatherer) Fetch() (any, int, error) {
func (c *dummyDataGatherer) Fetch(ctx context.Context) (any, int, error) {
var err error
if c.attemptNumber < c.FailedAttempts {
err = fmt.Errorf("First %d attempts will fail", c.FailedAttempts)
Expand Down
20 changes: 10 additions & 10 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ const schemaVersion string = "v2.0.0"

// Run starts the agent process
func Run(cmd *cobra.Command, args []string) (returnErr error) {
ctx, cancel := context.WithCancel(cmd.Context())
baseCtx, cancel := context.WithCancel(cmd.Context())
defer cancel()
log := klog.FromContext(ctx).WithName("Run")
log := klog.FromContext(baseCtx).WithName("Run")

log.Info("Starting", "version", version.PreflightVersion, "commit", version.Commit)

Expand All @@ -78,7 +78,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
return fmt.Errorf("While evaluating configuration: %v", err)
}

group, gctx := errgroup.WithContext(ctx)
group, gctx := errgroup.WithContext(baseCtx)
defer func() {
cancel()
if groupErr := group.Wait(); groupErr != nil {
Expand Down Expand Up @@ -123,13 +123,14 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
})

group.Go(func() error {
listenCtx := klog.NewContext(gctx, log)
err := listenAndServe(
klog.NewContext(gctx, log),
listenCtx,
&http.Server{
Addr: serverAddress,
Handler: server,
BaseContext: func(_ net.Listener) context.Context {
return gctx
return listenCtx
},
},
)
Expand Down Expand Up @@ -239,7 +240,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
// be cancelled, which will cause this blocking loop to exit
// instead of waiting for the time period.
for {
if err := gatherAndOutputData(klog.NewContext(ctx, log), eventf, config, preflightClient, dataGatherers); err != nil {
if err := gatherAndOutputData(gctx, eventf, config, preflightClient, dataGatherers); err != nil {
return err
}

Expand Down Expand Up @@ -316,7 +317,7 @@ func gatherAndOutputData(ctx context.Context, eventf Eventf, config CombinedConf
}
} else {
var err error
readings, err = gatherData(klog.NewContext(ctx, log), config, dataGatherers)
readings, err = gatherData(ctx, config, dataGatherers)
if err != nil {
return err
}
Expand All @@ -338,7 +339,7 @@ func gatherAndOutputData(ctx context.Context, eventf Eventf, config CombinedConf
postCtx, cancel := context.WithTimeout(ctx, config.BackoffMaxTime)
defer cancel()

return struct{}{}, postData(klog.NewContext(postCtx, log), config, preflightClient, readings)
return struct{}{}, postData(postCtx, config, preflightClient, readings)
}

group.Go(func() error {
Expand All @@ -361,7 +362,7 @@ func gatherData(ctx context.Context, config CombinedConfig, dataGatherers map[st

var dgError *multierror.Error
for k, dg := range dataGatherers {
dgData, count, err := dg.Fetch()
dgData, count, err := dg.Fetch(ctx)
if err != nil {
dgError = multierror.Append(dgError, fmt.Errorf("error in datagatherer %s: %w", k, err))

Expand Down Expand Up @@ -406,7 +407,6 @@ func gatherData(ctx context.Context, config CombinedConfig, dataGatherers map[st

func postData(ctx context.Context, config CombinedConfig, preflightClient client.Client, readings []*api.DataReading) error {
log := klog.FromContext(ctx).WithName("postData")
ctx = klog.NewContext(ctx, log)
err := preflightClient.PostDataReadingsWithOptions(ctx, readings, client.Options{
ClusterName: config.ClusterName,
ClusterDescription: config.ClusterDescription,
Expand Down
2 changes: 1 addition & 1 deletion pkg/datagatherer/datagatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type DataGatherer interface {
// Fetch retrieves data.
// count is the number of items that were discovered. A negative count means the number
// of items was indeterminate.
Fetch() (data any, count int, err error)
Fetch(ctx context.Context) (data any, count int, err error)
// Run starts the data gatherer's informers for resource collection.
// Returns error if the data gatherer informer wasn't initialized
Run(ctx context.Context) error
Expand Down
2 changes: 1 addition & 1 deletion pkg/datagatherer/k8sdiscovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (g *DataGathererDiscovery) WaitForCacheSync(ctx context.Context) error {
}

// Fetch will fetch discovery data from the apiserver, or return an error
func (g *DataGathererDiscovery) Fetch() (any, int, error) {
func (g *DataGathererDiscovery) Fetch(ctx context.Context) (any, int, error) {
data, err := g.cl.ServerVersion()
if err != nil {
return nil, -1, fmt.Errorf("failed to get server version: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/datagatherer/k8sdynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (g *DataGathererDynamic) WaitForCacheSync(ctx context.Context) error {

// Fetch will fetch the requested data from the apiserver, or return an error
// if fetching the data fails.
func (g *DataGathererDynamic) Fetch() (any, int, error) {
func (g *DataGathererDynamic) Fetch(ctx context.Context) (any, int, error) {
if g.groupVersionResource.String() == "" {
return nil, -1, fmt.Errorf("resource type must be specified")
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/datagatherer/k8sdynamic/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func sortGatheredResources(list []*api.GatheredResource) {

func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) {
ctx := t.Context()

config := ConfigDynamic{
ExcludeNamespaces: []string{"kube-system"},
GroupVersionResource: schema.GroupVersionResource{Group: "foobar", Version: "v1", Resource: "foos"},
Expand Down Expand Up @@ -748,7 +749,7 @@ func TestDynamicGatherer_Fetch(t *testing.T) {
if waitTimeout(&wg, 30*time.Second) {
t.Fatalf("unexpected timeout")
}
res, expectCount, err := dynamiDg.Fetch()
res, expectCount, err := dynamiDg.Fetch(ctx)
if err != nil && !tc.err {
t.Errorf("expected no error but got: %v", err)
}
Expand Down Expand Up @@ -1061,7 +1062,7 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) {
if waitTimeout(&wg, 5*time.Second) {
t.Fatalf("unexpected timeout")
}
rawRes, count, err := dynamiDg.Fetch()
rawRes, count, err := dynamiDg.Fetch(ctx)
if tc.err {
require.Error(t, err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/datagatherer/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (g *DataGatherer) WaitForCacheSync(ctx context.Context) error {
}

// Fetch loads and returns the data from the LocalDatagatherer's dataPath
func (g *DataGatherer) Fetch() (any, int, error) {
func (g *DataGatherer) Fetch(ctx context.Context) (any, int, error) {
dataBytes, err := os.ReadFile(g.dataPath)
if err != nil {
return nil, -1, err
Expand Down
4 changes: 1 addition & 3 deletions pkg/datagatherer/oidc/oidc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ func (g *DataGathererOIDC) WaitForCacheSync(ctx context.Context) error {
}

// Fetch will fetch the OIDC discovery document and JWKS from the cluster API server.
func (g *DataGathererOIDC) Fetch() (any, int, error) {
ctx := context.Background()

func (g *DataGathererOIDC) Fetch(ctx context.Context) (any, int, error) {
oidcResponse, oidcErr := g.fetchOIDCConfig(ctx)
jwksResponse, jwksErr := g.fetchJWKS(ctx)

Expand Down
4 changes: 2 additions & 2 deletions pkg/datagatherer/oidc/oidc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestFetch_Success(t *testing.T) {
rc := makeRESTClient(t, ts)
g := &DataGathererOIDC{cl: rc}

anyRes, count, err := g.Fetch()
anyRes, count, err := g.Fetch(t.Context())
require.NoError(t, err)
require.Equal(t, 1, count)

Expand Down Expand Up @@ -197,7 +197,7 @@ func TestFetch_Errors(t *testing.T) {
rc := makeRESTClient(t, ts)
g := &DataGathererOIDC{cl: rc}

anyRes, count, err := g.Fetch()
anyRes, count, err := g.Fetch(t.Context())
require.NoError(t, err)
require.Equal(t, 1, count)

Expand Down
Loading