Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3 from gocardless/lawrence-simplify
Browse files Browse the repository at this point in the history
Refactor to simplify
  • Loading branch information
lawrencejones committed Mar 2, 2019
2 parents ca1c8cd + 7be74e9 commit a7723a9
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 236 deletions.
4 changes: 2 additions & 2 deletions cmd/pgreplay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func main() {

// Poll our consumer to determine how much work remains
case <-poller.C:
if conns, pending := database.Pending(); pending > 0 {
logger.Log("event", "consume.pending", "connections", len(conns), "items", pending)
if connections, items := database.Pending(); connections > 0 {
logger.Log("event", "consume.pending", "connections", connections, "items", items)
}
}
}
Expand Down
179 changes: 79 additions & 100 deletions pkg/pgreplay/database.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pgreplay

import (
"fmt"
"sync"

"github.com/eapache/channels"
Expand Down Expand Up @@ -50,17 +49,13 @@ func NewDatabase(cfg pgx.ConnConfig) (*Database, error) {
return nil, err
}

return &Database{
cfg: cfg,
ConnInfo: conn.ConnInfo,
connections: map[SessionID]Connection{},
}, conn.Close()
return &Database{cfg, conn.ConnInfo, map[SessionID]*Conn{}}, conn.Close()
}

type Database struct {
cfg pgx.ConnConfig
ConnInfo *pgtype.ConnInfo
connections map[SessionID]Connection
pgx.ConnConfig
*pgtype.ConnInfo
conns map[SessionID]*Conn
}

// Consume iterates through all the items in the given channel and attempts to process
Expand All @@ -69,59 +64,56 @@ type Database struct {
// indicate unrecoverable failures.
//
// Once all items have finished processing, both channels will be closed.
func (d *Database) Consume(items chan ReplayItem) (chan error, chan error) {
errs, done := make(chan error), make(chan error)
func (d *Database) Consume(items chan Item) (chan error, chan error) {
var wg sync.WaitGroup

errs, done := make(chan error, 10), make(chan error)

go func() {
for item := range items {
if item == nil {
continue
}

Connect:

if item, ok := item.(*ConnectItem); ok {
if conn, err := d.Connect(&wg, item.Database(), item.User()); err == nil {
d.connections[item.SessionID()] = conn
go conn.Start()
var err error
conn, ok := d.conns[item.GetSessionID()]

// Connection did not exist, so create a new one
if !ok {
if conn, err = d.Connect(item); err != nil {
errs <- err
continue
}

continue
}
d.conns[item.GetSessionID()] = conn

if conn, ok := d.connections[item.SessionID()]; ok {
conn.items.In() <- item
wg.Add(1)
ConnectionsEstablishedTotal.Inc()
ConnectionsActive.Inc()

// If we're going to disconnect, then remove this connection from our pool and
// close the channel.
if _, ok := item.(*DisconnectItem); ok {
conn.items.Close()
delete(d.connections, item.SessionID())
}
} else {
errs <- fmt.Errorf("no connection for session %s", item.SessionID())
item = &ConnectItem{
Item{session: item.SessionID(), database: item.Database(), user: item.User()},
}
go func(conn *Conn) {
defer wg.Done()
defer ConnectionsActive.Dec()

goto Connect
if err := conn.Start(); err != nil {
errs <- err
}
}(conn)
}

conn.In() <- item
}

for _, conn := range d.connections {
if !conn.closed {
// Non-blocking channel op to avoid race between thinking the conn is closed and
// it actually being closed.
// Flush disconnects down each of our connection sessions, ensuring even connections
// that we don't have disconnects for in our logs get closed.
for _, conn := range d.conns {
if !conn.IsAlive() {
// Non-blocking channel op to avoid read-write-race between checking whether the
// connection is alive and the channel having been closed
select {
case conn.items.In() <- &DisconnectItem{}:
case conn.In() <- &Disconnect{}:
default:
}
}

conn.items.Close()
}

// Wait for every connection to terminate
wg.Wait()

close(errs)
Expand All @@ -131,82 +123,69 @@ func (d *Database) Consume(items chan ReplayItem) (chan error, chan error) {
return errs, done
}

// Pending returns a slice of connections that are yet to be closed, and the number of
// pending items that are still to be processed by all connections.
func (d *Database) Pending() (conns []Connection, pending int) {
conns = []Connection{}
for _, conn := range d.connections {
if !conn.closed {
conns = append(conns, conn)
pending += conn.items.Len()
}
}

return
}

// Connect establishes a new connection to the database, reusing the ConnInfo that was
// generated when the Database was constructed. The wg is incremented whenever we
// establish a new connection and decremented when we disconnect.
func (d *Database) Connect(wg *sync.WaitGroup, database, user string) (Connection, error) {
cfg := d.cfg
cfg.Database, cfg.User = database, user
// cfg.CustomConnInfo = func(_ *pgx.Conn) (*pgtype.ConnInfo, error) {
// return d.ConnInfo.DeepCopy(), nil
// }
func (d *Database) Connect(item Item) (*Conn, error) {
cfg := d.ConnConfig
cfg.Database, cfg.User = item.GetDatabase(), item.GetUser()
cfg.CustomConnInfo = func(_ *pgx.Conn) (*pgtype.ConnInfo, error) {
return d.ConnInfo.DeepCopy(), nil
}

conn, err := pgx.Connect(cfg)
if err == nil {
wg.Add(1)
ConnectionsEstablishedTotal.Inc()
ConnectionsActive.Inc()
if err != nil {
return nil, err
}

return &Conn{conn, channels.NewInfiniteChannel()}, nil
}

// Pending returns a count of the connections that are still active, and how many items
// are currently pending against all connections.
//
// There is a small risk that calling this function could result in a segfault, as Golang
// maps aren't race-safe with concurrent writes and reads. The alternative implementation
// is a lot messier though, so let's do this until there become problems.
func (d *Database) Pending() (connections, items int) {
for _, conn := range d.conns {
if conn.IsAlive() {
connections++
items += conn.Len()
}
}

return Connection{
Conn: conn,
database: d,
items: channels.NewInfiniteChannel(),
wg: wg,
}, err
return
}

type Connection struct {
// Conn represents a single database connection handling a stream of work Items
type Conn struct {
*pgx.Conn
database *Database
items channels.Channel
wg *sync.WaitGroup
closed bool
err error
channels.Channel
}

// Start begins to process the items that are placed into the Connection's channel. For
// every item we'll run the appropriate action for the current connection.
func (c Connection) Start() {
items := make(chan ReplayItem)
channels.Unwrap(c.items, items)
// Start begins to process the items that are placed into the Conn's channel. We'll finish
// once the connection has died or we run out of items to process.
func (c *Conn) Start() error {
items := make(chan Item)
channels.Unwrap(c.Channel, items)
defer c.Channel.Close()

for item := range items {
if item == nil {
continue
}

ItemsProcessedTotal.Inc()
ItemsMostRecentTimestamp.Set(float64(item.Time().Unix()))

switch item := item.(type) {
case *ExecuteItem:
c.Exec(item.Query, item.Parameters...)
case *StatementItem:
c.Exec(item.Query)
case *DisconnectItem:
if c.Conn != nil {
c.err = c.Close()
}
c.closed = true
c.wg.Done()
ItemsMostRecentTimestamp.Set(float64(item.GetTimestamp().Unix()))

err := item.Handle(c.Conn)

ConnectionsActive.Dec()
return
// If we're no longer alive, then we know we can no longer process items
if !c.IsAlive() {
return err
}
}

return nil
}
Loading

0 comments on commit a7723a9

Please sign in to comment.