mirror of https://github.com/agola-io/agola
449 lines
13 KiB
Go
449 lines
13 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
stdsql "database/sql"
|
|
"strings"
|
|
|
|
sq "github.com/huandu/go-sqlbuilder"
|
|
"github.com/rs/zerolog"
|
|
"github.com/sorintlab/errors"
|
|
|
|
"agola.io/agola/internal/services/runservice/db/objects"
|
|
"agola.io/agola/internal/sqlg"
|
|
"agola.io/agola/internal/sqlg/sql"
|
|
"agola.io/agola/services/runservice/types"
|
|
)
|
|
|
|
//go:generate ../../../../tools/bin/dbgenerator -type db -component runservice
|
|
|
|
type DB struct {
|
|
log zerolog.Logger
|
|
sdb *sql.DB
|
|
}
|
|
|
|
func NewDB(log zerolog.Logger, sdb *sql.DB) (*DB, error) {
|
|
return &DB{
|
|
log: log,
|
|
sdb: sdb,
|
|
}, nil
|
|
}
|
|
|
|
func (d *DB) DBType() sql.Type {
|
|
return d.sdb.Type()
|
|
}
|
|
|
|
func (d *DB) DB() *sql.DB {
|
|
return d.sdb
|
|
}
|
|
|
|
func (d *DB) Do(ctx context.Context, f func(tx *sql.Tx) error) error {
|
|
return errors.WithStack(d.sdb.Do(ctx, f))
|
|
}
|
|
|
|
func (d *DB) ObjectsInfo() []sqlg.ObjectInfo {
|
|
return objects.ObjectsInfo
|
|
}
|
|
|
|
func (d *DB) Flavor() sq.Flavor {
|
|
switch d.sdb.Type() {
|
|
case sql.Postgres:
|
|
return sq.PostgreSQL
|
|
case sql.Sqlite3:
|
|
return sq.SQLite
|
|
}
|
|
|
|
return sq.PostgreSQL
|
|
}
|
|
|
|
func (d *DB) exec(tx *sql.Tx, rq sq.Builder) (stdsql.Result, error) {
|
|
q, args := rq.BuildWithFlavor(d.Flavor())
|
|
// d.log.Debug().Msgf("q: %s, args: %s", q, util.Dump(args))
|
|
|
|
r, err := tx.Exec(q, args...)
|
|
return r, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) query(tx *sql.Tx, rq sq.Builder) (*stdsql.Rows, error) {
|
|
q, args := rq.BuildWithFlavor(d.Flavor())
|
|
// d.log.Debug().Msgf("start q: %s, args: %s", q, util.Dump(args))
|
|
|
|
r, err := tx.Query(q, args...)
|
|
// d.log.Debug().Msgf("end q: %s, args: %s", q, util.Dump(args))
|
|
return r, errors.WithStack(err)
|
|
}
|
|
|
|
func mustSingleRow[T any](s []*T) (*T, error) {
|
|
if len(s) > 1 {
|
|
return nil, errors.Errorf("too many rows returned")
|
|
}
|
|
if len(s) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
return s[0], nil
|
|
}
|
|
|
|
func (d *DB) GetChangeGroups(tx *sql.Tx) ([]*types.ChangeGroup, error) {
|
|
q := changeGroupSelect()
|
|
changeGroups, _, err := d.fetchChangeGroups(tx, q)
|
|
|
|
return changeGroups, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetChangeGroupsByNames(tx *sql.Tx, changeGroupsNames []string) ([]*types.ChangeGroup, error) {
|
|
if len(changeGroupsNames) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
q := changeGroupSelect()
|
|
q.Where(q.In("name", sq.Flatten(changeGroupsNames)...))
|
|
changeGroups, _, err := d.fetchChangeGroups(tx, q)
|
|
|
|
return changeGroups, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetRun(tx *sql.Tx, runID string) (*types.Run, error) {
|
|
q := runSelect()
|
|
q.Where(q.E("run.id", runID))
|
|
runs, _, err := d.fetchRuns(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
out, err := mustSingleRow(runs)
|
|
return out, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetRunByGroup(tx *sql.Tx, groupPath string, runCounter uint64) (*types.Run, error) {
|
|
q := runSelect()
|
|
|
|
groupPath = strings.TrimSuffix(groupPath, "/")
|
|
// search exact path or child path (add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02))
|
|
q.Where(q.And(q.Or(q.E("run.run_group", groupPath), q.Like("run.run_group", groupPath+"/%")), q.E("run.counter", runCounter)))
|
|
runs, _, err := d.fetchRuns(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
out, err := mustSingleRow(runs)
|
|
return out, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetRuns(tx *sql.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunSequence uint64, limit int, sortDirection types.SortDirection) ([]*types.Run, error) {
|
|
return d.getRunsFiltered(tx, groups, lastRun, phaseFilter, resultFilter, startRunSequence, limit, sortDirection)
|
|
}
|
|
|
|
func (d *DB) getRunsFilteredQuery(phaseFilter []types.RunPhase, resultFilter []types.RunResult, groups []string, lastRun bool, startRunSequence uint64, limit int, sortDirection types.SortDirection) *sq.SelectBuilder {
|
|
useSubquery := false
|
|
if len(groups) > 0 && lastRun {
|
|
useSubquery = true
|
|
}
|
|
|
|
q := runSelect()
|
|
if useSubquery {
|
|
q = sq.NewSelectBuilder().Select("max(run.sequence)").From("run")
|
|
}
|
|
|
|
w := []string{}
|
|
having := []string{}
|
|
if len(phaseFilter) > 0 {
|
|
w = append(w, q.In("run.phase", sq.Flatten(phaseFilter)...))
|
|
}
|
|
if len(resultFilter) > 0 {
|
|
w = append(w, q.In("run.result", sq.Flatten(resultFilter)...))
|
|
}
|
|
if startRunSequence > 0 {
|
|
if lastRun {
|
|
switch sortDirection {
|
|
case types.SortDirectionAsc:
|
|
having = append(having, q.G("run.sequence", startRunSequence))
|
|
case types.SortDirectionDesc:
|
|
having = append(having, q.L("run.sequence", startRunSequence))
|
|
}
|
|
} else {
|
|
switch sortDirection {
|
|
case types.SortDirectionAsc:
|
|
w = append(w, q.G("run.sequence", startRunSequence))
|
|
case types.SortDirectionDesc:
|
|
w = append(w, q.L("run.sequence", startRunSequence))
|
|
}
|
|
}
|
|
}
|
|
if limit > 0 {
|
|
q.Limit(limit)
|
|
}
|
|
|
|
if len(groups) > 0 {
|
|
cond := []string{}
|
|
for _, groupPath := range groups {
|
|
groupPath = strings.TrimSuffix(groupPath, "/")
|
|
|
|
// search exact path or child path (add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02))
|
|
cond = append(cond, q.E("run.run_group", groupPath), q.Like("run.run_group", groupPath+"/%"))
|
|
}
|
|
w = append(w, q.Or(cond...))
|
|
|
|
if lastRun {
|
|
q.GroupBy("run.run_group")
|
|
}
|
|
}
|
|
|
|
q.Where(w...)
|
|
q.Having(having...)
|
|
|
|
if useSubquery {
|
|
sq := runSelect()
|
|
sq.Where(sq.In("run.sequence", q))
|
|
|
|
switch sortDirection {
|
|
case types.SortDirectionAsc:
|
|
sq.OrderBy("run.sequence").Asc()
|
|
case types.SortDirectionDesc:
|
|
sq.OrderBy("run.sequence").Desc()
|
|
}
|
|
return sq
|
|
}
|
|
|
|
switch sortDirection {
|
|
case types.SortDirectionAsc:
|
|
q.OrderBy("run.sequence").Asc()
|
|
case types.SortDirectionDesc:
|
|
q.OrderBy("run.sequence").Desc()
|
|
}
|
|
|
|
return q
|
|
}
|
|
|
|
func (d *DB) getRunsFiltered(tx *sql.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunSequence uint64, limit int, sortDirection types.SortDirection) ([]*types.Run, error) {
|
|
q := d.getRunsFilteredQuery(phaseFilter, resultFilter, groups, lastRun, startRunSequence, limit, sortDirection)
|
|
|
|
runs, _, err := d.fetchRuns(tx, q)
|
|
|
|
return runs, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetUnarchivedRuns(tx *sql.Tx) ([]*types.Run, error) {
|
|
q := runSelect()
|
|
q.Where(q.E("archived", false))
|
|
|
|
runs, _, err := d.fetchRuns(tx, q)
|
|
|
|
return runs, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetGroupRuns(tx *sql.Tx, group string, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunCounter uint64, limit int, sortDirection types.SortDirection) ([]*types.Run, error) {
|
|
return d.getGroupRunsFiltered(tx, group, phaseFilter, resultFilter, startRunCounter, limit, sortDirection)
|
|
}
|
|
|
|
func (d *DB) getGroupRunsFilteredQuery(phaseFilter []types.RunPhase, resultFilter []types.RunResult, groupPath string, startRunCounter uint64, limit int, sortDirection types.SortDirection, objectstorage bool) *sq.SelectBuilder {
|
|
q := runSelect()
|
|
|
|
switch sortDirection {
|
|
case types.SortDirectionAsc:
|
|
q.OrderBy("run.counter").Asc()
|
|
case types.SortDirectionDesc:
|
|
q.OrderBy("run.counter").Desc()
|
|
}
|
|
if len(phaseFilter) > 0 {
|
|
q.Where(q.In("phase", sq.Flatten(phaseFilter)...))
|
|
}
|
|
if len(resultFilter) > 0 {
|
|
q.Where(q.In("result", sq.Flatten(resultFilter)...))
|
|
}
|
|
if startRunCounter > 0 {
|
|
switch sortDirection {
|
|
case types.SortDirectionAsc:
|
|
q.Where(q.G("run.counter", startRunCounter))
|
|
case types.SortDirectionDesc:
|
|
q.Where(q.L("run.counter", startRunCounter))
|
|
}
|
|
}
|
|
if limit > 0 {
|
|
q.Limit(limit)
|
|
}
|
|
|
|
groupPath = strings.TrimSuffix(groupPath, "/")
|
|
|
|
// search exact path or child path (add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02))
|
|
q.Where(q.Or(q.E("run.run_group", groupPath), q.Like("run.run_group", groupPath+"/%")))
|
|
|
|
return q
|
|
}
|
|
|
|
func (d *DB) getGroupRunsFiltered(tx *sql.Tx, group string, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunCounter uint64, limit int, sortDirection types.SortDirection) ([]*types.Run, error) {
|
|
q := d.getGroupRunsFilteredQuery(phaseFilter, resultFilter, group, startRunCounter, limit, sortDirection, false)
|
|
|
|
runs, _, err := d.fetchRuns(tx, q)
|
|
|
|
return runs, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetRunConfig(tx *sql.Tx, runConfigID string) (*types.RunConfig, error) {
|
|
q := runConfigSelect()
|
|
q.Where(q.E("runconfig.id", runConfigID))
|
|
|
|
runConfigs, _, err := d.fetchRunConfigs(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
out, err := mustSingleRow(runConfigs)
|
|
return out, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetRunCounter(tx *sql.Tx, groupID string) (*types.RunCounter, error) {
|
|
q := runCounterSelect()
|
|
q.Where(q.E("group_id", groupID))
|
|
|
|
runCounters, _, err := d.fetchRunCounters(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
out, err := mustSingleRow(runCounters)
|
|
return out, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) NextRunCounter(tx *sql.Tx, groupID string) (uint64, error) {
|
|
// TODO(sgotti) Postgres currently (as of v15) returns unique constraint
|
|
// errors hiding serializable errors also if we check for the existance
|
|
// before the insert.
|
|
// If we have a not existing runcounter for groupid and multiple concurrent
|
|
// transactions try to insert the new runcounter only one will succeed and
|
|
// the others will receive a unique constraint violation error instead of a
|
|
// serialization error and won't by retried
|
|
// During an update of an already existing runcounter instead a serialiation
|
|
// error will be returned.
|
|
//
|
|
// This is probably related to this issue with multiple unique indexes
|
|
// https://www.postgresql.org/message-id/flat/CAGPCyEZG76zjv7S31v_xPeLNRuzj-m%3DY2GOY7PEzu7vhB%3DyQog%40mail.gmail.com
|
|
|
|
// This is a very unprobable event. To avoid it we could wait for postgres
|
|
// updates or use an upsert.
|
|
runCounter, err := d.GetRunCounter(tx, groupID)
|
|
if err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
if runCounter == nil {
|
|
runCounter = types.NewRunCounter(tx, groupID)
|
|
}
|
|
|
|
runCounter.Value++
|
|
|
|
if err := d.InsertOrUpdateRunCounter(tx, runCounter); err != nil {
|
|
return 0, errors.WithStack(err)
|
|
}
|
|
|
|
return runCounter.Value, nil
|
|
}
|
|
|
|
func (d *DB) GetRunEventsFromSequence(tx *sql.Tx, startSequence uint64, limit int) ([]*types.RunEvent, error) {
|
|
q := runEventSelect().OrderBy("sequence").Asc()
|
|
q.Where(q.G("sequence", startSequence))
|
|
|
|
if limit > 0 {
|
|
q.Limit(limit)
|
|
}
|
|
|
|
runEvents, _, err := d.fetchRunEvents(tx, q)
|
|
return runEvents, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetLastRunEvent(tx *sql.Tx) (*types.RunEvent, error) {
|
|
q := runEventSelect().OrderBy("sequence").Desc().Limit(1)
|
|
|
|
runEvents, _, err := d.fetchRunEvents(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
out, err := mustSingleRow(runEvents)
|
|
return out, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetExecutor(tx *sql.Tx, id string) (*types.Executor, error) {
|
|
q := executorSelect()
|
|
q.Where(q.E("executor.id", id))
|
|
|
|
executors, _, err := d.fetchExecutors(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
out, err := mustSingleRow(executors)
|
|
return out, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetExecutorByExecutorID(tx *sql.Tx, executorID string) (*types.Executor, error) {
|
|
q := executorSelect()
|
|
q.Where(q.E("executor.executor_id", executorID))
|
|
|
|
executors, _, err := d.fetchExecutors(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
out, err := mustSingleRow(executors)
|
|
return out, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetExecutors(tx *sql.Tx) ([]*types.Executor, error) {
|
|
q := executorSelect()
|
|
executors, _, err := d.fetchExecutors(tx, q)
|
|
|
|
return executors, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetExecutorTasks(tx *sql.Tx) ([]*types.ExecutorTask, error) {
|
|
q := executorTaskSelect()
|
|
executorTasks, _, err := d.fetchExecutorTasks(tx, q)
|
|
|
|
return executorTasks, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetExecutorTask(tx *sql.Tx, executorTaskID string) (*types.ExecutorTask, error) {
|
|
q := executorTaskSelect()
|
|
q.Where(q.E("executortask.id", executorTaskID))
|
|
|
|
executorTasks, _, err := d.fetchExecutorTasks(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
out, err := mustSingleRow(executorTasks)
|
|
return out, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetExecutorTasksByExecutor(tx *sql.Tx, executorID string) ([]*types.ExecutorTask, error) {
|
|
q := executorTaskSelect()
|
|
q.Where(q.E("executor_id", executorID))
|
|
|
|
executorTasks, _, err := d.fetchExecutorTasks(tx, q)
|
|
|
|
return executorTasks, errors.WithStack(err)
|
|
|
|
}
|
|
|
|
func (d *DB) GetExecutorTasksByRun(tx *sql.Tx, runID string) ([]*types.ExecutorTask, error) {
|
|
q := executorTaskSelect()
|
|
q.Where(q.E("run_id", runID))
|
|
|
|
executorTasks, _, err := d.fetchExecutorTasks(tx, q)
|
|
|
|
return executorTasks, errors.WithStack(err)
|
|
}
|
|
|
|
func (d *DB) GetExecutorTaskByRunTask(tx *sql.Tx, runID, runTaskID string) (*types.ExecutorTask, error) {
|
|
q := executorTaskSelect()
|
|
q.Where(q.And(q.E("run_id", runID), q.E("run_task_id", runTaskID)))
|
|
|
|
executorTasks, _, err := d.fetchExecutorTasks(tx, q)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
out, err := mustSingleRow(executorTasks)
|
|
return out, errors.WithStack(err)
|
|
}
|