mirror of https://github.com/agola-io/agola
1356 lines
31 KiB
Go
1356 lines
31 KiB
Go
// Code generated by go generate; DO NOT EDIT.
|
|
package db
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
stdsql "database/sql"
|
|
"time"
|
|
|
|
"github.com/sorintlab/errors"
|
|
sq "github.com/huandu/go-sqlbuilder"
|
|
|
|
"agola.io/agola/internal/sqlg"
|
|
"agola.io/agola/internal/sqlg/sql"
|
|
|
|
types "agola.io/agola/services/runservice/types"
|
|
)
|
|
|
|
var (
|
|
changeGroupSelectColumns = func(additionalCols ...string) []string {
|
|
columns := []string{"changegroup.id", "changegroup.revision", "changegroup.creation_time", "changegroup.update_time", "changegroup.name", "changegroup.value"}
|
|
columns = append(columns, additionalCols...)
|
|
|
|
return columns
|
|
}
|
|
|
|
changeGroupSelect = func(additionalCols ...string) *sq.SelectBuilder {
|
|
return sq.NewSelectBuilder().Select(changeGroupSelectColumns(additionalCols...)...).From("changegroup")
|
|
}
|
|
)
|
|
|
|
func (d *DB) InsertOrUpdateChangeGroup(tx *sql.Tx, v *types.ChangeGroup) error {
|
|
var err error
|
|
if v.Revision == 0 {
|
|
err = d.InsertChangeGroup(tx, v)
|
|
} else {
|
|
err = d.UpdateChangeGroup(tx, v)
|
|
}
|
|
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) InsertChangeGroup(tx *sql.Tx, v *types.ChangeGroup) error {
|
|
if v.Revision != 0 {
|
|
return errors.Errorf("expected revision 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not created by this transaction")
|
|
}
|
|
|
|
v.Revision = 1
|
|
|
|
now := time.Now()
|
|
v.CreationTime = now
|
|
v.UpdateTime = now
|
|
|
|
var err error
|
|
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawChangeGroupPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertChangeGroupSqlite3(tx, v);
|
|
}
|
|
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert changegroup")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) UpdateChangeGroup(tx *sql.Tx, v *types.ChangeGroup) error {
|
|
if v.Revision < 1 {
|
|
return errors.Errorf("expected revision > 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not fetched by this transaction")
|
|
}
|
|
|
|
curRevision := v.Revision
|
|
v.Revision++
|
|
|
|
v.UpdateTime = time.Now()
|
|
|
|
var res stdsql.Result
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
res, err = d.updateChangeGroupPostgres(tx, curRevision, v);
|
|
case sql.Sqlite3:
|
|
res, err = d.updateChangeGroupSqlite3(tx, curRevision, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update changegroup")
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update changegroup")
|
|
}
|
|
|
|
if rows != 1 {
|
|
v.Revision = curRevision
|
|
return sqlg.ErrConcurrent
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) deleteChangeGroup(tx *sql.Tx, changeGroupID string) error {
|
|
q := sq.NewDeleteBuilder()
|
|
q.DeleteFrom("changegroup").Where(q.E("id", changeGroupID))
|
|
|
|
if _, err := d.exec(tx, q); err != nil {
|
|
return errors.Wrap(err, "failed to delete changeGroup")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) DeleteChangeGroup(tx *sql.Tx, id string) error {
|
|
return d.deleteChangeGroup(tx, id)
|
|
}
|
|
|
|
// insertRawChangeGroup should be used only for import.
|
|
// * It won't update object times.
|
|
// * It will insert values for sequences.
|
|
func (d *DB) insertRawChangeGroup(tx *sql.Tx, v *types.ChangeGroup) error {
|
|
v.Revision = 1
|
|
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawChangeGroupPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRawChangeGroupSqlite3(tx, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert changegroup")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
runConfigSelectColumns = func(additionalCols ...string) []string {
|
|
columns := []string{"runconfig.id", "runconfig.revision", "runconfig.creation_time", "runconfig.update_time", "runconfig.name", "runconfig.run_group", "runconfig.setup_errors", "runconfig.annotations", "runconfig.static_environment", "runconfig.environment", "runconfig.tasks", "runconfig.cache_group"}
|
|
columns = append(columns, additionalCols...)
|
|
|
|
return columns
|
|
}
|
|
|
|
runConfigSelect = func(additionalCols ...string) *sq.SelectBuilder {
|
|
return sq.NewSelectBuilder().Select(runConfigSelectColumns(additionalCols...)...).From("runconfig")
|
|
}
|
|
)
|
|
|
|
func (d *DB) InsertOrUpdateRunConfig(tx *sql.Tx, v *types.RunConfig) error {
|
|
var err error
|
|
if v.Revision == 0 {
|
|
err = d.InsertRunConfig(tx, v)
|
|
} else {
|
|
err = d.UpdateRunConfig(tx, v)
|
|
}
|
|
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) InsertRunConfig(tx *sql.Tx, v *types.RunConfig) error {
|
|
if v.Revision != 0 {
|
|
return errors.Errorf("expected revision 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not created by this transaction")
|
|
}
|
|
|
|
v.Revision = 1
|
|
|
|
now := time.Now()
|
|
v.CreationTime = now
|
|
v.UpdateTime = now
|
|
|
|
var err error
|
|
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawRunConfigPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRunConfigSqlite3(tx, v);
|
|
}
|
|
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert runconfig")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) UpdateRunConfig(tx *sql.Tx, v *types.RunConfig) error {
|
|
if v.Revision < 1 {
|
|
return errors.Errorf("expected revision > 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not fetched by this transaction")
|
|
}
|
|
|
|
curRevision := v.Revision
|
|
v.Revision++
|
|
|
|
v.UpdateTime = time.Now()
|
|
|
|
var res stdsql.Result
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
res, err = d.updateRunConfigPostgres(tx, curRevision, v);
|
|
case sql.Sqlite3:
|
|
res, err = d.updateRunConfigSqlite3(tx, curRevision, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update runconfig")
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update runconfig")
|
|
}
|
|
|
|
if rows != 1 {
|
|
v.Revision = curRevision
|
|
return sqlg.ErrConcurrent
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) deleteRunConfig(tx *sql.Tx, runConfigID string) error {
|
|
q := sq.NewDeleteBuilder()
|
|
q.DeleteFrom("runconfig").Where(q.E("id", runConfigID))
|
|
|
|
if _, err := d.exec(tx, q); err != nil {
|
|
return errors.Wrap(err, "failed to delete runConfig")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) DeleteRunConfig(tx *sql.Tx, id string) error {
|
|
return d.deleteRunConfig(tx, id)
|
|
}
|
|
|
|
// insertRawRunConfig should be used only for import.
|
|
// * It won't update object times.
|
|
// * It will insert values for sequences.
|
|
func (d *DB) insertRawRunConfig(tx *sql.Tx, v *types.RunConfig) error {
|
|
v.Revision = 1
|
|
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawRunConfigPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRawRunConfigSqlite3(tx, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert runconfig")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
runSelectColumns = func(additionalCols ...string) []string {
|
|
columns := []string{"run.id", "run.revision", "run.creation_time", "run.update_time", "run.sequence", "run.name", "run.run_config_id", "run.counter", "run.run_group", "run.annotations", "run.phase", "run.result", "run.stop", "run.tasks", "run.enqueue_time", "run.start_time", "run.end_time", "run.archived"}
|
|
columns = append(columns, additionalCols...)
|
|
|
|
return columns
|
|
}
|
|
|
|
runSelect = func(additionalCols ...string) *sq.SelectBuilder {
|
|
return sq.NewSelectBuilder().Select(runSelectColumns(additionalCols...)...).From("run")
|
|
}
|
|
)
|
|
|
|
func (d *DB) InsertOrUpdateRun(tx *sql.Tx, v *types.Run) error {
|
|
var err error
|
|
if v.Revision == 0 {
|
|
err = d.InsertRun(tx, v)
|
|
} else {
|
|
err = d.UpdateRun(tx, v)
|
|
}
|
|
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) InsertRun(tx *sql.Tx, v *types.Run) error {
|
|
if v.Revision != 0 {
|
|
return errors.Errorf("expected revision 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not created by this transaction")
|
|
}
|
|
|
|
v.Revision = 1
|
|
|
|
now := time.Now()
|
|
v.CreationTime = now
|
|
v.UpdateTime = now
|
|
|
|
var err error
|
|
var nextSeq uint64
|
|
|
|
nextSeq, err = d.nextSequence(tx, "run_sequence_seq")
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to create next sequence for run_sequence_seq")
|
|
}
|
|
v.Sequence = nextSeq
|
|
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawRunPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRunSqlite3(tx, v);
|
|
}
|
|
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert run")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) UpdateRun(tx *sql.Tx, v *types.Run) error {
|
|
if v.Revision < 1 {
|
|
return errors.Errorf("expected revision > 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not fetched by this transaction")
|
|
}
|
|
|
|
curRevision := v.Revision
|
|
v.Revision++
|
|
|
|
v.UpdateTime = time.Now()
|
|
|
|
var res stdsql.Result
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
res, err = d.updateRunPostgres(tx, curRevision, v);
|
|
case sql.Sqlite3:
|
|
res, err = d.updateRunSqlite3(tx, curRevision, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update run")
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update run")
|
|
}
|
|
|
|
if rows != 1 {
|
|
v.Revision = curRevision
|
|
return sqlg.ErrConcurrent
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) deleteRun(tx *sql.Tx, runID string) error {
|
|
q := sq.NewDeleteBuilder()
|
|
q.DeleteFrom("run").Where(q.E("id", runID))
|
|
|
|
if _, err := d.exec(tx, q); err != nil {
|
|
return errors.Wrap(err, "failed to delete run")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) DeleteRun(tx *sql.Tx, id string) error {
|
|
return d.deleteRun(tx, id)
|
|
}
|
|
|
|
// insertRawRun should be used only for import.
|
|
// * It won't update object times.
|
|
// * It will insert values for sequences.
|
|
func (d *DB) insertRawRun(tx *sql.Tx, v *types.Run) error {
|
|
v.Revision = 1
|
|
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawRunPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRawRunSqlite3(tx, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert run")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
runCounterSelectColumns = func(additionalCols ...string) []string {
|
|
columns := []string{"runcounter.id", "runcounter.revision", "runcounter.creation_time", "runcounter.update_time", "runcounter.group_id", "runcounter.value"}
|
|
columns = append(columns, additionalCols...)
|
|
|
|
return columns
|
|
}
|
|
|
|
runCounterSelect = func(additionalCols ...string) *sq.SelectBuilder {
|
|
return sq.NewSelectBuilder().Select(runCounterSelectColumns(additionalCols...)...).From("runcounter")
|
|
}
|
|
)
|
|
|
|
func (d *DB) InsertOrUpdateRunCounter(tx *sql.Tx, v *types.RunCounter) error {
|
|
var err error
|
|
if v.Revision == 0 {
|
|
err = d.InsertRunCounter(tx, v)
|
|
} else {
|
|
err = d.UpdateRunCounter(tx, v)
|
|
}
|
|
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) InsertRunCounter(tx *sql.Tx, v *types.RunCounter) error {
|
|
if v.Revision != 0 {
|
|
return errors.Errorf("expected revision 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not created by this transaction")
|
|
}
|
|
|
|
v.Revision = 1
|
|
|
|
now := time.Now()
|
|
v.CreationTime = now
|
|
v.UpdateTime = now
|
|
|
|
var err error
|
|
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawRunCounterPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRunCounterSqlite3(tx, v);
|
|
}
|
|
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert runcounter")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) UpdateRunCounter(tx *sql.Tx, v *types.RunCounter) error {
|
|
if v.Revision < 1 {
|
|
return errors.Errorf("expected revision > 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not fetched by this transaction")
|
|
}
|
|
|
|
curRevision := v.Revision
|
|
v.Revision++
|
|
|
|
v.UpdateTime = time.Now()
|
|
|
|
var res stdsql.Result
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
res, err = d.updateRunCounterPostgres(tx, curRevision, v);
|
|
case sql.Sqlite3:
|
|
res, err = d.updateRunCounterSqlite3(tx, curRevision, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update runcounter")
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update runcounter")
|
|
}
|
|
|
|
if rows != 1 {
|
|
v.Revision = curRevision
|
|
return sqlg.ErrConcurrent
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) deleteRunCounter(tx *sql.Tx, runCounterID string) error {
|
|
q := sq.NewDeleteBuilder()
|
|
q.DeleteFrom("runcounter").Where(q.E("id", runCounterID))
|
|
|
|
if _, err := d.exec(tx, q); err != nil {
|
|
return errors.Wrap(err, "failed to delete runCounter")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) DeleteRunCounter(tx *sql.Tx, id string) error {
|
|
return d.deleteRunCounter(tx, id)
|
|
}
|
|
|
|
// insertRawRunCounter should be used only for import.
|
|
// * It won't update object times.
|
|
// * It will insert values for sequences.
|
|
func (d *DB) insertRawRunCounter(tx *sql.Tx, v *types.RunCounter) error {
|
|
v.Revision = 1
|
|
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawRunCounterPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRawRunCounterSqlite3(tx, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert runcounter")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
runEventSelectColumns = func(additionalCols ...string) []string {
|
|
columns := []string{"runevent.id", "runevent.revision", "runevent.creation_time", "runevent.update_time", "runevent.sequence", "runevent.run_id", "runevent.phase", "runevent.result"}
|
|
columns = append(columns, additionalCols...)
|
|
|
|
return columns
|
|
}
|
|
|
|
runEventSelect = func(additionalCols ...string) *sq.SelectBuilder {
|
|
return sq.NewSelectBuilder().Select(runEventSelectColumns(additionalCols...)...).From("runevent")
|
|
}
|
|
)
|
|
|
|
func (d *DB) InsertOrUpdateRunEvent(tx *sql.Tx, v *types.RunEvent) error {
|
|
var err error
|
|
if v.Revision == 0 {
|
|
err = d.InsertRunEvent(tx, v)
|
|
} else {
|
|
err = d.UpdateRunEvent(tx, v)
|
|
}
|
|
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) InsertRunEvent(tx *sql.Tx, v *types.RunEvent) error {
|
|
if v.Revision != 0 {
|
|
return errors.Errorf("expected revision 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not created by this transaction")
|
|
}
|
|
|
|
v.Revision = 1
|
|
|
|
now := time.Now()
|
|
v.CreationTime = now
|
|
v.UpdateTime = now
|
|
|
|
var err error
|
|
var nextSeq uint64
|
|
|
|
nextSeq, err = d.nextSequence(tx, "runevent_sequence_seq")
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to create next sequence for runevent_sequence_seq")
|
|
}
|
|
v.Sequence = nextSeq
|
|
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawRunEventPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRunEventSqlite3(tx, v);
|
|
}
|
|
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert runevent")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) UpdateRunEvent(tx *sql.Tx, v *types.RunEvent) error {
|
|
if v.Revision < 1 {
|
|
return errors.Errorf("expected revision > 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not fetched by this transaction")
|
|
}
|
|
|
|
curRevision := v.Revision
|
|
v.Revision++
|
|
|
|
v.UpdateTime = time.Now()
|
|
|
|
var res stdsql.Result
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
res, err = d.updateRunEventPostgres(tx, curRevision, v);
|
|
case sql.Sqlite3:
|
|
res, err = d.updateRunEventSqlite3(tx, curRevision, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update runevent")
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update runevent")
|
|
}
|
|
|
|
if rows != 1 {
|
|
v.Revision = curRevision
|
|
return sqlg.ErrConcurrent
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) deleteRunEvent(tx *sql.Tx, runEventID string) error {
|
|
q := sq.NewDeleteBuilder()
|
|
q.DeleteFrom("runevent").Where(q.E("id", runEventID))
|
|
|
|
if _, err := d.exec(tx, q); err != nil {
|
|
return errors.Wrap(err, "failed to delete runEvent")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) DeleteRunEvent(tx *sql.Tx, id string) error {
|
|
return d.deleteRunEvent(tx, id)
|
|
}
|
|
|
|
// insertRawRunEvent should be used only for import.
|
|
// * It won't update object times.
|
|
// * It will insert values for sequences.
|
|
func (d *DB) insertRawRunEvent(tx *sql.Tx, v *types.RunEvent) error {
|
|
v.Revision = 1
|
|
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawRunEventPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRawRunEventSqlite3(tx, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert runevent")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
executorSelectColumns = func(additionalCols ...string) []string {
|
|
columns := []string{"executor.id", "executor.revision", "executor.creation_time", "executor.update_time", "executor.executor_id", "executor.listen_url", "executor.archs", "executor.labels", "executor.allow_privileged_containers", "executor.active_tasks_limit", "executor.active_tasks", "executor.dynamic", "executor.executor_group", "executor.siblings_executors"}
|
|
columns = append(columns, additionalCols...)
|
|
|
|
return columns
|
|
}
|
|
|
|
executorSelect = func(additionalCols ...string) *sq.SelectBuilder {
|
|
return sq.NewSelectBuilder().Select(executorSelectColumns(additionalCols...)...).From("executor")
|
|
}
|
|
)
|
|
|
|
func (d *DB) InsertOrUpdateExecutor(tx *sql.Tx, v *types.Executor) error {
|
|
var err error
|
|
if v.Revision == 0 {
|
|
err = d.InsertExecutor(tx, v)
|
|
} else {
|
|
err = d.UpdateExecutor(tx, v)
|
|
}
|
|
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) InsertExecutor(tx *sql.Tx, v *types.Executor) error {
|
|
if v.Revision != 0 {
|
|
return errors.Errorf("expected revision 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not created by this transaction")
|
|
}
|
|
|
|
v.Revision = 1
|
|
|
|
now := time.Now()
|
|
v.CreationTime = now
|
|
v.UpdateTime = now
|
|
|
|
var err error
|
|
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawExecutorPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertExecutorSqlite3(tx, v);
|
|
}
|
|
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert executor")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) UpdateExecutor(tx *sql.Tx, v *types.Executor) error {
|
|
if v.Revision < 1 {
|
|
return errors.Errorf("expected revision > 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not fetched by this transaction")
|
|
}
|
|
|
|
curRevision := v.Revision
|
|
v.Revision++
|
|
|
|
v.UpdateTime = time.Now()
|
|
|
|
var res stdsql.Result
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
res, err = d.updateExecutorPostgres(tx, curRevision, v);
|
|
case sql.Sqlite3:
|
|
res, err = d.updateExecutorSqlite3(tx, curRevision, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update executor")
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update executor")
|
|
}
|
|
|
|
if rows != 1 {
|
|
v.Revision = curRevision
|
|
return sqlg.ErrConcurrent
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) deleteExecutor(tx *sql.Tx, executorID string) error {
|
|
q := sq.NewDeleteBuilder()
|
|
q.DeleteFrom("executor").Where(q.E("id", executorID))
|
|
|
|
if _, err := d.exec(tx, q); err != nil {
|
|
return errors.Wrap(err, "failed to delete executor")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) DeleteExecutor(tx *sql.Tx, id string) error {
|
|
return d.deleteExecutor(tx, id)
|
|
}
|
|
|
|
// insertRawExecutor should be used only for import.
|
|
// * It won't update object times.
|
|
// * It will insert values for sequences.
|
|
func (d *DB) insertRawExecutor(tx *sql.Tx, v *types.Executor) error {
|
|
v.Revision = 1
|
|
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawExecutorPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRawExecutorSqlite3(tx, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert executor")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var (
|
|
executorTaskSelectColumns = func(additionalCols ...string) []string {
|
|
columns := []string{"executortask.id", "executortask.revision", "executortask.creation_time", "executortask.update_time", "executortask.executor_id", "executortask.run_id", "executortask.run_task_id", "executortask.stop", "executortask.phase", "executortask.timedout", "executortask.fail_error", "executortask.start_time", "executortask.end_time", "executortask.setup_step", "executortask.steps"}
|
|
columns = append(columns, additionalCols...)
|
|
|
|
return columns
|
|
}
|
|
|
|
executorTaskSelect = func(additionalCols ...string) *sq.SelectBuilder {
|
|
return sq.NewSelectBuilder().Select(executorTaskSelectColumns(additionalCols...)...).From("executortask")
|
|
}
|
|
)
|
|
|
|
func (d *DB) InsertOrUpdateExecutorTask(tx *sql.Tx, v *types.ExecutorTask) error {
|
|
var err error
|
|
if v.Revision == 0 {
|
|
err = d.InsertExecutorTask(tx, v)
|
|
} else {
|
|
err = d.UpdateExecutorTask(tx, v)
|
|
}
|
|
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) InsertExecutorTask(tx *sql.Tx, v *types.ExecutorTask) error {
|
|
if v.Revision != 0 {
|
|
return errors.Errorf("expected revision 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not created by this transaction")
|
|
}
|
|
|
|
v.Revision = 1
|
|
|
|
now := time.Now()
|
|
v.CreationTime = now
|
|
v.UpdateTime = now
|
|
|
|
var err error
|
|
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawExecutorTaskPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertExecutorTaskSqlite3(tx, v);
|
|
}
|
|
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert executortask")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) UpdateExecutorTask(tx *sql.Tx, v *types.ExecutorTask) error {
|
|
if v.Revision < 1 {
|
|
return errors.Errorf("expected revision > 0 got %d", v.Revision)
|
|
}
|
|
|
|
if v.TxID != tx.ID() {
|
|
return errors.Errorf("object was not fetched by this transaction")
|
|
}
|
|
|
|
curRevision := v.Revision
|
|
v.Revision++
|
|
|
|
v.UpdateTime = time.Now()
|
|
|
|
var res stdsql.Result
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
res, err = d.updateExecutorTaskPostgres(tx, curRevision, v);
|
|
case sql.Sqlite3:
|
|
res, err = d.updateExecutorTaskSqlite3(tx, curRevision, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update executortask")
|
|
}
|
|
|
|
rows, err := res.RowsAffected()
|
|
if err != nil {
|
|
v.Revision = curRevision
|
|
return errors.Wrap(err, "failed to update executortask")
|
|
}
|
|
|
|
if rows != 1 {
|
|
v.Revision = curRevision
|
|
return sqlg.ErrConcurrent
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) deleteExecutorTask(tx *sql.Tx, executorTaskID string) error {
|
|
q := sq.NewDeleteBuilder()
|
|
q.DeleteFrom("executortask").Where(q.E("id", executorTaskID))
|
|
|
|
if _, err := d.exec(tx, q); err != nil {
|
|
return errors.Wrap(err, "failed to delete executorTask")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) DeleteExecutorTask(tx *sql.Tx, id string) error {
|
|
return d.deleteExecutorTask(tx, id)
|
|
}
|
|
|
|
// insertRawExecutorTask should be used only for import.
|
|
// * It won't update object times.
|
|
// * It will insert values for sequences.
|
|
func (d *DB) insertRawExecutorTask(tx *sql.Tx, v *types.ExecutorTask) error {
|
|
v.Revision = 1
|
|
|
|
var err error
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
err = d.insertRawExecutorTaskPostgres(tx, v);
|
|
case sql.Sqlite3:
|
|
err = d.insertRawExecutorTaskSqlite3(tx, v);
|
|
}
|
|
if err != nil {
|
|
v.Revision = 0
|
|
return errors.Wrap(err, "failed to insert executortask")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) UnmarshalExportObject(data []byte) (sqlg.Object, error) {
|
|
type exportObjectExportMeta struct {
|
|
ExportMeta sqlg.ExportMeta `json:"exportMeta"`
|
|
}
|
|
|
|
var om exportObjectExportMeta
|
|
if err := json.Unmarshal(data, &om); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
var obj sqlg.Object
|
|
|
|
switch om.ExportMeta.Kind {
|
|
case "ChangeGroup":
|
|
obj = &types.ChangeGroup{}
|
|
case "RunConfig":
|
|
obj = &types.RunConfig{}
|
|
case "Run":
|
|
obj = &types.Run{}
|
|
case "RunCounter":
|
|
obj = &types.RunCounter{}
|
|
case "RunEvent":
|
|
obj = &types.RunEvent{}
|
|
case "Executor":
|
|
obj = &types.Executor{}
|
|
case "ExecutorTask":
|
|
obj = &types.ExecutorTask{}
|
|
|
|
default:
|
|
panic(errors.Errorf("unknown object kind %q, data: %s", om.ExportMeta.Kind, data))
|
|
}
|
|
|
|
if err := json.Unmarshal(data, &obj); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
return obj, nil
|
|
}
|
|
|
|
func (d *DB) InsertRawObject(tx *sql.Tx, obj sqlg.Object) error {
|
|
switch o := obj.(type) {
|
|
case *types.ChangeGroup:
|
|
return d.insertRawChangeGroup(tx, o)
|
|
case *types.RunConfig:
|
|
return d.insertRawRunConfig(tx, o)
|
|
case *types.Run:
|
|
return d.insertRawRun(tx, o)
|
|
case *types.RunCounter:
|
|
return d.insertRawRunCounter(tx, o)
|
|
case *types.RunEvent:
|
|
return d.insertRawRunEvent(tx, o)
|
|
case *types.Executor:
|
|
return d.insertRawExecutor(tx, o)
|
|
case *types.ExecutorTask:
|
|
return d.insertRawExecutorTask(tx, o)
|
|
|
|
default:
|
|
panic(errors.Errorf("unknown object type %T", obj))
|
|
}
|
|
}
|
|
|
|
func (d *DB) SelectObject(kind string) *sq.SelectBuilder {
|
|
switch kind {
|
|
case "ChangeGroup":
|
|
return changeGroupSelect()
|
|
case "RunConfig":
|
|
return runConfigSelect()
|
|
case "Run":
|
|
return runSelect()
|
|
case "RunCounter":
|
|
return runCounterSelect()
|
|
case "RunEvent":
|
|
return runEventSelect()
|
|
case "Executor":
|
|
return executorSelect()
|
|
case "ExecutorTask":
|
|
return executorTaskSelect()
|
|
|
|
default:
|
|
panic(errors.Errorf("unknown object kind %q", kind))
|
|
}
|
|
}
|
|
|
|
func (d *DB) FetchObjects(tx *sql.Tx, kind string, q sq.Builder) ([]sqlg.Object, error) {
|
|
switch kind {
|
|
case "ChangeGroup":
|
|
fobjs, _, err := d.fetchChangeGroups(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
objs := make([]sqlg.Object, len(fobjs))
|
|
for i, fobj := range fobjs {
|
|
objs[i] = fobj
|
|
}
|
|
|
|
return objs, nil
|
|
case "RunConfig":
|
|
fobjs, _, err := d.fetchRunConfigs(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
objs := make([]sqlg.Object, len(fobjs))
|
|
for i, fobj := range fobjs {
|
|
objs[i] = fobj
|
|
}
|
|
|
|
return objs, nil
|
|
case "Run":
|
|
fobjs, _, err := d.fetchRuns(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
objs := make([]sqlg.Object, len(fobjs))
|
|
for i, fobj := range fobjs {
|
|
objs[i] = fobj
|
|
}
|
|
|
|
return objs, nil
|
|
case "RunCounter":
|
|
fobjs, _, err := d.fetchRunCounters(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
objs := make([]sqlg.Object, len(fobjs))
|
|
for i, fobj := range fobjs {
|
|
objs[i] = fobj
|
|
}
|
|
|
|
return objs, nil
|
|
case "RunEvent":
|
|
fobjs, _, err := d.fetchRunEvents(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
objs := make([]sqlg.Object, len(fobjs))
|
|
for i, fobj := range fobjs {
|
|
objs[i] = fobj
|
|
}
|
|
|
|
return objs, nil
|
|
case "Executor":
|
|
fobjs, _, err := d.fetchExecutors(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
objs := make([]sqlg.Object, len(fobjs))
|
|
for i, fobj := range fobjs {
|
|
objs[i] = fobj
|
|
}
|
|
|
|
return objs, nil
|
|
case "ExecutorTask":
|
|
fobjs, _, err := d.fetchExecutorTasks(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
objs := make([]sqlg.Object, len(fobjs))
|
|
for i, fobj := range fobjs {
|
|
objs[i] = fobj
|
|
}
|
|
|
|
return objs, nil
|
|
|
|
default:
|
|
panic(errors.Errorf("unknown object kind %q", kind))
|
|
}
|
|
}
|
|
|
|
func (d *DB) ObjectToExportJSON(obj sqlg.Object, e *json.Encoder) error {
|
|
switch o := obj.(type) {
|
|
case *types.ChangeGroup:
|
|
type exportObject struct {
|
|
ExportMeta sqlg.ExportMeta `json:"exportMeta"`
|
|
|
|
*types.ChangeGroup
|
|
}
|
|
|
|
if err := e.Encode(&exportObject{ExportMeta: sqlg.ExportMeta{ Kind: "ChangeGroup" }, ChangeGroup: o}); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
case *types.RunConfig:
|
|
type exportObject struct {
|
|
ExportMeta sqlg.ExportMeta `json:"exportMeta"`
|
|
|
|
*types.RunConfig
|
|
}
|
|
|
|
if err := e.Encode(&exportObject{ExportMeta: sqlg.ExportMeta{ Kind: "RunConfig" }, RunConfig: o}); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
case *types.Run:
|
|
type exportObject struct {
|
|
ExportMeta sqlg.ExportMeta `json:"exportMeta"`
|
|
|
|
*types.Run
|
|
}
|
|
|
|
if err := e.Encode(&exportObject{ExportMeta: sqlg.ExportMeta{ Kind: "Run" }, Run: o}); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
case *types.RunCounter:
|
|
type exportObject struct {
|
|
ExportMeta sqlg.ExportMeta `json:"exportMeta"`
|
|
|
|
*types.RunCounter
|
|
}
|
|
|
|
if err := e.Encode(&exportObject{ExportMeta: sqlg.ExportMeta{ Kind: "RunCounter" }, RunCounter: o}); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
case *types.RunEvent:
|
|
type exportObject struct {
|
|
ExportMeta sqlg.ExportMeta `json:"exportMeta"`
|
|
|
|
*types.RunEvent
|
|
}
|
|
|
|
if err := e.Encode(&exportObject{ExportMeta: sqlg.ExportMeta{ Kind: "RunEvent" }, RunEvent: o}); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
case *types.Executor:
|
|
type exportObject struct {
|
|
ExportMeta sqlg.ExportMeta `json:"exportMeta"`
|
|
|
|
*types.Executor
|
|
}
|
|
|
|
if err := e.Encode(&exportObject{ExportMeta: sqlg.ExportMeta{ Kind: "Executor" }, Executor: o}); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
case *types.ExecutorTask:
|
|
type exportObject struct {
|
|
ExportMeta sqlg.ExportMeta `json:"exportMeta"`
|
|
|
|
*types.ExecutorTask
|
|
}
|
|
|
|
if err := e.Encode(&exportObject{ExportMeta: sqlg.ExportMeta{ Kind: "ExecutorTask" }, ExecutorTask: o}); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
return nil
|
|
|
|
default:
|
|
panic(errors.Errorf("unknown object kind %T", obj))
|
|
}
|
|
}
|
|
|
|
func (d *DB) GetSequence(tx *sql.Tx, sequenceName string) (uint64, error) {
|
|
var q *sq.SelectBuilder
|
|
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
q = sq.NewSelectBuilder().Select("last_value").From(sequenceName)
|
|
|
|
case sql.Sqlite3:
|
|
q = sq.NewSelectBuilder().Select("value").From("sequence_t")
|
|
q.Where(q.E("name", sequenceName))
|
|
}
|
|
|
|
rows, err := d.query(tx, q)
|
|
if err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var value uint64
|
|
if !rows.Next() {
|
|
return value, nil
|
|
}
|
|
|
|
if err := rows.Scan(&value); err != nil {
|
|
return 0, errors.Wrap(err, "failed to scan rows")
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
|
|
return value, nil
|
|
}
|
|
|
|
func (d *DB) nextSequence(tx *sql.Tx, sequenceName string) (uint64, error) {
|
|
var value uint64
|
|
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
q := fmt.Sprintf("SELECT nextval('%s');", sequenceName)
|
|
|
|
rows, err := tx.Query(q)
|
|
if err != nil {
|
|
return 0, errors.Wrapf(err, "failed to get sequence %s nextval", sequenceName)
|
|
}
|
|
|
|
defer rows.Close()
|
|
|
|
if !rows.Next() {
|
|
return value, nil
|
|
}
|
|
|
|
if err := rows.Scan(&value); err != nil {
|
|
return 0, errors.Wrap(err, "failed to scan rows")
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
|
|
case sql.Sqlite3:
|
|
var err error
|
|
value, err = d.GetSequence(tx, sequenceName)
|
|
if err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
|
|
if value == 0 {
|
|
value++
|
|
q := sq.NewInsertBuilder()
|
|
q.InsertInto("sequence_t").Cols("name", "value").Values(sequenceName, value)
|
|
if _, err := d.exec(tx, q); err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
} else {
|
|
value++
|
|
q := sq.NewUpdateBuilder()
|
|
q.Update("sequence_t").Set(q.Assign("value", value)).Where(q.E("name", sequenceName))
|
|
if _, err := d.exec(tx, q); err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return value, nil
|
|
}
|
|
|
|
func (d *DB) PopulateSequences(tx *sql.Tx) error {
|
|
switch d.DBType() {
|
|
case sql.Postgres:
|
|
return d.populateSequencesPostgres(tx);
|
|
case sql.Sqlite3:
|
|
return d.populateSequencesSqlite3(tx);
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) populateSequencesPostgres(tx *sql.Tx) error {
|
|
var q string
|
|
q = "SELECT setval('run_sequence_seq', (SELECT COALESCE(MAX(sequence), 1) FROM run));"
|
|
if _, err := tx.Exec(q); err != nil {
|
|
return errors.Wrap(err, "failed to update sequence run_sequence_seq")
|
|
}
|
|
q = "SELECT setval('runevent_sequence_seq', (SELECT COALESCE(MAX(sequence), 1) FROM runevent));"
|
|
if _, err := tx.Exec(q); err != nil {
|
|
return errors.Wrap(err, "failed to update sequence runevent_sequence_seq")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DB) populateSequencesSqlite3(tx *sql.Tx) error {
|
|
var q string
|
|
q = "INSERT INTO sequence_t (name, value) VALUES ('run_sequence_seq', (SELECT COALESCE(MAX(sequence), 1) FROM run));"
|
|
if _, err := tx.Exec(q); err != nil {
|
|
return errors.Wrap(err, "failed to update sequence for run_sequence_seq")
|
|
}
|
|
q = "INSERT INTO sequence_t (name, value) VALUES ('runevent_sequence_seq', (SELECT COALESCE(MAX(sequence), 1) FROM runevent));"
|
|
if _, err := tx.Exec(q); err != nil {
|
|
return errors.Wrap(err, "failed to update sequence for runevent_sequence_seq")
|
|
}
|
|
|
|
return nil
|
|
}
|