Skip to content

Commit

Permalink
Merge pull request #1 from bgpkit/add-filtering
Browse files Browse the repository at this point in the history
add filtering by subscription
  • Loading branch information
digizeph committed Nov 28, 2021
2 parents fb4256f + cd4bb07 commit e9577a0
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 8 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ris-live-rs"
version = "0.0.1"
version = "0.1.0"
edition = "2021"
authors = ["Mingwei Zhang <[email protected]>"]
readme = "README.md"
Expand All @@ -18,9 +18,12 @@ keywords = ["bgp"]

serde={version="1.0", features=["derive"]}
serde_json = "1.0.69"
bgp-models = "0.4.0"
bgp-models = "0.5.0"

# cli-tool dependencies
tungstenite="0.16.0"
url = "2.1.0"
clap = "3.0.0-beta.5"

[[bin]]
name="ris-live-reader"
Expand Down
69 changes: 69 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,75 @@ fn main() {
}
```

## Filtering

`ris-live-rs` support filtering message by composing customized
ris-live subscription message. Use the `compose_subscription_message`
function to create a filtering message.

```rust
pub fn compose_subscription_message(
host: Option<String>,
msg_type: Option<String>,
require: Option<String>,
peer: Option<String>,
prefix: Option<String>,
path: Option<String>,
more_specific: bool,
less_specific: bool,
) -> String {
...
}

