diff --git a/cmd/pgreplay/main.go b/cmd/pgreplay/main.go index e655931..1bf1fe6 100644 --- a/cmd/pgreplay/main.go +++ b/cmd/pgreplay/main.go @@ -21,25 +21,27 @@ var logger kitlog.Logger var Version string // assigned during build var ( - app = kingpin.New("pgreplay", "Replay Postgres logs against database").Version(Version) - host = app.Flag("host", "PostgreSQL database host").Required().String() - port = app.Flag("port", "PostgreSQL database port").Default("5432").Uint16() - datname = app.Flag("database", "PostgreSQL root database").Default("postgres").String() - user = app.Flag("user", "PostgreSQL root user").Default("postgres").String() + app = kingpin.New("pgreplay", "Replay Postgres logs against database").Version(Version) + + // Global flags applying to every command + debug = app.Flag("debug", "Enable debug logging").Default("false").Bool() errlogFile = app.Flag("errlog-file", "Path to PostgreSQL errlog").Required().ExistingFile() replayRate = app.Flag("replay-rate", "Rate of playback, will execute queries at Nx speed").Default("1").Float() startFlag = app.Flag("start", "Play logs from this time onward ("+pgreplay.PostgresTimestampFormat+")").String() finishFlag = app.Flag("finish", "Stop playing logs at this time ("+pgreplay.PostgresTimestampFormat+")").String() - debug = app.Flag("debug", "Enable debug logging").Default("false").Bool() - pollInterval = app.Flag("poll-interval", "Interval between polling for finish").Default("5s").Duration() metricsAddress = app.Flag("metrics-address", "Address to bind HTTP metrics listener").Default("127.0.0.1").String() metricsPort = app.Flag("metrics-port", "Port to bind HTTP metrics listener").Default("9445").Uint16() + + run = app.Command("run", "Replay from log files against a real database") + runHost = run.Flag("host", "PostgreSQL database host").Required().String() + runPort = run.Flag("port", "PostgreSQL database port").Default("5432").Uint16() + runDatname = run.Flag("database", "PostgreSQL root database").Default("postgres").String() + runUser = run.Flag("user", "PostgreSQL root user").Default("postgres").String() + runPollInterval = run.Flag("poll-interval", "Interval between polling for finish").Default("5s").Duration() ) func main() { - if _, err := app.Parse(os.Args[1:]); err != nil { - kingpin.Fatalf("%s, try --help", err) - } + command := kingpin.MustParse(app.Parse(os.Args[1:])) logger = kitlog.NewLogfmtLogger(kitlog.NewSyncWriter(os.Stderr)) logger = level.NewFilter(logger, level.AllowInfo()) @@ -57,74 +59,77 @@ func main() { http.ListenAndServe(fmt.Sprintf("%s:%v", *metricsAddress, *metricsPort), nil) }() - errlog, err := os.Open(*errlogFile) - if err != nil { - logger.Log("event", "logfile.error", "error", err) - os.Exit(255) - } - - database, err := pgreplay.NewDatabase(pgx.ConnConfig{ - Host: *host, - Port: *port, - Database: *datname, - User: *user, - }) - - if err != nil { - logger.Log("event", "postgres.error", "error", err) - os.Exit(255) - } - - items, logerrs, done := pgreplay.Parse(errlog) + switch command { + case run.FullCommand(): + errlog, err := os.Open(*errlogFile) + if err != nil { + logger.Log("event", "logfile.error", "error", err) + os.Exit(255) + } - go func() { - logger.Log("event", "parse.finished", "error", <-done) - }() + database, err := pgreplay.NewDatabase(pgx.ConnConfig{ + Host: *runHost, + Port: *runPort, + Database: *runDatname, + User: *runUser, + }) - go func() { - for err := range logerrs { - level.Debug(logger).Log("event", "parse.error", "error", err) + if err != nil { + logger.Log("event", "postgres.error", "error", err) + os.Exit(255) } - }() - - var start, finish *time.Time - if start, err = parseTimestamp(*startFlag); err != nil { - kingpin.Fatalf("--start flag %s", err) - } + items, logerrs, done := pgreplay.Parse(errlog) - if finish, err = parseTimestamp(*finishFlag); err != nil { - kingpin.Fatalf("--finish flag %s", err) - } + go func() { + logger.Log("event", "parse.finished", "error", <-done) + }() - stream, err := pgreplay.NewStreamer(start, finish).Stream(items, *replayRate) - if err != nil { - kingpin.Fatalf("failed to start streamer: %s", err) - } + go func() { + for err := range logerrs { + level.Debug(logger).Log("event", "parse.error", "error", err) + } + }() - errs, consumeDone := database.Consume(stream) - poller := time.NewTicker(*pollInterval) + var start, finish *time.Time - var status int + if start, err = parseTimestamp(*startFlag); err != nil { + kingpin.Fatalf("--start flag %s", err) + } - for { - select { - case err := <-errs: - if err != nil { - logger.Log("event", "consume.error", "error", err) - } - case err := <-consumeDone: - if err != nil { - status = 255 - } + if finish, err = parseTimestamp(*finishFlag); err != nil { + kingpin.Fatalf("--finish flag %s", err) + } - logger.Log("event", "consume.finished", "error", err, "status", status) - os.Exit(status) + stream, err := pgreplay.NewStreamer(start, finish).Stream(items, *replayRate) + if err != nil { + kingpin.Fatalf("failed to start streamer: %s", err) + } - // Poll our consumer to determine how much work remains - case <-poller.C: - if connections, items := database.Pending(); connections > 0 { - logger.Log("event", "consume.pending", "connections", connections, "items", items) + errs, consumeDone := database.Consume(stream) + poller := time.NewTicker(*runPollInterval) + + var status int + + for { + select { + case err := <-errs: + if err != nil { + logger.Log("event", "consume.error", "error", err) + } + case err := <-consumeDone: + if err != nil { + status = 255 + } + + logger.Log("event", "consume.finished", "error", err, "status", status) + os.Exit(status) + + // Poll our consumer to determine how much work remains + case <-poller.C: + if connections, items := database.Pending(); connections > 0 { + logger.Log("event", "consume.pending", "connections", connections, "items", items) + } } } }