Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Introduce new API to fetch existing metrics related to segment replication. #4554

Closed
Tracked by #5147
Rishikesh1159 opened this issue Sep 19, 2022 · 10 comments
Assignees
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request

Comments

@Rishikesh1159
Copy link
Member

Rishikesh1159 commented Sep 19, 2022

Is your feature request related to a problem? Please describe.
Currently we just log all the metrics related to segment replication. Instead we need to create an API to fetch all relevant segment replication metrics, so that we can just call the API from opensearch-benchmarks to get the metrics and include them in the results of a benchmark run.

@Rishikesh1159 Rishikesh1159 added enhancement Enhancement or improvement to existing feature or request untriaged distributed framework and removed untriaged labels Sep 19, 2022
@Rishikesh1159 Rishikesh1159 changed the title [Segment Replication] Create an API to fetch metrics related to segment replication. [Segment Replication] Update existing API to fetch existing metrics related to segment replication. Sep 30, 2022
@Poojita-Raj
Copy link
Contributor

Poojita-Raj commented Nov 15, 2022

These are the 4 api changes (updates/additions) we need to fetch metrics related to segment replication:

  1. To surface segment replication timing data, our current approach is to follow the existing convention of surfacing timing data (for peer recovery). Timing data is just the time spent on each replication event and a breakdown of time spent in each stage of the replication. GET /_cat/segment_replication/ and GET /_cat/segment_replication/index-name
    1. Can specify a specific index or should return for all indices on the cluster. This will be the minimum required task in order to surface data for benchmarking and measuring performance. With shard level granularity.
    2. It should have the following attributes:
      1. index | i,idx | index name
      2. shard | s,sh | shard name
      3. segment | seg | segment -> what uniquely identifies a segment?
      4. start_time | start | replication start time
      5. start_time_millis | start_millis | replication start time in epoch milliseconds
      6. stop_time | stop | replication stop time
      7. stop_time_millis | stop_millis | replication stop time in epoch milliseconds
      8. stage | st | replication stage
      9. source_host | shost | source host
      10. source_node | snode | source node name
      11. target_host | thost | target host
      12. target_node | tnode | target node name
    3. If we set detailed=true in query paramaters, it should also show time spent in each replication
    4. Query parameters
      1. active_only
        (Optional, Boolean) If true, the response only includes ongoing segment replications. Defaults to false.
      2. format
        (Optional, string) Short version of the HTTP accept header. Valid values include JSON, YAML, etc.
      3. h
        (Optional, string) Comma-separated list of column names to display.
      4. help
        (Optional, Boolean) If true, the response includes help information. Defaults to false.
      5. index
        (Optional, string) Comma-separated list or wildcard expression of index names used to limit the request.
      6. time
        (Optional) Unit used to display time values. milliseconds by default.
      7. v
        (Optional, Boolean) If true, the response includes column headings. Defaults to false.
    5. Currently info for all stages is present in SegmentReplicationState’s timingData object. For each of the above stages we have a long value for time taken in each stage which is in milliseconds (uses ReplicationTimer for all stages).
    6. This records timing values for each single replication event, at end of this event (at ReplicationDone), we need to persist these values in order to surface them later on.
    7. STAGES: DONE((byte) 0),
      INIT((byte) 1),
      REPLICATING((byte) 2),
      GET_CHECKPOINT_INFO((byte) 3),
      FILE_DIFF((byte) 4),
      GET_FILES((byte) 5),
      FINALIZE_REPLICATION((byte) 6),
      CANCELLED((byte) 7);
  2. The segment_replication api can be extended to include information of data sent across for each replication event once we have implemented the timing data. Currently, we are not tracking this as part of replication state, it is only accessed as part of the getSegmentFiles call. This would include :
    1. files | f | number of files to replicate
    2. files_recovered | fr | files replicated
    3. files_percent | fp | percent of files replicated
    4. files_total | tf | total number of files
    5. bytes | b | number of bytes to replicate
    6. bytes_recovered | br | bytes replicated
    7. bytes_percent | bp | percent of bytes replicated
    8. bytes_total | tb | total number of bytes
  3. Index stats api is the right place for high-level aggregation and statistics, holding index level statistics and accumulated values across replicas and primaries. This makes it the right place to hold information like refresh latency (time index is searchable across all shards).
    This would also be the right place to include aggregated timing data of all past replication events.
  4. GET /index-name/_segment_replication/ - more robust detailed api that is meant for production/application usage.

