package importers
import (
"context"
"encoding/csv"
"io"
"github.com/jackc/pgx/v5/pgxpool"
)
type Row struct {
Email string
Name string
}
func ImportUsers(ctx context.Context, db *pgxpool.Pool, r io.Reader) error {
cr := csv.NewReader(r)
batch := make([]Row, 0, 500)
flush := func() error {
if len(batch) == 0 {
return nil
}
b := &pgxpool.Batch{}
for _, row := range batch {
b.Queue(`INSERT INTO users(email, name) VALUES($1, $2) ON CONFLICT (email) DO UPDATE SET name=EXCLUDED.name`, row.Email, row.Name)
}
br := db.SendBatch(ctx, b)
defer br.Close()
if _, err := br.Exec(); err != nil {
return err
}
batch = batch[:0]
return nil
}
for {
rec, err := cr.Read()
if err == io.EOF {
return flush()
}
if err != nil {
return err
}
if len(rec) < 2 {
continue
}
batch = append(batch, Row{Email: rec[0], Name: rec[1]})
if len(batch) >= 500 {
if err := flush(); err != nil {
return err
}
}
}
}
CSV imports are where memory usage quietly explodes if you read everything at once. I stream rows with encoding/csv, validate each record, and batch inserts to keep DB overhead low. The important detail is backpressure: if the database is slow, the importer must naturally slow down rather than buffering millions of rows. I usually set a maximum batch size and flush on EOF. For validation, I keep a counter of rejected rows and return a summary so operators can see what happened without scanning logs. In production, I pair this with idempotency (import IDs) and a staging table so the main tables aren’t left half-updated on failures. The snippet below focuses on the core loop: read, validate, batch, flush. It’s simple, fast, and safe for large datasets.