Skip to content

Commit

Permalink
Use bulk API for importing variables (apache#45744)
Browse files Browse the repository at this point in the history
* initial refactor

* colour

* remove import api

* remove import tests
  • Loading branch information
shubhamraj-git authored Jan 17, 2025
1 parent 3af9ddd commit 262537b
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 447 deletions.
92 changes: 0 additions & 92 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5989,68 +5989,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/variables/import:
post:
tags:
- Variable
summary: Import Variables
description: Import variables from a JSON file.
operationId: import_variables
parameters:
- name: action_if_exists
in: query
required: false
schema:
enum:
- overwrite
- fail
- skip
type: string
default: fail
title: Action If Exists
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/Body_import_variables'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/VariablesImportResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}:
get:
tags:
Expand Down Expand Up @@ -6629,16 +6567,6 @@ components:
- status
title: BaseInfoResponse
description: Base info serializer for responses.
Body_import_variables:
properties:
file:
type: string
format: binary
title: File
type: object
required:
- file
title: Body_import_variables
ClearTaskInstancesBody:
properties:
dry_run:
Expand Down Expand Up @@ -10066,26 +9994,6 @@ components:
- is_encrypted
title: VariableResponse
description: Variable serializer for responses.
VariablesImportResponse:
properties:
created_variable_keys:
items:
type: string
type: array
title: Created Variable Keys
import_count:
type: integer
title: Import Count
created_count:
type: integer
title: Created Count
type: object
required:
- created_variable_keys
- import_count
- created_count
title: VariablesImportResponse
description: Import Variables serializer for responses.
VersionInfo:
properties:
version:
Expand Down
59 changes: 2 additions & 57 deletions airflow/api_fastapi/core_api/routes/public/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
# under the License.
from __future__ import annotations

import json
from typing import Annotated, Literal
from typing import Annotated

from fastapi import Depends, HTTPException, Query, UploadFile, status
from fastapi import Depends, HTTPException, Query, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import select
Expand All @@ -39,7 +38,6 @@
VariableBulkResponse,
VariableCollectionResponse,
VariableResponse,
VariablesImportResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.services.public.variables import (
Expand Down Expand Up @@ -192,59 +190,6 @@ def post_variable(
return variable


@variables_router.post(
"/import",
status_code=status.HTTP_200_OK,
responses=create_openapi_http_exception_doc(
[status.HTTP_400_BAD_REQUEST, status.HTTP_409_CONFLICT, status.HTTP_422_UNPROCESSABLE_ENTITY]
),
)
def import_variables(
file: UploadFile,
session: SessionDep,
action_if_exists: Literal["overwrite", "fail", "skip"] = "fail",
) -> VariablesImportResponse:
"""Import variables from a JSON file."""
try:
file_content = file.file.read().decode("utf-8")
variables = json.loads(file_content)

if not isinstance(variables, dict):
raise ValueError("Uploaded JSON must contain key-value pairs.")
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON format: {e}")

if not variables:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="No variables found in the provided JSON.",
)

existing_keys = {variable for variable in session.execute(select(Variable.key)).scalars()}
import_keys = set(variables.keys())

matched_keys = existing_keys & import_keys

if action_if_exists == "fail" and matched_keys:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"The variables with these keys: {matched_keys} already exists.",
)
elif action_if_exists == "skip":
create_keys = import_keys - matched_keys
else:
create_keys = import_keys

for key in create_keys:
Variable.set(key=key, value=variables[key], session=session)

return VariablesImportResponse(
created_count=len(create_keys),
import_count=len(import_keys),
created_variable_keys=list(create_keys),
)


@variables_router.patch("")
def bulk_variables(
request: VariableBulkBody,
Expand Down
3 changes: 0 additions & 3 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1625,9 +1625,6 @@ export type PoolServicePostPoolMutationResult = Awaited<ReturnType<typeof PoolSe
export type VariableServicePostVariableMutationResult = Awaited<
ReturnType<typeof VariableService.postVariable>
>;
export type VariableServiceImportVariablesMutationResult = Awaited<
ReturnType<typeof VariableService.importVariables>
>;
export type BackfillServicePauseBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.pauseBackfill>
>;
Expand Down
41 changes: 0 additions & 41 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import {
} from "../requests/services.gen";
import {
BackfillPostBody,
Body_import_variables,
ClearTaskInstancesBody,
ConnectionBody,
ConnectionBulkBody,
Expand Down Expand Up @@ -3124,46 +3123,6 @@ export const useVariableServicePostVariable = <
VariableService.postVariable({ requestBody }) as unknown as Promise<TData>,
...options,
});
/**
* Import Variables
* Import variables from a JSON file.
* @param data The data for the request.
* @param data.formData
* @param data.actionIfExists
* @returns VariablesImportResponse Successful Response
* @throws ApiError
*/
export const useVariableServiceImportVariables = <
TData = Common.VariableServiceImportVariablesMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
actionIfExists?: "overwrite" | "fail" | "skip";
formData: Body_import_variables;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
actionIfExists?: "overwrite" | "fail" | "skip";
formData: Body_import_variables;
},
TContext
>({
mutationFn: ({ actionIfExists, formData }) =>
VariableService.importVariables({ actionIfExists, formData }) as unknown as Promise<TData>,
...options,
});
/**
* Pause Backfill
* @param data The data for the request.
Expand Down
37 changes: 0 additions & 37 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -482,19 +482,6 @@ export const $BaseInfoResponse = {
description: "Base info serializer for responses.",
} as const;

export const $Body_import_variables = {
properties: {
file: {
type: "string",
format: "binary",
title: "File",
},
},
type: "object",
required: ["file"],
title: "Body_import_variables",
} as const;

export const $ClearTaskInstancesBody = {
properties: {
dry_run: {
Expand Down Expand Up @@ -5749,30 +5736,6 @@ export const $VariableResponse = {
description: "Variable serializer for responses.",
} as const;

export const $VariablesImportResponse = {
properties: {
created_variable_keys: {
items: {
type: "string",
},
type: "array",
title: "Created Variable Keys",
},
import_count: {
type: "integer",
title: "Import Count",
},
created_count: {
type: "integer",
title: "Created Count",
},
},
type: "object",
required: ["created_variable_keys", "import_count", "created_count"],
title: "VariablesImportResponse",
description: "Import Variables serializer for responses.",
} as const;

export const $VersionInfo = {
properties: {
version: {
Expand Down
30 changes: 0 additions & 30 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ import type {
PostVariableResponse,
BulkVariablesData,
BulkVariablesResponse,
ImportVariablesData,
ImportVariablesResponse,
ReparseDagFileData,
ReparseDagFileResponse,
GetHealthResponse,
Expand Down Expand Up @@ -3095,34 +3093,6 @@ export class VariableService {
},
});
}

/**
* Import Variables
* Import variables from a JSON file.
* @param data The data for the request.
* @param data.formData
* @param data.actionIfExists
* @returns VariablesImportResponse Successful Response
* @throws ApiError
*/
public static importVariables(data: ImportVariablesData): CancelablePromise<ImportVariablesResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/variables/import",
query: {
action_if_exists: data.actionIfExists,
},
formData: data.formData,
mediaType: "multipart/form-data",
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
409: "Conflict",
422: "Unprocessable Entity",
},
});
}
}

export class DagParsingService {
Expand Down
Loading

0 comments on commit 262537b

Please sign in to comment.