@Rishikesh1159
Copy link
Member Author

Rishikesh1159 commented Jan 5, 2023

Thanks @Poojita-Raj for putting up the design. I started working on point '1.' of creating a GET /_cat/segment_replication/ API in this PR. I have made few changes while implementing point '1.' of comment:

-> Instead of persisting data of each segment replication event in a shard we will be storing only the latest segment replication event happened on the shard. So the result of GET /_cat/segment_replication/ API call would be metrics of only last segment replication event happened on each shard. [Later when needed for benchmarks we will use telemetry device that will call this API on an interval basis and calculate P90,P99.]

-> We will use Replication id which will help us to identify segment replication event on a shard.

For now these two are only changes made during implementation. Rest of design is same as mentioned here.

With Completion of point "1." in comment by PR the result of GET /_cat/segment_replication/ API call would look something like this:

{
  INDEX_NAME : {
    "shards" : [
      {
        INDEX_NAME : ,
        SHARD_ID : ,
        REPLICATION_ID : ,
        OVERALL_TIME : ,
        START_TIME : ,
        START_TIME_MILLIS : ,
        STOP_TIME : ,
        STOP_TIME_MILLIS : ,
        STAGE : ,
        SOURCE_HOST : ,
        SOURCE_NODE : ,
        TARGET_HOST: ,
        
      }
    ]
  }
}

@mch2
Copy link
Member

mch2 commented Jan 6, 2023

@Rishikesh1159 This looks good, can we also add total bytes?

@Rishikesh1159
Copy link
Member Author

Rishikesh1159 commented Jan 9, 2023

-> In latest commit of PR, I have added total bytes transfered in a single segment replication event metric.

-> First two points of the initial design proposal have been implemented.

-> Sample output/response of segment_replication API:
~$ curl -X GET "localhost:9200/_cat/segment_replication?pretty"

{
  "test1" : {
    "shards" : [
      {
        "id" : 0,
        "stage" : "DONE",
        "replication_id" : 4,
        "number_of_segment_replication_events" : 3,
        "start_time_in_millis" : 1673279271464,
        "stop_time_in_millis" : 1673279271474,
        "total_time_in_millis" : 10,
        "source" : {
          "id" : "iQgovTKeSeS5Ahjbqz7B0Q",
          "host" : "127.0.0.1",
          "transport_address" : "127.0.0.1:9301",
          "ip" : "127.0.0.1",
          "name" : "runTask-1"
        },
        "target" : {
          "id" : "DDpK8D2CRResK9YZQkgRvA",
          "host" : "127.0.0.1",
          "transport_address" : "127.0.0.1:9302",
          "ip" : "127.0.0.1",
          "name" : "runTask-2"
        },
        "replicating_stage" : {
          "total_time_in_millis" : 0
        },
        "get_checkpoint_info_stage" : {
          "total_time_in_millis" : 1
        },
        "file_diff_stage" : {
          "size_of_files_in_bytes" : 3664,
          "total_time_in_millis" : 1
        },
        "get_files_stage" : {
          "size" : {
            "total_in_bytes" : 3664,
            "reused_in_bytes" : 0,
            "recovered_in_bytes" : 3664,
            "percent" : "100.0%"
          },
          "files" : {
            "total" : 3,
            "reused" : 0,
            "recovered" : 3,
            "percent" : "100.0%"
          },
          "total_time_in_millis" : 0,
          "source_throttle_time_in_millis" : 0,
          "target_throttle_time_in_millis" : 0,
          "total_get_files_stage_in_millis" : 3
        },
        "finalize_replication_stage" : {
          "total_time_in_millis" : 3
        }
      },
      {
        "id" : 0,
        "stage" : "DONE",
        "replication_id" : 3,
        "number_of_segment_replication_events" : 2,
        "start_time_in_millis" : 1673279271467,
        "stop_time_in_millis" : 1673279271474,
        "total_time_in_millis" : 7,
        "source" : {
          "id" : "iQgovTKeSeS5Ahjbqz7B0Q",
          "host" : "127.0.0.1",
          "transport_address" : "127.0.0.1:9301",
          "ip" : "127.0.0.1",
          "name" : "runTask-1"
        },
        "target" : {
          "id" : "_HU9AI4-QLexkD0LK7nM_A",
          "host" : "127.0.0.1",
          "transport_address" : "127.0.0.1:9300",
          "ip" : "127.0.0.1",
          "name" : "runTask-0"
        },
        "replicating_stage" : {
          "total_time_in_millis" : 0
        },
        "get_checkpoint_info_stage" : {
          "total_time_in_millis" : 0
        },
        "file_diff_stage" : {
          "size_of_files_in_bytes" : 3664,
          "total_time_in_millis" : 0
        },
        "get_files_stage" : {
          "size" : {
            "total_in_bytes" : 3664,
            "reused_in_bytes" : 0,
            "recovered_in_bytes" : 3664,
            "percent" : "100.0%"
          },
          "files" : {
            "total" : 3,
            "reused" : 0,
            "recovered" : 3,
            "percent" : "100.0%"
          },
          "total_time_in_millis" : 0,
          "source_throttle_time_in_millis" : 0,
          "target_throttle_time_in_millis" : 0,
          "total_get_files_stage_in_millis" : 2
        },
        "finalize_replication_stage" : {
          "total_time_in_millis" : 3
        }
      }
    ]
  }
}

