Skip to content

Commit

Permalink
Port to GCP. Refactor for both later.
Browse files Browse the repository at this point in the history
  • Loading branch information
gh-mlfowler committed Dec 16, 2016
1 parent 2655779 commit 64f8ece
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 59 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
varnish-purge-proxy
===================

[![Build Status](https://travis-ci.org/BashtonLtd/varnish-purge-proxy.svg?branch=master)](https://travis-ci.org/BashtonLtd/varnish-purge-proxy)

Proxy purge requests to multiple varnish servers

Specify tags to limit instances that receive the purge request, multiple tags can be used. You must specify at least one tag.
You must specify the following options:

`./varnish-purge-proxy Service:varnish Environment:live`
`./varnish-purge-proxy --region=europe-west1 --project=my-google-project --credentials=/path/to/credentials.json`

You can also specify host and port to listen on:

Expand All @@ -24,7 +22,7 @@ varnish-purge-proxy will cache the IP lookup for 60 seconds, you can change this
Authentication
--------------

AWS access key and secret key can be added as environment variables, using either `AWS_ACCESS_KEY_ID` or `AWS_SECRET_ACCESS_KEY`. If these are not available then IAM credentials for the instance will be checked.
A service account needs to be created in Google that has at least "Compute Network Viewer".

Building
--------
Expand Down
108 changes: 54 additions & 54 deletions varnish-purge-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

/*
* varnish-purge-proxy
* (C) Copyright Bashton Ltd, 2014
* (C) Copyright Bashton Ltd, 2016
*
* varnish-purge-proxy is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -19,21 +19,22 @@ package main
*
*/
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"log/syslog"
"net/http"
"net/url"
"strings"
"os"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
compute "google.golang.org/api/compute/v1"

"gopkg.in/alecthomas/kingpin.v1"
)

Expand All @@ -42,9 +43,11 @@ var (
cache = kingpin.Flag("cache", "Time in seconds to cache instance IP lookup.").Default("60").Int()
destport = kingpin.Flag("destport", "The destination port of the varnish server to target.").Default("80").Int()
listen = kingpin.Flag("listen", "Host address to listen on, defaults to 127.0.0.1").Default("127.0.0.1").String()
tags = kingpin.Arg("tag", "Key:value pair of tags to match EC2 instances.").Strings()
region = kingpin.Flag("region", "Google region to discover varnish servers").Required().String()
project = kingpin.Flag("project", "Google project to discover varnish servers").Required().String()
debug = kingpin.Flag("debug", "Log additional debug messages.").Bool()
region string
credentials = kingpin.Flag("credentials", "Path to service account JSON credentials").Required().String()
nameprefix = kingpin.Flag("nameprefix", "Instance name prefix, eg. varnish").Default("varnish").String()
resetAfter time.Time
taggedInstances = []string{}
)
Expand All @@ -67,33 +70,35 @@ func main() {
log.SetOutput(sl)
}

if len(*tags) == 0 {
fmt.Println("No tags specified")
os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", *credentials)
src, err := google.DefaultTokenSource(oauth2.NoContext, compute.ComputeReadonlyScope)
if err != nil {
log.Fatalf("Unable to acquire token source: %v", err)
return
}

region, err = ec2metadata.New(session.New()).Region()
oauthClient := oauth2.NewClient(context.Background(), src)

service, err := compute.New(oauthClient)
if err != nil {
log.Printf("Unable to retrieve the region from the EC2 instance %v\n", err)
log.Fatalf("Unable to get client: ", err)
return
}

// Set up access to ec2
svc := ec2.New(session.New(), &aws.Config{Region: &region})

go serveHTTP(*port, *listen, svc)
go serveHTTP(*port, *listen, service)

select {}
}

func serveHTTP(port int, host string, ec2region *ec2.EC2) {
func serveHTTP(port int, host string, service *compute.Service) {
timeout := time.Duration(5 * time.Second)
client := http.Client{
Timeout: timeout,
}

mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
requestHandler(w, r, &client, ec2region)
requestHandler(w, r, &client, service)
})

addr := fmt.Sprintf("%v:%d", host, port)
Expand All @@ -110,7 +115,7 @@ func serveHTTP(port int, host string, ec2region *ec2.EC2) {
log.Println(err.Error())
}

func requestHandler(w http.ResponseWriter, r *http.Request, client *http.Client, ec2region *ec2.EC2) {
func requestHandler(w http.ResponseWriter, r *http.Request, client *http.Client, service *compute.Service) {
// check that request is PURGE and has X-Purge-Regex header set
if _, exists := r.Header["X-Purge-Regex"]; !exists || r.Method != "PURGE" {
if *debug {
Expand All @@ -123,7 +128,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request, client *http.Client,
privateIPs := taggedInstances
// Check instance cache
if time.Now().After(resetAfter) {
privateIPs = getPrivateIPs(ec2region)
privateIPs = getPrivateIPs(service)
resetAfter = time.Now().Add(time.Duration(*cache*1000) * time.Millisecond)
taggedInstances = privateIPs
}
Expand Down Expand Up @@ -180,51 +185,46 @@ func copyRequest(src *http.Request) (*http.Request, error) {
return req, nil
}

func getPrivateIPs(ec2region *ec2.EC2) []string {
func getPrivateIPs(service *compute.Service) []string {
instances := []string{}
filters, err := buildFilter(*tags)
if err != nil {
log.Println(err)
}

request := ec2.DescribeInstancesInput{Filters: filters}
result, err := ec2region.DescribeInstances(&request)
if err != nil {
log.Println(err)
ctx := context.Background()

zones := make([]string, 0)
{
call := service.Zones.List(*project)
call.Filter("region eq .*" + *region + ".*")
if err := call.Pages(ctx, func(page *compute.ZoneList) error {
for _, v := range page.Items {
log.Printf("Found zone: %s", v.Name)
zones = append(zones, v.Name)
}
return nil
}); err != nil {
log.Fatalf("Failed to list zones: ", err)
}
}

for _, reservation := range result.Reservations {
for _, instance := range reservation.Instances {
if instance.PrivateIpAddress != nil {
if *debug {
log.Printf("Adding %s to IP list\n", *instance.PrivateIpAddress)
for _, zone := range zones {
log.Printf("Checking zone: %s", zone)
call := service.Instances.List(*project, zone)
call.Filter("(name eq .*" + *nameprefix + ".*) (status eq RUNNING)")
if err := call.Pages(ctx, func(page *compute.InstanceList) error {
for _, v := range page.Items {
log.Printf("Found instance: %s", v.Name)
for _, n := range v.NetworkInterfaces {
log.Printf("Found address: %s", n.NetworkIP)
instances = append(instances, n.NetworkIP)
}
instances = append(instances, *instance.PrivateIpAddress)
}
return nil
}); err != nil {
log.Fatalf("Failed to list instances: ", err)
}
}

return instances
}

func buildFilter(tags []string) ([]*ec2.Filter, error) {
filters := []*ec2.Filter{}

for _, tag := range tags {
parts := strings.SplitN(tag, ":", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("expected TAG:VALUE got %s", tag)
}
tagName := fmt.Sprintf("tag:%s", *aws.String(parts[0]))
filters = append(filters, &ec2.Filter{
Name: &tagName,
Values: []*string{aws.String(parts[1])},
})
}
return filters, nil

}

func forwardRequest(r *http.Request, ip string, destport int, client http.Client, requesturl string, responseChannel chan int, wg *sync.WaitGroup) {
defer wg.Done()
r.Host = ip
Expand Down

0 comments on commit 64f8ece

Please sign in to comment.