Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Split pgreplay binary into a run command
Browse files Browse the repository at this point in the history
This enables us to add a new command that can do filtering of logs into
a pre-processed form.
  • Loading branch information
lawrencejones committed Mar 5, 2019
1 parent a7723a9 commit 4d621d6
Showing 1 changed file with 72 additions and 67 deletions.
139 changes: 72 additions & 67 deletions cmd/pgreplay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,27 @@ var logger kitlog.Logger
var Version string // assigned during build

var (
app = kingpin.New("pgreplay", "Replay Postgres logs against database").Version(Version)
host = app.Flag("host", "PostgreSQL database host").Required().String()
port = app.Flag("port", "PostgreSQL database port").Default("5432").Uint16()
datname = app.Flag("database", "PostgreSQL root database").Default("postgres").String()
user = app.Flag("user", "PostgreSQL root user").Default("postgres").String()
app = kingpin.New("pgreplay", "Replay Postgres logs against database").Version(Version)

// Global flags applying to every command
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
errlogFile = app.Flag("errlog-file", "Path to PostgreSQL errlog").Required().ExistingFile()
replayRate = app.Flag("replay-rate", "Rate of playback, will execute queries at Nx speed").Default("1").Float()
startFlag = app.Flag("start", "Play logs from this time onward ("+pgreplay.PostgresTimestampFormat+")").String()
finishFlag = app.Flag("finish", "Stop playing logs at this time ("+pgreplay.PostgresTimestampFormat+")").String()
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
pollInterval = app.Flag("poll-interval", "Interval between polling for finish").Default("5s").Duration()
metricsAddress = app.Flag("metrics-address", "Address to bind HTTP metrics listener").Default("127.0.0.1").String()
metricsPort = app.Flag("metrics-port", "Port to bind HTTP metrics listener").Default("9445").Uint16()

run = app.Command("run", "Replay from log files against a real database")
runHost = run.Flag("host", "PostgreSQL database host").Required().String()
runPort = run.Flag("port", "PostgreSQL database port").Default("5432").Uint16()
runDatname = run.Flag("database", "PostgreSQL root database").Default("postgres").String()
runUser = run.Flag("user", "PostgreSQL root user").Default("postgres").String()
runPollInterval = run.Flag("poll-interval", "Interval between polling for finish").Default("5s").Duration()
)

func main() {
if _, err := app.Parse(os.Args[1:]); err != nil {
kingpin.Fatalf("%s, try --help", err)
}
command := kingpin.MustParse(app.Parse(os.Args[1:]))

logger = kitlog.NewLogfmtLogger(kitlog.NewSyncWriter(os.Stderr))
logger = level.NewFilter(logger, level.AllowInfo())
Expand All @@ -57,74 +59,77 @@ func main() {
http.ListenAndServe(fmt.Sprintf("%s:%v", *metricsAddress, *metricsPort), nil)
}()

errlog, err := os.Open(*errlogFile)
if err != nil {
logger.Log("event", "logfile.error", "error", err)
os.Exit(255)
}

database, err := pgreplay.NewDatabase(pgx.ConnConfig{
Host: *host,
Port: *port,
Database: *datname,
User: *user,
})

if err != nil {
logger.Log("event", "postgres.error", "error", err)
os.Exit(255)
}

items, logerrs, done := pgreplay.Parse(errlog)
switch command {
case run.FullCommand():
errlog, err := os.Open(*errlogFile)
if err != nil {
logger.Log("event", "logfile.error", "error", err)
os.Exit(255)
}

go func() {
logger.Log("event", "parse.finished", "error", <-done)
}()
database, err := pgreplay.NewDatabase(pgx.ConnConfig{
Host: *runHost,
Port: *runPort,
Database: *runDatname,
User: *runUser,
})

go func() {
for err := range logerrs {
level.Debug(logger).Log("event", "parse.error", "error", err)
if err != nil {
logger.Log("event", "postgres.error", "error", err)
os.Exit(255)
}
}()

var start, finish *time.Time

if start, err = parseTimestamp(*startFlag); err != nil {
kingpin.Fatalf("--start flag %s", err)
}
items, logerrs, done := pgreplay.Parse(errlog)

if finish, err = parseTimestamp(*finishFlag); err != nil {
kingpin.Fatalf("--finish flag %s", err)
}
go func() {
logger.Log("event", "parse.finished", "error", <-done)
}()

stream, err := pgreplay.NewStreamer(start, finish).Stream(items, *replayRate)
if err != nil {
kingpin.Fatalf("failed to start streamer: %s", err)
}
go func() {
for err := range logerrs {
level.Debug(logger).Log("event", "parse.error", "error", err)
}
}()

errs, consumeDone := database.Consume(stream)
poller := time.NewTicker(*pollInterval)
var start, finish *time.Time

var status int
if start, err = parseTimestamp(*startFlag); err != nil {
kingpin.Fatalf("--start flag %s", err)
}

for {
select {
case err := <-errs:
if err != nil {
logger.Log("event", "consume.error", "error", err)
}
case err := <-consumeDone:
if err != nil {
status = 255
}
if finish, err = parseTimestamp(*finishFlag); err != nil {
kingpin.Fatalf("--finish flag %s", err)
}

logger.Log("event", "consume.finished", "error", err, "status", status)
os.Exit(status)
stream, err := pgreplay.NewStreamer(start, finish).Stream(items, *replayRate)
if err != nil {
kingpin.Fatalf("failed to start streamer: %s", err)
}

// Poll our consumer to determine how much work remains
case <-poller.C:
if connections, items := database.Pending(); connections > 0 {
logger.Log("event", "consume.pending", "connections", connections, "items", items)
errs, consumeDone := database.Consume(stream)
poller := time.NewTicker(*runPollInterval)

var status int

for {
select {
case err := <-errs:
if err != nil {
logger.Log("event", "consume.error", "error", err)
}
case err := <-consumeDone:
if err != nil {
status = 255
}

logger.Log("event", "consume.finished", "error", err, "status", status)
os.Exit(status)

// Poll our consumer to determine how much work remains
case <-poller.C:
if connections, items := database.Pending(); connections > 0 {
logger.Log("event", "consume.pending", "connections", connections, "items", items)
}
}
}
}
Expand Down

0 comments on commit 4d621d6

Please sign in to comment.