Skip to content

Commit

Permalink
[AIRFLOW-XXX] Fix Docstrings for Operators (#3820)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored Aug 31, 2018
1 parent e09b387 commit ba27fca
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 113 deletions.
4 changes: 4 additions & 0 deletions airflow/contrib/hooks/gcp_container_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, project_id, location):
def _dict_to_proto(py_dict, proto):
"""
Converts a python dictionary to the proto supplied
:param py_dict: The dictionary to convert
:type py_dict: dict
:param proto: The proto object to merge with dictionary
Expand All @@ -63,6 +64,7 @@ def wait_for_operation(self, operation):
"""
Given an operation, continuously fetches the status from Google Cloud until either
completion or an error occurring
:param operation: The Operation to wait for
:type operation: A google.cloud.container_V1.gapic.enums.Operator
:return: A new, updated operation fetched from Google Cloud
Expand All @@ -83,6 +85,7 @@ def wait_for_operation(self, operation):
def get_operation(self, operation_name):
"""
Fetches the operation from Google Cloud
:param operation_name: Name of operation to fetch
:type operation_name: str
:return: The new, updated operation from Google Cloud
Expand Down Expand Up @@ -196,6 +199,7 @@ def create_cluster(self, cluster, retry=DEFAULT, timeout=DEFAULT):
def get_cluster(self, name, retry=DEFAULT, timeout=DEFAULT):
"""
Gets details of specified cluster
:param name: The name of the cluster to retrieve
:type name: str
:param retry: A retry object used to retry requests. If None is specified,
Expand Down
16 changes: 9 additions & 7 deletions airflow/contrib/operators/awsbatch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,20 @@ class AWSBatchOperator(BaseOperator):
:type job_definition: str
:param job_queue: the queue name on AWS Batch
:type job_queue: str
:param: overrides: the same parameter that boto3 will receive on
containerOverrides (templated):
http://boto3.readthedocs.io/en/latest/reference/services/batch.html#submit_job
:type: overrides: dict
:param max_retries: exponential backoff retries while waiter is not merged, 4200 = 48 hours
:param overrides: the same parameter that boto3 will receive on
containerOverrides (templated).
http://boto3.readthedocs.io/en/latest/reference/services/batch.html#submit_job
:type overrides: dict
:param max_retries: exponential backoff retries while waiter is not merged,
4200 = 48 hours
:type max_retries: int
:param aws_conn_id: connection id of AWS credentials / region name. If None,
credential boto3 strategy will be used
(http://boto3.readthedocs.io/en/latest/guide/configuration.html).
credential boto3 strategy will be used
(http://boto3.readthedocs.io/en/latest/guide/configuration.html).
:type aws_conn_id: str
:param region_name: region name to use in AWS Hook.
Override the region_name in connection (if provided)
:type region_name: str
"""

ui_color = '#c3dae0'
Expand Down
6 changes: 3 additions & 3 deletions airflow/contrib/operators/bigquery_check_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class BigQueryCheckOperator(CheckOperator):
:param bigquery_conn_id: reference to the BigQuery database
:type bigquery_conn_id: string
:param use_legacy_sql: Whether to use legacy SQL (true)
or standard SQL (false).
or standard SQL (false).
:type use_legacy_sql: boolean
"""

Expand All @@ -83,7 +83,7 @@ class BigQueryValueCheckOperator(ValueCheckOperator):
:param sql: the sql to be executed
:type sql: string
:param use_legacy_sql: Whether to use legacy SQL (true)
or standard SQL (false).
or standard SQL (false).
:type use_legacy_sql: boolean
"""

Expand Down Expand Up @@ -125,7 +125,7 @@ class BigQueryIntervalCheckOperator(IntervalCheckOperator):
between the current day, and the prior days_back.
:type metrics_threshold: dict
:param use_legacy_sql: Whether to use legacy SQL (true)
or standard SQL (false).
or standard SQL (false).
:type use_legacy_sql: boolean
"""

Expand Down
82 changes: 41 additions & 41 deletions airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,49 +234,49 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
:param labels a dictionary containing labels for the table, passed to BigQuery
:param labels: a dictionary containing labels for the table, passed to BigQuery
**Example (with schema JSON in GCS)**: ::
CreateTable = BigQueryCreateEmptyTableOperator(
task_id='BigQueryCreateEmptyTableOperator_task',
dataset_id='ODS',
table_id='Employees',
project_id='internal-gcp-project',
gcs_schema_object='gs://schema-bucket/employee_schema.json',
bigquery_conn_id='airflow-service-account',
google_cloud_storage_conn_id='airflow-service-account'
)
**Corresponding Schema file** (``employee_schema.json``): ::
[
{
"mode": "NULLABLE",
"name": "emp_name",
"type": "STRING"
},
{
"mode": "REQUIRED",
"name": "salary",
"type": "INTEGER"
}
]
**Example (with schema in the DAG)**: ::
CreateTable = BigQueryCreateEmptyTableOperator(
task_id='BigQueryCreateEmptyTableOperator_task',
dataset_id='ODS',
table_id='Employees',
project_id='internal-gcp-project',
schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}],
bigquery_conn_id='airflow-service-account',
google_cloud_storage_conn_id='airflow-service-account'
)
:type labels: dict
**Example (with schema JSON in GCS)**: ::
CreateTable = BigQueryCreateEmptyTableOperator(
task_id='BigQueryCreateEmptyTableOperator_task',
dataset_id='ODS',
table_id='Employees',
project_id='internal-gcp-project',
gcs_schema_object='gs://schema-bucket/employee_schema.json',
bigquery_conn_id='airflow-service-account',
google_cloud_storage_conn_id='airflow-service-account'
)
**Corresponding Schema file** (``employee_schema.json``): ::
[
{
"mode": "NULLABLE",
"name": "emp_name",
"type": "STRING"
},
{
"mode": "REQUIRED",
"name": "salary",
"type": "INTEGER"
}
]
**Example (with schema in the DAG)**: ::
CreateTable = BigQueryCreateEmptyTableOperator(
task_id='BigQueryCreateEmptyTableOperator_task',
dataset_id='ODS',
table_id='Employees',
project_id='internal-gcp-project',
schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}],
bigquery_conn_id='airflow-service-account',
google_cloud_storage_conn_id='airflow-service-account'
)
"""
template_fields = ('dataset_id', 'table_id', 'project_id',
'gcs_schema_object', 'labels')
Expand Down
63 changes: 32 additions & 31 deletions airflow/contrib/operators/dataflow_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,38 @@ def execute(self, context):


class DataFlowPythonOperator(BaseOperator):
"""
Create a new DataFlowPythonOperator. Note that both
dataflow_default_options and options will be merged to specify pipeline
execution parameter, and dataflow_default_options is expected to save
high-level options, for instances, project and zone information, which
apply to all dataflow operators in the DAG.
.. seealso::
For more detail on job submission have a look at the reference:
https://cloud.google.com/dataflow/pipelines/specifying-exec-params
:param py_file: Reference to the python dataflow pipleline file.py, e.g.,
/some/local/file/path/to/your/python/pipeline/file.
:type py_file: string
:param py_options: Additional python options.
:type pyt_options: list of strings, e.g., ["-m", "-v"].
:param dataflow_default_options: Map of default job options.
:type dataflow_default_options: dict
:param options: Map of job specific options.
:type options: dict
:param gcp_conn_id: The connection ID to use connecting to Google Cloud
Platform.
:type gcp_conn_id: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: string
:param poll_sleep: The time in seconds to sleep between polling Google
Cloud Platform for the dataflow job status while the job is in the
JOB_STATE_RUNNING state.
:type poll_sleep: int
"""

template_fields = ['options', 'dataflow_default_options']

Expand All @@ -267,38 +299,7 @@ def __init__(
poll_sleep=10,
*args,
**kwargs):
"""
Create a new DataFlowPythonOperator. Note that both
dataflow_default_options and options will be merged to specify pipeline
execution parameter, and dataflow_default_options is expected to save
high-level options, for instances, project and zone information, which
apply to all dataflow operators in the DAG.
.. seealso::
For more detail on job submission have a look at the reference:
https://cloud.google.com/dataflow/pipelines/specifying-exec-params

