Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix storelogs #222

Merged
merged 2 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 56 additions & 55 deletions pkg/db/db_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,89 +6,90 @@ import (

func (d *dbConn) DBInit() {
db := d.db
_, err := db.Exec("CREATE TABLE IF NOT EXISTS studies" +
"(id CHAR(16) PRIMARY KEY, " +
"name VARCHAR(255), " +
"owner VARCHAR(255), " +
"optimization_type TINYINT, " +
"optimization_goal DOUBLE, " +
"parameter_configs TEXT, " +
"tags TEXT, " +
"objective_value_name VARCHAR(255), " +
"metrics TEXT, " +
"job_id TEXT)")
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS studies
(id CHAR(16) PRIMARY KEY,
name VARCHAR(255),
owner VARCHAR(255),
optimization_type TINYINT,
optimization_goal DOUBLE,
parameter_configs TEXT,
tags TEXT,
objective_value_name VARCHAR(255),
metrics TEXT,
job_id TEXT)`)
if err != nil {
log.Fatalf("Error creating studies table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS study_permissions" +
"(study_id CHAR(16) NOT NULL, " +
"access_permission VARCHAR(255), " +
"PRIMARY KEY (study_id, access_permission))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS study_permissions
(study_id CHAR(16) NOT NULL,
access_permission VARCHAR(255),
PRIMARY KEY (study_id, access_permission))`)
if err != nil {
log.Fatalf("Error creating study_permissions table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS trials" +
"(id CHAR(16) PRIMARY KEY, " +
"study_id CHAR(16), " +
"parameters TEXT, " +
"objective_value VARCHAR(255), " +
"tags TEXT, " +
"FOREIGN KEY(study_id) REFERENCES studies(id))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS trials
(id CHAR(16) PRIMARY KEY,
study_id CHAR(16),
parameters TEXT,
objective_value VARCHAR(255),
tags TEXT,
FOREIGN KEY(study_id) REFERENCES studies(id))`)
if err != nil {
log.Fatalf("Error creating trials table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS workers" +
"(id CHAR(16) PRIMARY KEY, " +
"study_id CHAR(16), " +
"trial_id CHAR(16), " +
"type VARCHAR(255), " +
"status TINYINT, " +
"template_path TEXT, " +
"tags TEXT, " +
"FOREIGN KEY(study_id) REFERENCES studies(id), " +
"FOREIGN KEY(trial_id) REFERENCES trials(id))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS workers
(id CHAR(16) PRIMARY KEY,
study_id CHAR(16),
trial_id CHAR(16),
type VARCHAR(255),
status TINYINT,
template_path TEXT,
tags TEXT,
FOREIGN KEY(study_id) REFERENCES studies(id),
FOREIGN KEY(trial_id) REFERENCES trials(id))`)
if err != nil {
log.Fatalf("Error creating workers table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS worker_metrics" +
"(worker_id CHAR(16) NOT NULL, " +
"id INT AUTO_INCREMENT PRIMARY KEY, " +
"time DATETIME(6), " +
"name VARCHAR(255), " +
"value TEXT, " +
"is_objective TINYINT)")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS worker_metrics
(worker_id CHAR(16) NOT NULL,
id INT AUTO_INCREMENT PRIMARY KEY,
time DATETIME(6),
name VARCHAR(255),
value TEXT,
is_objective TINYINT,
FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE)`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you replace " + to ` on other tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to minimize the size of diff. It might be better to tidy all the things now. Just a sec.

if err != nil {
log.Fatalf("Error creating worker_metrics table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS worker_lastlogs" +
"(worker_id CHAR(16) PRIMARY KEY, " +
"time DATETIME(6), " +
"value TEXT)")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS worker_lastlogs
(worker_id CHAR(16) PRIMARY KEY,
time DATETIME(6),
FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE CASCADE)`)
if err != nil {
log.Fatalf("Error creating worker_lastlogs table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS suggestion_param" +
"(id CHAR(16) PRIMARY KEY," +
"suggestion_algo TEXT, " +
"study_id CHAR(16), " +
"parameters TEXT, " +
"FOREIGN KEY(study_id) REFERENCES studies(id))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS suggestion_param
(id CHAR(16) PRIMARY KEY,
suggestion_algo TEXT,
study_id CHAR(16),
parameters TEXT,
FOREIGN KEY(study_id) REFERENCES studies(id))`)
if err != nil {
log.Fatalf("Error creating suggestion_param table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS earlystop_param" +
"(id CHAR(16) PRIMARY KEY, " +
"earlystop_argo TEXT, " +
"study_id CHAR(16), " +
"parameters TEXT, " +
"FOREIGN KEY(study_id) REFERENCES studies(id))")
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS earlystop_param
(id CHAR(16) PRIMARY KEY,
earlystop_argo TEXT,
study_id CHAR(16),
parameters TEXT,
FOREIGN KEY(study_id) REFERENCES studies(id))`)
if err != nil {
log.Fatalf("Error creating earlystop_param table: %v", err)
}
Expand Down
133 changes: 84 additions & 49 deletions pkg/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,18 +455,61 @@ func (d *dbConn) GetWorkerLogs(id string, opts *GetWorkerLogOpts) ([]*WorkerLog,
return result, nil
}

func (d *dbConn) getWorkerLastlog(id string, value *string) (*time.Time, error) {
var lastTimestamp string
func (d *dbConn) getWorkerLastlogs(id string) (time.Time, []*WorkerLog, error) {
var timeStr string
var timeVal time.Time
var err error

if value != nil {
row := d.db.QueryRow("SELECT time, value FROM worker_lastlogs WHERE worker_id = ?", id)
err = row.Scan(&lastTimestamp, value)
} else {
row := d.db.QueryRow("SELECT time FROM worker_lastlogs WHERE worker_id = ?", id)
err = row.Scan(&lastTimestamp)
// Use LEFT JOIN to ensure a result even if there's no matching
// in worker_metrics.
rows, err := d.db.Query(
`SELECT worker_lastlogs.time, name, value FROM worker_lastlogs
LEFT JOIN worker_metrics
ON (worker_lastlogs.worker_id = worker_metrics.worker_id AND worker_lastlogs.time = worker_metrics.time)
WHERE worker_lastlogs.worker_id = ?`, id)
if err != nil {
return timeVal, nil, err
}

var result []*WorkerLog
for rows.Next() {
log1 := new(WorkerLog)
var thisTime string
var name, value sql.NullString

err := rows.Scan(&thisTime, &name, &value)
if err != nil {
log.Printf("Error scanning log: %v", err)
continue
}
if timeStr == "" {
timeStr = thisTime
timeVal, err = time.Parse(mysqlTimeFmt, timeStr)
if err != nil {
log.Printf("Error parsing time %s: %v", timeStr, err)
return timeVal, nil, err
}
} else if timeStr != thisTime {
log.Printf("Unexpected query result %s != %s",
timeStr, thisTime)
}
log1.Time = timeVal
if !name.Valid {
continue
}
(*log1).Name = name.String
(*log1).Value = value.String
result = append(result, log1)
}
return timeVal, result, nil
}

func (d *dbConn) GetWorkerTimestamp(id string) (*time.Time, error) {
var lastTimestamp string

row := d.db.QueryRow("SELECT time FROM worker_lastlogs WHERE worker_id = ?", id)
err := row.Scan(&lastTimestamp)

switch {
case err == sql.ErrNoRows:
return nil, nil
Expand All @@ -483,10 +526,6 @@ func (d *dbConn) getWorkerLastlog(id string, value *string) (*time.Time, error)
}
}

func (d *dbConn) GetWorkerTimestamp(id string) (*time.Time, error) {
return d.getWorkerLastlog(id, nil)
}

func (d *dbConn) storeWorkerLog(workerID string, time string, metricsName string, metricsValue string, objectiveValueName string) error {
isObjective := 0
if metricsName == objectiveValueName {
Expand All @@ -502,9 +541,8 @@ func (d *dbConn) storeWorkerLog(workerID string, time string, metricsName string

func (d *dbConn) StoreWorkerLogs(workerID string, logs []*api.MetricsLog) error {
var lasterr error
var lastValue string

dbT, err := d.getWorkerLastlog(workerID, &lastValue)
dbT, lastLogs, err := d.getWorkerLastlogs(workerID)
if err != nil {
log.Printf("Error getting last log timestamp: %v", err)
}
Expand All @@ -519,69 +557,66 @@ func (d *dbConn) StoreWorkerLogs(workerID string, logs []*api.MetricsLog) error
return err
}

// Store logs when
// 1. a log is newer than dbT, or,
// 2. a log is not yet in the DB when the timestamps are equal
var formattedTime string
var ls []string
var lastTime time.Time
for _, mlog := range logs {
metricsName := mlog.Name
logLoop:
for _, mv := range mlog.Values {
t, err := time.Parse(time.RFC3339Nano, mv.Time)
if err != nil {
log.Printf("Error parsing time %s: %v", mv.Time, err)
lasterr = err
continue
}
if dbT != nil && !t.After(*dbT) {
if t.Before(dbT) {
// dbT is from mysql and has microsec precision.
// This code assumes nanosec fractions are rounded down.
continue
}
// use UTC as mysql DATETIME lacks timezone
formattedTime = t.UTC().Format(mysqlTimeFmt)
if dbT != nil {
// Parse again to get rounding effect
//reparsed_time, err := time.Parse(mysqlTimeFmt, formattedTime)
//if reparsed_time == *dbT {
// if mv.Value == lastValue {
// stored_logs are already in DB
// This assignment ensures the remaining
// logs will be stored in DB.
// dbT = nil
// continue
// }
// // We don't know this is necessary or not yet.
// stored_logs = append(stored_logs, &mv.Value)
// continue
//}
// (reparsed_time > *dbT) can be assumed
err = d.storeWorkerLog(workerID,
dbT.UTC().Format(mysqlTimeFmt),
metricsName, mv.Value,
objectiveValueName)
if !dbT.IsZero() {
// Parse again to get rounding effect, otherwise
// the next comparison will be almost always false.
reparsed_time, err := time.Parse(mysqlTimeFmt, formattedTime)
if err != nil {
log.Printf("Error storing log %s: %v", mv.Value, err)
log.Printf("Error parsing time %s: %v", formattedTime, err)
lasterr = err
continue
}
dbT = nil
} else {
err = d.storeWorkerLog(workerID,
formattedTime,
metricsName, mv.Value,
objectiveValueName)
if err != nil {
log.Printf("Error storing log %s: %v", mv.Value, err)
lasterr = err
if reparsed_time == dbT {
for _, l := range lastLogs {
if l.Name == metricsName && l.Value == mv.Value {
continue logLoop
}
}
}
}
err = d.storeWorkerLog(workerID,
formattedTime,
metricsName, mv.Value,
objectiveValueName)
if err != nil {
log.Printf("Error storing log %s: %v", mv.Value, err)
lasterr = err
} else if t.After(lastTime) {
lastTime = t
}
}
}
if lasterr != nil {
// If lastlog were updated, logs that couldn't be saved
// would be lost.
return lasterr
}
if len(ls) == 2 {
_, err = d.db.Exec("REPLACE INTO worker_lastlogs VALUES (?, ?, ?)",
workerID, formattedTime, ls[1])
if !lastTime.IsZero() {
formattedTime = lastTime.UTC().Format(mysqlTimeFmt)
_, err = d.db.Exec("REPLACE INTO worker_lastlogs VALUES (?, ?)",
workerID, formattedTime)
}
return err
}
Expand Down