From c6cc0e670eb0e2421a11c73eab4ca9c69593a356 Mon Sep 17 00:00:00 2001 From: Garrett Graves <35381942+GRVYDEV@users.noreply.github.com> Date: Sun, 19 Feb 2023 19:42:24 -0800 Subject: [PATCH] [v2/build] implement running ingest binary from webrtc server (#65) * implement running ingest binary from webrtc server * delete hash file, fix lint * remove hash --- ingest/src/connection.rs | 2 +- webrtc/.gitignore | 2 +- webrtc/go.mod | 2 +- webrtc/ingest/ingest.go | 73 ++++++++++++++++++++++++++++++++++++++++ webrtc/main.go | 35 ++++++++++++++----- 5 files changed, 103 insertions(+), 11 deletions(-) create mode 100644 webrtc/ingest/ingest.go diff --git a/ingest/src/connection.rs b/ingest/src/connection.rs index d51437a..43286f3 100644 --- a/ingest/src/connection.rs +++ b/ingest/src/connection.rs @@ -373,7 +373,7 @@ pub fn read_stream_key(startup: bool, stream_key_env: Option<&str>) -> Vec { if !stream_key.is_empty() { let key = stream_key.as_bytes().to_vec(); print_stream_key(key.to_vec()); - fs::write("hash", hex::encode(&stream_key)) + fs::write("hash", hex::encode(stream_key)) .expect("Unable to write stream key to hash file"); return key; } diff --git a/webrtc/.gitignore b/webrtc/.gitignore index 3dc4204..9fa5645 100644 --- a/webrtc/.gitignore +++ b/webrtc/.gitignore @@ -18,4 +18,4 @@ vendor/ # ide /.idea/ # actual binary -/lightspeed-webrtc \ No newline at end of file +/webrtc diff --git a/webrtc/go.mod b/webrtc/go.mod index bb9577a..6cfaaf1 100644 --- a/webrtc/go.mod +++ b/webrtc/go.mod @@ -1,4 +1,4 @@ -module github.com/GRVYDEV/lightspeed-webrtc +module github.com/GRVYDEV/Project-Lightspeed/webrtc go 1.15 diff --git a/webrtc/ingest/ingest.go b/webrtc/ingest/ingest.go new file mode 100644 index 0000000..81bc064 --- /dev/null +++ b/webrtc/ingest/ingest.go @@ -0,0 +1,73 @@ +package ingest + +import ( + "fmt" + "io" + "log" + "os/exec" +) + +type IngestServer struct { + cmd *exec.Cmd +} + +type IngestServerConfig struct { + Path string + Addr string + StreamKey string + LogFile string +} + +func NewIngestServer(config IngestServerConfig) *IngestServer { + args := make([]string, 0) + if config.Addr != "" { + args = append(args, fmt.Sprintf("-a=%s", config.Addr)) + } + if config.StreamKey != "" { + args = append(args, fmt.Sprintf("-k=%s", config.StreamKey)) + } + if config.LogFile != "" { + args = append(args, fmt.Sprintf("-l=%s", config.LogFile)) + } + + return &IngestServer{ + cmd: exec.Command(config.Path, args...), + } +} + +// StartIngest tries to start the ingest server and logs its stdout +func (s *IngestServer) Start() error { + stdout, err := s.cmd.StdoutPipe() + if err != nil { + return err + } + err = s.cmd.Start() + if err != nil { + return err + } + + //TODO Handle shutdown logic + go s.logger(stdout) + go s.waitForExit() + return nil +} + +func (s *IngestServer) logger(stdout io.ReadCloser) { + for { + buf := make([]byte, 1096) + _, err := stdout.Read(buf) + if err == io.EOF { + break + } + line := string(buf) + fmt.Print(line) + } +} + +func (s *IngestServer) waitForExit() error { + err := s.cmd.Wait() + if err != nil { + log.Fatal(err) + } + return err +} diff --git a/webrtc/main.go b/webrtc/main.go index 5244b8d..f120eab 100644 --- a/webrtc/main.go +++ b/webrtc/main.go @@ -14,7 +14,8 @@ import ( "strings" "sync" - "github.com/GRVYDEV/lightspeed-webrtc/ws" + "github.com/GRVYDEV/Project-Lightspeed/webrtc/ingest" + "github.com/GRVYDEV/Project-Lightspeed/webrtc/ws" "github.com/gorilla/websocket" "github.com/pion/interceptor" @@ -23,13 +24,19 @@ import ( ) var ( - addr = flag.String("addr", "localhost", "http service address") - ip = flag.String("ip", "none", "IP address for webrtc") - wsPort = flag.Int("ws-port", 8080, "Port for websocket") - rtpPort = flag.Int("rtp-port", 65535, "Port for RTP") - ports = flag.String("ports", "20000-20500", "Port range for webrtc") - sslCert = flag.String("ssl-cert", "", "Ssl cert for websocket (optional)") - sslKey = flag.String("ssl-key", "", "Ssl key for websocket (optional)") + addr = flag.String("addr", "localhost", "http service address") + ip = flag.String("ip", "none", "IP address for webrtc") + wsPort = flag.Int("ws-port", 8080, "Port for websocket") + rtpPort = flag.Int("rtp-port", 65535, "Port for RTP") + ports = flag.String("ports", "20000-20500", "Port range for webrtc") + sslCert = flag.String("ssl-cert", "", "Ssl cert for websocket (optional)") + sslKey = flag.String("ssl-key", "", "Ssl key for websocket (optional)") + + ingestPath = flag.String("ingest-path", "lightspeed-ingest", "Path to the lightspeed-ingest binary") + ingestAddr = flag.String("ingest-addr", "", "The address for the ingest server to bind to default 0.0.0.0") + ingestStreamKey = flag.String("ingest-stream-key", "", "The stream key for the ingest server to use") + ingestLogFile = flag.String("ingest-log-file", "", "A path to store ingest logs") + upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } @@ -45,6 +52,18 @@ func main() { flag.Parse() log.SetFlags(0) + iServer := ingest.NewIngestServer(ingest.IngestServerConfig{ + Path: *ingestPath, + Addr: *ingestAddr, + StreamKey: *ingestStreamKey, + LogFile: *ingestLogFile, + }) + + err := iServer.Start() + if err != nil { + log.Fatalf("error starting ingest server %v", err) + } + // Open a UDP Listener for RTP Packets on port 65535 listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(*addr), Port: *rtpPort}) if err != nil {