Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: add rtmp for realtime message proxy #69

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions examples/rtmp/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
PREFIX?=/usr
BINDIR?=$(PREFIX)/bin

GOHOSTOS?=$(shell go env GOHOSTOS)
GOARCH?=$(shell go env GOARCH)

.PHONY: all
all: rtmp_service rtmp_edge

.PHONY: clean
clean:
rm rtmp_service rtmp_edge

.PHONY: rtmp_service
rtmp_service: service/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o ./bin/rtmp_service service/*.go

.PHONY: rtmp_edge
rtmp_edge: edge/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o ./bin/rtmp_edge edge/*.go
62 changes: 62 additions & 0 deletions examples/rtmp/edge/edge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"fmt"
"io"
"net"
"sync"

"github.com/singchia/frontier/api/dataplane/v1/edge"
"github.com/spf13/pflag"
)

func main() {
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:30012", "address to dial")
name := pflag.String("name", "alice", "user name to join chatroom")
listen := pflag.String("listen", "127.0.0.1:1935", "rtmp port to proxy")
pflag.Parse()
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
cli, err := edge.NewNoRetryEdge(dialer, edge.OptionEdgeMeta([]byte(*name)))
if err != nil {
fmt.Println("new edge err:", err)
return
}
for {
ln, err := net.Listen("tcp", *listen)
if err != nil {
return
}
for {
netconn, err := ln.Accept()
if err != nil {
fmt.Printf("accept err: %s\n", err)
break
}
go func() {
st, err := cli.OpenStream("rtmp")
if err != nil {
fmt.Printf("open stream err: %s\n", err)
return
}
wg := new(sync.WaitGroup)
wg.Add(2)
go func() {
defer wg.Done()
io.Copy(st, netconn)
netconn.Close()
st.Close()
}()
go func() {
defer wg.Done()
io.Copy(netconn, st)
netconn.Close()
st.Close()
}()
wg.Wait()
}()
}
}
}
86 changes: 86 additions & 0 deletions examples/rtmp/service/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"fmt"
"net"
"sync"

"github.com/singchia/frontier/api/dataplane/v1/service"
"github.com/singchia/joy4/av/avutil"
"github.com/singchia/joy4/av/pktque"
"github.com/singchia/joy4/av/pubsub"
"github.com/singchia/joy4/format"
"github.com/singchia/joy4/format/rtmp"
"github.com/spf13/pflag"
)

func init() {
format.RegisterAll()
}

func main() {
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:30011", "address to dial")
pflag.Parse()

// service
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
svc, err := service.NewService(dialer, service.OptionServiceName("rtmp"))
if err != nil {
fmt.Println("new service err:", err)
return
}
// rtmp service
rtmpserver := &rtmp.Server{}

l := &sync.RWMutex{}
type Channel struct {
que *pubsub.Queue
}
channels := map[string]*Channel{}

rtmpserver.HandlePlay = func(conn *rtmp.Conn) {
fmt.Println(conn.URL.Path)
l.RLock()
ch := channels[conn.URL.Path]
l.RUnlock()

if ch != nil {
cursor := ch.que.Latest()
filters := pktque.Filters{}

demuxer := &pktque.FilterDemuxer{
Filter: filters,
Demuxer: cursor,
}

avutil.CopyFile(conn, demuxer)
}
}
rtmpserver.HandlePublish = func(conn *rtmp.Conn) {
l.Lock()
ch := channels[conn.URL.Path]
if ch == nil {
ch = &Channel{}
ch.que = pubsub.NewQueue()
channels[conn.URL.Path] = ch
} else {
ch = nil
}
l.Unlock()
if ch == nil {
return
}

avutil.CopyFile(ch.que, conn)

l.Lock()
delete(channels, conn.URL.Path)
l.Unlock()
ch.que.Close()
}

rtmpserver.Serve(svc)
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/rabbitmq/amqp091-go v1.9.0
github.com/singchia/geminio v1.1.7-rc.1
github.com/singchia/go-timer/v2 v2.2.1
github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6
github.com/soheilhy/cmux v0.1.5
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
Expand All @@ -33,6 +34,7 @@ require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tidwall/btree v1.4.2 // indirect
github.com/tidwall/gjson v1.14.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369 h1:Yp0zFEufLz0H7jzffb4UPXijavlyqlYeOg7dcyVUNnQ=
github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369/go.mod h1:aFJ1ZwLjvHN4yEzE5Bkz8rD8/d8Vlj3UIuvz2yfET7I=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand All @@ -149,6 +151,8 @@ github.com/singchia/geminio v1.1.7-rc.1/go.mod h1:LkgZj4Ddja97vP7NWQk7TffFLZAosH
github.com/singchia/go-timer/v2 v2.0.3/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4=
github.com/singchia/go-timer/v2 v2.2.1 h1:gJucmL99fkuNzGk2AfNPFpa1X3/4+aGO21KkjFAG624=
github.com/singchia/go-timer/v2 v2.2.1/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4=
github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6 h1:B9MVqDiyqKAjHmYYFNjOPYHqhml8rA1ogKs8rYTfZ00=
github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6/go.mod h1:apGwjKmzM7JlKFbd/KANpq6T8Y5Ntr8Jjhq1BmKU/FA=
github.com/singchia/yafsm v1.0.1 h1:TTDSX7SBCr2YNdv/DZ76LjTer0rYwm7IPt24odNymUs=
github.com/singchia/yafsm v1.0.1/go.mod h1:fSWQl6DCzqc51DhLfwHr3gN2FhGmOEjTAQ2AOKDSBtY=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
Expand Down
Loading