Testing of this new API and implementation of final two points in design are still pending. I will add these in upcoming commits to the PR

@nandi-github
Copy link

Can you consider using Random number as seed for "replication_id", it give a better feel for security.

How is the backward compatibility handled if a new variable is required in future.

@Rishikesh1159
Copy link
Member Author

Rishikesh1159 commented Jan 31, 2023

More detailed description of segment Replication API: from PR :

Detailed description of API :

Overview:

The purpose of the this API is to return metric information about ongoing and latest completed segment replication events on replica shards. This API returns metric per shard level and this API should only be called on Indices with segment replication enabled.

Paths:

GET/_cat/segment_replication

GET/_cat/segment_replication/<index>

If you want to get information for more than one index, separate the indices with commas:

GET/_cat/segment_replication/index1,index2,index3

Description:

→ cat segment_replication API returns metric information about ongoing and latest completed segment replication events.
→ Segment Replication is a process of copying segment files from primary shard to replica shards. When primary sends checkpoint to replica shards on a refresh, a new segment replication event is triggered on replica shards.
→ Segment Replication event occurs on following processes:

  • When a new replica shard is added to cluster.
  • When there are segment file changes on a primary shard refresh.
  • When recovering replica shards from primary shard using peer recovery.

Query Parameters:

  • active_only
    (Optional, Boolean) If true, the response only includes ongoing segment replications. Defaults to false.
  • detailed
    (Optional, string) If true, the response includes detailed metrics of each stage of segment replication event. Defaults to false.
  • completed_only
    (Optional, Boolean) If true, the response only includes latest completed segment replications. Defaults to false.
  • shards
    (Optional, string) Comma-separated list of shards to display.
  • format
    (Optional, string) Short version of the HTTP accept header. Valid values include JSON, YAML, etc.
  • h
    (Optional, string) Comma-separated list of column names to display.
  • help
    (Optional, Boolean) If true, the response includes help information. Defaults to false.
  • index
    (Optional, string) Comma-separated list or wildcard expression of index names used to limit the request.
  • time
    (Optional) Unit used to display time values. milliseconds by default.
  • v
    (Optional, Boolean) If true, the response includes column headings. Defaults to false.

Metric Fields:

  • index | i | idx : Name of the Index
  • shardId | s : Id of a specific shard
  • start_time | start : segment replication start time. Show up only when default=true
  • start_time_millis | start_millis : segment replication start time in epoch milliseconds. Show up only when default=true
  • stop_time | stop : ssegment replication stop time. Show up only when default=true
  • stop_time_millis | stop_millis : segment replication stop time in epoch milliseconds. Show up only when default=true
  • time | t | ti : time taken to complete segment replication event in milliseconds
  • stage | st : current stage of segment replication event.
  • source_description | sdesc : Description of source
  • target_host | thost : target host
  • target_node | tnode : target node name
  • files_fetched | ff : count to files fetched until now in segment replication event
  • files_percent | fp : percent of files fetched until now in segment replication event
  • bytes_fetched | bf : amount of bytes fetched until now in segment replication event
  • bytes_percent| bp : percent of bytes fetched until now in segment replication event

