-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_functions.py
264 lines (202 loc) · 11.5 KB
/
data_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
########################################
# @author: HALISSON SOUZA GOMIDES
# halisson.gomides@gmail.com
# ver: 1.0 21/05/2021
########################################
import pandas as pd
import numpy as np
import asyncio
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import time
# --------------------------------------------------------------------------------------------
# Função de carga de dados
# --------------------------------------------------------------------------------------------
def load_data(data_url: str, tipo: str = 'csv', date_f: list = [], **kwargs) -> 'DataFrame':
'''
Função para carregar um conjunto de dados
:rtype: pd.Dataframe
:param data_url: caminho completo do arquivo a ser carregado
:param tipo: tipo de arquivo: csv|xls|xlsx|json etc
:param date_f: lista com os nomes dos campos a ser convertidos para datetime
:param kwargs: argumentos especificos para a carga do arquivo, conforme o parametro 'tipo'
:return: Pandas DataFrame
'''
if tipo == 'csv':
if 'chunksize' in kwargs:
kwargs['iterator'] = True
data = pd.DataFrame()
_df_chunked = pd.read_csv(data_url, **kwargs)
for _df in _df_chunked:
data = data.append(_df)
else:
data = pd.read_csv(data_url, **kwargs)
elif tipo == 'xls' or tipo == 'xlsx':
data = pd.read_excel(data_url, **kwargs)
elif tipo == 'json':
import json
with open(data_url) as f:
data = json.load(f)
return data
else:
if 'chunksize' in kwargs:
kwargs['iterator'] = True
data = pd.DataFrame()
_df_chunked = pd.read_table(data_url, **kwargs)
for _df in _df_chunked:
data = data.append(_df)
else:
data = pd.read_table(data_url, **kwargs)
for dt_field in date_f:
data[dt_field] = pd.to_datetime(data[dt_field])
return data
# --------------------------------------------------------------------------------------------
# Funções de Transformação dos dados
# --------------------------------------------------------------------------------------------
async def transform_dfbr(df, cols: list = []) -> 'DataFrame':
'''
Função para transformar o DataFrame df_br, filtra os dados para o Brasil, seleciona colunas especificadas
e adiciona novas colunas
:param df: dataframe com dados sobre a covid do Brasil
:param cols: lista com os nomes das colunas que deverão conter no DataFrame de retorno. Não sendo informado,
retorna todas as colunas do DataFrame.
:return: Pandas DataFrame
'''
colunas = cols if len(cols) > 0 else df.columns.to_list()
# Filtra os dados para o Brasil e seleciona colunas específicas
_df_BR = df.query("state == 'TOTAL'")[colunas]
# cria novas colunas
_df_BR['activeCases'] = _df_BR['totalCases'] - _df_BR['deaths'] - _df_BR['recovered']
_df_BR['activeCasesMS'] = _df_BR['totalCasesMS'] - _df_BR['deathsMS'] - _df_BR['recovered']
_df_BR['activeCasesDiff'] = _df_BR['activeCases'] - _df_BR['activeCasesMS']
_df_BR['deathsDiff'] = _df_BR['deaths'] - _df_BR['deathsMS']
_df_BR['newVaccinated'] = _df_BR['vaccinated'].diff()
_df_BR['newVaccinated_second'] = _df_BR['vaccinated_second'].diff()
return _df_BR
async def transform_popuf(df_munic) -> 'DataFrame':
'''
Função para transformar o DataFrame df_popmunic, trata o dado de POPULAÇÃO ESTIMADA e retonar outro dataframe
agregado por UF
:param df_munic: Dicionario de DataFrames com dados populacionais dos municípios e por UF
:return: Pandas DataFrame
'''
# ---- Trata o DataFrame de População do Brasil e UFs ----
# Exclui uma coluna vazia e as linhas que contém a string 'Brasil' e 'Região'
pop_uf = df_munic['BRASIL E UFs'].drop(columns='Unnamed: 1').drop(
np.where(df_munic['BRASIL E UFs'].iloc[:, 0].str.contains('Brasil|Região', case=False))[0]
).reset_index(drop=True)
# Trata os valores populacionais que contém referências entre ()
pop_uf['POPULAÇÃO ESTIMADA'] = pop_uf['POPULAÇÃO ESTIMADA'].apply(
lambda x: int(x.split('(')[0].replace('.', '')) if isinstance(x, str) else x)
# Renomeia as colunas que sobraram
pop_uf.rename(columns={'BRASIL E UNIDADES DA FEDERAÇÃO': 'NM_UF', 'POPULAÇÃO ESTIMADA': 'POPULACAO'}, inplace=True)
# ---- Trata o DataFrame de População de Municípios ----
_pop_munic = df_munic['Municípios'].drop(index=df_munic['Municípios'].index[-9:])
_pop_munic['POPULAÇÃO ESTIMADA'] = _pop_munic['POPULAÇÃO ESTIMADA'].apply(lambda x: str(x).split('(')[0])
_pop_munic['POPULAÇÃO ESTIMADA'] = _pop_munic['POPULAÇÃO ESTIMADA'].astype(int)
# Adciona a sigla da UF no DataFrame pop_uf baseado na populacao do DataFrame _pop_munic
pop_uf['UF'] = pop_uf['POPULACAO'].map(_pop_munic[['UF', 'POPULAÇÃO ESTIMADA']].groupby('UF')
.sum().reset_index().set_index('POPULAÇÃO ESTIMADA')['UF'])
return pop_uf
async def transform_dfcities(df_cities, df_gps_cities) -> 'DataFrame':
'''
Função para transformar o DataFrame df_cities, acrescentando informações de latitude e longitude a partir do df_gps_cities
:param df_cities: dataframe com dados de covid por município
:param df_gps_cities: dataframe com dados de coordenadas geográficas dos municípios
:return: Pandas DataFrame
'''
# filtra pela data mais recente
_df = df_cities.query('date == @df_cities.date.max()').copy()
# removendo as linhas cujo campo ibgeID está faltando
_df_gps = df_gps_cities.dropna(subset=['ibgeID']).copy()
# convertendo o tipo da coluna ibeID do df_gps_cities para o mesmo tipo da coluna ibgeID do df_cities
_df_gps.loc[:, 'ibgeID'] = _df_gps['ibgeID'].astype(int)
# definindo as colunas 'lat' e 'lon' no df_cities com base no 'ibgeID' do df_gps_cities
_df['lat'] = _df.loc[:, 'ibgeID'].map(_df_gps.set_index('ibgeID').loc[:, 'lat'])
_df['lon'] = _df.loc[:, 'ibgeID'].map(_df_gps.set_index('ibgeID').loc[:, 'lon'])
return _df
async def transform_dfuf(df, df_popuf) -> 'DataFrame':
'''
Função para transformar o DataFrame df_br, filtra os dados por UF e adiciona nova coluna de percentual
da população vacinada de cada UF com base na informação de população do df_popuf
:param df: dataframe com dados sobre a covid do Brasil
:param df_popuf: dataframe com dados sobre a população estimada por UF
:return: Pandas DataFrame
'''
# Filtra os dados para as UFs e para a data mais recente
_df_UF = df.query("state != 'TOTAL' and date == @df['date'].max()").copy()
# cria novas colunas
_df_UF['perc_vac'] = (_df_UF.loc[:, 'vaccinated'] / _df_UF.loc[:, 'state'].map(
df_popuf.set_index('UF').loc[:, 'POPULACAO'])) * 100
_df_UF['NM_UF'] = _df_UF.loc[:, 'state'].map(df_popuf.set_index('UF').loc[:, 'NM_UF'])
# Definindo faixa de valores de população vacinada por UF
limite_inferior = int(round(_df_UF['perc_vac'].min(), 0))
limite_superior = int(round(_df_UF['perc_vac'].max(), 0))
cut_bins = np.linspace(limite_inferior - 2, limite_superior + 2, num=5)
cut_bins = np.ceil(cut_bins).astype(int)
cut_labels = [f'{cut_bins[i]}-{cut_bins[i + 1]}%' if i + 2 < len(cut_bins) else f'> {cut_bins[i]}%' for i in
range(0, len(cut_bins) - 1)]
_df_UF['faixa_perc'] = pd.cut(
_df_UF['perc_vac'],
bins=cut_bins,
labels=cut_labels,
)
return _df_UF
# --------------------------------------------------------------------------------------------
# Função para retornar os dataframes carregados e transformados
# --------------------------------------------------------------------------------------------
async def fetch_dataframes(url_br, url_cities, url_popmunic, url_gpscities, url_geojson_br, chunk_size, logger):
'''
:param url_br: caminho do conjunto de dados sobre a covid referente ao Brasil e aos estados brasileiros
:param url_cities: caminho do conjunto de dados sobre a covid nos municípios brasileiros
:param url_popmunic: caminho do conjunto de dados sobre a população estimada dos municípios brasileiros
:param url_gpscities: caminho do conjunto de dados contendo informações geográficas de latitude e longitude dos municípios brasileiros
:param url_geojson_br: caminho do arquivo geojson do mapa geográfico brasileiro, dividido por UF
:param chunk_size: quantidade de linhas por parte, usado para particionar o carregamento de DataFrames muito grandes
:param logger: biblioteca para registrar os passos dos processamentos dos DatFrames
:return: df_br, -> DataFrame com dados de covid no Brasil transformado
df_cities, -> DataFrame com dados de covid nos municípios brasileiros transformado
df_popuf, -> DataFrame com dados populacionais por UF
df_uf -> DataFrame com dados de covid agregados por UF e com percentual de vacinados por UF
'''
# Dicionário cuja chave é o nome do Dataframe a ser carregado e o valor são os parâmetros a serem
# passados para a função que irá carregar o DataFrame - load_data
D_ARGS = {
'df_br': dict(data_url=url_br, date_f=['date']),
'df_cities': dict(data_url=url_cities, date_f=['date'], compression='gzip', chunksize=chunk_size),
'df_popmunic': dict(data_url=url_popmunic, tipo='xls', sheet_name=['BRASIL E UFs', 'Municípios'], skiprows=1,
skipfooter=7),
'df_gpscities': dict(data_url=url_gpscities),
'gj_br': dict(data_url=url_geojson_br, tipo='json'),
}
# Inicializa o Dicionário que irá conter os DataFrames carregados
datasets = {}
logger.info('Iniciando carga de dados...')
# Marca o tempo de início da execução da carga
start = time.perf_counter()
# Execução paralela da carga de dados
with ThreadPoolExecutor() as executor:
# Inicia as tarefas de carga e atribui a cada tarefa o nome da chave correspondente a cada conjunto de dados
future_data_loader = {executor.submit(load_data, **valor): chave for chave, valor in D_ARGS.items()}
for task in as_completed(future_data_loader):
try:
datasets[future_data_loader[task]] = task.result()
except Exception as exc:
logger.error(f'{future_data_loader[task]} generated an exception: {exc}')
else:
logger.info(
f'dataset {future_data_loader[task]} carregado - {len(datasets[future_data_loader[task]])} linhas')
logger.info(f'Carga de dados finalizada. Tempo de execução: {time.perf_counter() - start} ')
logger.info('Iniciando tratamento dos dados...')
# Marca o tempo de início da transformação dos dados
start = time.perf_counter()
df_br, df_cities, df_popuf = await asyncio.gather(
transform_dfbr(datasets['df_br'],
cols=['date', 'state', 'newDeaths', 'deaths', 'deathsMS', 'newCases', 'totalCases',
'totalCasesMS', 'recovered', 'tests', 'vaccinated', 'vaccinated_second']),
transform_dfcities(datasets['df_cities'], datasets['df_gpscities']),
transform_popuf(datasets['df_popmunic']),
)
df_uf = await transform_dfuf(datasets['df_br'], df_popuf)
logger.info(f'Tratamento os dados finalizada. Tempo de execução: {time.perf_counter() - start} ')
return df_br, df_cities, df_popuf, df_uf, datasets['gj_br']