Files
sub2api-cn-relay-manager/internal/app/http_batch_runs.go

218 lines
7.5 KiB
Go

package app
import (
"context"
"database/sql"
"fmt"
"net/http"
"strings"
"sub2api-cn-relay-manager/internal/batch"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
func handleGetBatchImportRun(w http.ResponseWriter, r *http.Request, fn func(context.Context, string) (batch.RunSummaryProjection, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "get-batch-import-run action is not configured"})
return
}
runID := strings.TrimSpace(r.PathValue("run_id"))
if runID == "" {
writeHTTPError(w, &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "run_id is required"})
return
}
run, err := fn(r.Context(), runID)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusOK, map[string]any{
"run": run,
"recent_warnings": run.RecentWarnings,
})
}
func handleListBatchImportRunItems(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListBatchImportRunItemsRequest) (ListBatchImportRunItemsResponse, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-batch-import-run-items action is not configured"})
return
}
runID := strings.TrimSpace(r.PathValue("run_id"))
if runID == "" {
writeHTTPError(w, &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "run_id is required"})
return
}
req := ListBatchImportRunItemsRequest{
RunID: runID,
CurrentStage: strings.TrimSpace(r.URL.Query().Get("current_stage")),
ConfirmationStatus: strings.TrimSpace(r.URL.Query().Get("confirmation_status")),
AccessStatus: strings.TrimSpace(r.URL.Query().Get("access_status")),
ProviderID: strings.TrimSpace(r.URL.Query().Get("provider_id")),
MatchedAccountState: strings.TrimSpace(r.URL.Query().Get("matched_account_state")),
AccountResolution: strings.TrimSpace(r.URL.Query().Get("account_resolution")),
Query: strings.TrimSpace(r.URL.Query().Get("q")),
Cursor: strings.TrimSpace(r.URL.Query().Get("cursor")),
Limit: parsePositiveInt(r.URL.Query().Get("limit")),
}
if hasWarningRaw := strings.TrimSpace(r.URL.Query().Get("has_warning")); hasWarningRaw != "" {
value := strings.EqualFold(hasWarningRaw, "true")
req.HasWarning = &value
}
items, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
if items.Items == nil {
items.Items = []batch.ItemSummaryProjection{}
}
writeJSON(w, http.StatusOK, items)
}
func handleGetBatchImportRunItem(w http.ResponseWriter, r *http.Request, fn func(context.Context, GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "get-batch-import-run-item action is not configured"})
return
}
req := GetBatchImportRunItemRequest{
RunID: strings.TrimSpace(r.PathValue("run_id")),
ItemID: strings.TrimSpace(r.PathValue("item_id")),
}
if req.RunID == "" || req.ItemID == "" {
writeHTTPError(w, &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "run_id and item_id are required"})
return
}
item, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusOK, item)
}
func buildGetBatchImportRunAction(sqliteDSN string) func(context.Context, string) (batch.RunSummaryProjection, error) {
return func(ctx context.Context, runID string) (batch.RunSummaryProjection, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return batch.RunSummaryProjection{}, err
}
defer store.Close()
run, err := store.ImportRuns().GetByRunID(ctx, runID)
if err != nil {
if err == sql.ErrNoRows {
return batch.RunSummaryProjection{}, fmt.Errorf("run not found: %s", runID)
}
return batch.RunSummaryProjection{}, err
}
return batch.ProjectRunSummary(run), nil
}
}
func buildListBatchImportRunItemsAction(sqliteDSN string) func(context.Context, ListBatchImportRunItemsRequest) (ListBatchImportRunItemsResponse, error) {
return func(ctx context.Context, req ListBatchImportRunItemsRequest) (ListBatchImportRunItemsResponse, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return ListBatchImportRunItemsResponse{}, err
}
defer store.Close()
if _, err := store.ImportRuns().GetByRunID(ctx, req.RunID); err != nil {
if err == sql.ErrNoRows {
return ListBatchImportRunItemsResponse{}, fmt.Errorf("run not found: %s", req.RunID)
}
return ListBatchImportRunItemsResponse{}, err
}
items, err := store.ImportRunItems().ListByRunID(ctx, req.RunID)
if err != nil {
return ListBatchImportRunItemsResponse{}, err
}
limit := defaultPositiveInt(req.Limit, 50)
result := make([]batch.ItemSummaryProjection, 0, limit)
nextCursor := (*string)(nil)
started := strings.TrimSpace(req.Cursor) == ""
for _, item := range items {
if !started {
if item.ItemID == strings.TrimSpace(req.Cursor) {
started = true
}
continue
}
view := batch.ProjectItemSummary(item)
if !matchesItemFilters(view, req) {
continue
}
if len(result) >= limit {
cursor := item.ItemID
nextCursor = &cursor
break
}
result = append(result, view)
}
return ListBatchImportRunItemsResponse{Items: result, NextCursor: nextCursor}, nil
}
}
func buildGetBatchImportRunItemAction(sqliteDSN string) func(context.Context, GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) {
return func(ctx context.Context, req GetBatchImportRunItemRequest) (batch.ItemDetailProjection, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return batch.ItemDetailProjection{}, err
}
defer store.Close()
item, err := store.ImportRunItems().GetByItemID(ctx, req.ItemID)
if err != nil {
if err == sql.ErrNoRows {
return batch.ItemDetailProjection{}, fmt.Errorf("item not found: %s", req.ItemID)
}
return batch.ItemDetailProjection{}, err
}
if item.RunID != req.RunID {
return batch.ItemDetailProjection{}, fmt.Errorf("item not found in run %s", req.RunID)
}
events, err := store.ImportRunEvents().ListByItemID(ctx, req.ItemID)
if err != nil {
return batch.ItemDetailProjection{}, err
}
return batch.ProjectItemDetail(item, events)
}
}
func matchesItemFilters(view batch.ItemSummaryProjection, req ListBatchImportRunItemsRequest) bool {
if req.CurrentStage != "" && view.CurrentStage != req.CurrentStage {
return false
}
if req.ConfirmationStatus != "" && view.ConfirmationStatus != req.ConfirmationStatus {
return false
}
if req.AccessStatus != "" && view.AccessStatus != req.AccessStatus {
return false
}
if req.ProviderID != "" && view.ProviderID != req.ProviderID {
return false
}
if req.MatchedAccountState != "" && view.MatchedAccountState != req.MatchedAccountState {
return false
}
if req.AccountResolution != "" && view.AccountResolution != req.AccountResolution {
return false
}
if req.HasWarning != nil {
hasWarning := len(view.AdvisoryMessages) > 0
if hasWarning != *req.HasWarning {
return false
}
}
if req.Query != "" {
query := strings.ToLower(req.Query)
if !strings.Contains(strings.ToLower(view.ItemID), query) &&
!strings.Contains(strings.ToLower(view.ProviderID), query) &&
!strings.Contains(strings.ToLower(view.BaseURL), query) {
return false
}
}
return true
}