All metrics mentioned below will present in response only when query parameter detailed=true

  • files | f : count of files that needs to be fetched in a segment replication event
  • files_total | tf : total number of files that are part of this recovery, both re-used and recovered
  • bytes | b : amount of bytes that needs to be fetched in a segment replication event
  • bytes_total | tb : total number of bytes in the shard
  • replicating_stage_time_taken | rstt : Time taken to complete “replicating” stage of segment replication event
  • get_checkpoint_info_stage_time_taken | gcistt : Time taken to complete “get checkpoint info” stage of segment replication event
  • file_diff_stage_time_taken | fdstt : Time taken to complete “file diff” stage of segment replication event
  • get_files_stage_time_taken | gfstt : Time taken to complete “get files” stage of segment replication event
  • finalize_replication_stage_time_taken | frstt : Time taken to complete “finalize replication” stage of segment replication event

Example Response of API:

→ Sample response with no ongoing segment replication events:

$ curl -X GET "localhost:9200/_cat/segment_replication?v=true"

index shardId time stage source_description target_host target_node files_fetched files_percent bytes_fetched bytes_percent
index1 0 19ms done runTask-1 127.0.0.1 runTask-2 3 100.0% 3661 100.0%

→ Sample response with query parameter shards=0, which limits response to only specific shards with ID as 0:

curl -X GET "localhost:9200/_cat/segment_replication?v=true&shards=0"

index shardId time stage source_description target_host target_node files_fetched files_percent bytes_fetched bytes_percent
index2 0 8ms done runTask-1 127.0.0.1 runTask-2 3 100.0% 3661 100.0%
index1 0 19ms done runTask-1 127.0.0.1 runTask-2 3 100.0% 3661 100.0%

→ Sample response with query parameter detailed=true, which gives more detailed information each stage of segment replication event:

curl -X GET "localhost:9200/_cat/segment_replication?v=true&detailed=true"

index shardId time stage source_description target_host target_node files_fetched files_percent bytes_fetched bytes_percent files files_total bytes bytes_total replicating_stage_time_taken get_checkpoint_info_stage_time_taken file_diff_stage_time_taken get_files_stage_time_taken finalize_replication_stage_time_taken
index2 0 21ms done runTask-1 127.0.0.1 runTask-2 3 100.0% 3661 100.0% 3 3 3661 3661 0s 2ms 0s 4ms 14ms
index1 0 9ms done runTask-1 127.0.0.1 runTask-2 3 100.0% 3661 100.0% 3 3 3661 3661 0s 2ms 0s 3ms 3ms

@mch2
Copy link
Member

mch2 commented Feb 1, 2023

@Rishikesh1159 Thanks for writing this up! A few nits:

start_time | start : segment replication start time. Show up only when default=true
start_time_millis | start_millis : segment replication start time in epoch milliseconds. Show up only when default=true
stop_time | stop : ssegment replication stop time. Show up only when default=true
stop_time_millis | stop_millis : segment replication stop time in epoch milliseconds. Show up only when default=true

Not sure what you mean here by default=true?

replication_id : Id of the ongoing/completed segment replication event I don't know that this is useful for this API given we are only showing ongoing & most recently completed replication events. Thoughts? Also on that note, let's clarify that this API returns only the most recently completed event.

files_total | tf : total number of files that are part of this recovery, both re-used and recovered

nit - replication

@Rishikesh1159
Copy link
Member Author

@mch2 initially default is false for few metrics start_time, start_time_millis, stop_time, stop_time_millis. So these metrics don't show up in response. If we put default=true in query parameters of API call then only we get these metrics. It is similar to having detailed=true.

For replication_id I also don't see any use as of now, but we might need it in future so I included it. But yeah I can totally remove this from response. My main thought process behind including a replication_id was to uniquely identify each segment replication event in a shard. But as we are only returning only most recently completed event, it won't be of any use then.

Yes I will clarify in documentation that this API returns only the most recently completed event.

@Rishikesh1159 Rishikesh1159 changed the title [Segment Replication] Update existing API to fetch existing metrics related to segment replication. [Segment Replication] Introduce new API to fetch existing metrics related to segment replication. Feb 3, 2023
@mch2
Copy link
Member

mch2 commented Feb 9, 2023

@Rishikesh1159 Closing this issue, lets cut smaller issues to track enhancements to this API.

@Rishikesh1159
Copy link
Member Author

Updated above comment with latest additional changes to API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request
Projects
Status: Done
Development

No branches or pull requests

4 participants