diff --git a/examples/rtmp/Makefile b/examples/rtmp/Makefile new file mode 100644 index 0000000..1d237d8 --- /dev/null +++ b/examples/rtmp/Makefile @@ -0,0 +1,22 @@ +PREFIX?=/usr +BINDIR?=$(PREFIX)/bin + +GOHOSTOS?=$(shell go env GOHOSTOS) +GOARCH?=$(shell go env GOARCH) + +.PHONY: all +all: rtmp_service rtmp_edge + +.PHONY: clean +clean: + rm rtmp_service rtmp_edge + +.PHONY: rtmp_service +rtmp_service: service/*.go + CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \ + go build -trimpath -ldflags "-s -w" -o ./bin/rtmp_service service/*.go + +.PHONY: rtmp_edge +rtmp_edge: edge/*.go + CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \ + go build -trimpath -ldflags "-s -w" -o ./bin/rtmp_edge edge/*.go \ No newline at end of file diff --git a/examples/rtmp/edge/edge.go b/examples/rtmp/edge/edge.go new file mode 100644 index 0000000..3541fa6 --- /dev/null +++ b/examples/rtmp/edge/edge.go @@ -0,0 +1,62 @@ +package main + +import ( + "fmt" + "io" + "net" + "sync" + + "github.com/singchia/frontier/api/dataplane/v1/edge" + "github.com/spf13/pflag" +) + +func main() { + network := pflag.String("network", "tcp", "network to dial") + address := pflag.String("address", "127.0.0.1:30012", "address to dial") + name := pflag.String("name", "alice", "user name to join chatroom") + listen := pflag.String("listen", "127.0.0.1:1935", "rtmp port to proxy") + pflag.Parse() + dialer := func() (net.Conn, error) { + return net.Dial(*network, *address) + } + cli, err := edge.NewNoRetryEdge(dialer, edge.OptionEdgeMeta([]byte(*name))) + if err != nil { + fmt.Println("new edge err:", err) + return + } + for { + ln, err := net.Listen("tcp", *listen) + if err != nil { + return + } + for { + netconn, err := ln.Accept() + if err != nil { + fmt.Printf("accept err: %s\n", err) + break + } + go func() { + st, err := cli.OpenStream("rtmp") + if err != nil { + fmt.Printf("open stream err: %s\n", err) + return + } + wg := new(sync.WaitGroup) + wg.Add(2) + go func() { + defer wg.Done() + io.Copy(st, netconn) + netconn.Close() + st.Close() + }() + go func() { + defer wg.Done() + io.Copy(netconn, st) + netconn.Close() + st.Close() + }() + wg.Wait() + }() + } + } +} diff --git a/examples/rtmp/service/service.go b/examples/rtmp/service/service.go new file mode 100644 index 0000000..c033bd8 --- /dev/null +++ b/examples/rtmp/service/service.go @@ -0,0 +1,86 @@ +package main + +import ( + "fmt" + "net" + "sync" + + "github.com/singchia/frontier/api/dataplane/v1/service" + "github.com/singchia/joy4/av/avutil" + "github.com/singchia/joy4/av/pktque" + "github.com/singchia/joy4/av/pubsub" + "github.com/singchia/joy4/format" + "github.com/singchia/joy4/format/rtmp" + "github.com/spf13/pflag" +) + +func init() { + format.RegisterAll() +} + +func main() { + network := pflag.String("network", "tcp", "network to dial") + address := pflag.String("address", "127.0.0.1:30011", "address to dial") + pflag.Parse() + + // service + dialer := func() (net.Conn, error) { + return net.Dial(*network, *address) + } + svc, err := service.NewService(dialer, service.OptionServiceName("rtmp")) + if err != nil { + fmt.Println("new service err:", err) + return + } + // rtmp service + rtmpserver := &rtmp.Server{} + + l := &sync.RWMutex{} + type Channel struct { + que *pubsub.Queue + } + channels := map[string]*Channel{} + + rtmpserver.HandlePlay = func(conn *rtmp.Conn) { + fmt.Println(conn.URL.Path) + l.RLock() + ch := channels[conn.URL.Path] + l.RUnlock() + + if ch != nil { + cursor := ch.que.Latest() + filters := pktque.Filters{} + + demuxer := &pktque.FilterDemuxer{ + Filter: filters, + Demuxer: cursor, + } + + avutil.CopyFile(conn, demuxer) + } + } + rtmpserver.HandlePublish = func(conn *rtmp.Conn) { + l.Lock() + ch := channels[conn.URL.Path] + if ch == nil { + ch = &Channel{} + ch.que = pubsub.NewQueue() + channels[conn.URL.Path] = ch + } else { + ch = nil + } + l.Unlock() + if ch == nil { + return + } + + avutil.CopyFile(ch.que, conn) + + l.Lock() + delete(channels, conn.URL.Path) + l.Unlock() + ch.que.Close() + } + + rtmpserver.Serve(svc) +} diff --git a/go.mod b/go.mod index f36c879..defddcb 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/rabbitmq/amqp091-go v1.9.0 github.com/singchia/geminio v1.1.7-rc.1 github.com/singchia/go-timer/v2 v2.2.1 + github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6 github.com/soheilhy/cmux v0.1.5 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 @@ -33,6 +34,7 @@ require ( github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/tidwall/btree v1.4.2 // indirect github.com/tidwall/gjson v1.14.3 // indirect diff --git a/go.sum b/go.sum index 41ebb33..456a869 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,8 @@ github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369 h1:Yp0zFEufLz0H7jzffb4UPXijavlyqlYeOg7dcyVUNnQ= +github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369/go.mod h1:aFJ1ZwLjvHN4yEzE5Bkz8rD8/d8Vlj3UIuvz2yfET7I= github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= @@ -149,6 +151,8 @@ github.com/singchia/geminio v1.1.7-rc.1/go.mod h1:LkgZj4Ddja97vP7NWQk7TffFLZAosH github.com/singchia/go-timer/v2 v2.0.3/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4= github.com/singchia/go-timer/v2 v2.2.1 h1:gJucmL99fkuNzGk2AfNPFpa1X3/4+aGO21KkjFAG624= github.com/singchia/go-timer/v2 v2.2.1/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4= +github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6 h1:B9MVqDiyqKAjHmYYFNjOPYHqhml8rA1ogKs8rYTfZ00= +github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6/go.mod h1:apGwjKmzM7JlKFbd/KANpq6T8Y5Ntr8Jjhq1BmKU/FA= github.com/singchia/yafsm v1.0.1 h1:TTDSX7SBCr2YNdv/DZ76LjTer0rYwm7IPt24odNymUs= github.com/singchia/yafsm v1.0.1/go.mod h1:fSWQl6DCzqc51DhLfwHr3gN2FhGmOEjTAQ2AOKDSBtY= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=