// subscribe to messages from one collector
let msg = compose_subscription_message(
opts.host,
opts.msg_type,
opts.require,
opts.peer,
opts.prefix,
opts.path,
opts.more_specific,
opts.less_specific
);
println!("{}", &msg);
socket.write_message(Message::Text(msg)).unwrap();
```

## `ris-live-reader`

`ris-live-rs` library also comes with a simple command-line program
that supports filtering and different output formats: `ris-live-reader`.

Full command-line options are:
```
ris-live-rs 0.1.0
Mingwei Zhang <[email protected]>
ris-live-reader is a simple cli tool that can stream BGP data from RIS-Live project with websocket
USAGE:
ris-live-reader [OPTIONS]
OPTIONS:
--client <CLIENT> client name to identify the stream [default: ris-live-rs]
-h, --help Print help information
--host <HOST> Filter by RRC host: e.g. rrc01
--json Output as JSON objects
--less-specific Match prefixes that are less specific (contain) `prefix`
--more-specific Match prefixes that are more specific (part of) `prefix`
--msg-type <MSG_TYPE> Only include messages of a given BGP or RIS type: UPDATE, OPEN,
NOTIFICATION, KEEPALIVE, or RIS_PEER_STATE
--path <PATH> ASN or pattern to match against the AS PATH attribute
--peer <PEER> Only include messages sent by the given BGP peer
--prefix <PREFIX> Filter UPDATE messages by prefixes in announcements or withdrawals
--pretty Pretty-print JSON output
--raw Print out raw message without parsing
--require <REQUIRE> Only include messages containing a given key
-V, --version Print version information
```

## Built with ❤️ by BGPKIT Team

BGPKIT is a small-team start-up that focus on building the best tooling for BGP data in Rust. We have 10 years of
Expand Down
57 changes: 57 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,63 @@ macro_rules! unwrap_or_return {
}
}

pub fn compose_subscription_message(
host: Option<String>,
msg_type: Option<String>,
require: Option<String>,
peer: Option<String>,
prefix: Option<String>,
path: Option<String>,
more_specific: bool,
less_specific: bool,
) -> String {
let mut options: Vec<String> = vec![];

if let Some(host) = host {
options.push(format!("\"host\": \"{}\"", host))
}

if let Some(msg_type) = msg_type {
options.push(format!("\"type\": \"{}\"", msg_type))
}

if let Some(require) = require {
options.push(format!("\"require\": \"{}\"", require))
}

if let Some(peer) = peer {
options.push(format!("\"peer\": \"{}\"", peer))
}

if let Some(prefix) = prefix {
options.push(format!("\"prefix\": \"{}\"", prefix))
}

if let Some(path) = path {
options.push(format!("\"path\": \"{}\"", path))
}

match more_specific {
true => {
options.push(format!("\"moreSpecific\": true"))
}
false => {
options.push(format!("\"moreSpecific\": false"))
}
}

match less_specific {
true => {
options.push(format!("\"lessSpecific\": true"))
}
false => {
options.push(format!("\"lessSpecific\": false"))
}
}

format!("{{\"type\": \"ris_subscribe\", \"data\":{{ {} }} }}", options.join(","))
}

/// This function parses one message and returns a result of a vector of [BgpElem]s or an error
pub fn parse_ris_live_message(msg_str: &str) -> Result<Vec<BgpElem>, ParserRisliveError> {

Expand Down
91 changes: 85 additions & 6 deletions src/reader/main.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,112 @@
use serde_json::json;
use tungstenite::{connect, Message};
use url::Url;
use ris_live_rs::error::ParserRisliveError;
use ris_live_rs::parse_ris_live_message;
use ris_live_rs::{compose_subscription_message, parse_ris_live_message};
use clap::Parser;

const RIS_LIVE_URL: &str = "ws://ris-live.ripe.net/v1/ws/?client=rust-bgpkit-parser";
const RIS_LIVE_URL_BASE: &str = "ws://ris-live.ripe.net/v1/ws/";

/// ris-live-reader is a simple cli tool that can stream BGP data from RIS-Live project with websocket.
#[derive(Parser)]
#[clap(version = "0.1.0", author = "Mingwei Zhang <[email protected]>")]
struct Opts {

/// client name to identify the stream
#[clap(long, default_value="ris-live-rs")]
client: String,

/// Filter by RRC host: e.g. rrc01
#[clap(long)]
host: Option<String>,

/// Only include messages of a given BGP or RIS type: UPDATE, OPEN, NOTIFICATION, KEEPALIVE, or RIS_PEER_STATE
#[clap(long)]
msg_type: Option<String>,

/// Only include messages containing a given key
#[clap(long)]
require: Option<String>,

/// Only include messages sent by the given BGP peer
#[clap(long)]
peer: Option<String>,

/// Filter UPDATE messages by prefixes in announcements or withdrawals
#[clap(long)]
prefix: Option<String>,

/// Match prefixes that are more specific (part of) `prefix`
#[clap(long, parse(from_flag = std::ops::Not::not))]
more_specific: bool,

/// Match prefixes that are less specific (contain) `prefix`
#[clap(long)]
less_specific: bool,

/// ASN or pattern to match against the AS PATH attribute
#[clap(long)]
path: Option<String>,

/// Output as JSON objects
#[clap(long)]
json: bool,

/// Pretty-print JSON output
#[clap(long)]
pretty: bool,

/// Print out raw message without parsing
#[clap(long)]
raw: bool,
}

/// This is an example of subscribing to RIS-Live's streaming data.
///
/// For more RIS-Live details, check out their documentation at https://ris-live.ripe.net/manual/
fn main() {
let opts: Opts = Opts::parse();

let url = format!("{}?client={}", RIS_LIVE_URL_BASE, opts.client);
// connect to RIPE RIS Live websocket server
let (mut socket, _response) =
connect(Url::parse(RIS_LIVE_URL).unwrap())
connect(Url::parse(url.as_str()).unwrap())
.expect("Can't connect to RIS Live websocket server");

// subscribe to messages from one collector
let msg = json!({"type": "ris_subscribe", "data": null}).to_string();
let msg = compose_subscription_message(
opts.host,
opts.msg_type,
opts.require,
opts.peer,
opts.prefix,
opts.path,
opts.more_specific,
opts.less_specific
);
println!("{}", &msg);
socket.write_message(Message::Text(msg)).unwrap();

loop {
let msg = socket.read_message().expect("Error reading message").to_string();
if msg.is_empty() {
continue
}
if opts.raw {
println!("{}", msg.as_str());
continue
}
match parse_ris_live_message(msg.as_str()) {
Ok(elems) => {
for e in elems {
println!("{}", e);
if opts.json{
if opts.pretty {
println!("{}", serde_json::to_string_pretty(&e).unwrap());
} else {
println!("{}", serde_json::json!(e));
}
} else {
println!("{}", e);
}
}
}
Err(error) => {
Expand Down

0 comments on commit e9577a0

Please sign in to comment.