:param py_file: Reference to the python dataflow pipleline file.py, e.g.,
/some/local/file/path/to/your/python/pipeline/file.
:type py_file: string
:param py_options: Additional python options.
:type pyt_options: list of strings, e.g., ["-m", "-v"].
:param dataflow_default_options: Map of default job options.
:type dataflow_default_options: dict
:param options: Map of job specific options.
:type options: dict
:param gcp_conn_id: The connection ID to use connecting to Google Cloud
Platform.
:type gcp_conn_id: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: string
:param poll_sleep: The time in seconds to sleep between polling Google
Cloud Platform for the dataflow job status while the job is in the
JOB_STATE_RUNNING state.
:type poll_sleep: int
"""
super(DataFlowPythonOperator, self).__init__(*args, **kwargs)

self.py_file = py_file
Expand Down
16 changes: 8 additions & 8 deletions airflow/contrib/operators/dataproc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,14 @@ class DataprocClusterScaleOperator(BaseOperator):
**Example**: ::
t1 = DataprocClusterScaleOperator(
task_id='dataproc_scale',
project_id='my-project',
cluster_name='cluster-1',
num_workers=10,
num_preemptible_workers=10,
graceful_decommission_timeout='1h',
dag=dag)
t1 = DataprocClusterScaleOperator(
task_id='dataproc_scale',
project_id='my-project',
cluster_name='cluster-1',
num_workers=10,
num_preemptible_workers=10,
graceful_decommission_timeout='1h',
dag=dag)
.. seealso::
For more detail on about scaling clusters have a look at the reference:
Expand Down
13 changes: 7 additions & 6 deletions airflow/contrib/operators/ecs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ class ECSOperator(BaseOperator):
:type task_definition: str
:param cluster: the cluster name on EC2 Container Service
:type cluster: str
:param: overrides: the same parameter that boto3 will receive (templated):
http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task
:type: overrides: dict
:param overrides: the same parameter that boto3 will receive (templated):
http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task
:type overrides: dict
:param aws_conn_id: connection id of AWS credentials / region name. If None,
credential boto3 strategy will be used
(http://boto3.readthedocs.io/en/latest/guide/configuration.html).
credential boto3 strategy will be used
(http://boto3.readthedocs.io/en/latest/guide/configuration.html).
:type aws_conn_id: str
:param region_name: region name to use in AWS Hook.
Override the region_name in connection (if provided)
:type region_name: str
:param launch_type: the launch type on which to run your task ('EC2' or 'FARGATE')
:type: launch_type: str
:type launch_type: str
"""

