Skip to content
This repository has been archived by the owner on Dec 18, 2019. It is now read-only.

support redis over tcp #103

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/analyzer/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from time import time
from msgpack import unpackb, packb
from redis import StrictRedis
import utils

from settings import (
ALGORITHMS,
Expand All @@ -15,15 +15,14 @@
MAX_TOLERABLE_BOREDOM,
MIN_TOLERABLE_LENGTH,
STALE_PERIOD,
REDIS_SOCKET_PATH,
ENABLE_SECOND_ORDER,
BOREDOM_SET_SIZE,
)

from algorithm_exceptions import *

logger = logging.getLogger("AnalyzerLog")
redis_conn = StrictRedis(unix_socket_path=REDIS_SOCKET_PATH)
redis_conn = utils.redis_conn()

"""
This is no man's land. Do anything you want in here,
Expand Down
8 changes: 4 additions & 4 deletions src/analyzer/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
from Queue import Empty
from redis import StrictRedis
from time import time, sleep
from threading import Thread
from collections import defaultdict
Expand All @@ -16,6 +15,7 @@
from alerters import trigger_alert
from algorithms import run_selected_algorithm
from algorithm_exceptions import *
from utils import redis_conn, redis_conn_string

logger = logging.getLogger("AnalyzerLog")

Expand All @@ -26,7 +26,7 @@ def __init__(self, parent_pid):
Initialize the Analyzer
"""
super(Analyzer, self).__init__()
self.redis_conn = StrictRedis(unix_socket_path = settings.REDIS_SOCKET_PATH)
self.redis_conn = redis_conn()
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
Expand Down Expand Up @@ -136,9 +136,9 @@ def run(self):
try:
self.redis_conn.ping()
except:
logger.error('skyline can\'t connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
logger.error('skyline can\'t connect to redis at socket path %s' % redis_conn_string())
sleep(10)
self.redis_conn = StrictRedis(unix_socket_path = settings.REDIS_SOCKET_PATH)
self.redis_conn = redis_conn()
continue

# Discover unique metrics
Expand Down
9 changes: 5 additions & 4 deletions src/horizon/roomba.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from os import kill
from redis import StrictRedis, WatchError
from redis import WatchError
from multiprocessing import Process
from threading import Thread
from msgpack import Unpacker, packb
from types import TupleType
from time import time, sleep
from utils import redis_conn, redis_conn_string

import logging
import settings
Expand All @@ -18,7 +19,7 @@ class Roomba(Thread):
"""
def __init__(self, parent_pid, skip_mini):
super(Roomba, self).__init__()
self.redis_conn = StrictRedis(unix_socket_path = settings.REDIS_SOCKET_PATH)
self.redis_conn = redis_conn()
self.daemon = True
self.parent_pid = parent_pid
self.skip_mini = skip_mini
Expand Down Expand Up @@ -158,9 +159,9 @@ def run(self):
try:
self.redis_conn.ping()
except:
logger.error('roomba can\'t connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
logger.error('roomba can\'t connect to redis at socket path %s' % redis_conn_string())
sleep(10)
self.redis_conn = StrictRedis(unix_socket_path = settings.REDIS_SOCKET_PATH)
self.redis_conn = redis_conn()
continue

# Spawn processes
Expand Down
9 changes: 5 additions & 4 deletions src/horizon/worker.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from os import kill, system
from redis import StrictRedis, WatchError
from redis import WatchError
from multiprocessing import Process
from Queue import Empty
from msgpack import packb
from time import time, sleep
from utils import redis_conn, redis_conn_string

import logging
import socket
Expand All @@ -19,7 +20,7 @@ class Worker(Process):
"""
def __init__(self, queue, parent_pid, skip_mini, canary=False):
super(Worker, self).__init__()
self.redis_conn = StrictRedis(unix_socket_path = settings.REDIS_SOCKET_PATH)
self.redis_conn = redis_conn()
self.q = queue
self.parent_pid = parent_pid
self.daemon = True
Expand Down Expand Up @@ -74,9 +75,9 @@ def run(self):
try:
self.redis_conn.ping()
except:
logger.error('worker can\'t connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
logger.error('worker can\'t connect to redis at socket path %s' % redis_conn_string())
sleep(10)
self.redis_conn = StrictRedis(unix_socket_path = settings.REDIS_SOCKET_PATH)
self.redis_conn = redis_conn()
pipe = self.redis_conn.pipeline()
continue

Expand Down
2 changes: 2 additions & 0 deletions src/settings.py.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Shared settings

# The path for the Redis unix socket
REDIS_SOCKET_PATH = '/tmp/redis.sock'
# Or you can use a tcp socket
# REDIS_HOST_PORT = ('localhost', 6379)

# The Skyline logs directory. Do not include a trailing slash.
LOG_PATH = '/var/log/skyline'
Expand Down
18 changes: 18 additions & 0 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from redis import StrictRedis

import settings


def redis_conn_string():
if settings.REDIS_HOST_PORT is None:
return settings.REDIS_SOCKET_PATH
else:
return settings.REDIS_HOST_PORT


def redis_conn():
if settings.REDIS_HOST_PORT is not None:
(host, port) = settings.REDIS_HOST_PORT
return StrictRedis(host=host, port=port)
else:
return StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
5 changes: 3 additions & 2 deletions src/webapp/webapp.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import redis
import logging
import simplejson as json
import sys
Expand All @@ -7,11 +6,13 @@
from daemon import runner
from os.path import dirname, abspath

from utils import redis_conn

# add the shared settings file to namespace
sys.path.insert(0, dirname(dirname(abspath(__file__))))
import settings

REDIS_CONN = redis.StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
REDIS_CONN = redis_conn()

app = Flask(__name__)
app.config['PROPAGATE_EXCEPTIONS'] = True
Expand Down
4 changes: 2 additions & 2 deletions utils/continuity.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import redis
import msgpack
import sys
import time
from os.path import dirname, abspath

from utils import redis_conn
# add the shared settings file to namespace
sys.path.insert(0, ''.join((dirname(dirname(abspath(__file__))), "/src")))
import settings
Expand All @@ -12,7 +12,7 @@


def check_continuity(metric, mini = False):
r = redis.StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
r = redis_conn()
if mini:
raw_series = r.get(settings.MINI_NAMESPACE + metric)
else:
Expand Down
4 changes: 2 additions & 2 deletions utils/seed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from multiprocessing import Manager, Process, log_to_stderr
from struct import Struct, pack

import redis
import msgpack
from utils import redis_conn

# Get the current working directory of this file.
# http://stackoverflow.com/a/4060259/120999
Expand Down Expand Up @@ -44,7 +44,7 @@ def seed():
sock.sendto(packet, (socket.gethostname(), settings.UDP_PORT))

print "Connecting to Redis..."
r = redis.StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
r = redis_conn()
time.sleep(5)

try:
Expand Down