Skip to content

Commit

Permalink
a22-traffic-quality: refactor, fix possible resource leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
clezag committed Feb 12, 2024
1 parent a71cf47 commit 64d9e74
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 103 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.compile.nullAnalysis.mode": "disabled"
}
1 change: 1 addition & 0 deletions traffic-a22-data-quality/src/bdplib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func GetStations(stationType string, origin string) ([]Station, error) {
if err != nil {
return nil, fmt.Errorf("error performing ninja request: %w", err)
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, errors.New("ninja request returned non-OK status: " + strconv.Itoa(res.StatusCode))
Expand Down
103 changes: 0 additions & 103 deletions traffic-a22-data-quality/src/dc/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"time"
"traffic-a22-data-quality/bdplib"
"traffic-a22-data-quality/ninja"

"golang.org/x/exp/maps"
)

type NinjaMeasurement struct {
Expand Down Expand Up @@ -181,104 +179,3 @@ func getNinjaData(stationCode string, typeName string, from time.Time, to time.T
err := ninja.History(req, res)
return res, err
}

func sumParentJob() {
req := ninja.DefaultNinjaRequest()
req.DataTypes = append(baseDataTypes, TotalType.Name)
req.Select = "tname,mvalue,pcode,stype"
req.Where = fmt.Sprintf("sorigin.eq.%s,sactive.eq.true,mperiod.eq.%d", origin, periodAggregate)
req.Limit = -1

res := &ninja.NinjaResponse[[]struct {
Tstamp ninja.NinjaTime `json:"_timestamp"`
DType string `json:"tname"`
Value uint64 `json:"mvalue"`
Parent string `json:"pcode"`
Stype string `json:"stype"`
}]{}

err := ninja.Latest(req, res)
if err != nil {
slog.Error("sumParent: Error in ninja call. aborting", "err", err)
return
}

type window = struct {
from time.Time
to time.Time
}

// parentId / datatype
parents := make(map[string]map[string]window)

// For each parent/type find out where the elaboration window starts/ends
for _, m := range res.Data {
if _, exists := parents[m.Parent]; !exists {
parents[m.Parent] = make(map[string]window)
}
t := parents[m.Parent][m.DType]
// There is only one parent record per data type
if m.Stype == parentStationType {
t.from = m.Tstamp.Time
} else {
if m.Tstamp.Time.After(t.to) {
t.to = m.Tstamp.Time
}
}
parents[m.Parent][m.DType] = t
}

for parId, types := range parents {
// Do only a single request per parent. So we determine the max window.
// Note that when a data type does not exist yet on parent station, but in a child station, the window defaults to from = 0000-00-00...
var from time.Time
var to time.Time
for _, tp := range types {
if tp.from.Before(from) {
from = tp.from
}
if tp.to.After(to) {
to = tp.to
}
}

req := ninja.DefaultNinjaRequest()
req.DataTypes = maps.Keys(types) // Any data type we found base data for
req.AddStationType(baseStationType)
req.Select = "tname,mvalue"
req.From = from
req.To = to.Add(time.Minute) // Ninja is open interval, need to get the exact timestamp, too
req.Where = fmt.Sprintf("sorigin.eq.%s,sactive.eq.true,mperiod.eq.86400,pcode.eq.%s", origin, parId)
req.Limit = -1

res := &ninja.NinjaResponse[[]struct {
Tstamp ninja.NinjaTime `json:"_timestamp"`
DType string `json:"tname"`
Value uint64 `json:"mvalue"`
}]{}

err := ninja.History(req, res)
if err != nil {
slog.Error("sumParent: Error in ninja call. aborting", "err", err)
return
}

sums := make(map[string]map[time.Time]uint64) // datatype / timestamp / sum value

// build sums per datatype and timestamp (should be full days)
for _, m := range res.Data {
if _, exists := sums[m.DType]; !exists {
sums[m.DType] = make(map[time.Time]uint64)
}
sums[m.DType][m.Tstamp.Time] = sums[m.DType][m.Tstamp.Time] + m.Value
}

recs := bdplib.DataMap{}
for dType, times := range sums {
for timestamp, value := range times {
recs.AddRecord(parId, dType, bdplib.CreateRecord(timestamp.UnixMilli(), value, periodAggregate))
}
}
bdplib.PushData(parentStationType, recs)
}
}
116 changes: 116 additions & 0 deletions traffic-a22-data-quality/src/dc/sum_parent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// SPDX-FileCopyrightText: NOI Techpark <digital@noi.bz.it>

// SPDX-License-Identifier: AGPL-3.0-or-later

package dc

import (
"fmt"
"log/slog"
"time"
"traffic-a22-data-quality/bdplib"
"traffic-a22-data-quality/ninja"

"golang.org/x/exp/maps"
)

func sumParentJob() {
req := ninja.DefaultNinjaRequest()
req.DataTypes = append(baseDataTypes, TotalType.Name)
req.Select = "tname,mvalue,pcode,stype"
req.Where = fmt.Sprintf("sorigin.eq.%s,sactive.eq.true,mperiod.eq.%d", origin, periodAggregate)
req.Limit = -1

res := &ninja.NinjaResponse[[]struct {
Tstamp ninja.NinjaTime `json:"_timestamp"`
DType string `json:"tname"`
Value uint64 `json:"mvalue"`
Parent string `json:"pcode"`
Stype string `json:"stype"`
}]{}

err := ninja.Latest(req, res)
if err != nil {
slog.Error("sumParent: Error in ninja call. aborting", "err", err)
return
}

type window = struct {
from time.Time
to time.Time
}

// parentId / datatype
parents := make(map[string]map[string]window)

// For each parent/type find out where the elaboration window starts/ends
for _, m := range res.Data {
if _, exists := parents[m.Parent]; !exists {
parents[m.Parent] = make(map[string]window)
}
t := parents[m.Parent][m.DType]
// There is only one parent record per data type
if m.Stype == parentStationType {
t.from = m.Tstamp.Time
} else {
if m.Tstamp.Time.After(t.to) {
t.to = m.Tstamp.Time
}
}
parents[m.Parent][m.DType] = t
}

for parId, types := range parents {
// Do only a single request per parent. So we determine the max window.
// Note that when a data type does not exist yet on parent station, but in a child station, the window defaults to from = 0000-00-00...
var from time.Time
var to time.Time
for _, tp := range types {
if tp.from.Before(from) {
from = tp.from
}
if tp.to.After(to) {
to = tp.to
}
}

req := ninja.DefaultNinjaRequest()
req.DataTypes = maps.Keys(types) // Any data type we found base data for
req.AddStationType(baseStationType)
req.Select = "tname,mvalue"
req.From = from
req.To = to.Add(time.Minute) // Ninja is open interval, need to get the exact timestamp, too
req.Where = fmt.Sprintf("sorigin.eq.%s,sactive.eq.true,mperiod.eq.86400,pcode.eq.%s", origin, parId)
req.Limit = -1

res := &ninja.NinjaResponse[[]struct {
Tstamp ninja.NinjaTime `json:"_timestamp"`
DType string `json:"tname"`
Value uint64 `json:"mvalue"`
}]{}

err := ninja.History(req, res)
if err != nil {
slog.Error("sumParent: Error in ninja call. aborting", "err", err)
return
}

sums := make(map[string]map[time.Time]uint64) // datatype / timestamp / sum value

// build sums per datatype and timestamp (should be full days)
for _, m := range res.Data {
if _, exists := sums[m.DType]; !exists {
sums[m.DType] = make(map[time.Time]uint64)
}
sums[m.DType][m.Tstamp.Time] = sums[m.DType][m.Tstamp.Time] + m.Value
}

recs := bdplib.DataMap{}
for dType, times := range sums {
for timestamp, value := range times {
recs.AddRecord(parId, dType, bdplib.CreateRecord(timestamp.UnixMilli(), value, periodAggregate))
}
}
bdplib.PushData(parentStationType, recs)
}
}
1 change: 1 addition & 0 deletions traffic-a22-data-quality/src/ninja/ninja.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func requestUrl[T any](reqUrl *url.URL, result *NinjaResponse[T]) error {
if err != nil {
return fmt.Errorf("error performing ninja request: %w", err)
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return errors.New("ninja request returned non-OK status: " + strconv.Itoa(res.StatusCode))
Expand Down

0 comments on commit 64d9e74

Please sign in to comment.