-
Notifications
You must be signed in to change notification settings - Fork 958
Building a new parser
A parser is a python script that defines one or more functions that fetch data for a particular zone and/or exchange.
A parser can be built if there is a public URL that contains electricity generation data for a specific zone. See the Technical Requirements for Parsers to verify that the source you have found does indeed fit these requirements.
If you're looking to contribute, but don't have a specific zone in mind, you can take a look at the Missing Countries Overview which has information on where we think a parser might be buildable.
In 2023, we implemented a new data structure to be returned by the parsers. The aim of this new structure is to make sure that all parsed data is compliant with the expected format. This prevents errors further down the pipeline.
A parser should return an EventList
object. An Event object represents any kind of grid events that we are capable of processing. The backend will automatically update past values properly.
The Event
class is abstract and is used to create the following sub-classes representing the different data kind we accept at the moment:
- Exchange
- TotalProduction
- ProductionBreakdown
- TotalConsumption
- Price
In cases where the parser function collects data for more than one datetime, the data should be stored in the following objects:
- ExchangeList
- TotalProductionList
- ProductionBreakdownList
- TotalConsumptionList
- PriceList
Each event has a sourceType
attribute:
- measured
- estimated
- forecasted
By default, the sourceType
is set to measured
and this attribute doesn't need to be set. For forecasted data (ie. consumption forecast), the sourceType
needs to be set to forecasted
at the creation of the event.
Each event represents a data point that provides the value for a given datetime. The different Event
objects are found here The expected format for a production parser event follows:
ProductionBreakdown.create(
logger=logger,
datetime=datetime(2023,5,8,16,34, tzinfo=ZoneInfo('timezone'),
zoneKey=ZoneKey('SOME_ZONE'),
source='mysource.com’
production=ProductionMix(wind=7, solar=8, hydro=10)
)
The expected format for a consumption parser event follows:
TotalConsumption(
datetime=datetime(2023,5,8,16,34, tzinfo=ZoneInfo('timezone'),
zoneKey=ZoneKey('SOME_ZONE'),
source='mysource.com’
consumption= 1234.56
)
A zone key identifier is required for each parser event. This identifier needs to be included in ZONES_CONFIG
or EXCHANGES_CONFIG
.
The zone key object can be imported from config
.
from electricitymap.contrib.config import ZoneKey
ProductionBreakdown(zoneKey=ZoneKey('DE'), ***values)
and for exchange parsers, the zone identifier are sorted zone keys:
Exchange(zoneKey=ZoneKey('DE->FR'), ***values)
For historical reasons we need for now to convert the EventList object to a list of plain dictionaries. This is done by using the method to_list()
from the EventList
class. This requirement will be removed at a later stage, once all parsers are using the same structure.
return ProductionBreakdownList(logger).to_list()
All timezones should either be a zoneinfo.ZoneInfo
object or datetime.timezone.utc
if the returned datetime is UTC, we no longer accept parsers that use pytz or arrow for timezones.
These can imported like this:
from datetime import timezone.utc
from zoneinfo import ZoneInfo
Then they can be used like the following if the data has a specific timezone:
Price(
datetime=datetime.fromisoformat("2023-11-13T10:30").replace(tzinfo=ZoneInfo('Europe/Copenhagen')),
**otherValues
)
or like this if the time is UTC:
Price(
datetime=datetime.fromisoformat("2023-11-13T10:30").replace(tzinfo=timezone.utc),
**otherValues
)
All parsers must contain the following arguments:
- zone key information.
zone_key
if the parser only fetches data for a single zone.zone_key1
andzone_key2
if the parser fetches data for an exchange. -
session
: a python request session that you can re-use to make HTTP requests. -
target_datetime
: used to fetch historical data when available. -
logger
: alogging.Logger
whose output is publicly available for anyone to monitor correct functioning of the parsers.
To help you get started, you can find a parser template here as well as an example to build a fetch_production
function.
See below for complete signatures
Return the consumption at the current time as a TotalConsumption
object if only one data point is available or a TotalConsumptionList
if the function parses data for multiple datetimes.
The consumption
values (MW) should never be negative.
def fetch_consumption(
zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
session = session or Session()
all_consumption_events = # all events with a datetime and a consumption value
consumption_list = TotalConsumptionList(logger)
for event in all_consumption_events:
consumption_list.append(
zoneKey=zone_key,
datetime=event.datetime,
consumption=event.consumption_value,
source='mysource.com'
)
return consumption_list.to_list()
Return the price per MWh at the current time as a Price
object if only one data point is available or a PriceList
if the function parses data for multiple datetimes.
The currency
values should be a three letter string representing the currency of the price
value. View the code-symbol mapping from the currency-symbol-map node package.
price
values is price per MWh and can be both positive and negative. It should, when possible, represent the day-ahead prices of the zone.
def fetch_price(
zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
session = session or Session()
all_price_events = # all events with a datetime and a price value
price_list = PriceList(logger)
for event in all_price_events:
price_list.append(
zoneKey=zone_key,
datetime=event.datetime,
currency='EUR',
price=event.price_value,
source='mysource.com'
)
return price_list.to_list()
Here, the Event kind is ProductionBreakdown
as the function returns the production mix at the current time. It can either be a ProductionBreakdown
with the below fields if there is only one data point available, or a ProductionBreakdownList
if multiple time values can be fetched.
The production
values (MW) should never be negative. Use None
, or omit the key if a specific production mode is not known.
storage
values can be both positive (when storing energy) or negative (when the storage is discharged).
def fetch_production(
zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
session = session or Session()
all_production_events = # all events with a datetime and a production breakdown and/or a storage breakdown
production_list = ProductionBreakdownList(logger)
for event in all_production_events:
production_list.append(
zoneKey=zone_key,
datetime=event.datetime,
production=event.production_mix,
storage=event.storage_mix,
source='mysource.com'
)
return production_list.to_list()
The function returns a TotalConsumption
object. If there are multiple datapoints available, the function should return a TotalConsumptionList
.
The sourceType
is EventSourceType.forecasted
.
def fetch_consumption_forecast(
zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
session = session or Session()
all_consumption_events = # all events with a datetime and a consumption value
consumption_list = TotalConsumptionList(logger)
for event in all_consumption_events:
consumption_list.append(
zoneKey=zone_key,
datetime=event.datetime,
consumption=event.consumption_value,
source='mysource.com',
sourceType=EventSourceType.forecasted
)
return consumption_list.to_list()
Returns data for total generation. The Event kind is TotalProduction
as no power breakdown is available. If there are multiple events available, the function should return a TotalProductionList
.
The sourceType
is EventSourceType.forecasted
def fetch_generation_forecast(
zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
session = session or Session()
all_generation_events = # all events with a datetime and a generation value
generation_list = TotalProductionList(logger)
for event in all_generation_events:
consumption_list.append(
zoneKey=zone_key,
datetime=event.datetime,
value=event.generation_value,
source='mysource.com',
sourceType=EventSourceType.forecasted
)
return generation_list.to_list()
In this case, as the function collects forecasted production by mode, it returns a ProductionBreakdown
object. If there are multiple datapoints available, the function should return a ProductionBreakdownList
.
As the data is forecasted, the sourceType
is EventSourceType.forecasted
def fetch_wind_solar_forecasts(
zone_key: ZoneKey = ZoneKey('SOME_ZONE'),
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
session = session or Session()
all_production_events = # all events with a datetime and a production breakdown
production_list = ProductionBreakdownList(logger)
for event in all_production_events:
production_list.append(
zoneKey=ZoneKey(zone_key),
datetime=event.datetime,
production=event.production_mix,
source='mysource.com',
sourceType=EventSourceType.forecasted
)
return production_list.to_list()
Return the cross-border flow at the current time as an Exchange
or an ExchangeList
if data is available for multiple datetimes.
The sortedZoneKeys
value should be a string in the format zone_keyA->zone_keyB deciding which to put first based on alphabetical order.
The netFlow
value can be positive or negative, dictating the direction of the flow. Respecting the alphabetical sort of the zone keys, a positive value means the first zone is exporting to the second zone while a negative value means the first zone is importing from the second zone.
def fetch_exchange(
zone_key1: ZoneKey,
zone_key2: ZoneKey,
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
session = session or Session()
sortedZoneKeys = '->'.join(sorted([zone_key1, zone_key2]))
all_exchange_events = # all events with a datetime and an exchange value
exchange_list = ExchangeList(logger)
for event in all_exchange_events:
exchange_list.append(
zoneKey=ZoneKey(sortedZoneKeys),
datetime=event.datetime,
netFlow=event.exchange_value,
source='mysource.com'
)
return exchange_list.to_list()
Return the cross-border flow at the current time as an Exchange
or an ExchangeList
if data is available for multiple datetimes.
As the data is forecasted, the sourceType
is EventSourceType.forecasted
def fetch_exchange(
zone_key1: ZoneKey,
zone_key2: ZoneKey,
session: Session | None = None,
target_datetime: datetime | None = None,
logger: Logger = getLogger(__name__),
) -> list[dict[str, Any]]:
session = session or Session()
sortedZoneKeys = '->'.join(sorted([zone_key1, zone_key2]))
all_exchange_events = # all events with a datetime and an exchange value
exchange_list = ExchangeList(logger)
for event in all_exchange_events:
exchange_list.append(
zoneKey=ZoneKey(sortedZoneKeys),
datetime=event.datetime,
netFlow=event.exchange_value,
source='mysource.com',
sourceType=EventSourceType.forecasted
)
return exchange_list.to_list()
Once you're done, add your parser to the zones.json and exchanges.json configuration files. Finally update the real-time sources.
After setting up your local development environment, you can run all of the parser tests with the following command from the root directory:
poetry run test
For more info, check out the example parser or browse existing parsers.
-
Set up your local development environment
-
Ensure dependencies are installed:
poetry install -E parsers
-
From the root folder, use the
test_parser.py
command line utility:poetry run test_parser FR price # get latest price parser for France poetry run test_parser FR # defaults to production if no data type is given # test a specific datetime (parser needs to be able to fetch past datetimes) poetry run test_parser DE --target_datetime 2018-01-01T08:00 poetry run test_parser "CH->FR" exchange # get the exchange data between Switzerland & France
Many of the tests require API keys of the data or web service providers, and therefore fail with an error message like
Exception: No ENTSOE_TOKEN found! Please add it into secrets.env!
In such cases, please browse the website related to the provider and ask for an API key. Once you get hold of the API key, make it an environment variable. This fixes the error.
Do you have a question or an idea for improvements? Open a new discussion here