diff --git a/packages/@aws-cdk/aws-glue/README.md b/packages/@aws-cdk/aws-glue/README.md index 20e08d7c14e31..f5e200f0465e7 100644 --- a/packages/@aws-cdk/aws-glue/README.md +++ b/packages/@aws-cdk/aws-glue/README.md @@ -23,6 +23,69 @@ This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aws-cdk) project. +## Job + +A `Job` encapsulates a script that connects to data sources, processes them, and then writes output to a data target. + +There are 3 types of jobs supported by AWS Glue: Spark ETL, Spark Streaming, and Python Shell jobs. + +The `glue.JobExecutable` allows you to specify the type of job, the language to use and the code assets required by the job. + +`glue.Code` allows you to refer to the different code assets required by the job, either from an existing S3 location or from a local file path. + +### Spark Jobs + +These jobs run in an Apache Spark environment managed by AWS Glue. + +#### ETL Jobs + +An ETL job processes data in batches using Apache Spark. + +```ts +new glue.Job(stack, 'ScalaSparkEtlJob', { + executable: glue.JobExecutable.scalaEtl({ + glueVersion: glue.GlueVersion.V2_0, + script: glue.Code.fromBucket(bucket, 'src/com/example/HelloWorld.scala'), + className: 'com.example.HelloWorld', + extraJars: [glue.Code.fromBucket(bucket, 'jars/HelloWorld.jar')], + }), + description: 'an example Scala ETL job', +}); +``` + +#### Streaming Jobs + +A Streaming job is similar to an ETL job, except that it performs ETL on data streams. It uses the Apache Spark Structured Streaming framework. Some Spark job features are not available to streaming ETL jobs. + +```ts +new glue.Job(stack, 'PythonSparkStreamingJob', { + executable: glue.JobExecutable.pythonStreaming({ + glueVersion: glue.GlueVersion.V2_0, + pythonVersion: glue.PythonVersion.THREE, + script: glue.Code.fromAsset(path.join(__dirname, 'job-script/hello_world.py')), + }), + description: 'an example Python Streaming job', +}); +``` + +### Python Shell Jobs + +A Python shell job runs Python scripts as a shell and supports a Python version that depends on the AWS Glue version you are using. +This can be used to schedule and run tasks that don't require an Apache Spark environment. + +```ts +new glue.Job(stack, 'PythonShellJob', { + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V1_0, + pythonVersion: PythonVersion.THREE, + script: glue.Code.fromBucket(bucket, 'script.py'), + }), + description: 'an example Python Shell job', +}); +``` + +See [documentation](https://docs.aws.amazon.com/glue/latest/dg/add-job.html) for more information on adding jobs in Glue. + ## Connection A `Connection` allows Glue jobs, crawlers and development endpoints to access certain types of data stores. For example, to create a network connection to connect to a data source within a VPC: @@ -41,16 +104,6 @@ If you need to use a connection type that doesn't exist as a static member on `C See [Adding a Connection to Your Data Store](https://docs.aws.amazon.com/glue/latest/dg/populate-add-connection.html) and [Connection Structure](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-connections.html#aws-glue-api-catalog-connections-Connection) documentation for more information on the supported data stores and their configurations. -## Database - -A `Database` is a logical grouping of `Tables` in the Glue Catalog. - -```ts -new glue.Database(stack, 'MyDatabase', { - databaseName: 'my_database' -}); -``` - ## SecurityConfiguration A `SecurityConfiguration` is a set of security properties that can be used by AWS Glue to encrypt data at rest. @@ -84,6 +137,15 @@ new glue.SecurityConfiguration(stack, 'MySecurityConfiguration', { See [documentation](https://docs.aws.amazon.com/glue/latest/dg/encryption-security-configuration.html) for more info for Glue encrypting data written by Crawlers, Jobs, and Development Endpoints. +## Database + +A `Database` is a logical grouping of `Tables` in the Glue Catalog. + +```ts +new glue.Database(stack, 'MyDatabase', { + databaseName: 'my_database' +}); +``` ## Table diff --git a/packages/@aws-cdk/aws-glue/lib/code.ts b/packages/@aws-cdk/aws-glue/lib/code.ts new file mode 100644 index 0000000000000..9f2f03d9884be --- /dev/null +++ b/packages/@aws-cdk/aws-glue/lib/code.ts @@ -0,0 +1,112 @@ +import * as crypto from 'crypto'; +import * as fs from 'fs'; +import * as iam from '@aws-cdk/aws-iam'; +import * as s3 from '@aws-cdk/aws-s3'; +import * as s3assets from '@aws-cdk/aws-s3-assets'; +import * as cdk from '@aws-cdk/core'; +import * as constructs from 'constructs'; + +/** + * Represents a Glue Job's Code assets (an asset can be a scripts, a jar, a python file or any other file). + */ +export abstract class Code { + + /** + * Job code as an S3 object. + * @param bucket The S3 bucket + * @param key The object key + */ + public static fromBucket(bucket: s3.IBucket, key: string): S3Code { + return new S3Code(bucket, key); + } + + /** + * Job code from a local disk path. + * + * @param path code file (not a directory). + */ + public static fromAsset(path: string, options?: s3assets.AssetOptions): AssetCode { + return new AssetCode(path, options); + } + + /** + * Called when the Job is initialized to allow this object to bind. + */ + public abstract bind(scope: constructs.Construct, grantable: iam.IGrantable): CodeConfig; +} + +/** + * Glue job Code from an S3 bucket. + */ +export class S3Code extends Code { + constructor(private readonly bucket: s3.IBucket, private readonly key: string) { + super(); + } + + public bind(_scope: constructs.Construct, grantable: iam.IGrantable): CodeConfig { + this.bucket.grantRead(grantable, this.key); + return { + s3Location: { + bucketName: this.bucket.bucketName, + objectKey: this.key, + }, + }; + } +} + +/** + * Job Code from a local file. + */ +export class AssetCode extends Code { + private asset?: s3assets.Asset; + + /** + * @param path The path to the Code file. + */ + constructor(private readonly path: string, private readonly options: s3assets.AssetOptions = { }) { + super(); + + if (fs.lstatSync(this.path).isDirectory()) { + throw new Error(`Code path ${this.path} is a directory. Only files are supported`); + } + } + + public bind(scope: constructs.Construct, grantable: iam.IGrantable): CodeConfig { + // If the same AssetCode is used multiple times, retain only the first instantiation. + if (!this.asset) { + this.asset = new s3assets.Asset(scope, `Code${this.hashcode(this.path)}`, { + path: this.path, + ...this.options, + }); + } else if (cdk.Stack.of(this.asset) !== cdk.Stack.of(scope)) { + throw new Error(`Asset is already associated with another stack '${cdk.Stack.of(this.asset).stackName}'. ` + + 'Create a new Code instance for every stack.'); + } + this.asset.grantRead(grantable); + return { + s3Location: { + bucketName: this.asset.s3BucketName, + objectKey: this.asset.s3ObjectKey, + }, + }; + } + + /** + * Hash a string + */ + private hashcode(s: string): string { + const hash = crypto.createHash('md5'); + hash.update(s); + return hash.digest('hex'); + }; +} + +/** + * Result of binding `Code` into a `Job`. + */ +export interface CodeConfig { + /** + * The location of the code in S3. + */ + readonly s3Location: s3.Location; +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue/lib/index.ts b/packages/@aws-cdk/aws-glue/lib/index.ts index a3dfa85b3be71..d1da5e9385349 100644 --- a/packages/@aws-cdk/aws-glue/lib/index.ts +++ b/packages/@aws-cdk/aws-glue/lib/index.ts @@ -4,6 +4,9 @@ export * from './glue.generated'; export * from './connection'; export * from './data-format'; export * from './database'; +export * from './job'; +export * from './job-executable'; +export * from './code'; export * from './schema'; export * from './security-configuration'; export * from './table'; \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue/lib/job-executable.ts b/packages/@aws-cdk/aws-glue/lib/job-executable.ts new file mode 100644 index 0000000000000..8fd7c39da5508 --- /dev/null +++ b/packages/@aws-cdk/aws-glue/lib/job-executable.ts @@ -0,0 +1,393 @@ +import { Code } from './code'; + +/** + * AWS Glue version determines the versions of Apache Spark and Python that are available to the job. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/add-job.html. + * + * If you need to use a GlueVersion that doesn't exist as a static member, you + * can instantiate a `GlueVersion` object, e.g: `GlueVersion.of('1.5')`. + */ +export class GlueVersion { + /** + * Glue version using Spark 2.2.1 and Python 2.7 + */ + public static readonly V0_9 = new GlueVersion('0.9'); + + /** + * Glue version using Spark 2.4.3, Python 2.7 and Python 3.6 + */ + public static readonly V1_0 = new GlueVersion('1.0'); + + /** + * Glue version using Spark 2.4.3 and Python 3.7 + */ + public static readonly V2_0 = new GlueVersion('2.0'); + + /** + * Glue version using Spark 3.1.1 and Python 3.7 + */ + public static readonly V3_0 = new GlueVersion('3.0'); + + /** + * Custom Glue version + * @param version custom version + */ + public static of(version: string): GlueVersion { + return new GlueVersion(version); + } + + /** + * The name of this GlueVersion, as expected by Job resource. + */ + public readonly name: string; + + private constructor(name: string) { + this.name = name; + } +} + +/** + * Runtime language of the Glue job + */ +export enum JobLanguage { + /** + * Scala + */ + SCALA = 'scala', + + /** + * Python + */ + PYTHON = 'python', +} + +/** + * Python version + */ +export enum PythonVersion { + /** + * Python 2 (the exact version depends on GlueVersion and JobCommand used) + */ + TWO = '2', + + /** + * Python 3 (the exact version depends on GlueVersion and JobCommand used) + */ + THREE = '3', +} + +/** + * The job type. + * + * If you need to use a JobType that doesn't exist as a static member, you + * can instantiate a `JobType` object, e.g: `JobType.of('other name')`. + */ +export class JobType { + /** + * Command for running a Glue ETL job. + */ + public static readonly ETL = new JobType('glueetl'); + + /** + * Command for running a Glue streaming job. + */ + public static readonly STREAMING = new JobType('gluestreaming'); + + /** + * Command for running a Glue python shell job. + */ + public static readonly PYTHON_SHELL = new JobType('pythonshell'); + + /** + * Custom type name + * @param name type name + */ + public static of(name: string): JobType { + return new JobType(name); + } + + /** + * The name of this JobType, as expected by Job resource. + */ + public readonly name: string; + + private constructor(name: string) { + this.name = name; + } +} + +interface PythonExecutableProps { + /** + * The Python version to use. + */ + readonly pythonVersion: PythonVersion; + + /** + * Additional Python files that AWS Glue adds to the Python path before executing your script. + * Only individual files are supported, directories are not supported. + * + * @default - no extra python files and argument is not set + * + * @see `--extra-py-files` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly extraPythonFiles?: Code[]; +} + +interface SharedJobExecutableProps { + /** + * Glue version. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/release-notes.html + */ + readonly glueVersion: GlueVersion; + + /** + * The script that executes a job. + */ + readonly script: Code; + + /** + * Additional files, such as configuration files that AWS Glue copies to the working directory of your script before executing it. + * Only individual files are supported, directories are not supported. + * + * @default [] - no extra files are copied to the working directory + * + * @see `--extra-files` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly extraFiles?: Code[]; +} + +interface SharedSparkJobExecutableProps extends SharedJobExecutableProps { + /** + * Additional Java .jar files that AWS Glue adds to the Java classpath before executing your script. + * Only individual files are supported, directories are not supported. + * + * @default [] - no extra jars are added to the classpath + * + * @see `--extra-jars` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly extraJars?: Code[]; + + /** + * Setting this value to true prioritizes the customer's extra JAR files in the classpath. + * + * @default false - priority is not given to user-provided jars + * + * @see `--user-jars-first` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly extraJarsFirst?: boolean; +} + +/** + * Props for creating a Scala Spark (ETL or Streaming) job executable + */ +export interface ScalaJobExecutableProps extends SharedSparkJobExecutableProps { + /** + * The fully qualified Scala class name that serves as the entry point for the job. + * + * @see `--class` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly className: string; +} + +/** + * Props for creating a Python Spark (ETL or Streaming) job executable + */ +export interface PythonSparkJobExecutableProps extends SharedSparkJobExecutableProps, PythonExecutableProps {} + +/** + * Props for creating a Python shell job executable + */ +export interface PythonShellExecutableProps extends SharedJobExecutableProps, PythonExecutableProps {} + +/** + * The executable properties related to the Glue job's GlueVersion, JobType and code + */ +export class JobExecutable { + + /** + * Create Scala executable props for Apache Spark ETL job. + * + * @param props Scala Apache Spark Job props + */ + public static scalaEtl(props: ScalaJobExecutableProps): JobExecutable { + return new JobExecutable({ + ...props, + type: JobType.ETL, + language: JobLanguage.SCALA, + }); + } + + /** + * Create Scala executable props for Apache Spark Streaming job. + * + * @param props Scala Apache Spark Job props + */ + public static scalaStreaming(props: ScalaJobExecutableProps): JobExecutable { + return new JobExecutable({ + ...props, + type: JobType.STREAMING, + language: JobLanguage.SCALA, + }); + } + + /** + * Create Python executable props for Apache Spark ETL job. + * + * @param props Python Apache Spark Job props + */ + public static pythonEtl(props: PythonSparkJobExecutableProps): JobExecutable { + return new JobExecutable({ + ...props, + type: JobType.ETL, + language: JobLanguage.PYTHON, + }); + } + + /** + * Create Python executable props for Apache Spark Streaming job. + * + * @param props Python Apache Spark Job props + */ + public static pythonStreaming(props: PythonSparkJobExecutableProps): JobExecutable { + return new JobExecutable({ + ...props, + type: JobType.STREAMING, + language: JobLanguage.PYTHON, + }); + } + + /** + * Create Python executable props for python shell jobs. + * + * @param props Python Shell Job props. + */ + public static pythonShell(props: PythonShellExecutableProps): JobExecutable { + return new JobExecutable({ + ...props, + type: JobType.PYTHON_SHELL, + language: JobLanguage.PYTHON, + }); + } + + /** + * Create a custom JobExecutable. + * + * @param config custom job executable configuration. + */ + public static of(config: JobExecutableConfig): JobExecutable { + return new JobExecutable(config); + } + + private config: JobExecutableConfig; + + private constructor(config: JobExecutableConfig) { + if (JobType.PYTHON_SHELL === config.type) { + if (config.language !== JobLanguage.PYTHON) { + throw new Error('Python shell requires the language to be set to Python'); + } + if ([GlueVersion.V0_9, GlueVersion.V2_0, GlueVersion.V3_0].includes(config.glueVersion)) { + throw new Error(`Specified GlueVersion ${config.glueVersion.name} does not support Python Shell`); + } + } + if (config.extraJarsFirst && [GlueVersion.V0_9, GlueVersion.V1_0].includes(config.glueVersion)) { + throw new Error(`Specified GlueVersion ${config.glueVersion.name} does not support extraJarsFirst`); + } + if (config.pythonVersion === PythonVersion.TWO && ![GlueVersion.V0_9, GlueVersion.V1_0].includes(config.glueVersion)) { + throw new Error(`Specified GlueVersion ${config.glueVersion.name} does not support PythonVersion ${config.pythonVersion}`); + } + if (JobLanguage.PYTHON !== config.language && config.extraPythonFiles) { + throw new Error('extraPythonFiles is not supported for languages other than JobLanguage.PYTHON'); + } + this.config = config; + } + + /** + * Called during Job initialization to get JobExecutableConfig. + */ + public bind(): JobExecutableConfig { + return this.config; + } +} + +/** + * Result of binding a `JobExecutable` into a `Job`. + */ +export interface JobExecutableConfig { + /** + * Glue version. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/release-notes.html + */ + readonly glueVersion: GlueVersion; + + /** + * The language of the job (Scala or Python). + * + * @see `--job-language` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly language: JobLanguage; + + /** + * Specify the type of the job whether it's an Apache Spark ETL or streaming one or if it's a Python shell job. + */ + readonly type: JobType; + + /** + * The Python version to use. + * + * @default - no python version specified + */ + readonly pythonVersion?: PythonVersion; + + /** + * The script that is executed by a job. + */ + readonly script: Code; + + /** + * The Scala class that serves as the entry point for the job. This applies only if your the job langauage is Scala. + * + * @default - no scala className specified + * + * @see `--class` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly className?: string; + + /** + * Additional Java .jar files that AWS Glue adds to the Java classpath before executing your script. + * + * @default - no extra jars specified. + * + * @see `--extra-jars` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly extraJars?: Code[]; + + /** + * Additional Python files that AWS Glue adds to the Python path before executing your script. + * + * @default - no extra python files specified. + * + * @see `--extra-py-files` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly extraPythonFiles?: Code[]; + + /** + * Additional files, such as configuration files that AWS Glue copies to the working directory of your script before executing it. + * + * @default - no extra files specified. + * + * @see `--extra-files` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly extraFiles?: Code[]; + + /** + * Setting this value to true prioritizes the customer's extra JAR files in the classpath. + * + * @default - extra jars are not prioritized. + * + * @see `--user-jars-first` in https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly extraJarsFirst?: boolean; +} diff --git a/packages/@aws-cdk/aws-glue/lib/job.ts b/packages/@aws-cdk/aws-glue/lib/job.ts new file mode 100644 index 0000000000000..0233783f94869 --- /dev/null +++ b/packages/@aws-cdk/aws-glue/lib/job.ts @@ -0,0 +1,803 @@ +import * as cloudwatch from '@aws-cdk/aws-cloudwatch'; +import * as events from '@aws-cdk/aws-events'; +import * as iam from '@aws-cdk/aws-iam'; +import * as logs from '@aws-cdk/aws-logs'; +import * as s3 from '@aws-cdk/aws-s3'; +import * as cdk from '@aws-cdk/core'; +import * as constructs from 'constructs'; +import { Code, JobExecutable, JobExecutableConfig, JobType } from '.'; +import { IConnection } from './connection'; +import { CfnJob } from './glue.generated'; +import { ISecurityConfiguration } from './security-configuration'; + +/** + * The type of predefined worker that is allocated when a job runs. + * + * If you need to use a WorkerType that doesn't exist as a static member, you + * can instantiate a `WorkerType` object, e.g: `WorkerType.of('other type')`. + */ +export class WorkerType { + /** + * Each worker provides 4 vCPU, 16 GB of memory and a 50GB disk, and 2 executors per worker. + */ + public static readonly STANDARD = new WorkerType('Standard'); + + /** + * Each worker maps to 1 DPU (4 vCPU, 16 GB of memory, 64 GB disk), and provides 1 executor per worker. Suitable for memory-intensive jobs. + */ + public static readonly G_1X = new WorkerType('G.1X'); + + /** + * Each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB disk), and provides 1 executor per worker. Suitable for memory-intensive jobs. + */ + public static readonly G_2X = new WorkerType('G.2X'); + + /** + * Custom worker type + * @param workerType custom worker type + */ + public static of(workerType: string): WorkerType { + return new WorkerType(workerType); + } + + /** + * The name of this WorkerType, as expected by Job resource. + */ + public readonly name: string; + + private constructor(name: string) { + this.name = name; + } +} + +/** + * Job states emitted by Glue to CloudWatch Events. + * + * @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#glue-event-types for more information. + */ +export enum JobState { + /** + * State indicating job run succeeded + */ + SUCCEEDED = 'SUCCEEDED', + + /** + * State indicating job run failed + */ + FAILED = 'FAILED', + + /** + * State indicating job run timed out + */ + TIMEOUT = 'TIMEOUT', + + /** + * State indicating job is starting + */ + STARTING = 'STARTING', + + /** + * State indicating job is running + */ + RUNNING = 'RUNNING', + + /** + * State indicating job is stopping + */ + STOPPING = 'STOPPING', + + /** + * State indicating job stopped + */ + STOPPED = 'STOPPED', +} + +/** + * The Glue CloudWatch metric type. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/monitoring-awsglue-with-cloudwatch-metrics.html + */ +export enum MetricType { + /** + * A value at a point in time. + */ + GAUGE = 'gauge', + + /** + * An aggregate number. + */ + COUNT = 'count', +} + +/** + * Interface representing a created or an imported {@link Job}. + */ +export interface IJob extends cdk.IResource, iam.IGrantable { + /** + * The name of the job. + * @attribute + */ + readonly jobName: string; + + /** + * The ARN of the job. + * @attribute + */ + readonly jobArn: string; + + /** + * Defines a CloudWatch event rule triggered when something happens with this job. + * + * @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#glue-event-types + */ + onEvent(id: string, options?: events.OnEventOptions): events.Rule; + + /** + * Defines a CloudWatch event rule triggered when this job moves to the input jobState. + * + * @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#glue-event-types + */ + onStateChange(id: string, jobState: JobState, options?: events.OnEventOptions): events.Rule; + + /** + * Defines a CloudWatch event rule triggered when this job moves to the SUCCEEDED state. + * + * @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#glue-event-types + */ + onSuccess(id: string, options?: events.OnEventOptions): events.Rule; + + /** + * Defines a CloudWatch event rule triggered when this job moves to the FAILED state. + * + * @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#glue-event-types + */ + onFailure(id: string, options?: events.OnEventOptions): events.Rule; + + /** + * Defines a CloudWatch event rule triggered when this job moves to the TIMEOUT state. + * + * @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#glue-event-types + */ + onTimeout(id: string, options?: events.OnEventOptions): events.Rule; + + /** + * Create a CloudWatch metric. + * + * @param metricName name of the metric typically prefixed with `glue.driver.`, `glue..` or `glue.ALL.`. + * @param type the metric type. + * @param props metric options. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/monitoring-awsglue-with-cloudwatch-metrics.html + */ + metric(metricName: string, type: MetricType, props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Create a CloudWatch Metric indicating job success. + */ + metricSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Create a CloudWatch Metric indicating job failure. + */ + metricFailure(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Create a CloudWatch Metric indicating job timeout. + */ + metricTimeout(props?: cloudwatch.MetricOptions): cloudwatch.Metric; +} + +abstract class JobBase extends cdk.Resource implements IJob { + + public abstract readonly jobArn: string; + public abstract readonly jobName: string; + public abstract readonly grantPrincipal: iam.IPrincipal; + + /** + * Create a CloudWatch Event Rule for this Glue Job when it's in a given state + * + * @param id construct id + * @param options event options. Note that some values are overridden if provided, these are + * - eventPattern.source = ['aws.glue'] + * - eventPattern.detailType = ['Glue Job State Change', 'Glue Job Run Status'] + * - eventPattern.detail.jobName = [this.jobName] + * + * @see https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#glue-event-types + */ + public onEvent(id: string, options: events.OnEventOptions = {}): events.Rule { + const rule = new events.Rule(this, id, options); + rule.addTarget(options.target); + rule.addEventPattern({ + source: ['aws.glue'], + detailType: ['Glue Job State Change', 'Glue Job Run Status'], + detail: { + jobName: [this.jobName], + }, + }); + return rule; + } + + /** + * Create a CloudWatch Event Rule for the transition into the input jobState. + * + * @param id construct id. + * @param jobState the job state. + * @param options optional event options. + */ + public onStateChange(id: string, jobState: JobState, options: events.OnEventOptions = {}): events.Rule { + const rule = this.onEvent(id, { + description: `Rule triggered when Glue job ${this.jobName} is in ${jobState} state`, + ...options, + }); + rule.addEventPattern({ + detail: { + state: [jobState], + }, + }); + return rule; + } + + /** + * Create a CloudWatch Event Rule matching JobState.SUCCEEDED. + * + * @param id construct id. + * @param options optional event options. default is {}. + */ + public onSuccess(id: string, options: events.OnEventOptions = {}): events.Rule { + return this.onStateChange(id, JobState.SUCCEEDED, options); + } + + /** + * Return a CloudWatch Event Rule matching FAILED state. + * + * @param id construct id. + * @param options optional event options. default is {}. + */ + public onFailure(id: string, options: events.OnEventOptions = {}): events.Rule { + return this.onStateChange(id, JobState.FAILED, options); + } + + /** + * Return a CloudWatch Event Rule matching TIMEOUT state. + * + * @param id construct id. + * @param options optional event options. default is {}. + */ + public onTimeout(id: string, options: events.OnEventOptions = {}): events.Rule { + return this.onStateChange(id, JobState.TIMEOUT, options); + } + + /** + * Create a CloudWatch metric. + * + * @param metricName name of the metric typically prefixed with `glue.driver.`, `glue..` or `glue.ALL.`. + * @param type the metric type. + * @param props metric options. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/monitoring-awsglue-with-cloudwatch-metrics.html + */ + public metric(metricName: string, type: MetricType, props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return new cloudwatch.Metric({ + metricName, + namespace: 'Glue', + dimensions: { + JobName: this.jobName, + JobRunId: 'ALL', + Type: type, + }, + ...props, + }).attachTo(this); + } + + /** + * Return a CloudWatch Metric indicating job success. + * + * This metric is based on the Rule returned by no-args onSuccess() call. + */ + public metricSuccess(props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return metricRule(this.metricJobStateRule('SuccessMetricRule', JobState.SUCCEEDED), props); + } + + /** + * Return a CloudWatch Metric indicating job failure. + * + * This metric is based on the Rule returned by no-args onFailure() call. + */ + public metricFailure(props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return metricRule(this.metricJobStateRule('FailureMetricRule', JobState.FAILED), props); + } + + /** + * Return a CloudWatch Metric indicating job timeout. + * + * This metric is based on the Rule returned by no-args onTimeout() call. + */ + public metricTimeout(props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return metricRule(this.metricJobStateRule('TimeoutMetricRule', JobState.TIMEOUT), props); + } + + /** + * Creates or retrieves a singleton event rule for the input job state for use with the metric JobState methods. + * + * @param id construct id. + * @param jobState the job state. + * @private + */ + private metricJobStateRule(id: string, jobState: JobState): events.Rule { + return this.node.tryFindChild(id) as events.Rule ?? this.onStateChange(id, jobState); + } +} + +/** + * Properties for enabling Spark UI monitoring feature for Spark-based Glue jobs. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui-jobs.html + * @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ +export interface SparkUIProps { + /** + * Enable Spark UI. + */ + readonly enabled: boolean + + /** + * The bucket where the Glue job stores the logs. + * + * @default a new bucket will be created. + */ + readonly bucket?: s3.IBucket; + + /** + * The path inside the bucket (objects prefix) where the Glue job stores the logs. + * + * @default '/' - the logs will be written at the root of the bucket + */ + readonly prefix?: string; +} + +/** + * The Spark UI logging location. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui-jobs.html + * @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ +export interface SparkUILoggingLocation { + /** + * The bucket where the Glue job stores the logs. + */ + readonly bucket: s3.IBucket; + + /** + * The path inside the bucket (objects prefix) where the Glue job stores the logs. + * + * @default '/' - the logs will be written at the root of the bucket + */ + readonly prefix?: string; +} + +/** + * Properties for enabling Continuous Logging for Glue Jobs. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/monitor-continuous-logging-enable.html + * @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ +export interface ContinuousLoggingProps { + /** + * Enable continouous logging. + */ + readonly enabled: boolean; + + /** + * Specify a custom CloudWatch log group name. + * + * @default - a log group is created with name `/aws-glue/jobs/logs-v2/`. + */ + readonly logGroup?: logs.ILogGroup; + + /** + * Specify a custom CloudWatch log stream prefix. + * + * @default - the job run ID. + */ + readonly logStreamPrefix?: string; + + /** + * Filter out non-useful Apache Spark driver/executor and Apache Hadoop YARN heartbeat log messages. + * + * @default true + */ + readonly quiet?: boolean; + + /** + * Apply the provided conversion pattern. + * + * This is a Log4j Conversion Pattern to customize driver and executor logs. + * + * @default `%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n` + */ + readonly conversionPattern?: string; +} + +/** + * Attributes for importing {@link Job}. + */ +export interface JobAttributes { + /** + * The name of the job. + */ + readonly jobName: string; + + /** + * The IAM role assumed by Glue to run this job. + * + * @default - undefined + */ + readonly role?: iam.IRole; +} + +/** + * Construction properties for {@link Job}. + */ +export interface JobProps { + /** + * The job's executable properties. + */ + readonly executable: JobExecutable; + + /** + * The name of the job. + * + * @default - a name is automatically generated + */ + readonly jobName?: string; + + /** + * The description of the job. + * + * @default - no value + */ + readonly description?: string; + + /** + * The number of AWS Glue data processing units (DPUs) that can be allocated when this job runs. + * Cannot be used for Glue version 2.0 and later - workerType and workerCount should be used instead. + * + * @default - 10 when job type is Apache Spark ETL or streaming, 0.0625 when job type is Python shell + */ + readonly maxCapacity?: number; + + /** + * The maximum number of times to retry this job after a job run fails. + * + * @default 0 + */ + readonly maxRetries?: number; + + /** + * The maximum number of concurrent runs allowed for the job. + * + * An error is returned when this threshold is reached. The maximum value you can specify is controlled by a service limit. + * + * @default 1 + */ + readonly maxConcurrentRuns?: number; + + /** + * The number of minutes to wait after a job run starts, before sending a job run delay notification. + * + * @default - no delay notifications + */ + readonly notifyDelayAfter?: cdk.Duration; + + /** + * The maximum time that a job run can consume resources before it is terminated and enters TIMEOUT status. + * + * @default cdk.Duration.hours(48) + */ + readonly timeout?: cdk.Duration; + + /** + * The type of predefined worker that is allocated when a job runs. + * + * @default - differs based on specific Glue version + */ + readonly workerType?: WorkerType; + + /** + * The number of workers of a defined {@link WorkerType} that are allocated when a job runs. + * + * @default - differs based on specific Glue version/worker type + */ + readonly workerCount?: number; + + /** + * The {@link Connection}s used for this job. + * + * Connections are used to connect to other AWS Service or resources within a VPC. + * + * @default [] - no connections are added to the job + */ + readonly connections?: IConnection[]; + + /** + * The {@link SecurityConfiguration} to use for this job. + * + * @default - no security configuration. + */ + readonly securityConfiguration?: ISecurityConfiguration; + + /** + * The default arguments for this job, specified as name-value pairs. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html for a list of reserved parameters + * @default - no arguments + */ + readonly defaultArguments?: { [key: string]: string }; + + /** + * The tags to add to the resources on which the job runs + * + * @default {} - no tags + */ + readonly tags?: { [key: string]: string }; + + /** + * The IAM role assumed by Glue to run this job. + * + * If providing a custom role, it needs to trust the Glue service principal (glue.amazonaws.com) and be granted sufficient permissions. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/getting-started-access.html + * + * @default - a role is automatically generated + */ + readonly role?: iam.IRole; + + /** + * Enables the collection of metrics for job profiling. + * + * @default - no profiling metrics emitted. + * + * @see `--enable-metrics` at https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly enableProfilingMetrics? :boolean; + + /** + * Enables the Spark UI debugging and monitoring with the specified props. + * + * @default - Spark UI debugging and monitoring is disabled. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui-jobs.html + * @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly sparkUI?: SparkUIProps, + + /** + * Enables continuous logging with the specified props. + * + * @default - continuous logging is disabled. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/monitor-continuous-logging-enable.html + * @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + readonly continuousLogging?: ContinuousLoggingProps, +} + +/** + * A Glue Job. + */ +export class Job extends JobBase { + /** + * Creates a Glue Job + * + * @param scope The scope creating construct (usually `this`). + * @param id The construct's id. + * @param attrs Import attributes + */ + public static fromJobAttributes(scope: constructs.Construct, id: string, attrs: JobAttributes): IJob { + class Import extends JobBase { + public readonly jobName = attrs.jobName; + public readonly jobArn = jobArn(scope, attrs.jobName); + public readonly grantPrincipal = attrs.role ?? new iam.UnknownPrincipal({ resource: this }); + } + + return new Import(scope, id); + } + + /** + * The ARN of the job. + */ + public readonly jobArn: string; + + /** + * The name of the job. + */ + public readonly jobName: string; + + /** + * The IAM role Glue assumes to run this job. + */ + public readonly role: iam.IRole; + + /** + * The principal this Glue Job is running as. + */ + public readonly grantPrincipal: iam.IPrincipal; + + /** + * The Spark UI logs location if Spark UI monitoring and debugging is enabled. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui-jobs.html + * @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + public readonly sparkUILoggingLocation?: SparkUILoggingLocation; + + constructor(scope: constructs.Construct, id: string, props: JobProps) { + super(scope, id, { + physicalName: props.jobName, + }); + + const executable = props.executable.bind(); + + this.role = props.role ?? new iam.Role(this, 'ServiceRole', { + assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'), + managedPolicies: [iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole')], + }); + this.grantPrincipal = this.role; + + const sparkUI = props.sparkUI?.enabled ? this.setupSparkUI(executable, this.role, props.sparkUI) : undefined; + this.sparkUILoggingLocation = sparkUI?.location; + const continuousLoggingArgs = props.continuousLogging?.enabled ? this.setupContinuousLogging(this.role, props.continuousLogging) : {}; + const profilingMetricsArgs = props.enableProfilingMetrics ? { '--enable-metrics': '' } : {}; + + const defaultArguments = { + ...this.executableArguments(executable), + ...continuousLoggingArgs, + ...profilingMetricsArgs, + ...sparkUI?.args, + ...this.checkNoReservedArgs(props.defaultArguments), + }; + + const jobResource = new CfnJob(this, 'Resource', { + name: props.jobName, + description: props.description, + role: this.role.roleArn, + command: { + name: executable.type.name, + scriptLocation: this.codeS3ObjectUrl(executable.script), + pythonVersion: executable.pythonVersion, + }, + glueVersion: executable.glueVersion.name, + workerType: props.workerType?.name, + numberOfWorkers: props.workerCount, + maxCapacity: props.maxCapacity, + maxRetries: props.maxRetries, + executionProperty: props.maxConcurrentRuns ? { maxConcurrentRuns: props.maxConcurrentRuns } : undefined, + notificationProperty: props.notifyDelayAfter ? { notifyDelayAfter: props.notifyDelayAfter.toMinutes() } : undefined, + timeout: props.timeout?.toMinutes(), + connections: props.connections ? { connections: props.connections.map((connection) => connection.connectionName) } : undefined, + securityConfiguration: props.securityConfiguration?.securityConfigurationName, + tags: props.tags, + defaultArguments, + }); + + const resourceName = this.getResourceNameAttribute(jobResource.ref); + this.jobArn = jobArn(this, resourceName); + this.jobName = resourceName; + } + + /** + * Check no usage of reserved arguments. + * + * @see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html + */ + private checkNoReservedArgs(defaultArguments?: { [key: string]: string }) { + if (defaultArguments) { + const reservedArgs = new Set(['--conf', '--debug', '--mode', '--JOB_NAME']); + Object.keys(defaultArguments).forEach((arg) => { + if (reservedArgs.has(arg)) { + throw new Error(`The ${arg} argument is reserved by Glue. Don't set it`); + } + }); + } + return defaultArguments; + } + + private executableArguments(config: JobExecutableConfig) { + const args: { [key: string]: string } = {}; + args['--job-language'] = config.language; + if (config.className) { + args['--class'] = config.className; + } + if (config.extraJars && config.extraJars?.length > 0) { + args['--extra-jars'] = config.extraJars.map(code => this.codeS3ObjectUrl(code)).join(','); + } + if (config.extraPythonFiles && config.extraPythonFiles.length > 0) { + args['--extra-py-files'] = config.extraPythonFiles.map(code => this.codeS3ObjectUrl(code)).join(','); + } + if (config.extraFiles && config.extraFiles.length > 0) { + args['--extra-files'] = config.extraFiles.map(code => this.codeS3ObjectUrl(code)).join(','); + } + if (config.extraJarsFirst) { + args['--user-jars-first'] = 'true'; + } + return args; + } + + private setupSparkUI(executable: JobExecutableConfig, role: iam.IRole, props: SparkUIProps) { + if (JobType.PYTHON_SHELL === executable.type) { + throw new Error('Spark UI is not available for JobType.PYTHON_SHELL jobs'); + } + + const bucket = props.bucket ?? new s3.Bucket(this, 'SparkUIBucket'); + bucket.grantReadWrite(role); + const args = { + '--enable-spark-ui': 'true', + '--spark-event-logs-path': bucket.s3UrlForObject(props.prefix), + }; + + return { + location: { + prefix: props.prefix, + bucket, + }, + args, + }; + } + + private setupContinuousLogging(role: iam.IRole, props: ContinuousLoggingProps) { + const args: {[key: string]: string} = { + '--enable-continuous-cloudwatch-log': 'true', + '--enable-continuous-log-filter': (props.quiet ?? true).toString(), + }; + + if (props.logGroup) { + args['--continuous-log-logGroup'] = props.logGroup.logGroupName; + props.logGroup.grantWrite(role); + } + + if (props.logStreamPrefix) { + args['--continuous-log-logStreamPrefix'] = props.logStreamPrefix; + } + if (props.conversionPattern) { + args['--continuous-log-conversionPattern'] = props.conversionPattern; + } + return args; + } + + private codeS3ObjectUrl(code: Code) { + const s3Location = code.bind(this, this.role).s3Location; + return `s3://${s3Location.bucketName}/${s3Location.objectKey}`; + } +} + +/** + * Create a CloudWatch Metric that's based on Glue Job events + * {@see https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#glue-event-types} + * The metric has namespace = 'AWS/Events', metricName = 'TriggeredRules' and RuleName = rule.ruleName dimension. + * + * @param rule for use in setting RuleName dimension value + * @param props metric properties + */ +function metricRule(rule: events.IRule, props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return new cloudwatch.Metric({ + namespace: 'AWS/Events', + metricName: 'TriggeredRules', + dimensions: { RuleName: rule.ruleName }, + statistic: cloudwatch.Statistic.SUM, + ...props, + }).attachTo(rule); +} + + +/** + * Returns the job arn + * @param scope + * @param jobName + */ +function jobArn(scope: constructs.Construct, jobName: string) : string { + return cdk.Stack.of(scope).formatArn({ + service: 'glue', + resource: 'job', + resourceName: jobName, + }); +} diff --git a/packages/@aws-cdk/aws-glue/package.json b/packages/@aws-cdk/aws-glue/package.json index 5f11a3895db92..daaba9400606c 100644 --- a/packages/@aws-cdk/aws-glue/package.json +++ b/packages/@aws-cdk/aws-glue/package.json @@ -84,19 +84,29 @@ "pkglint": "0.0.0" }, "dependencies": { + "@aws-cdk/assets": "0.0.0", + "@aws-cdk/aws-cloudwatch": "0.0.0", + "@aws-cdk/aws-events": "0.0.0", "@aws-cdk/aws-ec2": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", + "@aws-cdk/aws-logs": "0.0.0", "@aws-cdk/aws-kms": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", + "@aws-cdk/aws-s3-assets": "0.0.0", "@aws-cdk/core": "0.0.0", "constructs": "^3.3.69" }, "homepage": "https://github.com/aws/aws-cdk", "peerDependencies": { + "@aws-cdk/assets": "0.0.0", + "@aws-cdk/aws-cloudwatch": "0.0.0", + "@aws-cdk/aws-events": "0.0.0", "@aws-cdk/aws-ec2": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", + "@aws-cdk/aws-logs": "0.0.0", "@aws-cdk/aws-kms": "0.0.0", "@aws-cdk/aws-s3": "0.0.0", + "@aws-cdk/aws-s3-assets": "0.0.0", "@aws-cdk/core": "0.0.0", "constructs": "^3.3.69" }, @@ -143,7 +153,8 @@ "docs-public-apis:@aws-cdk/aws-glue.ClassificationString.XML", "docs-public-apis:@aws-cdk/aws-glue.ClassificationString.PARQUET", "docs-public-apis:@aws-cdk/aws-glue.ClassificationString.ORC", - "docs-public-apis:@aws-cdk/aws-glue.ClassificationString.value" + "docs-public-apis:@aws-cdk/aws-glue.ClassificationString.value", + "events-method-signature:@aws-cdk/aws-glue.Job.onStateChange" ] }, "awscdkio": { diff --git a/packages/@aws-cdk/aws-glue/test/code.test.ts b/packages/@aws-cdk/aws-glue/test/code.test.ts new file mode 100644 index 0000000000000..061f6d26c351f --- /dev/null +++ b/packages/@aws-cdk/aws-glue/test/code.test.ts @@ -0,0 +1,304 @@ +import * as path from 'path'; +import { Template } from '@aws-cdk/assertions'; +import * as s3 from '@aws-cdk/aws-s3'; +import * as cdk from '@aws-cdk/core'; +import * as glue from '../lib'; + +describe('Code', () => { + let stack: cdk.Stack; + let script: glue.Code; + + beforeEach(() => { + stack = new cdk.Stack(); + }); + + describe('.fromBucket()', () => { + const key = 'script'; + let bucket: s3.IBucket; + + test('with valid bucket name and key and bound by job sets the right path and grants the job permissions to read from it', () => { + bucket = s3.Bucket.fromBucketName(stack, 'Bucket', 'bucketName'); + script = glue.Code.fromBucket(bucket, key); + new glue.Job(stack, 'Job1', { + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V1_0, + pythonVersion: glue.PythonVersion.THREE, + script, + }), + }); + + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + Command: { + ScriptLocation: 's3://bucketName/script', + }, + }); + + // Role policy should grant reading from the assets bucket + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 's3:GetObject*', + 's3:GetBucket*', + 's3:List*', + ], + Effect: 'Allow', + Resource: [ + { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':s3:::bucketName', + ], + ], + }, + { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':s3:::bucketName/script', + ], + ], + }, + ], + }, + ], + }, + Roles: [ + { + Ref: 'Job1ServiceRole7AF34CCA', + }, + ], + }); + }); + }); + + describe('.fromAsset()', () => { + const filePath = path.join(__dirname, 'job-script/hello_world.py'); + const directoryPath = path.join(__dirname, 'job-script'); + + beforeEach(() => { + script = glue.Code.fromAsset(filePath); + }); + + test("with valid and existing file path and bound to job sets job's script location and permissions stack metadata", () => { + new glue.Job(stack, 'Job1', { + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V1_0, + pythonVersion: glue.PythonVersion.THREE, + script, + }), + }); + + expect(stack.node.metadata.find(m => m.type === 'aws:cdk:asset')).toBeDefined(); + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + Command: { + ScriptLocation: { + 'Fn::Join': [ + '', + [ + 's3://', + { + Ref: 'AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469', + }, + '/', + { + 'Fn::Select': [ + 0, + { + 'Fn::Split': [ + '||', + { + Ref: 'AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763', + }, + ], + }, + ], + }, + { + 'Fn::Select': [ + 1, + { + 'Fn::Split': [ + '||', + { + Ref: 'AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763', + }, + ], + }, + ], + }, + ], + ], + }, + }, + }); + // Role policy should grant reading from the assets bucket + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 's3:GetObject*', + 's3:GetBucket*', + 's3:List*', + ], + Effect: 'Allow', + Resource: [ + { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':s3:::', + { + Ref: 'AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469', + }, + ], + ], + }, + { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':s3:::', + { + Ref: 'AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469', + }, + '/*', + ], + ], + }, + ], + }, + ], + }, + Roles: [ + { + Ref: 'Job1ServiceRole7AF34CCA', + }, + ], + }); + }); + + test('with an unsupported directory path throws', () => { + expect(() => glue.Code.fromAsset(directoryPath)) + .toThrow(/Only files are supported/); + }); + + test('used in more than 1 job in the same stack should be reused', () => { + new glue.Job(stack, 'Job1', { + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V1_0, + pythonVersion: glue.PythonVersion.THREE, + script, + }), + }); + new glue.Job(stack, 'Job2', { + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V1_0, + pythonVersion: glue.PythonVersion.THREE, + script, + }), + }); + const ScriptLocation = { + 'Fn::Join': [ + '', + [ + 's3://', + { + Ref: 'AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469', + }, + '/', + { + 'Fn::Select': [ + 0, + { + 'Fn::Split': [ + '||', + { + Ref: 'AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763', + }, + ], + }, + ], + }, + { + 'Fn::Select': [ + 1, + { + 'Fn::Split': [ + '||', + { + Ref: 'AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763', + }, + ], + }, + ], + }, + ], + ], + }; + + expect(stack.node.metadata.find(m => m.type === 'aws:cdk:asset')).toBeDefined(); + // Job1 and Job2 use reuse the asset + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + Command: { + ScriptLocation, + }, + Role: { + 'Fn::GetAtt': [ + 'Job1ServiceRole7AF34CCA', + 'Arn', + ], + }, + }); + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + Command: { + ScriptLocation, + }, + Role: { + 'Fn::GetAtt': [ + 'Job2ServiceRole5D2B98FE', + 'Arn', + ], + }, + }); + }); + + test('throws if trying to rebind in another stack', () => { + new glue.Job(stack, 'Job1', { + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V1_0, + pythonVersion: glue.PythonVersion.THREE, + script, + }), + }); + const differentStack = new cdk.Stack(); + + expect(() => new glue.Job(differentStack, 'Job2', { + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V1_0, + pythonVersion: glue.PythonVersion.THREE, + script: script, + }), + })).toThrow(/associated with another stack/); + }); + }); +}); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue/test/integ.job.expected.json b/packages/@aws-cdk/aws-glue/test/integ.job.expected.json new file mode 100644 index 0000000000000..61f4f60434db1 --- /dev/null +++ b/packages/@aws-cdk/aws-glue/test/integ.job.expected.json @@ -0,0 +1,571 @@ +{ + "Resources": { + "EtlJobServiceRole837F781B": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "glue.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSGlueServiceRole" + ] + ] + } + ] + } + }, + "EtlJobServiceRoleDefaultPolicy8BFE343B": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "s3:GetObject*", + "s3:GetBucket*", + "s3:List*", + "s3:DeleteObject*", + "s3:PutObject", + "s3:Abort*" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::GetAtt": [ + "EtlJobSparkUIBucketBF23744B", + "Arn" + ] + }, + { + "Fn::Join": [ + "", + [ + { + "Fn::GetAtt": [ + "EtlJobSparkUIBucketBF23744B", + "Arn" + ] + }, + "/*" + ] + ] + } + ] + }, + { + "Action": [ + "s3:GetObject*", + "s3:GetBucket*", + "s3:List*" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":s3:::", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469" + } + ] + ] + }, + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":s3:::", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469" + }, + "/*" + ] + ] + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "EtlJobServiceRoleDefaultPolicy8BFE343B", + "Roles": [ + { + "Ref": "EtlJobServiceRole837F781B" + } + ] + } + }, + "EtlJobSparkUIBucketBF23744B": { + "Type": "AWS::S3::Bucket", + "UpdateReplacePolicy": "Retain", + "DeletionPolicy": "Retain" + }, + "EtlJob7FC88E45": { + "Type": "AWS::Glue::Job", + "Properties": { + "Command": { + "Name": "glueetl", + "PythonVersion": "3", + "ScriptLocation": { + "Fn::Join": [ + "", + [ + "s3://", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469" + }, + "/", + { + "Fn::Select": [ + 0, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763" + } + ] + } + ] + }, + { + "Fn::Select": [ + 1, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763" + } + ] + } + ] + } + ] + ] + } + }, + "Role": { + "Fn::GetAtt": [ + "EtlJobServiceRole837F781B", + "Arn" + ] + }, + "DefaultArguments": { + "--job-language": "python", + "--enable-continuous-cloudwatch-log": "true", + "--enable-continuous-log-filter": "true", + "--continuous-log-logStreamPrefix": "EtlJob", + "--enable-spark-ui": "true", + "--spark-event-logs-path": { + "Fn::Join": [ + "", + [ + "s3://", + { + "Ref": "EtlJobSparkUIBucketBF23744B" + } + ] + ] + }, + "arg1": "value1", + "arg2": "value2" + }, + "ExecutionProperty": { + "MaxConcurrentRuns": 2 + }, + "GlueVersion": "2.0", + "MaxRetries": 2, + "Name": "EtlJob", + "NotificationProperty": { + "NotifyDelayAfter": 1 + }, + "NumberOfWorkers": 10, + "Tags": { + "key": "value" + }, + "Timeout": 5, + "WorkerType": "G.2X" + } + }, + "EtlJobSuccessMetricRuleA72A3EF6": { + "Type": "AWS::Events::Rule", + "Properties": { + "Description": { + "Fn::Join": [ + "", + [ + "Rule triggered when Glue job ", + { + "Ref": "EtlJob7FC88E45" + }, + " is in SUCCEEDED state" + ] + ] + }, + "EventPattern": { + "source": [ + "aws.glue" + ], + "detail-type": [ + "Glue Job State Change", + "Glue Job Run Status" + ], + "detail": { + "jobName": [ + { + "Ref": "EtlJob7FC88E45" + } + ], + "state": [ + "SUCCEEDED" + ] + } + }, + "State": "ENABLED" + } + }, + "StreamingJobServiceRole1B4B8BF9": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "glue.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSGlueServiceRole" + ] + ] + } + ] + } + }, + "StreamingJobServiceRoleDefaultPolicyA0CC4C68": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "s3:GetObject*", + "s3:GetBucket*", + "s3:List*" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":s3:::", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469" + } + ] + ] + }, + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":s3:::", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469" + }, + "/*" + ] + ] + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "StreamingJobServiceRoleDefaultPolicyA0CC4C68", + "Roles": [ + { + "Ref": "StreamingJobServiceRole1B4B8BF9" + } + ] + } + }, + "StreamingJob3783CC17": { + "Type": "AWS::Glue::Job", + "Properties": { + "Command": { + "Name": "gluestreaming", + "PythonVersion": "3", + "ScriptLocation": { + "Fn::Join": [ + "", + [ + "s3://", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469" + }, + "/", + { + "Fn::Select": [ + 0, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763" + } + ] + } + ] + }, + { + "Fn::Select": [ + 1, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763" + } + ] + } + ] + } + ] + ] + } + }, + "Role": { + "Fn::GetAtt": [ + "StreamingJobServiceRole1B4B8BF9", + "Arn" + ] + }, + "DefaultArguments": { + "--job-language": "python", + "arg1": "value1", + "arg2": "value2" + }, + "GlueVersion": "2.0", + "Name": "StreamingJob", + "Tags": { + "key": "value" + } + } + }, + "ShellJobServiceRoleCF97BC4B": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "glue.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSGlueServiceRole" + ] + ] + } + ] + } + }, + "ShellJobServiceRoleDefaultPolicy7F22D315": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "s3:GetObject*", + "s3:GetBucket*", + "s3:List*" + ], + "Effect": "Allow", + "Resource": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":s3:::", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469" + } + ] + ] + }, + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":s3:::", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469" + }, + "/*" + ] + ] + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "ShellJobServiceRoleDefaultPolicy7F22D315", + "Roles": [ + { + "Ref": "ShellJobServiceRoleCF97BC4B" + } + ] + } + }, + "ShellJob42E81F95": { + "Type": "AWS::Glue::Job", + "Properties": { + "Command": { + "Name": "pythonshell", + "PythonVersion": "3", + "ScriptLocation": { + "Fn::Join": [ + "", + [ + "s3://", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469" + }, + "/", + { + "Fn::Select": [ + 0, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763" + } + ] + } + ] + }, + { + "Fn::Select": [ + 1, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763" + } + ] + } + ] + } + ] + ] + } + }, + "Role": { + "Fn::GetAtt": [ + "ShellJobServiceRoleCF97BC4B", + "Arn" + ] + }, + "DefaultArguments": { + "--job-language": "python", + "arg1": "value1", + "arg2": "value2" + }, + "GlueVersion": "1.0", + "Name": "ShellJob", + "Tags": { + "key": "value" + } + } + } + }, + "Parameters": { + "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3Bucket4E517469": { + "Type": "String", + "Description": "S3 bucket for asset \"432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855\"" + }, + "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855S3VersionKeyF7753763": { + "Type": "String", + "Description": "S3 key for asset version \"432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855\"" + }, + "AssetParameters432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855ArtifactHash0C610005": { + "Type": "String", + "Description": "Artifact hash for asset \"432033e3218068a915d2532fa9be7858a12b228a2ae6e5c10faccd9097b1e855\"" + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue/test/integ.job.ts b/packages/@aws-cdk/aws-glue/test/integ.job.ts new file mode 100644 index 0000000000000..fedbc0b8b8428 --- /dev/null +++ b/packages/@aws-cdk/aws-glue/test/integ.job.ts @@ -0,0 +1,89 @@ +import * as path from 'path'; +import * as cdk from '@aws-cdk/core'; +import * as glue from '../lib'; + +/** + * To verify the ability to run jobs created in this test + * + * Run the job using + * `aws glue start-job-run --region us-east-1 --job-name ` + * This will return a runId + * + * Get the status of the job run using + * `aws glue get-job-run --region us-east-1 --job-name --run-id ` + * + * For example, to test the ShellJob + * - Run: `aws glue start-job-run --region us-east-1 --job-name ShellJob` + * - Get Status: `aws glue get-job-run --region us-east-1 --job-name ShellJob --run-id ` + * - Check output: `aws logs get-log-events --region us-east-1 --log-group-name "/aws-glue/python-jobs/output" --log-stream-name ">` which should show "hello world" + */ +const app = new cdk.App(); + +const stack = new cdk.Stack(app, 'aws-glue-job'); + +const script = glue.Code.fromAsset(path.join(__dirname, 'job-script/hello_world.py')); + +const etlJob = new glue.Job(stack, 'EtlJob', { + jobName: 'EtlJob', + executable: glue.JobExecutable.pythonEtl({ + glueVersion: glue.GlueVersion.V2_0, + pythonVersion: glue.PythonVersion.THREE, + script, + }), + workerType: glue.WorkerType.G_2X, + workerCount: 10, + maxConcurrentRuns: 2, + maxRetries: 2, + timeout: cdk.Duration.minutes(5), + notifyDelayAfter: cdk.Duration.minutes(1), + defaultArguments: { + arg1: 'value1', + arg2: 'value2', + }, + sparkUI: { + enabled: true, + }, + continuousLogging: { + enabled: true, + quiet: true, + logStreamPrefix: 'EtlJob', + }, + tags: { + key: 'value', + }, +}); +etlJob.metricSuccess(); + +new glue.Job(stack, 'StreamingJob', { + jobName: 'StreamingJob', + executable: glue.JobExecutable.pythonStreaming({ + glueVersion: glue.GlueVersion.V2_0, + pythonVersion: glue.PythonVersion.THREE, + script, + }), + defaultArguments: { + arg1: 'value1', + arg2: 'value2', + }, + tags: { + key: 'value', + }, +}); + +new glue.Job(stack, 'ShellJob', { + jobName: 'ShellJob', + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V1_0, + pythonVersion: glue.PythonVersion.THREE, + script, + }), + defaultArguments: { + arg1: 'value1', + arg2: 'value2', + }, + tags: { + key: 'value', + }, +}); + +app.synth(); diff --git a/packages/@aws-cdk/aws-glue/test/job-executable.test.ts b/packages/@aws-cdk/aws-glue/test/job-executable.test.ts new file mode 100644 index 0000000000000..481bd16dc8944 --- /dev/null +++ b/packages/@aws-cdk/aws-glue/test/job-executable.test.ts @@ -0,0 +1,106 @@ +import * as s3 from '@aws-cdk/aws-s3'; +import * as cdk from '@aws-cdk/core'; +import * as glue from '../lib'; + +describe('GlueVersion', () => { + test('.V0_9 should set the name correctly', () => expect(glue.GlueVersion.V0_9.name).toEqual('0.9')); + + test('.V1_0 should set the name correctly', () => expect(glue.GlueVersion.V1_0.name).toEqual('1.0')); + + test('.V2_0 should set the name correctly', () => expect(glue.GlueVersion.V2_0.name).toEqual('2.0')); + + test('.V3_0 should set the name correctly', () => expect(glue.GlueVersion.V3_0.name).toEqual('3.0')); + + test('of(customVersion) should set the name correctly', () => expect(glue.GlueVersion.of('CustomVersion').name).toEqual('CustomVersion')); +}); + +describe('JobType', () => { + test('.ETL should set the name correctly', () => expect(glue.JobType.ETL.name).toEqual('glueetl')); + + test('.STREAMING should set the name correctly', () => expect(glue.JobType.STREAMING.name).toEqual('gluestreaming')); + + test('.PYTHON_SHELL should set the name correctly', () => expect(glue.JobType.PYTHON_SHELL.name).toEqual('pythonshell')); + + test('of(customName) should set the name correctly', () => expect(glue.JobType.of('CustomName').name).toEqual('CustomName')); +}); + +describe('JobExecutable', () => { + let stack: cdk.Stack; + let bucket: s3.IBucket; + let script: glue.Code; + + beforeEach(() => { + stack = new cdk.Stack(); + bucket = s3.Bucket.fromBucketName(stack, 'Bucket', 'bucketName'); + script = glue.Code.fromBucket(bucket, 'script.py'); + }); + + describe('.of()', () => { + test('with valid config should succeed', () => { + expect(glue.JobExecutable.of({ + glueVersion: glue.GlueVersion.V1_0, + type: glue.JobType.PYTHON_SHELL, + language: glue.JobLanguage.PYTHON, + pythonVersion: glue.PythonVersion.THREE, + script, + })).toBeDefined(); + }); + + test('with JobType.PYTHON_SHELL and a language other than JobLanguage.PYTHON should throw', () => { + expect(() => glue.JobExecutable.of({ + glueVersion: glue.GlueVersion.V3_0, + type: glue.JobType.PYTHON_SHELL, + language: glue.JobLanguage.SCALA, + script, + })).toThrow(/Python shell requires the language to be set to Python/); + }); + + test('with a non JobLanguage.PYTHON and extraPythonFiles set should throw', () => { + expect(() => glue.JobExecutable.of({ + glueVersion: glue.GlueVersion.V3_0, + type: glue.JobType.ETL, + language: glue.JobLanguage.SCALA, + className: 'com.Test', + extraPythonFiles: [script], + script, + })).toThrow(/extraPythonFiles is not supported for languages other than JobLanguage.PYTHON/); + }); + + [glue.GlueVersion.V0_9, glue.GlueVersion.V2_0, glue.GlueVersion.V3_0].forEach((glueVersion) => { + test(`with JobType.PYTHON_SHELL and GlueVersion ${glueVersion} should throw`, () => { + expect(() => glue.JobExecutable.of({ + type: glue.JobType.PYTHON_SHELL, + language: glue.JobLanguage.PYTHON, + pythonVersion: glue.PythonVersion.TWO, + script, + glueVersion, + })).toThrow(`Specified GlueVersion ${glueVersion.name} does not support Python Shell`); + }); + }); + + [glue.GlueVersion.V0_9, glue.GlueVersion.V1_0].forEach((glueVersion) => { + test(`with extraJarsFirst set and GlueVersion ${glueVersion.name} should throw`, () => { + expect(() => glue.JobExecutable.of({ + type: glue.JobType.ETL, + language: glue.JobLanguage.PYTHON, + pythonVersion: glue.PythonVersion.TWO, + extraJarsFirst: true, + script, + glueVersion, + })).toThrow(`Specified GlueVersion ${glueVersion.name} does not support extraJarsFirst`); + }); + }); + + [glue.GlueVersion.V2_0, glue.GlueVersion.V3_0].forEach((glueVersion) => { + test(`with PythonVersion.TWO and GlueVersion ${glueVersion} should throw`, () => { + expect(() => glue.JobExecutable.of({ + type: glue.JobType.ETL, + language: glue.JobLanguage.PYTHON, + pythonVersion: glue.PythonVersion.TWO, + script, + glueVersion, + })).toThrow(`Specified GlueVersion ${glueVersion.name} does not support PythonVersion 2`); + }); + }); + }); +}); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue/test/job-script/hello_world.py b/packages/@aws-cdk/aws-glue/test/job-script/hello_world.py new file mode 100644 index 0000000000000..e75154b7c390f --- /dev/null +++ b/packages/@aws-cdk/aws-glue/test/job-script/hello_world.py @@ -0,0 +1 @@ +print("hello world") \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue/test/job-script/hello_world_2.py b/packages/@aws-cdk/aws-glue/test/job-script/hello_world_2.py new file mode 100644 index 0000000000000..e75154b7c390f --- /dev/null +++ b/packages/@aws-cdk/aws-glue/test/job-script/hello_world_2.py @@ -0,0 +1 @@ +print("hello world") \ No newline at end of file diff --git a/packages/@aws-cdk/aws-glue/test/job.test.ts b/packages/@aws-cdk/aws-glue/test/job.test.ts new file mode 100644 index 0000000000000..625e4743570fd --- /dev/null +++ b/packages/@aws-cdk/aws-glue/test/job.test.ts @@ -0,0 +1,842 @@ +import { Template } from '@aws-cdk/assertions'; +import * as cloudwatch from '@aws-cdk/aws-cloudwatch'; +import * as events from '@aws-cdk/aws-events'; +import * as iam from '@aws-cdk/aws-iam'; +import * as logs from '@aws-cdk/aws-logs'; +import * as s3 from '@aws-cdk/aws-s3'; +import * as cdk from '@aws-cdk/core'; +import * as glue from '../lib'; + +describe('WorkerType', () => { + test('.STANDARD should set the name correctly', () => expect(glue.WorkerType.STANDARD.name).toEqual('Standard')); + + test('.G_1X should set the name correctly', () => expect(glue.WorkerType.G_1X.name).toEqual('G.1X')); + + test('.G_2X should set the name correctly', () => expect(glue.WorkerType.G_2X.name).toEqual('G.2X')); + + test('of(customType) should set name correctly', () => expect(glue.WorkerType.of('CustomType').name).toEqual('CustomType')); +}); + +describe('Job', () => { + const jobName = 'test-job'; + let stack: cdk.Stack; + + beforeEach(() => { + stack = new cdk.Stack(); + }); + + describe('.fromJobAttributes()', () => { + test('with required attrs only', () => { + const job = glue.Job.fromJobAttributes(stack, 'ImportedJob', { jobName }); + + expect(job.jobName).toEqual(jobName); + expect(job.jobArn).toEqual(stack.formatArn({ + service: 'glue', + resource: 'job', + resourceName: jobName, + })); + expect(job.grantPrincipal).toEqual(new iam.UnknownPrincipal({ resource: job })); + }); + + test('with all attrs', () => { + const role = iam.Role.fromRoleArn(stack, 'Role', 'arn:aws:iam::123456789012:role/TestRole'); + const job = glue.Job.fromJobAttributes(stack, 'ImportedJob', { jobName, role }); + + expect(job.jobName).toEqual(jobName); + expect(job.jobArn).toEqual(stack.formatArn({ + service: 'glue', + resource: 'job', + resourceName: jobName, + })); + expect(job.grantPrincipal).toEqual(role); + }); + }); + + + describe('new', () => { + const className = 'com.amazon.test.ClassName'; + const codeBucketName = 'bucketName'; + const codeBucketAccessStatement = { + Action: [ + 's3:GetObject*', + 's3:GetBucket*', + 's3:List*', + ], + Effect: 'Allow', + Resource: [ + { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + `:s3:::${codeBucketName}`, + ], + ], + }, + { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + `:s3:::${codeBucketName}/script`, + ], + ], + }, + ], + }; + let codeBucket: s3.IBucket; + let script: glue.Code; + let extraJars: glue.Code[]; + let extraFiles: glue.Code[]; + let extraPythonFiles: glue.Code[]; + let job: glue.Job; + let defaultProps: glue.JobProps; + + beforeEach(() => { + codeBucket = s3.Bucket.fromBucketName(stack, 'CodeBucket', codeBucketName); + script = glue.Code.fromBucket(codeBucket, 'script'); + extraJars = [glue.Code.fromBucket(codeBucket, 'file1.jar'), glue.Code.fromBucket(codeBucket, 'file2.jar')]; + extraPythonFiles = [glue.Code.fromBucket(codeBucket, 'file1.py'), glue.Code.fromBucket(codeBucket, 'file2.py')]; + extraFiles = [glue.Code.fromBucket(codeBucket, 'file1.txt'), glue.Code.fromBucket(codeBucket, 'file2.txt')]; + defaultProps = { + executable: glue.JobExecutable.scalaEtl({ + glueVersion: glue.GlueVersion.V2_0, + className, + script, + }), + }; + }); + + describe('with necessary props only', () => { + beforeEach(() => { + job = new glue.Job(stack, 'Job', defaultProps); + }); + + test('should create a role and use it with the job', () => { + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', { + AssumeRolePolicyDocument: { + Statement: [ + { + Action: 'sts:AssumeRole', + Effect: 'Allow', + Principal: { + Service: 'glue.amazonaws.com', + }, + }, + ], + Version: '2012-10-17', + }, + ManagedPolicyArns: [ + { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':iam::aws:policy/service-role/AWSGlueServiceRole', + ], + ], + }, + ], + }); + + // Role policy should grant reading from the assets bucket + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + codeBucketAccessStatement, + ], + }, + Roles: [ + { + Ref: 'JobServiceRole4F432993', + }, + ], + }); + + // check the job using the role + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + Command: { + Name: 'glueetl', + ScriptLocation: 's3://bucketName/script', + }, + Role: { + 'Fn::GetAtt': [ + 'JobServiceRole4F432993', + 'Arn', + ], + }, + }); + }); + + test('should return correct jobName and jobArn from CloudFormation', () => { + expect(stack.resolve(job.jobName)).toEqual({ Ref: 'JobB9D00F9F' }); + expect(stack.resolve(job.jobArn)).toEqual({ + 'Fn::Join': ['', ['arn:', { Ref: 'AWS::Partition' }, ':glue:', { Ref: 'AWS::Region' }, ':', { Ref: 'AWS::AccountId' }, ':job/', { Ref: 'JobB9D00F9F' }]], + }); + }); + + test('with a custom role should use it and set it in CloudFormation', () => { + const role = iam.Role.fromRoleArn(stack, 'Role', 'arn:aws:iam::123456789012:role/TestRole'); + job = new glue.Job(stack, 'JobWithRole', { + ...defaultProps, + role, + }); + + expect(job.grantPrincipal).toEqual(role); + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + Role: role.roleArn, + }); + }); + + test('with a custom jobName should set it in CloudFormation', () => { + job = new glue.Job(stack, 'JobWithName', { + ...defaultProps, + jobName, + }); + + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + Name: jobName, + }); + }); + }); + + describe('enabling continuous logging with defaults', () => { + beforeEach(() => { + job = new glue.Job(stack, 'Job', { + ...defaultProps, + continuousLogging: { enabled: true }, + }); + }); + + test('should set minimal default arguments', () => { + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + DefaultArguments: { + '--enable-continuous-cloudwatch-log': 'true', + '--enable-continuous-log-filter': 'true', + }, + }); + }); + }); + + describe('enabling continuous logging with all props set', () => { + let logGroup; + + beforeEach(() => { + logGroup = logs.LogGroup.fromLogGroupName(stack, 'LogGroup', 'LogGroupName'); + job = new glue.Job(stack, 'Job', { + ...defaultProps, + continuousLogging: { + enabled: true, + quiet: false, + logStreamPrefix: 'LogStreamPrefix', + conversionPattern: '%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n', + logGroup, + }, + }); + }); + + test('should set all arguments', () => { + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + DefaultArguments: { + '--enable-continuous-cloudwatch-log': 'true', + '--enable-continuous-log-filter': 'false', + '--continuous-log-logGroup': 'LogGroupName', + '--continuous-log-logStreamPrefix': 'LogStreamPrefix', + '--continuous-log-conversionPattern': '%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n', + }, + }); + }); + + test('should grant cloudwatch log write permissions', () => { + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 'logs:CreateLogStream', + 'logs:PutLogEvents', + ], + Effect: 'Allow', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':logs:', + { + Ref: 'AWS::Region', + }, + ':', + { + Ref: 'AWS::AccountId', + }, + ':log-group:LogGroupName:*', + ], + ], + }, + }, + codeBucketAccessStatement, + ], + }, + Roles: [ + { + Ref: 'JobServiceRole4F432993', + }, + ], + }); + }); + }); + + describe('enabling spark ui', () => { + describe('with no bucket or path provided', () => { + beforeEach(() => { + job = new glue.Job(stack, 'Job', { + ...defaultProps, + sparkUI: { enabled: true }, + }); + }); + + test('should create spark ui bucket', () => { + Template.fromStack(stack).resourceCountIs('AWS::S3::Bucket', 1); + }); + + test('should grant the role read/write permissions to the spark ui bucket', () => { + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 's3:GetObject*', + 's3:GetBucket*', + 's3:List*', + 's3:DeleteObject*', + 's3:PutObject*', + 's3:Abort*', + ], + Effect: 'Allow', + Resource: [ + { + 'Fn::GetAtt': [ + 'JobSparkUIBucket8E6A0139', + 'Arn', + ], + }, + { + 'Fn::Join': [ + '', + [ + { + 'Fn::GetAtt': [ + 'JobSparkUIBucket8E6A0139', + 'Arn', + ], + }, + '/*', + ], + ], + }, + ], + }, + codeBucketAccessStatement, + ], + Version: '2012-10-17', + }, + PolicyName: 'JobServiceRoleDefaultPolicy03F68F9D', + Roles: [ + { + Ref: 'JobServiceRole4F432993', + }, + ], + }); + }); + + test('should set spark arguments on the job', () => { + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + DefaultArguments: { + '--enable-spark-ui': 'true', + '--spark-event-logs-path': { + 'Fn::Join': [ + '', + [ + 's3://', + { + Ref: 'JobSparkUIBucket8E6A0139', + }, + ], + ], + }, + }, + }); + }); + }); + + describe('with bucket provided', () => { + const sparkUIBucketName = 'sparkBucketName'; + let sparkUIBucket: s3.IBucket; + + beforeEach(() => { + sparkUIBucket = s3.Bucket.fromBucketName(stack, 'SparkBucketId', sparkUIBucketName); + job = new glue.Job(stack, 'Job', { + ...defaultProps, + sparkUI: { + enabled: true, + bucket: sparkUIBucket, + }, + }); + }); + + test('should grant the role read/write permissions to the provided spark ui bucket', () => { + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: [ + 's3:GetObject*', + 's3:GetBucket*', + 's3:List*', + 's3:DeleteObject*', + 's3:PutObject*', + 's3:Abort*', + ], + Effect: 'Allow', + Resource: [ + { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':s3:::sparkBucketName', + ], + ], + }, + { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':s3:::sparkBucketName/*', + ], + ], + }, + ], + }, + codeBucketAccessStatement, + ], + }, + Roles: [ + { + Ref: 'JobServiceRole4F432993', + }, + ], + }); + }); + + test('should set spark arguments on the job', () => { + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + DefaultArguments: { + '--enable-spark-ui': 'true', + '--spark-event-logs-path': `s3://${sparkUIBucketName}`, + }, + }); + }); + }); + + describe('with bucket and path provided', () => { + const sparkUIBucketName = 'sparkBucketName'; + const prefix = 'some/path/'; + let sparkUIBucket: s3.IBucket; + + beforeEach(() => { + sparkUIBucket = s3.Bucket.fromBucketName(stack, 'BucketId', sparkUIBucketName); + job = new glue.Job(stack, 'Job', { + ...defaultProps, + sparkUI: { + enabled: true, + bucket: sparkUIBucket, + prefix, + }, + }); + }); + + test('should set spark arguments on the job', () => { + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + DefaultArguments: { + '--enable-spark-ui': 'true', + '--spark-event-logs-path': `s3://${sparkUIBucketName}/${prefix}`, + }, + }); + }); + }); + }); + + describe('with extended props', () => { + beforeEach(() => { + job = new glue.Job(stack, 'Job', { + ...defaultProps, + jobName, + description: 'test job', + workerType: glue.WorkerType.G_2X, + workerCount: 10, + maxConcurrentRuns: 2, + maxRetries: 2, + timeout: cdk.Duration.minutes(5), + notifyDelayAfter: cdk.Duration.minutes(1), + defaultArguments: { + arg1: 'value1', + arg2: 'value2', + }, + connections: [glue.Connection.fromConnectionName(stack, 'ImportedConnection', 'ConnectionName')], + securityConfiguration: glue.SecurityConfiguration.fromSecurityConfigurationName(stack, 'ImportedSecurityConfiguration', 'SecurityConfigurationName'), + enableProfilingMetrics: true, + tags: { + key: 'value', + }, + }); + }); + + test('should synthesize correctly', () => { + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + Command: { + Name: 'glueetl', + ScriptLocation: 's3://bucketName/script', + }, + Role: { + 'Fn::GetAtt': [ + 'JobServiceRole4F432993', + 'Arn', + ], + }, + DefaultArguments: { + '--job-language': 'scala', + '--class': 'com.amazon.test.ClassName', + '--enable-metrics': '', + 'arg1': 'value1', + 'arg2': 'value2', + }, + Description: 'test job', + ExecutionProperty: { + MaxConcurrentRuns: 2, + }, + GlueVersion: '2.0', + MaxRetries: 2, + Name: 'test-job', + NotificationProperty: { + NotifyDelayAfter: 1, + }, + NumberOfWorkers: 10, + Tags: { + key: 'value', + }, + Timeout: 5, + WorkerType: 'G.2X', + Connections: { + Connections: [ + 'ConnectionName', + ], + }, + SecurityConfiguration: 'SecurityConfigurationName', + }); + }); + }); + + test('with reserved args should throw', () => { + ['--conf', '--debug', '--mode', '--JOB_NAME'].forEach((arg, index) => { + const defaultArguments: {[key: string]: string} = {}; + defaultArguments[arg] = 'random value'; + + expect(() => new glue.Job(stack, `Job${index}`, { + executable: glue.JobExecutable.scalaEtl({ + glueVersion: glue.GlueVersion.V2_0, + className, + script, + }), + defaultArguments, + })).toThrow(/argument is reserved by Glue/); + }); + }); + + describe('shell job', () => { + test('with unsupported glue version should throw', () => { + expect(() => new glue.Job(stack, 'Job', { + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V0_9, + pythonVersion: glue.PythonVersion.TWO, + script, + }), + })).toThrow('Specified GlueVersion 0.9 does not support Python Shell'); + }); + + test('with unsupported Spark UI prop should throw', () => { + expect(() => new glue.Job(stack, 'Job', { + executable: glue.JobExecutable.pythonShell({ + glueVersion: glue.GlueVersion.V1_0, + pythonVersion: glue.PythonVersion.THREE, + script, + }), + sparkUI: { enabled: true }, + })).toThrow('Spark UI is not available for JobType.PYTHON_SHELL'); + }); + }); + + + test('etl job with all props should synthesize correctly', () => { + new glue.Job(stack, 'Job', { + executable: glue.JobExecutable.pythonEtl({ + glueVersion: glue.GlueVersion.V2_0, + pythonVersion: glue.PythonVersion.THREE, + extraJarsFirst: true, + script, + extraPythonFiles, + extraJars, + extraFiles, + }), + }); + + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + GlueVersion: '2.0', + Command: { + Name: 'glueetl', + ScriptLocation: 's3://bucketName/script', + PythonVersion: '3', + }, + Role: { + 'Fn::GetAtt': [ + 'JobServiceRole4F432993', + 'Arn', + ], + }, + DefaultArguments: { + '--job-language': 'python', + '--extra-jars': 's3://bucketName/file1.jar,s3://bucketName/file2.jar', + '--extra-py-files': 's3://bucketName/file1.py,s3://bucketName/file2.py', + '--extra-files': 's3://bucketName/file1.txt,s3://bucketName/file2.txt', + '--user-jars-first': 'true', + }, + }); + }); + + test('streaming job with all props should synthesize correctly', () => { + new glue.Job(stack, 'Job', { + executable: glue.JobExecutable.scalaStreaming({ + glueVersion: glue.GlueVersion.V2_0, + extraJarsFirst: true, + className, + script, + extraJars, + extraFiles, + }), + }); + + Template.fromStack(stack).hasResourceProperties('AWS::Glue::Job', { + GlueVersion: '2.0', + Command: { + Name: 'gluestreaming', + ScriptLocation: 's3://bucketName/script', + }, + Role: { + 'Fn::GetAtt': [ + 'JobServiceRole4F432993', + 'Arn', + ], + }, + DefaultArguments: { + '--job-language': 'scala', + '--class': 'com.amazon.test.ClassName', + '--extra-jars': 's3://bucketName/file1.jar,s3://bucketName/file2.jar', + '--extra-files': 's3://bucketName/file1.txt,s3://bucketName/file2.txt', + '--user-jars-first': 'true', + }, + }); + }); + + describe('event rules and rule-based metrics', () => { + beforeEach(() => { + job = new glue.Job(stack, 'Job', { + executable: glue.JobExecutable.scalaEtl({ + glueVersion: glue.GlueVersion.V2_0, + className, + script, + }), + }); + }); + + test('.onEvent() should create the expected event rule', () => { + job.onEvent('eventId', {}); + + Template.fromStack(stack).hasResourceProperties('AWS::Events::Rule', { + EventPattern: { + 'source': [ + 'aws.glue', + ], + 'detail-type': [ + 'Glue Job State Change', + 'Glue Job Run Status', + ], + 'detail': { + jobName: [ + { + Ref: 'JobB9D00F9F', + }, + ], + }, + }, + State: 'ENABLED', + }); + }); + + [ + { name: 'onSuccess()', invoke: (testJob: glue.Job) => testJob.onSuccess('SuccessRule'), state: 'SUCCEEDED' }, + { name: 'onFailure()', invoke: (testJob: glue.Job) => testJob.onFailure('FailureRule'), state: 'FAILED' }, + { name: 'onTimeout()', invoke: (testJob: glue.Job) => testJob.onTimeout('TimeoutRule'), state: 'TIMEOUT' }, + ].forEach((testCase) => { + test(`${testCase.name} should create a rule with correct properties`, () => { + testCase.invoke(job); + + Template.fromStack(stack).hasResourceProperties('AWS::Events::Rule', { + Description: { + 'Fn::Join': [ + '', + [ + 'Rule triggered when Glue job ', + { + Ref: 'JobB9D00F9F', + }, + ` is in ${testCase.state} state`, + ], + ], + }, + EventPattern: { + 'source': [ + 'aws.glue', + ], + 'detail-type': [ + 'Glue Job State Change', + 'Glue Job Run Status', + ], + 'detail': { + state: [ + testCase.state, + ], + jobName: [ + { + Ref: 'JobB9D00F9F', + }, + ], + }, + }, + State: 'ENABLED', + }); + }); + }); + + [ + { name: '.metricSuccess()', invoke: (testJob: glue.Job) => testJob.metricSuccess(), state: 'SUCCEEDED', ruleId: 'SuccessMetricRule' }, + { name: '.metricFailure()', invoke: (testJob: glue.Job) => testJob.metricFailure(), state: 'FAILED', ruleId: 'FailureMetricRule' }, + { name: '.metricTimeout()', invoke: (testJob: glue.Job) => testJob.metricTimeout(), state: 'TIMEOUT', ruleId: 'TimeoutMetricRule' }, + ].forEach((testCase) => { + test(`${testCase.name} should create the expected singleton event rule and corresponding metric`, () => { + const metric = testCase.invoke(job); + testCase.invoke(job); + + expect(metric).toEqual(new cloudwatch.Metric({ + dimensions: { + RuleName: (job.node.findChild(testCase.ruleId) as events.Rule).ruleName, + }, + metricName: 'TriggeredRules', + namespace: 'AWS/Events', + statistic: 'Sum', + })); + + Template.fromStack(stack).resourceCountIs('AWS::Events::Rule', 1); + Template.fromStack(stack).hasResourceProperties('AWS::Events::Rule', { + Description: { + 'Fn::Join': [ + '', + [ + 'Rule triggered when Glue job ', + { + Ref: 'JobB9D00F9F', + }, + ` is in ${testCase.state} state`, + ], + ], + }, + EventPattern: { + 'source': [ + 'aws.glue', + ], + 'detail-type': [ + 'Glue Job State Change', + 'Glue Job Run Status', + ], + 'detail': { + state: [ + testCase.state, + ], + jobName: [ + { + Ref: 'JobB9D00F9F', + }, + ], + }, + }, + State: 'ENABLED', + }); + }); + }); + }); + + describe('.metric()', () => { + + test('with MetricType.COUNT should create a count sum metric', () => { + const metricName = 'glue.driver.aggregate.bytesRead'; + const props = { statistic: cloudwatch.Statistic.SUM }; + + expect(job.metric(metricName, glue.MetricType.COUNT, props)).toEqual(new cloudwatch.Metric({ + metricName, + statistic: 'Sum', + namespace: 'Glue', + dimensions: { + JobName: job.jobName, + JobRunId: 'ALL', + Type: 'count', + }, + })); + }); + + test('with MetricType.GAUGE should create a gauge average metric', () => { + const metricName = 'glue.driver.BlockManager.disk.diskSpaceUsed_MB'; + const props = { statistic: cloudwatch.Statistic.AVERAGE }; + + expect(job.metric(metricName, glue.MetricType.GAUGE, props)).toEqual(new cloudwatch.Metric({ + metricName, + statistic: 'Average', + namespace: 'Glue', + dimensions: { + JobName: job.jobName, + JobRunId: 'ALL', + Type: 'gauge', + }, + })); + }); + }); + }); +});