Skip to content

Commit

Permalink
minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Feb 6, 2025
1 parent 2fd52f4 commit 816470a
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import sys

# force to register the operations to SDK Harness
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import DebugOptions, PipelineOptions

import pyflink.fn_execution.beam.beam_operations # noqa # pylint: disable=unused-import

Expand All @@ -42,7 +42,11 @@ def get_state_cache_size(options):
"""
Return the maximum size of state cache in count.
"""
experiments = options.view_as(DebugOptions).experiments or []
if isinstance(options, PipelineOptions):
experiments = options.view_as(DebugOptions).experiments or []
else:
experiments = options

for experiment in experiments:
# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
Expand Down

0 comments on commit 816470a

Please sign in to comment.