Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3DeleteObjectsOperator Added ability to filter keys by last modified time #39151

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions airflow/providers/amazon/aws/operators/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
from airflow.utils.helpers import exactly_one

if TYPE_CHECKING:
from airflow.utils.context import Context
from datetime import datetime

from airflow.utils.context import Context

BUCKET_DOES_NOT_EXIST_MSG = "Bucket with name: %s doesn't exist"

Expand Down Expand Up @@ -473,6 +474,10 @@ class S3DeleteObjectsOperator(BaseOperator):

:param prefix: Prefix of objects to delete. (templated)
All objects matching this prefix in the bucket will be deleted.
:param from_datetime: Greater LastModified Date of objects to delete. (templated)
All objects which LastModified Date is greater than this datetime in the bucket will be deleted.
:param to_datetime: less LastModified Date of objects to delete. (templated)
All objects which LastModified Date is less than this datetime in the bucket will be deleted.
:param aws_conn_id: Connection id of the S3 connection to use
:param verify: Whether or not to verify SSL certificates for S3 connection.
By default SSL certificates are verified.
Expand All @@ -487,14 +492,16 @@ class S3DeleteObjectsOperator(BaseOperator):
CA cert bundle than the one used by botocore.
"""

template_fields: Sequence[str] = ("keys", "bucket", "prefix")
template_fields: Sequence[str] = ("keys", "bucket", "prefix", "from_datetime", "to_datetime")

def __init__(
self,
*,
bucket: str,
keys: str | list | None = None,
prefix: str | None = None,
from_datetime: datetime | None = None,
to_datetime: datetime | None = None,
aws_conn_id: str | None = "aws_default",
verify: str | bool | None = None,
**kwargs,
Expand All @@ -503,23 +510,36 @@ def __init__(
self.bucket = bucket
self.keys = keys
self.prefix = prefix
self.from_datetime = from_datetime
self.to_datetime = to_datetime
self.aws_conn_id = aws_conn_id
self.verify = verify

self._keys: str | list[str] = ""

if not exactly_one(prefix is None, keys is None):
raise AirflowException("Either keys or prefix should be set.")
if not exactly_one(keys is None, all(var is None for var in [prefix, from_datetime, to_datetime])):
idantepper marked this conversation as resolved.
Show resolved Hide resolved
raise AirflowException(
"Either keys or at least one of prefix, from_datetime, to_datetime should be set."
)

def execute(self, context: Context):
if not exactly_one(self.keys is None, self.prefix is None):
raise AirflowException("Either keys or prefix should be set.")
if not exactly_one(
self.keys is None, all(var is None for var in [self.prefix, self.from_datetime, self.to_datetime])
):
raise AirflowException(
"Either keys or at least one of prefix, from_datetime, to_datetime should be set."
)

if isinstance(self.keys, (list, str)) and not self.keys:
return
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)

keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
keys = self.keys or s3_hook.list_keys(
bucket_name=self.bucket,
prefix=self.prefix,
from_datetime=self.from_datetime,
to_datetime=self.to_datetime,
)
if keys:
s3_hook.delete_objects(bucket=self.bucket, keys=keys)
self._keys = keys
Expand Down
94 changes: 82 additions & 12 deletions tests/providers/amazon/aws/operators/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
S3PutBucketTaggingOperator,
)
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.timezone import datetime, utcnow

BUCKET_NAME = os.environ.get("BUCKET_NAME", "test-airflow-bucket")
S3_KEY = "test-airflow-key"
Expand Down Expand Up @@ -531,6 +532,37 @@ def test_s3_delete_multiple_objects(self):
# There should be no object found in the bucket created earlier
assert "Contents" not in conn.list_objects(Bucket=bucket, Prefix=key_pattern)

def test_s3_delete_from_to_datetime(self):
bucket = "testbucket"
key_pattern = "path/data"
n_keys = 3
keys = [key_pattern + str(i) for i in range(n_keys)]

conn = boto3.client("s3")
conn.create_bucket(Bucket=bucket)
for k in keys:
conn.upload_fileobj(Bucket=bucket, Key=k, Fileobj=BytesIO(b"input"))

# The objects should be detected before the DELETE action is taken
objects_in_dest_bucket = conn.list_objects(Bucket=bucket)
assert len(objects_in_dest_bucket["Contents"]) == n_keys
assert sorted(x["Key"] for x in objects_in_dest_bucket["Contents"]) == sorted(keys)

now = utcnow()
from_datetime = now.replace(year=now.year - 1)
to_datetime = now.replace(year=now.year + 1)

op = S3DeleteObjectsOperator(
task_id="test_task_s3_delete_prefix",
bucket=bucket,
from_datetime=from_datetime,
to_datetime=to_datetime,
)
op.execute(None)

# There should be no object found in the bucket created earlier
assert "Contents" not in conn.list_objects(Bucket=bucket)

def test_s3_delete_prefix(self):
bucket = "testbucket"
key_pattern = "path/data"
Expand Down Expand Up @@ -598,31 +630,64 @@ def test_s3_delete_empty_string(self):
assert objects_in_dest_bucket["Contents"][0]["Key"] == key_of_test

@pytest.mark.parametrize(
"keys, prefix",
"keys, prefix, from_datetime, to_datetime",
[
pytest.param("path/data.txt", "path/data", id="single-key-and-prefix"),
pytest.param(["path/data.txt"], "path/data", id="multiple-keys-and-prefix"),
pytest.param(None, None, id="both-none"),
pytest.param("path/data.txt", "path/data", None, None, id="single-key-and-prefix"),
pytest.param(["path/data.txt"], "path/data", None, None, id="multiple-keys-and-prefix"),
pytest.param(
["path/data.txt"],
"path/data",
datetime(1992, 3, 8, 18, 52, 51),
None,
id="keys-prefix-and-from_datetime",
),
pytest.param(
["path/data.txt"],
"path/data",
datetime(1992, 3, 8, 18, 52, 51),
datetime(1993, 3, 8, 18, 52, 51),
id="keys-prefix-and-from-to_datetime",
),
pytest.param(None, None, None, None, id="all-none"),
],
)
def test_validate_keys_and_prefix_in_constructor(self, keys, prefix):
with pytest.raises(AirflowException, match=r"Either keys or prefix should be set\."):
def test_validate_keys_and_filters_in_constructor(self, keys, prefix, from_datetime, to_datetime):
with pytest.raises(
AirflowException,
match=r"Either keys or at least one of prefix, from_datetime, to_datetime should be set.",
):
S3DeleteObjectsOperator(
task_id="test_validate_keys_and_prefix_in_constructor",
bucket="foo-bar-bucket",
keys=keys,
prefix=prefix,
from_datetime=from_datetime,
to_datetime=to_datetime,
)

@pytest.mark.parametrize(
"keys, prefix",
"keys, prefix, from_datetime, to_datetime",
[
pytest.param("path/data.txt", "path/data", id="single-key-and-prefix"),
pytest.param(["path/data.txt"], "path/data", id="multiple-keys-and-prefix"),
pytest.param(None, None, id="both-none"),
pytest.param("path/data.txt", "path/data", None, None, id="single-key-and-prefix"),
pytest.param(["path/data.txt"], "path/data", None, None, id="multiple-keys-and-prefix"),
pytest.param(
["path/data.txt"],
"path/data",
datetime(1992, 3, 8, 18, 52, 51),
None,
id="keys-prefix-and-from_datetime",
),
pytest.param(
["path/data.txt"],
"path/data",
datetime(1992, 3, 8, 18, 52, 51),
datetime(1993, 3, 8, 18, 52, 51),
id="keys-prefix-and-from-to_datetime",
),
pytest.param(None, None, None, None, id="all-none"),
],
)
def test_validate_keys_and_prefix_in_execute(self, keys, prefix):
def test_validate_keys_and_prefix_in_execute(self, keys, prefix, from_datetime, to_datetime):
bucket = "testbucket"
key_of_test = "path/data.txt"

Expand All @@ -639,13 +704,18 @@ def test_validate_keys_and_prefix_in_execute(self, keys, prefix):
)
op.keys = keys
op.prefix = prefix
op.from_datetime = from_datetime
op.to_datetime = to_datetime

# The object should be detected before the DELETE action is tested
objects_in_dest_bucket = conn.list_objects(Bucket=bucket, Prefix=key_of_test)
assert len(objects_in_dest_bucket["Contents"]) == 1
assert objects_in_dest_bucket["Contents"][0]["Key"] == key_of_test

with pytest.raises(AirflowException, match=r"Either keys or prefix should be set\."):
with pytest.raises(
AirflowException,
match=r"Either keys or at least one of prefix, from_datetime, to_datetime should be set.",
):
op.execute(None)

# The object found in the bucket created earlier should still be there
Expand Down