Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert ReproSchema2RedCap from js to py #31

Merged
merged 18 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,39 @@ convert
redcap2reproschema Convert REDCap CSV files to Reproschema format.
serve
validate
reproschema2redcap
```

## `reproschema2redcap` Usage

### Command-Line Usage

You can use this feature directly from the command line. To convert ReproSchema protocol to REDCap CSV format, use the following command

```
reproschema reproschema2redcap <input_dir_path> <output_csv_filename>
```

- `<input_dir_path>`: The path to the root folder of a protocol. For example, to convert the reproschema-demo-protocol provided by ReproNim, you can use the following commands:
```bash
git clone https://github.com/ReproNim/reproschema-demo-protocol.git
cd reproschema-demo-protocol
pwd
```
In this case, the output from `pwd` (which shows your current directory path)should be your `<input_dir_path>`.
- `<output_csv_filename>`: The name of the output CSV file where the converted data will be saved.

### Python Function Usage

You can also use the `reproschema2redcap` function from the `reproschema-py` package in your Python code.

```python
from reproschema import reproschema2redcap

input_dir_path = "path-to/reproschema-demo-protocol"
output_csv_filename = "output.csv"

reproschema2redcap(input_dir_path, output_csv_filename)
```

## redcap2reproschema Usage
Expand Down
17 changes: 17 additions & 0 deletions reproschema/cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import click
from pathlib import Path

from . import get_logger, set_logger_level
from . import __version__
from .redcap2reproschema import redcap2reproschema as redcap2rs
from .reproschema2redcap import main as rs2redcap

lgr = get_logger()

Expand Down Expand Up @@ -110,3 +112,18 @@ def redcap2reproschema(csv_path, yaml_path):
click.echo("Converted REDCap data dictionary to Reproschema format.")
except Exception as e:
raise click.ClickException(f"Error during conversion: {e}")


@main.command()
@click.argument("input_path", type=click.Path(exists=True, dir_okay=True))
@click.argument("output_csv_path", type=click.Path(writable=True))
def reproschema2redcap(input_path, output_csv_path):
"""
Convert reproschema protocol to Redcap CSV format.
"""
# Convert input_path to a Path object
input_path_obj = Path(input_path)
rs2redcap(input_path_obj, output_csv_path)
click.echo(
f"Converted reproschema protocol from {input_path} to Redcap CSV at {output_csv_path}"
)
184 changes: 184 additions & 0 deletions reproschema/reproschema2redcap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import sys
import json
import csv
from pathlib import Path


def read_json_file(file_path):
try:
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
except Exception as e:
print(f"Error reading file {file_path}: {e}")
return None


def find_Ftype_and_colH(item_json, row_data):
"""
Find the field type and column header based on the given item_json.

Args:
item_json (dict): The JSON object containing the item information.
row_data (dict): The row data dictionary.

Returns:
dict: The updated row data dictionary with field type and column header.

"""
# Extract the input type from the item_json
f_type = item_json.get("ui", {}).get("inputType", "")
col_h = ""

# Check the input type and update the field type and column header accordingly
if f_type == "integer":
f_type = "text"
col_h = "number"
elif f_type == "select":
f_type = "dropdown"
elif f_type == "date":
f_type = "text"
col_h = "ddate_mdy"

# Update the row_data dictionary with the field type
row_data["field_type"] = f_type

# Update the row_data dictionary with the column header if available
if col_h:
row_data["val_type_OR_slider"] = col_h

return row_data


def process_item(item_json, activity_name):
"""
Process an item in JSON format and extract relevant information into a dictionary.

Args:
item_json (dict): The JSON object representing the item.
activity_name (str): The name of the activity.

Returns:
dict: A dictionary containing the extracted information.
"""
row_data = {}

# Extract min and max values from response options, if available
response_options = item_json.get("responseOptions", {})
row_data["val_min"] = response_options.get("schema:minValue", "")
row_data["val_max"] = response_options.get("schema:maxValue", "")

choices = response_options.get("choices")
if choices:
if isinstance(choices, list):
# Extract choice values and names, and join them with a '|'
item_choices = [
f"{ch.get('schema:value', ch.get('value', ''))}, {ch.get('schema:name', ch.get('name', ''))}"
for ch in choices
]
row_data["choices"] = " | ".join(item_choices)
elif isinstance(choices, str):
row_data["choices"] = choices
else:
row_data["choices"] = ""

row_data["required"] = response_options.get("requiredValue", "")

row_data["field_notes"] = item_json.get("skos:altLabel", "")

row_data["var_name"] = item_json.get("@id", "")
row_data["activity"] = activity_name

question = item_json.get("question")
if isinstance(question, dict):
row_data["field_label"] = question.get("en", "")
elif isinstance(question, str):
row_data["field_label"] = question
else:
row_data["field_label"] = ""

# Call helper function to find Ftype and colH values and update row_data
row_data = find_Ftype_and_colH(item_json, row_data)

return row_data


def get_csv_data(dir_path):
csv_data = []

# Iterate over directories in dir_path
for protocol_dir in dir_path.iterdir():
if protocol_dir.is_dir():
# Check for a _schema file in each directory
schema_file = next(protocol_dir.glob("*_schema"), None)
if schema_file:
# Process the found _schema file
parsed_protocol_json = read_json_file(schema_file)

activity_order = parsed_protocol_json.get("ui", {}).get("order", [])
for relative_activity_path in activity_order:
# Normalize the relative path and construct the absolute path
normalized_relative_path = Path(
relative_activity_path.lstrip("../")
)
activity_path = dir_path / normalized_relative_path
print(f"Processing activity {activity_path}")
parsed_activity_json = read_json_file(activity_path)

if parsed_activity_json:
item_order = parsed_activity_json.get("ui", {}).get("order", [])
for item in item_order:
item_path = activity_path.parent / item
item_json = read_json_file(item_path)
if item_json:
row_data = process_item(item_json, activity_path.stem)
csv_data.append(row_data)

# Break after finding the first _schema file
break

return csv_data


def write_to_csv(csv_data, output_csv_filename):
# Define the headers for the CSV file as per the JavaScript file
headers = [
"var_name",
"activity",
"section",
"field_type",
"field_label",
"choices",
"field_notes",
"val_type_OR_slider",
"val_min",
"val_max",
"identifier",
"visibility",
"required",
]

# Writing to the CSV file
with open(output_csv_filename, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
for row in csv_data:
writer.writerow(row)

print("The CSV file was written successfully")


def main(input_dir_path, output_csv_filename):
csv_data = get_csv_data(input_dir_path)
write_to_csv(csv_data, output_csv_filename)


if __name__ == "__main__":
# check if input_dir_path and output_csv_filename are provided
if len(sys.argv) < 3:
print(
"Usage: python reproschema2redcap.py <input_dir_path> <output_csv_filename>"
)
sys.exit(1)
input_dir_path = Path(sys.argv[1])
output_csv_filename = sys.argv[2]
main(input_dir_path, output_csv_filename)
47 changes: 47 additions & 0 deletions reproschema/tests/test_reproschema2redcap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import pytest
from click.testing import CliRunner
from ..cli import main
from shutil import copytree
from pathlib import Path
import csv


def test_reproschema2redcap_success():
runner = CliRunner()

with runner.isolated_filesystem():
# Copy necessary test data into the isolated filesystem
original_data_dir = os.path.join(
os.path.dirname(__file__), "test_rs2redcap_data"
)
copytree(original_data_dir, "input_data")

input_path = Path("input_data") # Using Path object
output_csv_path = "output.csv"

# Invoke the reproschema2redcap command
result = runner.invoke(
main, ["reproschema2redcap", str(input_path), output_csv_path]
)

# Print the output for debugging
print(result.output)

# Assert the expected outcomes
assert result.exit_code == 0

# Check if the output CSV file has been created
assert os.path.exists(output_csv_path)

# Read and print the contents of the CSV file
with open(output_csv_path, "r", encoding="utf-8") as csv_file:
reader = csv.reader(csv_file)
csv_contents = list(reader)
print("CSV File Contents:")
for row in csv_contents:
print(row)

# Optionally, assert conditions about the CSV contents
# For example, assert that the file is not empty
assert len(csv_contents) > 0
Loading
Loading