ui_color = '#f0ede4'
Expand Down
19 changes: 11 additions & 8 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2335,14 +2335,17 @@ class derived from this one results in the creation of a task object,
:param executor_config: Additional task-level configuration parameters that are
interpreted by a specific executor. Parameters are namespaced by the name of
executor.
``example: to run this task in a specific docker container through
the KubernetesExecutor
MyOperator(...,
executor_config={
"KubernetesExecutor":
{"image": "myCustomDockerImage"}
}
)``
**Example**: to run this task in a specific docker container through
the KubernetesExecutor ::
MyOperator(...,
executor_config={
"KubernetesExecutor":
{"image": "myCustomDockerImage"}
}
)
:type executor_config: dict
"""

Expand Down
15 changes: 8 additions & 7 deletions airflow/operators/s3_file_transform_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,16 @@ class S3FileTransformOperator(BaseOperator):
:type source_s3_key: str
:param source_aws_conn_id: source s3 connection
:type source_aws_conn_id: str
:parame source_verify: Whether or not to verify SSL certificates for S3 connetion.
:param source_verify: Whether or not to verify SSL certificates for S3 connetion.
By default SSL certificates are verified.
You can provide the following values:
- False: do not validate SSL certificates. SSL will still be used
(unless use_ssl is False), but SSL certificates will not be
verified.
- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
You can specify this argument if you want to use a different
CA cert bundle than the one used by botocore.
- ``False``: do not validate SSL certificates. SSL will still be used
(unless use_ssl is False), but SSL certificates will not be
verified.
- ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses.
You can specify this argument if you want to use a different
CA cert bundle than the one used by botocore.
This is also applicable to ``dest_verify``.
:type source_verify: bool or str
:param dest_s3_key: The key to be written from S3. (templated)
Expand Down
2 changes: 1 addition & 1 deletion airflow/sensors/s3_key_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class S3KeySensor(BaseSensorOperator):
:type wildcard_match: bool
:param aws_conn_id: a reference to the s3 connection
:type aws_conn_id: str
:parame verify: Whether or not to verify SSL certificates for S3 connection.
:param verify: Whether or not to verify SSL certificates for S3 connection.
By default SSL certificates are verified.
You can provide the following values:
- False: do not validate SSL certificates. SSL will still be used
Expand Down
2 changes: 1 addition & 1 deletion airflow/sensors/s3_prefix_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class S3PrefixSensor(BaseSensorOperator):
:type delimiter: str
:param aws_conn_id: a reference to the s3 connection
:type aws_conn_id: str
:parame verify: Whether or not to verify SSL certificates for S3 connection.
:param verify: Whether or not to verify SSL certificates for S3 connection.
By default SSL certificates are verified.
You can provide the following values:
- False: do not validate SSL certificates. SSL will still be used
Expand Down

0 comments on commit ba27fca

Please sign in to comment.