diff --git a/bicep/modules/blade_common.bicep b/bicep/modules/blade_common.bicep index b9eabeed..dc830147 100644 --- a/bicep/modules/blade_common.bicep +++ b/bicep/modules/blade_common.bicep @@ -446,6 +446,10 @@ module csvDagShareUpload './script-share-csvdag/main.bicep' = { shareName: 'airflow-dags' filename: 'airflowdags' fileurl: 'https://community.opengroup.org/osdu/platform/data-flow/ingestion/csv-parser/csv-parser/-/archive/master/csv-parser-master.tar.gz' + keyVaultUrl: keyvault.outputs.uri + insightsKey: insights.outputs.instrumentationKey + clientId: applicationClientId + clientSecret: applicationClientSecret useExistingManagedIdentity: true managedIdentityName: deploymentScriptIdentity existingManagedIdentitySubId: subscription().subscriptionId diff --git a/bicep/modules/script-share-csvdag/README.md b/bicep/modules/script-share-csvdag/README.md new file mode 100644 index 00000000..dcc80c14 --- /dev/null +++ b/bicep/modules/script-share-csvdag/README.md @@ -0,0 +1,5 @@ +# NOTE + +This module is tightly coupled to the csv-parser dag. It is used to upload a file to a blob storage account and then execute a script on the file. + +It shouldn't be done this way and we have to move this to a kubernetesjob that can run a python script and just copy into a pvc mount. \ No newline at end of file diff --git a/bicep/modules/script-share-csvdag/main.bicep b/bicep/modules/script-share-csvdag/main.bicep index 6e857e7f..bf45aa1c 100644 --- a/bicep/modules/script-share-csvdag/main.bicep +++ b/bicep/modules/script-share-csvdag/main.bicep @@ -44,6 +44,17 @@ param initialScriptDelay string = '30s' @description('When the script resource is cleaned up') param cleanupPreference string = 'OnSuccess' +@description('Keyvault url') +param keyVaultUrl string + +@description('App Insights Instrumentation Key') +param insightsKey string + +@description('Client Id for the service principal') +param clientId string + +@description('Client Secret for the service principal') +param clientSecret string resource storageAccount 'Microsoft.Storage/storageAccounts@2023-04-01' existing = { name: storageAccountName @@ -69,16 +80,48 @@ resource rbac 'Microsoft.Authorization/roleAssignments@2022-04-01' = if (!empty( } } -var searchAndReplace = [ +var findAndReplace = [ { find: '{| DAG_NAME |}' replace: 'csv-parser' } { find: '{| DOCKER_IMAGE |}' - replace: 'msosdu.azurecr.io/csv-parser-msi:v5' + replace: 'community.opengroup.org:5555/osdu/platform/data-flow/ingestion/csv-parser/csv-parser/csv-parser-v0-27-0-azure-1:60747714ac490be0defe8f3e821497b3cce03390' + } + { + find: '{| NAMESPACE |}' + replace: 'airflow' + } + { + find: '{| K8S_POD_OPERATOR_KWARGS or {} |}' + replace: { + labels: { + aadpodidbinding: 'osdu-identity' + } + annotations: { + 'sidecar.istio.io/inject': 'false' + } + } + } + { + find: '{| ENV_VARS or {} |}' + replace: { + storage_service_endpoint: 'http://storage.osdu-core.svc.cluster.local/api/storage/v2' + schema_service_endpoint: 'http://schema.osdu-core.svc.cluster.local/api/schema-service/v1' + search_service_endpoint: 'http://search.osdu-core.svc.cluster.local/api/search/v2' + partition_service_endpoint: 'http://partition.osdu-core.svc.cluster.local/api/partition/v1' + unit_service_endpoint: 'http://unit.osdu-core.svc.cluster.local/api/unit/v2/unit/symbol' + file_service_endpoint: 'http://file.osdu-core.svc.cluster.local/api/file/v2' + KEYVAULT_URI: keyVaultUrl + appinsights_key: insightsKey + azure_paas_podidentity_isEnabled: 'false' + AZURE_TENANT_ID: subscription().tenantId + AZURE_CLIENT_ID: clientId + AZURE_CLIENT_SECRET: clientSecret + aad_client_id: clientId + } } - ] resource uploadFile 'Microsoft.Resources/deploymentScripts@2023-08-01' = { @@ -102,7 +145,7 @@ resource uploadFile 'Microsoft.Resources/deploymentScripts@2023-08-01' = { { name: 'URL', value: fileurl } { name: 'SHARE', value: shareName } { name: 'initialDelay', value: initialScriptDelay } - { name: 'SEARCH_AND_REPLACE', value: string(searchAndReplace) } + { name: 'SEARCH_AND_REPLACE', value: string(findAndReplace) } ] scriptContent: loadTextContent('script.sh') cleanupPreference: cleanupPreference diff --git a/bicep/modules/script-share-csvdag/script.sh b/bicep/modules/script-share-csvdag/script.sh index f1578e1b..ebab9ba1 100644 --- a/bicep/modules/script-share-csvdag/script.sh +++ b/bicep/modules/script-share-csvdag/script.sh @@ -22,14 +22,18 @@ set -e # # The SEARCH_AND_REPLACE variable is required for the script to perform the find/replace operations. - +# Ensure necessary packages are installed +apk add --no-cache curl zip jq echo "Waiting on Identity RBAC replication (${initialDelay})" sleep "${initialDelay}" -apk add --no-cache curl zip + +echo "###########################" +echo "${SEARCH_AND_REPLACE}" +echo "###########################" # Download the source code and extract it. -url_basename=$(basename ${URL}) +url_basename=$(basename "${URL}") echo "Derived filename from URL: ${url_basename}" echo "Downloading file from ${URL} to ${url_basename}" curl -so "${url_basename}" "${URL}" @@ -37,40 +41,94 @@ echo "Extracting tar.gz archive..." mkdir -p extracted_files tar -xzf "${url_basename}" --strip-components=1 -C extracted_files - -# Find and Replace. +# Process the replacements csv_file="extracted_files/${FILE}/csv_ingestion_all_steps.py" +output_file="extracted_files/${FILE}/csv-parser.py" + if [ -f "${csv_file}" ]; then echo "Processing ${csv_file} file" - # Escape patterns for sed - escape_sed_pattern() { - printf '%s' "$1" | sed 's/[\/&]/\\&/g; s/[][$.*^]/\\&/g' - } - escape_sed_replacement() { - printf '%s' "$1" | sed 's/[\/&]/\\&/g' - } - - # Create sed script from search and replace JSON - sed_script_file="sed_script.sed" - - echo "${SEARCH_AND_REPLACE}" | jq -c '.[]' | while IFS= read -r item; do - find=$(echo "$item" | jq -r '.find') - replace=$(echo "$item" | jq -r '.replace') - - find_escaped=$(escape_sed_pattern "$find") - replace_escaped=$(escape_sed_replacement "$replace") - - echo "find: ${find_escaped}" - echo "replace: ${replace_escaped}" - - echo "s/${find_escaped}/${replace_escaped}/g" >> "$sed_script_file" + # Number of replacements + num_replacements=$(echo "${SEARCH_AND_REPLACE}" | jq '. | length') + + # Initialize arrays + declare -a finds + declare -a replaces + declare -a replace_types + + # Build arrays + for (( idx=0; idx<${num_replacements}; idx++ )); do + finds[$idx]=$(echo "${SEARCH_AND_REPLACE}" | jq -r ".[$idx].find") + replace_type=$(echo "${SEARCH_AND_REPLACE}" | jq -r ".[$idx].replace | type") + replace_types[$idx]=$replace_type + if [ "$replace_type" == "string" ]; then + replaces[$idx]=$(echo "${SEARCH_AND_REPLACE}" | jq -r ".[$idx].replace") + else + replaces[$idx]=$(echo "${SEARCH_AND_REPLACE}" | jq -c ".[$idx].replace") + fi done - echo "Running sed script:" - cat "$sed_script_file" - sed -f "$sed_script_file" "$csv_file" > "extracted_files/${FILE}/csv-parser.py" - rm "$sed_script_file" + # Empty the output file + > "$output_file" + + # Read the input file line by line + while IFS= read -r line || [[ -n "$line" ]]; do + replaced=0 + # For each 'find'/'replace' pair + for idx in "${!finds[@]}"; do + find_placeholder="${finds[$idx]}" + replace_value="${replaces[$idx]}" + replace_type="${replace_types[$idx]}" + + if [[ "$line" == *"$find_placeholder"* ]]; then + # Line contains the placeholder + + if [ "$replace_type" == "object" ]; then + # 'replace_value' is a JSON object + + # Split the line at the placeholder + line_before_placeholder="${line%%$find_placeholder*}" + line_after_placeholder="${line#*$find_placeholder}" + + # Get the indentation of the line up to the placeholder + leading_spaces=$(echo "$line_before_placeholder" | sed -n 's/^\(\s*\).*$/\1/p') + + # Format the JSON with jq + formatted_json=$(echo "$replace_value" | jq '.') + + # Indent the JSON + indented_json=$(echo "$formatted_json" | sed "s/^/${leading_spaces}/") + + # Output the line before the placeholder (excluding placeholder) + echo -n "$line_before_placeholder" >> "$output_file" + + # Output the indented JSON + echo "$indented_json" >> "$output_file" + + # Output the rest of the line after the placeholder, if any + if [ -n "$line_after_placeholder" ]; then + echo "$line_after_placeholder" >> "$output_file" + fi + else + # 'replace_value' is a string + + # Replace the placeholder in the line + replaced_line="${line//$find_placeholder/$replace_value}" + + # Output the modified line + echo "$replaced_line" >> "$output_file" + fi + replaced=1 + break # Skip checking other placeholders for this line + fi + done + if [[ $replaced -eq 0 ]]; then + # Line did not contain any placeholder + echo "$line" >> "$output_file" + fi + done < "$csv_file" + + # Remove the original file rm "$csv_file" fi @@ -83,4 +141,4 @@ zip -r "${current_dir}/${zip_filename}" . cd - || exit 1 az storage file upload -s "${SHARE}" --source "${zip_filename}" -onone -echo "Zip file ${zip_filename} uploaded to file share ${SHARE}." +echo "Zip file ${zip_filename} uploaded to file share ${SHARE}." \ No newline at end of file diff --git a/dags/csv_parser.zip b/dags/csv_parser.zip deleted file mode 100644 index db866e70..00000000 Binary files a/dags/csv_parser.zip and /dev/null differ diff --git a/dags/test_fetch_remote.py b/dags/test_fetch_remote.py deleted file mode 100644 index 84054ecb..00000000 --- a/dags/test_fetch_remote.py +++ /dev/null @@ -1,39 +0,0 @@ -""" -Description: - This DAG tests the Airflow's ability to communicate with remote servers. -""" - -# import modules and functions -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.bash_operator import BashOperator - -# default DAG arguments -default_args = { - "owner": "OSDU", - "retries": 1, - "retry_delay": timedelta(minutes=5) -} - -# set DAG -dag = DAG( - "test_fetch_remote", - default_args = default_args, - description = "A simple DAG to test remote data fetching", - schedule_interval = "@daily", - start_date = datetime(2024, 1, 1), - catchup = False -) - - - -# set tasks -fetch_task = BashOperator( - task_id = "fetch_task", - bash_command = "curl --location 'https://postman-echo.com/get?foo1=bar1&foo2=bar2' ", # list services, processes and python packages - dag = dag -) - -# run tasks -fetch_task \ No newline at end of file diff --git a/dags/test_pip_packages.py b/dags/test_pip_packages.py deleted file mode 100644 index e3e20b47..00000000 --- a/dags/test_pip_packages.py +++ /dev/null @@ -1,36 +0,0 @@ -from airflow import DAG -from airflow.operators.python_operator import PythonOperator -from datetime import datetime -import subprocess - -# Define a function to get the list of installed pip packages -def list_installed_packages(): - # Run the pip freeze command to get installed packages - installed_packages = subprocess.check_output(['pip', 'freeze']).decode('utf-8') - - # Log the output to Airflow logs - print("Installed pip packages:") - print(installed_packages) - -# Define the DAG -default_args = { - 'owner': 'airflow', - 'start_date': datetime(2023, 1, 1), - 'retries': 1 -} - -with DAG( - dag_id='list_pip_packages', - default_args=default_args, - schedule_interval=None, - catchup=False -) as dag: - - # Define the task using PythonOperator - check_pip_packages = PythonOperator( - task_id='check_pip_packages', - python_callable=list_installed_packages - ) - - # Set task dependencies (if needed) - check_pip_packages